Commit 2da8f6d4 authored by Paul Rich's avatar Paul Rich
Browse files

nodeadm -l and --queues now supported

This also necessitated adding in queues at a basic level.  Muliple
queues now supported.  Orthogonal queueus appear to be working
correctly.  Nodeadm now lists nodes and can set queues on a list of
nids.
parent b9298214
......@@ -2,7 +2,7 @@
"""
nodeadm - Nodeadm is the administrative interface for cluster systems
Usage: %prog [-l] [--down part1 part2] [--up part1 part2]"
Usage: %prog [flags] [part1 part2...]"
version: "%prog " + __revision__ + , Cobalt + __version__
OPTIONS DEFINITIONS:
......@@ -12,6 +12,7 @@ OPTIONS DEFINITIONS:
'--up',dest='up',help='mark nodes as up (even if allocated)',action='store_true'
'--queue',action='store', dest='queue', help='set queue associations'
'-l','--list_nstates',action='store_true', dest='list_nstates', help='list the node states'
'-b','--list_details',action='store_true', dest='list_details', help='list detalied information for specified nodes'
"""
import logging
......@@ -20,7 +21,8 @@ from Cobalt import client_utils
from Cobalt.client_utils import cb_debug
import time
from Cobalt.arg_parser import ArgParse
from Cobalt.Util import compact_num_list, expand_num_list
import itertools
__revision__ = ''
__version__ = ''
......@@ -43,12 +45,13 @@ def validate_args(parser):
impl = client_utils.component_call(SYSMGR, False, 'get_implementation', ())
# make sure we're on a cluster-system
if "cluster_system" != impl:
if impl not in ['cluster_system', 'alps_system']:
client_utils.logger.error("nodeadm is only supported on cluster systems. Try partlist instead.")
sys.exit(0)
# Check mutually exclusive options
mutually_exclusive_option_lists = [['down', 'up', 'list_nstates', 'queue']]
mutually_exclusive_option_lists = [['down', 'up', 'list_nstates',
'list_details', 'queue']]
if opt_count > 1:
client_utils.validate_conflicting_options(parser, mutually_exclusive_option_lists)
......@@ -79,36 +82,84 @@ def main():
opt = parser.options
args = parser.args
if opt.down:
delta = client_utils.component_call(SYSMGR, False, 'nodes_down', (args, whoami))
client_utils.logger.info("nodes marked down:")
for d in delta:
client_utils.logger.info(" %s" % d)
client_utils.logger.info("")
client_utils.logger.info("unknown nodes:")
for a in args:
if a not in delta:
client_utils.logger.info(" %s" % a)
elif opt.up:
delta = client_utils.component_call(SYSMGR, False, 'nodes_up', (args, whoami))
client_utils.logger.info("nodes marked up:")
for d in delta:
client_utils.logger.info(" %s" % d)
client_utils.logger.info('')
client_utils.logger.info("nodes that weren't in the down list:")
for a in args:
if a not in delta:
client_utils.logger.info(" %s" %a)
elif opt.list_nstates:
header, output = client_utils.cluster_display_node_info()
client_utils.printTabular(header + output)
elif opt.queue:
data = client_utils.component_call(SYSMGR, False, 'set_queue_assignments', (opt.queue, args, whoami))
client_utils.logger.info(data)
#get type of system, Cray systems handled differently
impl = client_utils.component_call(SYSMGR, False, 'get_implementation', ())
if impl in ['alps_system']:
updates = {}
if opt.list_nstates:
nodes = client_utils.component_call(SYSMGR, False, 'get_nodes',
(True,))
if len(nodes) > 0:
header = ['Node_id', 'Name', 'Queues', 'status']
print_nodes = []
for node in nodes.values():
entry = []
for key in header:
entry.append(node[key.lower()])
print_nodes.append(entry)
client_utils.printTabular([header] + print_nodes)
else:
client_utils.logger.info('System has no nodes defined')
return
if opt.list_details:
#list details and bail
return
update_type = 'ERROR'
if opt.down:
update_type = 'node down'
updates['down'] = True
elif opt.up:
update_type = 'node up'
updates['up'] = True
elif opt.queue:
update_type = 'queue'
updates['queues'] = opt.queue
print args
#expand arguments
exp_arg_list = []
for arg in args:
exp_arg_list.extend(expand_num_list(arg))
print exp_arg_list
mod_nodes = client_utils.component_call(SYSMGR, False, 'update_nodes',
(updates, exp_arg_list, whoami))
client_utils.logger.info('Update %s applied to nodes %s', update_type,
compact_num_list(mod_nodes))
else:
if opt.down:
delta = client_utils.component_call(SYSMGR, False, 'nodes_down', (args, whoami))
client_utils.logger.info("nodes marked down:")
for d in delta:
client_utils.logger.info(" %s" % d)
client_utils.logger.info("")
client_utils.logger.info("unknown nodes:")
for a in args:
if a not in delta:
client_utils.logger.info(" %s" % a)
elif opt.up:
delta = client_utils.component_call(SYSMGR, False, 'nodes_up', (args, whoami))
client_utils.logger.info("nodes marked up:")
for d in delta:
client_utils.logger.info(" %s" % d)
client_utils.logger.info('')
client_utils.logger.info("nodes that weren't in the down list:")
for a in args:
if a not in delta:
client_utils.logger.info(" %s" %a)
elif opt.list_nstates:
header, output = client_utils.cluster_display_node_info()
client_utils.printTabular(header + output)
elif opt.queue:
data = client_utils.component_call(SYSMGR, False, 'set_queue_assignments', (opt.queue, args, whoami))
client_utils.logger.info(data)
if __name__ == '__main__':
try:
main()
......
......@@ -36,7 +36,7 @@ def init_bridge():
children should be considered invalid and purged.
'''
forker = ComponentProxy(FORKER, defer=False)
forker = ComponentProxy(FORKER, defer=True)
try:
stale_children = forker.get_children('apbridge', None)
forker.cleanup_children([int(child['id']) for child in stale_children])
......@@ -70,8 +70,7 @@ def reserve(user, jobid, nodecount, attributes=None, node_id_list=None):
if val is not None:
params[key] = val
if node_id_list is not None:
params['node_id_list'] = compact_num_list(node_id_list)
print str(BasilRequest('RESERVE', params=params))
params['node_list'] = [int(i) for i in node_id_list]
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RESERVE',
params=params)))
print str(retval)
......
......@@ -18,6 +18,8 @@ from Cobalt.DataTypes.ProcessGroup import ProcessGroup
from Cobalt.Util import compact_num_list, expand_num_list
from Cobalt.Util import init_cobalt_config, get_config_option
_logger = logging.getLogger(__name__)
init_cobalt_config()
......@@ -53,6 +55,17 @@ class CraySystem(BaseSystem):
start_time = time.time()
super(CraySystem, self).__init__(*args, **kwargs)
_logger.info('BASE SYSTEM INITIALIZED')
self._common_init_restart()
_logger.info('ALPS SYSTEM COMPONENT READY TO RUN')
_logger.info('Initilaization complete in %s sec.', (time.time() -
start_time))
def _common_init_restart(self, spec=None):
'''Common routine for cold and restart intialization of the system
component.
'''
#initilaize bridge.
bridge_pending = True
while bridge_pending:
# purge stale children from prior run. Also ensure the
......@@ -61,19 +74,13 @@ class CraySystem(BaseSystem):
ALPSBridge.init_bridge()
except ALPSBridge.BridgeError:
_logger.error('Bridge Initialization failed. Retrying.')
Cobalt.Util.sleep(10)
except ComponentLookupError:
_logger.warning('Error reaching forker. Retrying.')
Cobalt.Util.sleep(10)
else:
bridge_pending = False
_logger.info('BRIDGE INITIALIZED')
self._common_init_restart()
_logger.info('ALPS SYSTEM COMPONENT READY TO RUN')
_logger.info('Initilaization complete in %s sec.', (time.time() -
start_time))
def _common_init_restart(self, spec=None):
'''Common routine for cold and restart intialization of the system
component.
'''
#process manager setup
if spec is None:
self.process_manager = ProcessGroupManager()
......@@ -88,6 +95,10 @@ class CraySystem(BaseSystem):
if spec is not None:
self.alps_reservations = spec['alps_reservations']
self._init_nodes_and_reservations()
if spec is not None:
node_info = spec.get('node_info', {})
for nid, node in node_info.items():
self.nodes[nid].queues = node.queues
self.nodes_by_queue = {} #queue:[node_ids]
#populate initial state
#state update thread and lock
......@@ -107,23 +118,13 @@ class CraySystem(BaseSystem):
state['alps_system_statefile_version'] = 1
state['process_manager'] = self.process_manager.__getstate__()
state['alps_reservations'] = self.alps_reservations
state['node_info'] = self.nodes
return state
def __setstate__(self, state):
start_time = time.time()
super(CraySystem, self).__setstate__(state)
_logger.info('BASE SYSTEM INITIALIZED')
bridge_pending = True
while bridge_pending:
# purge stale children from prior run. Also ensure the
# system_script_forker is currently up.
try:
ALPSBridge.init_bridge()
except ALPSBridge.BridgeError:
_logger.error('Bridge Initialization failed. Retrying.')
else:
bridge_pending = False
_logger.info('BRIDGE INITIALIZED')
self._common_init_restart(state)
_logger.info('ALPS SYSTEM COMPONENT READY TO RUN')
_logger.info('Reinitilaization complete in %s sec.', (time.time() -
......@@ -139,7 +140,18 @@ class CraySystem(BaseSystem):
'''Initialize nodes from ALPS bridge data'''
retnodes = {}
inventory = ALPSBridge.fetch_inventory(resinfo=True)
pending = True
while pending:
try:
inventory = ALPSBridge.fetch_inventory(resinfo=True)
except Exception:
#don't crash out here. That may trash a statefile.
_logger.error('Possible transient encountered during initialization. Retrying.',
err_info=True)
Cobalt.Util.sleep(10)
else:
pending = False
for nodespec in inventory['nodes']:
node = CrayNode(nodespec)
node.managed = True
......@@ -156,14 +168,15 @@ class CraySystem(BaseSystem):
#help here.
def _gen_node_to_queue(self):
'''Generate a mapping for fast lookup of node-id's to queues.'''
'''(Re)Generate a mapping for fast lookup of node-id's to queues.'''
with self._node_lock:
self.nodes_by_queue = {}
for node in self.nodes.values():
for queue in node.queues:
if queue in self.nodes_by_queue.keys():
self.nodes_by_queue[queue].append(node.node_id)
self.nodes_by_queue[queue].add(node.node_id)
else:
self.nodes_by_queue[queue] = [node.node_id]
self.nodes_by_queue[queue] = set([node.node_id])
@exposed
......@@ -212,8 +225,13 @@ class CraySystem(BaseSystem):
#nodes. If there is no resource reservation and the node is not in
#current alps reservations, the node is ready to schedule again.
with self._node_lock:
inventory = ALPSBridge.fetch_inventory(resinfo=True) #This is a full-refresh,
#determine if summary may be used under normal operation
try:
inventory = ALPSBridge.fetch_inventory(resinfo=True) #This is a full-refresh,
#determine if summary may be used under normal operation
except (ALPSBridge.ALPSError, ComponentLookupError):
_logger.warning('Error contacting ALPS for state update. Aborting this update',
exc_info=True)
return
inven_nodes = inventory['nodes']
inven_reservations = inventory['reservations']
start_time = time.time()
......@@ -318,7 +336,6 @@ class CraySystem(BaseSystem):
equiv = []
node_active_queues = []
self.current_equivalence_classes = []
self.nodes_by_queues = {}
#reverse mapping of queues to nodes
for node in self.nodes.values():
if node.managed and node.schedulable:
......@@ -407,8 +424,8 @@ class CraySystem(BaseSystem):
in this case post job startup.
'''
#TODO: add reservation handling
#TODO: make not first-fit
#TODO: add queue constraints.
now = time.time()
resource_until_time = now + TEMP_RESERVATION_TIME
with self._node_lock:
......@@ -420,7 +437,11 @@ class CraySystem(BaseSystem):
best_match = {} #jobid: list(locations)
current_idle_nodecount = idle_nodecount
for job in arg_list:
node_id_list = self.nodes_by_queue[job['queue']]
if not job['queue'] in self.nodes_by_queue.keys():
# Either a new queue with no resources, or a possible
# reservation need to do extra work for a reservation
continue
node_id_list = list(self.nodes_by_queue[job['queue']])
if 'location' in job['attrs'].keys():
job_set = set([int(nid) for nid in
expand_num_list(job['attrs']['location'])])
......@@ -430,6 +451,11 @@ class CraySystem(BaseSystem):
else:
_logger.warning('Job %s: requesting locations that are not in queue for that job.',
job['jobid'])
continue
if int(job['nodes']) > len(node_id_list):
_logger.warning('Job %s: requested nodecount of %s exceeds number of nodes in queue %s',
job['jobid'], job['nodes'], len(node_id_list))
continue
if idle_nodecount == 0:
break
......@@ -471,8 +497,13 @@ class CraySystem(BaseSystem):
'''
attrs = job['attrs']
res_info = ALPSBridge.reserve(job['user'], job['jobid'],
try:
res_info = ALPSBridge.reserve(job['user'], job['jobid'],
int(job['nodes']), job['attrs'], node_id_list)
except ALPSBridge.ALPSError as exc:
_logger.warning('unable to reserve resources from ALPS: %s',
exc.message)
return None
new_alps_res = None
if res_info is not None:
new_alps_res = ALPSReservation(job, res_info, self.nodes)
......@@ -690,6 +721,42 @@ class CraySystem(BaseSystem):
#specification looks like.
return spec
@exposed
def update_nodes(self, updates, node_list, user):
'''Apply update to a node's status from an external client.
Updates apply to all nodes. User is for logging purposes.
node_list should be a list of nodeids from the cray system
Hold the node lock while doing this.
Force a status update while doing this operation.
'''
mod_nodes = []
with self._node_lock:
for node_id in node_list:
node = self.nodes[str(node_id)]
try:
if updates.get('down', False):
node.status = 'down'
elif updates.get('up', False):
node.status = 'idle'
elif updates.get('queues', None):
node.queues = list(updates['queues'].split(':'))
except Exception:
_logger.error("Unexpected exception encountered!", exc_info=True)
else:
mod_nodes.append(node_id)
if updates.get('queues', False):
self._gen_node_to_queue()
if mod_nodes != []:
self.update_node_state()
_logger.info('Updates %s applied to nodes %s by %s', updates,
compact_num_list(mod_nodes), user)
return mod_nodes
class ALPSReservation(object):
'''Container for ALPS Reservation information. Can be used to update
reservations and also internally relases reservation.
......
......@@ -87,7 +87,7 @@ class BaseSystem(Component):
raise NotImplementedError
@exposed
def update_nodes(self, updates):
def update_nodes(self, updates, node_list, user):
raise NotImplementedError
class ResourceManager(object):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment