Commit eb6d217b authored by Paul Rich's avatar Paul Rich
Browse files

Test updates for Cray Port

parent cb0ce41e
......@@ -40,6 +40,8 @@ import types
import unittest
import xmlrpclib
from mock import patch
import Cobalt.Components.cqm
from Cobalt.Components.base import Component, exposed, automatic, query
from Cobalt.Components.cqm import QueueManager, Signal_Map
......@@ -3594,3 +3596,45 @@ class TestCQMIntegration (CQMIntegrationTestBase):
del self.taskman
CQMIntegrationTestBase.teardown(self)
class TestJobStatuses(object):
states = {'Ready': ('queued', 'Q'),
'Preempted': ('preempted', 'P'),
'Job_Prologue': ('starting', 'R'),
'Job_Prologue_Retry': ('starting', 'R'),
'Resource_Prologue':('starting', 'R'),
'Resource_Prologue_Retry': ('starting', 'R'),
'Run_Retry': ('starting', 'R'),
'Running': ('running', 'R'),
'Kill_Retry': ('killing', 'K'),
'Killing': ('killing', 'K'),
'Preempt_Retry': ('preempting', 'P'),
'Preempting': ('preempting', 'P'),
'Preempt_Finalize_Retry': ('preempting', 'P'),
'Preempt_Epilogue': ('preempting', 'P'),
'Job_Prologue_Retry_Release': ('exiting', 'E'),
'Resource_Prologue_Retry_Release': ('exiting', 'E'),
'Finalize_Retry': ('exiting', 'E'),
'Resource_Epilogue': ('exiting', 'E'),
'Resource_Epilogue_Retry': ('exiting', 'E'),
'Job_Epilogue': ('exiting', 'E'),
'Job_Epilogue_Retry': ('exiting', 'E'),
'Terminal': ('done', 'E')
}
def setup(self):
spec = {}
self.job = Cobalt.Components.cqm.Job(spec)
def teardown(self):
del self.job
def test_display_states(self):
for state, printable_status in self.states.items():
with patch.object(Cobalt.Components.cqm.Job, '_sm_state', state):
assert self.job.state == printable_status[0], ("Bad State %s: Expected %s, got %s" %
(state, printable_status[0], self.job.state))
assert self.job.short_state == printable_status[1], ("Bad State %s: Expected %s, got %s" %
(state, printable_status[1], self.job.short_state))
This diff is collapsed.
'''Process Manager for cluster/cray systems tests'''
import time
import logging
import sys
from mock import Mock, MagicMock, patch
import Cobalt.Proxy
from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
default_child_data = [{'id': 1}]
def fake_forker(*args, **kwargs):
print args
print kwargs
raise RuntimeError('boom')
#return 1
class InspectMock(MagicMock):
'''allow us to inspect what is going on within a proxy call'''
def __getattr__(self, attr):
if attr == 'get_children':
return MagicMock(return_value=[{'id': 1}])
elif attr == 'fork':
return MagicMock(return_value=1)
return super(InspectMock, self).__getattr__(attr)
class TestProcessManager(object):
'''tests for the base project manager'''
def setup(self):
'''common setup for process group tests'''
self.base_spec = {'args':['arg1', 'arg2'], 'user':'frodo',
'jobid': 1, 'executable': 'job.exe', 'size': 2,
'cwd': '/home/frodo', 'location': 'loc1'
}
self.process_manager = ProcessGroupManager()
self.process_manager.forkers = ['forker1']
self.process_manager.forker_taskcounts = {'forker1':0}
def teardown(self):
'''common teardown for process group tests'''
del self.base_spec
del self.process_manager
def test_process_manager_init_groups_single(self):
'''ProcessGroupManager.init_groups: create a process group and add to process manager'''
specs = [self.base_spec]
self.process_manager.init_groups(specs)
assert self.process_manager.process_groups.get(1, None) is not None, "process group not created"
assert self.process_manager.process_groups[1].forker == 'forker1', "forker not set"
@patch.object(Cobalt.Proxy.DeferredProxyMethod, '__call__', return_value=1)
def test_process_manager_start_groups_single(self, *args, **kwargs):
'''ProcessGroupManager.start_groups: start up a single process group'''
self.base_spec['startup_timeout'] = 120
self.process_manager.init_groups([self.base_spec])
started = self.process_manager.start_groups([1])
assert len(started) == 1, "started %s groups, should have started 1" % len(started)
assert sorted(started) == [1], "wrong groups started."
assert self.process_manager.process_groups[1].startup_timeout == 0, (
"startup_timeout not reset")
@patch('Cobalt.Proxy.DeferredProxy', side_effect=InspectMock)
def test_process_manager_update_groups_timeout(self, *args, **kwargs):
'''ProcessGroupManager.update_groups: startup timeout respected.'''
now = int(time.time())
pgroups = self.process_manager.process_groups
self.process_manager.init_groups([self.base_spec])
pgroups[1].startup_timeout = 120 + now
self.process_manager.update_groups()
pgroups = self.process_manager.process_groups
assert len(pgroups) == 1, "%s groups, should have 1" % len(pgroups)
assert sorted(pgroups.keys()) == [1], "wrong groups."
assert pgroups[1].startup_timeout == now + 120, (
"bad startup timeout: %s" % pgroups[1].startup_timeout)
@patch('Cobalt.Proxy.DeferredProxy', side_effect=InspectMock)
def test_process_manager_update_groups_timeout_exceeded(self, *args, **kwargs):
'''ProcessGroupManager.update_groups: startup timeout exceeded.'''
now = int(time.time())
pgroups = self.process_manager.process_groups
self.process_manager.init_groups([self.base_spec])
pgroups[1].startup_timeout = now - 120
self.process_manager.update_groups()
pgroups = self.process_manager.process_groups
assert len(pgroups) == 0, "%s groups, should have 0" % len(pgroups)
assert sorted(pgroups.keys()) == [], "groups should be empty"
'''Tests for base ProcessGroup class and actions'''
from nose.tools import raises
from mock import Mock, MagicMock, patch
import Cobalt.Exceptions
from Cobalt.DataTypes.ProcessGroup import ProcessGroup
from Cobalt.Proxy import ComponentProxy
mock_proxy = MagicMock()
class TestProcessGroup(object):
'''Group together process group tests, and apply common setup'''
def setup(self):
'''common setup for process group tests'''
self.base_spec = {'args':['arg1', 'arg2'], 'user':'frodo',
'jobid': 1, 'executable': 'job.exe', 'size': 2,
'cwd': '/home/frodo', 'location': 'loc1'
}
def teardown(self):
'''common teardown for process group tests'''
del self.base_spec
def test_process_group_init(self):
'''ProcessGroup.__init__: basic initialization'''
pgroup = ProcessGroup(self.base_spec)
assert pgroup is not None, "process group creation failed"
@raises(Cobalt.Exceptions.DataCreationError)
def test_process_group_init_missing_fields(self):
'''ProcessGroup.__init__: exception on bad spec'''
pgroup = ProcessGroup({})
assert False, "Should raise exception"
@patch.object(Cobalt.Proxy.DeferredProxyMethod, '__call__', return_value=1)
def test_process_group_start_base(self, proxy):
'''basic process group startup'''
pgroup = ProcessGroup(self.base_spec)
data = pgroup.prefork()
pgroup.start()
proxy.assert_called_with([pgroup.executable] + pgroup.args, pgroup.tag,
"Job %s/%s/%s" %(pgroup.jobid, pgroup.user, pgroup.id), pgroup.env,
data, pgroup.runid)
"""Tests for general system component classes.
"""
from nose.tools import raises
from testsuite.TestCobalt.Utilities.assert_functions import assert_match, assert_not_match
from Cobalt.Components.system.resource import Resource
from Cobalt.Components.system.ClusterNode import ClusterNode
import Cobalt.Exceptions
import time
def is_match(a, b):
return a is b
class TestSystemResource(object):
def setup(self):
default_spec = {'name': 'res1',
'attributes': {'mem':64, 'cpupn':16, 'pubnet':True},
}
self.resource_list=[]
self.resource_list.append(Resource(default_spec))
self.resource_list[0].managed = True
def teardown(self):
del self.resource_list
def test_resource_init_no_attrs(self):
#initialize a resource with no attributes set, need to get {}.
resource = Resource({'name':'foo'})
assert_match(resource.name, 'foo', "Resource not initialized")
assert_match(resource.attributes, {}, "Improper attributes set")
assert_match(resource.status, 'idle', "Default resource not idle")
assert not resource.managed, "Default resource shouldn't be managed"
def test_resource_init_with_attrs(self):
#expected default initialization
attrs = {'bar':1, 'baz':'hi'}
resource = Resource({'name':'foo', 'attributes': attrs})
assert_match(resource.name, 'foo', "Resource not initialized")
assert_match(resource.attributes, attrs, "Imporper attributes set.")
assert_match(resource.status, 'idle', "Default resource not idle")
assert not resource.managed, "Default resource shouldn't be managed"
@raises(Cobalt.Exceptions.InvalidStatusError)
def test_resource_set_bad_state(self):
#Make sure the resource state constraint is obeyed.
self.resource_list[0].status = 'badstatus'
assert False, "Exception not raised"
def test_resource_state_reserved(self):
#if resource_until is set, the resource is reserved.
assert not self.resource_list[0].reserved, "Resource erroniously reserved."
self.resource_list[0].reserved_until = time.time() + 600
assert self.resource_list[0].reserved, "Resource should be reserved."
def test_resource_reservation_idle(self):
#Can we make a reservation
until = time.time() + 600
assert self.resource_list[0].reserve(until, 'foo', 1), "reservation failed."
assert_match(self.resource_list[0].reserved_until, until,
"reserved_until not set.")
assert_match(self.resource_list[0].reserved_by, 'foo',
"reserved_by not set.")
assert_match(self.resource_list[0].reserved_jobid, 1,
"reserved_jobid not set.")
assert_match(self.resource_list[0].status, 'allocated',
"resource not allocated")
def test_resource_reservation_reserved_bad_user(self):
#Fence the reservation manipulation to user
now = time.time()
until = now + 600
user = 'foo'
jobid = 1
res = self.resource_list[0]
assert res.reserve(now, user, jobid), "failed initial reservation"
try:
res.reserve(until, "bar", 1)
except Cobalt.Exceptions.ResourceReservationFailure:
pass
else:
assert False, "ResourceReservationFailure not raised"
assert_match(res.reserved_until, now, "reserved until modified.")
assert_match(res.reserved_by, user, "reserved user modified.")
assert_match(res.reserved_jobid, jobid, "reserved jobid modified.")
assert_match(res.status, "allocated", "improper status")
def test_resource_reservation_reserved_bad_job(self):
#make sure a bad jobid but right user doesn't re-reserve resource.
now = time.time()
until = now + 600
user = 'foo'
jobid = 1
res = self.resource_list[0]
assert res.reserve(now, user, jobid), "failed initial reservation"
try:
res.reserve(until, user, 2)
except Cobalt.Exceptions.ResourceReservationFailure:
pass
else:
assert False, "ResourceReservationFailure not raised"
assert_match(res.reserved_until, now, "reserved until modified.")
assert_match(res.reserved_by, user, "reserved user modified.")
assert_match(res.reserved_jobid, jobid, "reserved jobid modified.")
assert_match(res.status, "allocated", "improper status")
def test_resource_release(self):
#release resource reservation
res = self.resource_list[0]
until = time.time() + 600
user = 'foo'
jobid = 1
res.reserve(until, user, jobid)
assert res.reserved, "reservation failed"
assert res.release(user, jobid), "release failed"
assert not res.reserved, "still reserved"
assert_match(res.reserved_until, None, "reserved until not unset",
is_match)
assert_match(res.reserved_by, None, "reserved by not unset",
is_match)
assert_match(res.reserved_jobid, None, "reserved jobid not unset.",
is_match)
assert_match(res.status, "cleanup-pending", "improper status")
def test_resource_release_force(self):
#force-release reservation, because admin commands are a thing.
res = self.resource_list[0]
until = time.time() + 600
user = 'foo'
jobid = 1
res.reserve(until, user, jobid)
assert res.reserved, "reservation failed"
assert res.release(force=True), "release failed"
assert not res.reserved, "release failed"
assert_match(res.reserved_until, None, "reserved until not unset",
is_match)
assert_match(res.reserved_by, None, "reserved by not unset",
is_match)
assert_match(res.reserved_jobid, None, "reserved jobid not unset.",
is_match)
assert_match(res.status, "cleanup-pending", "improper status")
def test_resource_release_unreserved(self):
#you can't release something that hasn't been reserved yet.
#However, it would show up as being released anyway.
res = self.resource_list[0]
assert res.release(force=True), "must not show as reserved."
def test_resource_release_bad_user(self):
#keep a user from releasing some one else's reservation
#need both bad user and bad joid
res = self.resource_list[0]
until = time.time() + 600
user = 'foo'
jobid = 1
assert res.reserve(until, user, jobid), "failed to reserve"
assert res.reserved, "reservation failed"
assert res.release('bar', jobid), "release succeeded"
assert_match(res.reserved_until, None, 'reserve until unset')
assert_match(res.reserved_by, None, 'reserve by unset')
assert_match(res.reserved_jobid, None, 'reserve jobid unset')
assert_match(res.status, 'cleanup-pending', 'improper status')
def test_resource_release_bad_jobid(self):
#Release if user good but jobid bad
res = self.resource_list[0]
until = time.time() + 600
user = 'foo'
jobid = 1
assert res.reserve(until, user, jobid), "failed to reserve"
assert res.reserved, "reservation failed"
assert res.release(user, 2), "release succeeded"
assert_match(res.reserved_until, None, 'reserve until unset')
assert_match(res.reserved_by, None, 'reserve by unset')
assert_match(res.reserved_jobid, None, 'reserve jobid unset')
assert_match(res.status, 'cleanup-pending', 'improper status')
def test_resource_release_both_bad(self):
#Do not release if user and job are bad
res = self.resource_list[0]
until = time.time() + 600
user = 'foo'
jobid = 1
assert res.reserve(until, user, jobid), "failed to reserve"
assert res.reserved, "reservation failed"
assert not res.release('bar', 2), "release succeeded"
assert_match(res.reserved_until, until, 'reserve until unset')
assert_match(res.reserved_by, user, 'reserve by unset')
assert_match(res.reserved_jobid, jobid, 'reserve jobid unset')
assert_not_match(res.status, 'allocated', 'improper status')
@raises(Cobalt.Exceptions.UnmanagedResourceError)
def test_unmanaged_reserve(self):
#can't do anything to unmanaged resources
self.resource_list[0].managed = False
self.resource_list[0].reserve(time.time())
@raises(Cobalt.Exceptions.UnmanagedResourceError)
def test_unmanaged_release(self):
#if we're not managing it, we can't release it.
self.resource_list[0].managed = False
self.resource_list[0].release()
class TestClusterNode(object):
def setup(self):
'''Set up common test parameters. Run as a part of every test.'''
self.now = time.time()
def teardown(self):
'''Clean up any default parameters that need it. Run every test.'''
pass
def setup_base_node(self):
'''Generate a bare-bones test node.'''
spec = {'name': 'node1'}
self.nodelist = [ClusterNode(spec)]
self.testnode = self.nodelist[0]
self.testnode.schedulable = True
def test_init(self):
#test basic initialization
spec = {'name': 'node1',
'attributes': {'ncpu': 1, 'mem':4},
'queues': ['foo', 'bar'],
'backfill_epsilon': 600,
}
node = ClusterNode(spec)
assert_match(node.name, 'node1', "wrong name")
assert_match(node.attributes, {'ncpu': 1, 'mem':4}, "Attributes not set.")
assert node.schedulable, "defaulted to not schedulable"
assert_match(node.drain_until, None, "drain_until should not be set",
is_match)
assert_match(node.drain_jobid, None, "drain_jobid should not be set",
is_match)
assert_match(node.queues, ['foo', 'bar'], "queues not set")
assert_match(node.backfill_epsilon, 600, "backfill_epsilon not set")
def test_init_defaults(self):
#test that defaults are being properly set
spec = {'name': 'node1'}
node = ClusterNode(spec)
assert_match(node.attributes, {}, 'bad default attributes')
assert_match(node.queues, ['default'], 'bad default queues')
assert_match(node.backfill_epsilon, 120, 'bad default backfill_epsilon')
def test_set_drain(self):
#Set all draining parameters correctly.
self.setup_base_node()
self.testnode.set_drain(self.now + 500, 1234)
assert self.testnode.draining, "Node not reporting that it is draining"
assert_match(self.testnode.drain_jobid, 1234,
"Draining jobid set incorrectly.")
assert_match(self.testnode.drain_until, int(self.now + 500),
"Drain until set incorrectly.")
def test_clear_drain(self):
#make sure draining gets cleared correctly
self.setup_base_node()
self.testnode.set_drain(self.now + 500, 1234)
self.testnode.clear_drain()
assert not self.testnode.draining, "Node still draining."
assert_match(self.testnode.drain_jobid, None,
"Draining jobid still set.", is_match)
assert_match(self.testnode.drain_until, None, "Drain until still set.",
is_match)
def test_read_only_attrs(self):
#These are read-only attributes. Make sure they stay that way.
self.setup_base_node()
testattrs = ['drain_until', 'drain_jobid', 'draining']
for attr in testattrs:
try:
setattr(self.testnode, attr, 'foo')
except AttributeError:
pass
else:
assert False, ("Read only attribute %s did not raise exception"
% (attr))
@raises(ValueError)
def test_no_negative_backfill_epsilon(self):
#ensure ValueError raised for backfill_epsilon
self.setup_base_node()
self.testnode.backfill_epsilon = -1
@raises(Cobalt.Exceptions.UnschedulableNodeError)
def test_no_drain_down(self):
#don't drain down hardware
self.setup_base_node()
self.testnode.status = 'down'
self.testnode.set_drain(self.now, 1234)
@raises(Cobalt.Exceptions.UnschedulableNodeError)
def test_no_drain_unschedulable(self):
#don't drain on unscheduled hardware
self.setup_base_node()
self.testnode.schedulable = False
self.testnode.set_drain(self.now, 1234)
"""Helper and convenience functions for common assert cases"""
def assert_match(test, expect, desc, comp=None):
'''Assert that we have a match, log failure in convenient way otherwise.
comp should correspond to a boolean __eq__ function, if None, will
use default == comparitor.
'''
if comp is None:
comp = lambda a, b: a == b
assert comp(test, expect), "FAILED MATCH: %s: Expected: %s; Got %s" % (desc, str(expect), str(test))
def assert_not_match(test, expect, desc, comp=None):
'''Assert non-matching test and expected result. Default comparitor
corresponds to __ne__ function. By default will use !=.
'''
if comp is None:
comp = lambda a, b: a != b
assert comp(test, expect), "FAILED NON-MATCH: %s: Got %s for both" % (desc,
str(test))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment