Commit 6b135813 authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch 'master' into 20-draining-backfilling

parents 2ab37ab3 f456d0ed
......@@ -68,6 +68,7 @@ from Cobalt.arg_parser import ArgParse
from Cobalt.Util import get_config_option, init_cobalt_config, sleep
from Cobalt.Proxy import ComponentProxy
import xmlrpclib
import subprocess
__revision__ = '$Revision: 559 $'
__version__ = '$Version$'
......@@ -83,7 +84,7 @@ def on_interrupt(sig, func=None):
"""
Interrupt Handler to cleanup the interactive job if the user interrupts
Will contain two static variables: count and exit.
'count' will keep track how many interruptions happened and
'count' will keep track how many interruptions happened and
'exit' flags whether we completely exit once the interrupt occurs.
"""
on_interrupt.count += 1
......@@ -368,6 +369,29 @@ def parse_options(parser, spec, opts, opt2spec, def_spec):
opts['disable_preboot'] = not spec['script_preboot']
return opt_count
def fetch_pgid(user, jobid, loc, pgid=None):
'''fetch and set pgid for user shell. Needed for cray systems'''
if client_utils.component_call(SYSMGR, False, 'get_implementation', ()) == 'alps_system':
#Cray is apparently using the session id for interactive jobs.
spec = [{'user': user, 'jobid': jobid, 'pgid': pgid, 'location':loc}]
if not client_utils.component_call(SYSMGR, False, 'confirm_alps_reservation', (spec)):
client_utils.logger.error('Unable to confirm ALPS reservation. Exiting.')
sys.exit(1)
return
def exec_user_shell(user, jobid, loc):
'''Execute shell for user for interactive jobs. Uses the user's defined
SHELL. Will also send the pgid for cray systems so that aprun will work
from within the shell.
Wait until termination.
'''
pgid = os.getppid()
proc = subprocess.Popen([os.environ['SHELL']], shell=True,
preexec_fn=(lambda: fetch_pgid(user, jobid, loc, pgid=pgid)))
os.waitpid(proc.pid, 0)
def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
"""
This will create the shell or ssh session for user
......@@ -376,7 +400,6 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
# save whether we are running on a cluster system
impl = client_utils.component_call(SYSMGR, False, 'get_implementation', ())
exit_on_interrupt()
deljob = True if impl == "cluster_system" else False
def start_session(loc, resid, nodes, procs):
......@@ -396,11 +419,14 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
client_utils.logger.info("Opening interactive session to %s", loc)
if deljob:
os.system("/usr/bin/ssh -o \"SendEnv COBALT_NODEFILE COBALT_JOBID\" %s" % (loc))
if impl == 'alps_system':
exec_user_shell(user, jobid, loc)
else:
os.system(os.environ['SHELL'])
# Wait for job to start
query = [{'tag':'job', 'jobid':jobid, 'location':'*', 'state':"*", 'resid':"*"}]
query = [{'tag':'job', 'jobid':jobid, 'location':'*', 'state':"*",
'resid':"*"}]
client_utils.logger.info("Wait for job %s to start...", str(jobid))
while True:
......
......@@ -88,20 +88,39 @@ def verify_locations(partitions):
if system_type in ['alps_system']:
# nodes come in as a compact list. expand this.
check_partitions = []
# if we're not a compact list, convert to a compact list. Get this,
# ideally, in one call
for num_list in partitions:
check_partitions.extend(expand_num_list(num_list))
for p in check_partitions:
test_parts = client_utils.component_call(SYSMGR, False,
'verify_locations', (check_partitions,))
if len(test_parts) != len(check_partitions):
missing = [p for p in check_partitions if p not in test_parts]
client_utils.logger.error("Missing partitions: %s" % (" ".join(missing)))
sys.exit(1)
test_parts = client_utils.component_call(SYSMGR, False,
'verify_locations', (check_partitions,))
# On Cray we will be a little looser to make setting reservations
# easier.
client_utils.logger.info('Found Nodes: %s', compact_num_list(test_parts))
missing_nodes = set(check_partitions) - set(test_parts)
if len(missing_nodes) != 0:
# TODO: relax this, we should allow for this to occur, but
# reservation-queue data amalgamation will need a fix to get
# this to work. --PMR
client_utils.logger.error("Missing partitions: %s" % (",".join([str(nid) for nid in missing_nodes])))
client_utils.logger.error("Aborting reservation setup.")
sys.exit(1)
#sys.exit(1)
else:
for p in check_partitions:
test_parts = client_utils.component_call(SYSMGR, False,
'verify_locations', (check_partitions,))
if len(test_parts) != len(check_partitions):
missing = [p for p in check_partitions if p not in test_parts]
client_utils.logger.error("Missing partitions: %s" % (" ".join(missing)))
sys.exit(1)
def validate_args(parser,spec,opt_count):
"""
Validate setres arguments. Will return true if we want to continue processing options.
"""
system_type = client_utils.component_call(SYSMGR, False, 'get_implementation', ())
if parser.options.partitions != None:
parser.args += [part for part in parser.options.partitions.split(':')]
......@@ -120,7 +139,7 @@ def validate_args(parser,spec,opt_count):
if only_id_change:
# make the ID change and we are done with setres
if parser.options.res_id != None:
set_res_id(parser)
if parser.options.cycle_id != None:
......@@ -150,17 +169,26 @@ def validate_args(parser,spec,opt_count):
client_utils.logger.error("Cannot use -D while changing start or cycle time")
sys.exit(1)
if not parser.no_args():
verify_locations(parser.args)
# if we have command line arguments put them in spec
if not parser.no_args(): spec['partitions'] = ":".join(parser.args)
if system_type in ['alps_system']:
if not parser.no_args():
nodes = []
for arg in parser.args:
nodes.extend(expand_num_list(arg))
compact_nodes = compact_num_list(nodes)
verify_locations([compact_nodes])
spec['partitions'] = compact_nodes
else:
if not parser.no_args():
verify_locations(parser.args)
if not parser.no_args(): spec['partitions'] = ":".join(parser.args)
continue_processing_options = True # continue, setres is not done.
return continue_processing_options
def modify_reservation(parser):
"""
this will handle reservation modifications
......
......@@ -794,7 +794,7 @@ class BaseForker (Component):
break
elif exc.errno in [errno.EBADF, errno.EINVAL, errno.EPIPE]:
_logger.error("%s: Error reading stdout from child pipe.",
child.label)
child.label, excinfo=True)
break
elif exc.errno in [errno.ENITR]:
#Try again
......
......@@ -117,7 +117,7 @@ def confirm(alps_res_id, pg_id):
None Yet.
'''
params = {'pagg_id': pg_id,
'reservation': alps_res_id}
'reservation_id': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('CONFIRM',
params=params)))
return retval
......
......@@ -28,8 +28,34 @@ class CrayNode(ClusterNode):
self.ALPS_status = 'UNKNOWN' #Assume unknown state.
CrayNode.RESOURCE_STATUSES.append('alps-interactive')
def to_dict(self):
return self.__dict__
def to_dict(self, cooked=False, params=None):
'''return a dictionary representation of a node. Used to send data to
clients/other components.
Input:
cooked - (default: False) If true, strip leading '_' characters from
variables. Useful for display applications (e.g. nodelist)
Returns:
Dictionary representation of CrayNode fields.
Notes:
The output can be sent via XMLRPC without modificaiton
'''
ret_node = self.__dict__
if cooked:
cooked_node = {}
for key, val in self.__dict__.items():
if key.startswith('_'):
cooked_node[key[1:]] = val
else:
cooked_node[key] = val
ret_node = cooked_node
if params is not None and cooked:
params = [p.lower() for p in params]
ret_node = {k:v for k, v in ret_node.items() if k.lower() in params}
return ret_node
def __str__(self):
return str(self.to_dict())
......
This diff is collapsed.
......@@ -35,6 +35,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
compatible with the ProcessGroupDict class.
'''
self.pgroup_type = pgroup_type
self._common_init_restart()
......@@ -45,8 +46,12 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''
if state is None:
self.process_groups = ProcessGroupDict()
self.process_groups.item_cls = self.pgroup_type
else:
self.process_groups = state['process_groups']
self.process_groups = state.get('process_groups',
ProcessGroupDict())
for pg in self.process_groups.values():
_logger.info('recovering pgroup %s, jobid %s', pg.id, pg.jobid)
self.process_groups.id_gen.set(int(state['next_pg_id']))
self.process_group_actions = {}
self.forkers = [] #list of forker identifiers to use with ComponentProxy
......@@ -97,7 +102,10 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''
signaled_pgs = []
for pgid in pgids:
if self.process_groups[pgid].signal(signame):
if self.process_groups[pgid].mode == 'interactive':
self.process_groups[pgid].interactive_complete = True
signaled_pgs.append(self.process_groups[pgid])
elif self.process_groups[pgid].signal(signame):
signaled_pgs.append(self.process_groups[pgid])
return signaled_pgs
......@@ -143,8 +151,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
for forker in self.forkers:
completed[forker] = []
try:
child_data = ComponentProxy(forker).get_children("process group",
None)
child_data = ComponentProxy(forker).get_children("process group", None)
except ComponentLookupError, e:
_logger.error("failed to contact the %s component to obtain a list of children", forker)
except:
......@@ -159,6 +166,14 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
pg_id = pg.id
child_uid = (pg.forker, pg.head_pid)
if child_uid not in children:
if pg.mode == 'interactive':
#interactive job, there is no child job
if pg.interactive_complete:
completed_pgs.append(pg)
#not really orphaned, but this causes the proper cleanup
#to occur
orphaned.append(pg_id)
continue
orphaned.append(pg_id)
_logger.warning('%s: orphaned job exited with unknown status', pg.jobid)
pg.exit_status = 1234567 #FIXME: what should this sentinel be?
......
......@@ -40,7 +40,10 @@ class Resource(object):
self.reserved_jobid = node.reserved_jobid
self.reserved_until = node.reserved_until
self.managed = node.managed
self.admin_down = node.admin_down
try:
self.admin_down = node.admin_down
except:
self.admin_down = False
def __hash__(self):
'''Hash is the hash of the string name for the resource.'''
......
......@@ -16,6 +16,7 @@ import ConfigParser
import re
import logging
import time
import json
import Cobalt.Util
from Cobalt.Proxy import ComponentProxy
......@@ -1276,22 +1277,42 @@ def cluster_display_node_info():
def print_node_list():
'''fetch and print a list of node information with default headers'''
nodes = component_call(SYSMGR, False, 'get_nodes',
(True,))
reservations = component_call(SCHMGR, False, 'get_reservations', ([{'queue':'*', 'partitions':'*', 'active':True}],))
def _extract_res_node_ranges(res_nodes):
nids = []
for nidlist in res_nodes:
nids.extend(Cobalt.Util.expand_num_list(nidlist))
return nids
def _setup_res_info():
'''set up the reservation-to-node info so showres shows associated
reservation queues when active.
'''
reservations = component_call(SCHMGR, False, 'get_reservations',
([{'queue':'*', 'partitions':'*', 'active':True}],))
res_queues = {}
for res in reservations:
res_nodes = res['partitions'].split(':')
res_nodes = [str(nid) for nid in
_extract_res_node_ranges(res['partitions'].split(':'))]
for node in res_nodes:
if res_queues.get(node, []) == []:
res_queues[node] = [res['queue']]
else:
res_queues[node].append(res['queue'])
return res_queues
def print_node_list():
'''fetch and print a list of node information with default headers'''
header = ['Node_id', 'Name', 'Queues', 'Status']
nodes = json.loads(component_call(SYSMGR, False, 'get_nodes',
(True, None, header, True)))
#TODO: Allow headers to be specified by the user in the future.
if 'queues' in [h.lower() for h in header]:
res_queues = _setup_res_info()
if len(nodes) > 0:
header = ['Node_id', 'Name', 'Queues', 'Status']
print_nodes = []
for node in nodes.values():
entry = []
......@@ -1311,7 +1332,11 @@ def print_node_list():
logger.info('System has no nodes defined')
def print_node_details(args):
'''fetch and print a detailed view of node information'''
'''fetch and print a detailed view of node information
args - list of nodes to fetch detailed information on.
'''
def gen_printable_value(value):
if isinstance(value, dict):
retval = ', '.join(['%s: %s'% (k, gen_printable_value(v)) for k, v in
......@@ -1324,15 +1349,7 @@ def print_node_details(args):
nodes = component_call(SYSMGR, False, 'get_nodes',
(True, expand_node_args(args)))
reservations = component_call(SCHMGR, False, 'get_reservations', ([{'queue':'*', 'partitions':'*', 'active':True}],))
res_queues = {}
for res in reservations:
res_nodes = res['partitions'].split(':')
for node in res_nodes:
if res_queues.get(node, []) == []:
res_queues[node] = [res['queue']]
else:
res_queues[node].append(res['queue'])
res_queues = _setup_res_info()
for node in nodes.values():
header_list = []
value_list = []
......
# Test Cray-specific utilities/calls.
from nose.tools import raises
from testsuite.TestCobalt.Utilities.assert_functions import assert_match, assert_not_match
from Cobalt.Components.system.CrayNode import CrayNode
import Cobalt.Exceptions
import time
from Cobalt.Components.system.CraySystem import CraySystem
from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
import Cobalt.Components.system.AlpsBridge as AlpsBridge
from mock import MagicMock, Mock, patch
def is_match(a, b):
return a is b
class TestCrayNode(object):
def setup(self):
self.spec = {'name':'test', 'state': 'UP', 'node_id': 1, 'role':'batch',
'architecture': 'XT', 'SocketArray':['foo', 'bar'],
}
self.base_node = CrayNode(self.spec)
def teardown(self):
del self.spec
del self.base_node
def test_init(self):
'''CrayNode init test'''
spec = {'name':'test', 'state': 'UP', 'node_id': 1, 'role':'batch',
'architecture': 'XT', 'SocketArray':['foo', 'bar'],
}
node = CrayNode(spec)
assert_match(node.status, 'idle', 'bad status')
assert_match(node.node_id, 1, 'bad nodeid')
assert_match(node.role, 'BATCH', 'bad role')
assert_match(node.attributes['architecture'], 'XT',
'bad architecture', is_match)
assert_match(node.segment_details, ['foo', 'bar'],
'bad segment')
assert_match(node.ALPS_status, 'UNKNOWN',
'bad default ALPS status')
assert 'alps-interactive' in node.RESOURCE_STATUSES, 'alps-interactive not in resource statuses'
def test_init_alps_states(self):
'''CrayNode: init alps states'''
cray_state_list = ['UP', 'DOWN', 'UNAVAILABLE', 'ROUTING', 'SUSPECT',
'ADMIN', 'UNKNOWN', 'UNAVAIL', 'SWDOWN', 'REBOOTQ',
'ADMINDOWN']
correct_alps_states = {'UP': 'idle', 'DOWN':'down', 'UNAVAILABLE':'down',
'ROUTING':'down', 'SUSPECT':'down', 'ADMIN':'down',
'UNKNOWN':'down', 'UNAVAIL': 'down', 'SWDOWN': 'down',
'REBOOTQ':'down', 'ADMINDOWN':'down'}
for state in cray_state_list:
self.spec['state'] = state
node = CrayNode(self.spec)
assert node.status == correct_alps_states[state], "%s should map to %s" % (node.status, correct_alps_states[state])
def test_non_cray_statuses(self):
'''CrayNode: can set cobalt-tracking statuses.'''
test_statuses = ['busy', 'cleanup-pending', 'allocated',
'alps-interactive']
for status in test_statuses:
self.base_node.status = status
assert_match(self.base_node.status, status, "failed validation")
class TestCraySystem(object):
@patch.object(AlpsBridge, 'init_bridge')
@patch.object(CraySystem, '_init_nodes_and_reservations', return_value=None)
@patch.object(CraySystem, '_run_update_state', return_value=None)
def setup(self, *args, **kwargs):
self.system = CraySystem()
self.base_spec = {'name':'test', 'state': 'UP', 'node_id': '1', 'role':'batch',
'architecture': 'XT', 'SocketArray':['foo', 'bar'],
'queues':['default'],
}
for i in range(1,6):
self.base_spec['name'] = "test%s" % i
self.base_spec['node_id'] = str(i)
node_dict=dict(self.base_spec)
self.system.nodes[str(i)] = CrayNode(node_dict)
self.system.node_name_to_id[node_dict['name']] = node_dict['node_id']
for node in self.system.nodes.values():
node.managed = True
self.system._gen_node_to_queue()
self.base_job = {'jobid':1, 'user':'crusher', 'attrs':{},
'queue':'default', 'nodes': 1,
}
def teardown(self):
del self.system
del self.base_job
def test_assemble_queue_data(self):
'''CraySystem._assemble_queue_data: base functionality'''
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 5, 'expected 5 got %s' % nodecount
assert sorted(nodelist) == ['1','2','3','4','5'], 'expected [1, 2, 3, 4, 5] got %s' % nodelist
def test_assemble_queue_data_bad_queue(self):
'''CraySystem._assemble_queue_data: return nothing if queue for job doesn't exist'''
self.base_job['queue'] = 'foo'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 0, 'Nonzero nodecount'
assert nodelist == [], 'nonempty nodelist'
def test_assemble_queue_data_multiple_queue(self):
'''CraySystem._assemble_queue_data: return only proper queue nodes'''
self.system.nodes['1'].queues = ['foo']
self.system.nodes['4'].queues = ['bar']
self.system._gen_node_to_queue()
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount'
assert sorted(nodelist) == ['2','3','5'], 'Wrong nodelist'
def test_assemble_queue_data_multiple_queue_overlap(self):
'''CraySystem._assemble_queue_data: return only proper queue nodes in overlaping queues'''
self.system.nodes['1'].queues = ['foo', 'default', 'bar']
self.system.nodes['4'].queues = ['default','bar']
self.system.nodes['5'].queues = ['baz']
self.system._gen_node_to_queue()
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 4, 'Wrong nodecount'
assert sorted(nodelist) == ['1','2','3','4'], 'Wrong nodelist'
self.base_job['queue'] = 'foo'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
assert nodelist == ['1'], 'Wrong nodelist'
self.base_job['queue'] = 'bar'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 2, 'Wrong nodecount'
assert sorted(nodelist) == ['1','4'], 'Wrong nodelist'
self.base_job['queue'] = 'baz'
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
assert nodelist == ['5'], 'Wrong nodelist'
def test_assemble_queue_data_non_idle(self):
'''CraySystem._assemble_queue_data: return only non-idle nodes'''
self.system.nodes['1'].status = 'busy'
self.system.nodes['4'].status = 'ADMINDOWN'
self.system._gen_node_to_queue()
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount'
assert sorted(nodelist) == ['2','3','5'], 'Wrong nodelist'
def test_assemble_queue_data_attrs_location(self):
'''CraySystem._assemble_queue_data: return only attr locaiton loc'''
self.base_job['attrs'] = {'location':'3'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
assert nodelist == ['3'], 'Wrong node in list %s' % nodelist
def test_assemble_queue_data_attrs_location_repeats(self):
'''CraySystem._assemble_queue_data: eliminate repeat location entries'''
self.base_job['attrs'] = {'location':'1,1,2,3'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 3, 'Wrong nodecount got %s expected 3' % nodecount
assert sorted(nodelist) == ['1', '2', '3'], 'Wrong node in list %s' % nodelist
@raises(ValueError)
def test_assemble_queue_data_attrs_bad_location(self):
'''CraySystem._assemble_queue_data: raise error for location completely outside of
queue'''
self.base_job['attrs'] = {'location':'6'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount'
assert nodelist == ['3'], 'Wrong node in list %s' % nodelist
def test_assemble_queue_data_attrs_location_multi(self):
'''CraySystem._assemble_queue_data: return only attr locaiton complex loc string'''
self.base_job['attrs'] = {'location':'1-3,5'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 4, 'Wrong nodecount'
assert sorted(nodelist) == ['1','2','3','5'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_forbidden_loc(self):
'''CraySystem._assemble_queue_data: avoid reserved nodes'''
self.base_job['forbidden'] = ['1-3','5']
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount %s' % nodecount
assert sorted(nodelist) == ['4'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_forbidden_loc_attrs_loc(self):
'''CraySystem._assemble_queue_data: avoid reserved nodes despite location being set'''
self.base_job['forbidden'] = ['1-3']
self.base_job['attrs'] = {'location':'1-4'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 1, 'Wrong nodecount %s' % nodecount
assert sorted(nodelist) == ['4'], 'Wrong nodes in list %s' % nodelist
def test_assemble_queue_data_forbidden_loc_attrs_loc_complete(self):
'''CraySystem._assemble_queue_data: avoid reserved nodes block location if superset'''
self.base_job['forbidden'] = ['1-3']
self.base_job['attrs'] = {'location':'1-3'}
nodecount, nodelist = self.system._assemble_queue_data(self.base_job,
self.system._idle_nodes_by_queue())
assert nodecount == 0, 'Wrong nodecount %s' % nodecount
assert sorted(nodelist) == [], 'Wrong nodes in list %s' % nodelist