Commit 9dc323cc authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch 'cray-merge-clients' into 'develop'

Client changes and component invoker additions.

Client changes and updates for Cray support and display.  There are also performance improvements to nodelist/nodeadm -l for Cray systems and a major performance improvement to setres.

See merge request !15
parents 29e33e40 b6824f8d
......@@ -68,6 +68,7 @@ from Cobalt.arg_parser import ArgParse
from Cobalt.Util import get_config_option, init_cobalt_config, sleep
from Cobalt.Proxy import ComponentProxy
import xmlrpclib
import subprocess
__revision__ = '$Revision: 559 $'
__version__ = '$Version$'
......@@ -83,7 +84,7 @@ def on_interrupt(sig, func=None):
"""
Interrupt Handler to cleanup the interactive job if the user interrupts
Will contain two static variables: count and exit.
'count' will keep track how many interruptions happened and
'count' will keep track how many interruptions happened and
'exit' flags whether we completely exit once the interrupt occurs.
"""
on_interrupt.count += 1
......@@ -368,6 +369,29 @@ def parse_options(parser, spec, opts, opt2spec, def_spec):
opts['disable_preboot'] = not spec['script_preboot']
return opt_count
def fetch_pgid(user, jobid, loc, pgid=None):
'''fetch and set pgid for user shell. Needed for cray systems'''
if client_utils.component_call(SYSMGR, False, 'get_implementation', ()) == 'alps_system':
#Cray is apparently using the session id for interactive jobs.
spec = [{'user': user, 'jobid': jobid, 'pgid': pgid, 'location':loc}]
if not client_utils.component_call(SYSMGR, False, 'confirm_alps_reservation', (spec)):
client_utils.logger.error('Unable to confirm ALPS reservation. Exiting.')
sys.exit(1)
return
def exec_user_shell(user, jobid, loc):
'''Execute shell for user for interactive jobs. Uses the user's defined
SHELL. Will also send the pgid for cray systems so that aprun will work
from within the shell.
Wait until termination.
'''
pgid = os.getppid()
proc = subprocess.Popen([os.environ['SHELL']], shell=True,
preexec_fn=(lambda: fetch_pgid(user, jobid, loc, pgid=pgid)))
os.waitpid(proc.pid, 0)
def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
"""
This will create the shell or ssh session for user
......@@ -376,7 +400,6 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
# save whether we are running on a cluster system
impl = client_utils.component_call(SYSMGR, False, 'get_implementation', ())
exit_on_interrupt()
deljob = True if impl == "cluster_system" else False
def start_session(loc, resid, nodes, procs):
......@@ -396,11 +419,14 @@ def run_interactive_job(jobid, user, disable_preboot, nodes, procs):
client_utils.logger.info("Opening interactive session to %s", loc)
if deljob:
os.system("/usr/bin/ssh -o \"SendEnv COBALT_NODEFILE COBALT_JOBID\" %s" % (loc))
if impl == 'alps_system':
exec_user_shell(user, jobid, loc)
else:
os.system(os.environ['SHELL'])
# Wait for job to start
query = [{'tag':'job', 'jobid':jobid, 'location':'*', 'state':"*", 'resid':"*"}]
query = [{'tag':'job', 'jobid':jobid, 'location':'*', 'state':"*",
'resid':"*"}]
client_utils.logger.info("Wait for job %s to start...", str(jobid))
while True:
......
......@@ -156,12 +156,12 @@ def getq(info):
if que['mintime'] is not None:
que['mintime'] = "%02d:%02d:00" % (divmod(int(que.get('mintime')), 60))
header = [('Queue', 'Users', 'Groups', 'MinTime', 'MaxTime', 'MaxRunning',
'MaxQueued', 'MaxUserNodes', 'MaxNodeHours', 'TotalNodes',
'AdminEmail', 'State', 'Cron', 'Policy', 'Priority')]
'MaxTotalJobs', 'MaxQueued', 'MaxUserNodes', 'MaxNodeHours',
'TotalNodes', 'AdminEmail', 'State', 'Cron', 'Policy', 'Priority')]
datatoprint = [(que['name'], que['users'], que['groups'],
que['mintime'], que['maxtime'],
que['maxrunning'], que['maxqueued'],
que['maxusernodes'], que['maxnodehours'],
que['maxrunning'], que['maxtotaljobs'], que['maxqueued'],
que['maxusernodes'], que['maxnodehours'],
que['totalnodes'],
que['adminemail'], que['state'],
que['cron'], que['policy'], que['priority'])
......@@ -192,7 +192,7 @@ def process_cqadm_options(jobs, parser, spec, user):
force = parser.options.force # force flag.
info = [{'tag':'queue', 'name':'*', 'state':'*', 'users':'*', 'groups':'*', 'maxtime':'*', 'mintime':'*', 'maxuserjobs':'*',
'maxusernodes':'*', 'maxqueued':'*', 'maxrunning':'*', 'maxnodehours':'*', 'adminemail':'*',
'maxusernodes':'*', 'maxqueued':'*', 'maxrunning':'*','maxtotaljobs':'*', 'maxnodehours':'*', 'adminemail':'*',
'totalnodes':'*', 'cron':'*', 'policy':'*', 'priority':'*'}]
response = []
......
......@@ -2,7 +2,7 @@
"""
nodeadm - Nodeadm is the administrative interface for cluster systems
Usage: %prog [-l] [--down part1 part2] [--up part1 part2]"
Usage: %prog [flags] [part1 part2...]"
version: "%prog " + __revision__ + , Cobalt + __version__
OPTIONS DEFINITIONS:
......@@ -12,6 +12,7 @@ OPTIONS DEFINITIONS:
'--up',dest='up',help='mark nodes as up (even if allocated)',action='store_true'
'--queue',action='store', dest='queue', help='set queue associations'
'-l','--list_nstates',action='store_true', dest='list_nstates', help='list the node states'
'-b','--list_details',action='store_true', dest='list_details', help='list detalied information for specified nodes'
"""
import logging
......@@ -20,7 +21,8 @@ from Cobalt import client_utils
from Cobalt.client_utils import cb_debug
import time
from Cobalt.arg_parser import ArgParse
from Cobalt.Util import compact_num_list, expand_num_list
import itertools
__revision__ = ''
__version__ = ''
......@@ -43,12 +45,13 @@ def validate_args(parser):
impl = client_utils.component_call(SYSMGR, False, 'get_implementation', ())
# make sure we're on a cluster-system
if "cluster_system" != impl:
if impl not in ['cluster_system', 'alps_system']:
client_utils.logger.error("nodeadm is only supported on cluster systems. Try partlist instead.")
sys.exit(0)
# Check mutually exclusive options
mutually_exclusive_option_lists = [['down', 'up', 'list_nstates', 'queue']]
mutually_exclusive_option_lists = [['down', 'up', 'list_nstates',
'list_details', 'queue']]
if opt_count > 1:
client_utils.validate_conflicting_options(parser, mutually_exclusive_option_lists)
......@@ -79,35 +82,71 @@ def main():
opt = parser.options
args = parser.args
if opt.down:
delta = client_utils.component_call(SYSMGR, False, 'nodes_down', (args, whoami))
client_utils.logger.info("nodes marked down:")
for d in delta:
client_utils.logger.info(" %s" % d)
client_utils.logger.info("")
client_utils.logger.info("unknown nodes:")
for a in args:
if a not in delta:
client_utils.logger.info(" %s" % a)
elif opt.up:
delta = client_utils.component_call(SYSMGR, False, 'nodes_up', (args, whoami))
client_utils.logger.info("nodes marked up:")
for d in delta:
client_utils.logger.info(" %s" % d)
client_utils.logger.info('')
client_utils.logger.info("nodes that weren't in the down list:")
for a in args:
if a not in delta:
client_utils.logger.info(" %s" %a)
elif opt.list_nstates:
header, output = client_utils.cluster_display_node_info()
client_utils.printTabular(header + output)
elif opt.queue:
data = client_utils.component_call(SYSMGR, False, 'set_queue_assignments', (opt.queue, args, whoami))
client_utils.logger.info(data)
#get type of system, Cray systems handled differently
impl = client_utils.component_call(SYSMGR, False, 'get_implementation', ())
if impl in ['alps_system']:
updates = {}
if opt.list_nstates:
client_utils.print_node_list()
return
if opt.list_details:
# list details and bail
# get list from arguments. Currently assuing a comma separated,
# hyphen-condensed nodelist
client_utils.print_node_details(args)
return
update_type = 'ERROR'
if opt.down:
update_type = 'node down'
updates['down'] = True
elif opt.up:
update_type = 'node up'
updates['up'] = True
elif opt.queue:
update_type = 'queue'
updates['queues'] = opt.queue
mod_nodes = client_utils.component_call(SYSMGR, False, 'update_nodes',
(updates, client_utils.expand_node_args(args), whoami))
client_utils.logger.info('Update %s applied to nodes %s', update_type,
compact_num_list(mod_nodes))
else:
if opt.down:
delta = client_utils.component_call(SYSMGR, False, 'nodes_down', (args, whoami))
client_utils.logger.info("nodes marked down:")
for d in delta:
client_utils.logger.info(" %s" % d)
client_utils.logger.info("")
client_utils.logger.info("unknown nodes:")
for a in args:
if a not in delta:
client_utils.logger.info(" %s" % a)
elif opt.up:
delta = client_utils.component_call(SYSMGR, False, 'nodes_up', (args, whoami))
client_utils.logger.info("nodes marked up:")
for d in delta:
client_utils.logger.info(" %s" % d)
client_utils.logger.info('')
client_utils.logger.info("nodes that weren't in the down list:")
for a in args:
if a not in delta:
client_utils.logger.info(" %s" %a)
elif opt.list_nstates:
header, output = client_utils.cluster_display_node_info()
client_utils.printTabular(header + output)
elif opt.queue:
data = client_utils.component_call(SYSMGR, False, 'set_queue_assignments', (opt.queue, args, whoami))
client_utils.logger.info(data)
if __name__ == '__main__':
try:
......
......@@ -8,6 +8,7 @@ version: "%prog " + __revision__ + , Cobalt + __version__
OPTIONS DEFINITIONS: None
'-d','--debug',dest='debug',help='turn on communication debugging',callback=cb_debug
'-b','--list_details',action='store_true', dest='list_details', help='list detalied information for specified nodes'
'--noheader',dest='noheader',action='store_true',help='disable display of header information'
"""
......@@ -24,9 +25,6 @@ __version__ = 'TBD'
SYSMGR = client_utils.SYSMGR
def main():
"""
qmove main
"""
# setup logging for client. The clients should call this before doing anything else.
client_utils.setup_logging(logging.INFO)
......@@ -44,17 +42,29 @@ def main():
# Set required default values: None
parser.parse_it() # parse the command line
opt = parser.options
args = parser.args
if not parser.no_args():
client_utils.logger.error("No arguments needed")
#if not parser.no_args():
# client_utils.logger.error("No arguments needed")
impl = client_utils.component_call(SYSMGR, False, 'get_implementation', ())
# make sure we're on a cluster-system
if "cluster_system" != impl:
if impl not in ['cluster_system', 'alps_system']:
client_utils.logger.error("nodelist is only supported on cluster systems. Try partlist instead.")
sys.exit(0)
if impl == 'alps_system':
if opt.list_details:
# get list from arguments. Currently assuing a comma separated,
# hyphen-condensed nodelist
client_utils.print_node_details(args)
else:
client_utils.print_node_list()
return
header, output = client_utils.cluster_display_node_info()
if parser.options.noheader is not None:
......
......@@ -36,7 +36,7 @@ import sys
import time
from Cobalt import client_utils
from Cobalt.client_utils import cb_debug, cb_time, cb_date, cb_passthrough, cb_res_users
from Cobalt.Util import expand_num_list, compact_num_list
from Cobalt.arg_parser import ArgParse
__revision__ = '$Id: setres.py 2154 2011-05-25 00:22:56Z richp $'
......@@ -82,17 +82,45 @@ def verify_locations(partitions):
"""
verify that partitions are valid
"""
for p in partitions:
test_parts = client_utils.component_call(SYSMGR, False, 'verify_locations', (partitions,))
if len(test_parts) != len(partitions):
missing = [p for p in partitions if p not in test_parts]
client_utils.logger.error("Missing partitions: %s" % (" ".join(missing)))
sys.exit(1)
check_partitions = partitions
system_type = client_utils.component_call(SYSMGR, False, 'get_implementation', ())
# if we have args then verify the args (partitions)
if system_type in ['alps_system']:
# nodes come in as a compact list. expand this.
check_partitions = []
# if we're not a compact list, convert to a compact list. Get this,
# ideally, in one call
for num_list in partitions:
check_partitions.extend(expand_num_list(num_list))
test_parts = client_utils.component_call(SYSMGR, False,
'verify_locations', (check_partitions,))
# On Cray we will be a little looser to make setting reservations
# easier.
client_utils.logger.info('Found Nodes: %s', compact_num_list(test_parts))
missing_nodes = set(check_partitions) - set(test_parts)
if len(missing_nodes) != 0:
# TODO: relax this, we should allow for this to occur, but
# reservation-queue data amalgamation will need a fix to get
# this to work. --PMR
client_utils.logger.error("Missing partitions: %s" % (",".join([str(nid) for nid in missing_nodes])))
client_utils.logger.error("Aborting reservation setup.")
sys.exit(1)
#sys.exit(1)
else:
for p in check_partitions:
test_parts = client_utils.component_call(SYSMGR, False,
'verify_locations', (check_partitions,))
if len(test_parts) != len(check_partitions):
missing = [p for p in check_partitions if p not in test_parts]
client_utils.logger.error("Missing partitions: %s" % (" ".join(missing)))
sys.exit(1)
def validate_args(parser,spec,opt_count):
"""
Validate setres arguments. Will return true if we want to continue processing options.
"""
system_type = client_utils.component_call(SYSMGR, False, 'get_implementation', ())
if parser.options.partitions != None:
parser.args += [part for part in parser.options.partitions.split(':')]
......@@ -111,7 +139,7 @@ def validate_args(parser,spec,opt_count):
if only_id_change:
# make the ID change and we are done with setres
if parser.options.res_id != None:
set_res_id(parser)
if parser.options.cycle_id != None:
......@@ -141,18 +169,26 @@ def validate_args(parser,spec,opt_count):
client_utils.logger.error("Cannot use -D while changing start or cycle time")
sys.exit(1)
# if we have args then verify the args (partitions)
if not parser.no_args():
verify_locations(parser.args)
# if we have command line arguments put them in spec
if not parser.no_args(): spec['partitions'] = ":".join(parser.args)
if system_type in ['alps_system']:
if not parser.no_args():
nodes = []
for arg in parser.args:
nodes.extend(expand_num_list(arg))
compact_nodes = compact_num_list(nodes)
verify_locations([compact_nodes])
spec['partitions'] = compact_nodes
else:
if not parser.no_args():
verify_locations(parser.args)
if not parser.no_args(): spec['partitions'] = ":".join(parser.args)
continue_processing_options = True # continue, setres is not done.
return continue_processing_options
def modify_reservation(parser):
"""
this will handle reservation modifications
......
#!/usr/bin/env python
'''Launcher for alps_script_forkers. If we are running multiple on a node,
this will spawn the actual forkers as subprocesses.
'''
__revision__ = '$Revision: $'
import sys
from Cobalt.Components.alps_script_forker import ALPSScriptForker
from Cobalt.Components.base import run_component
#from Cobalt.Util import init_cobalt_config, get_config_option
if __name__ == "__main__":
# state_name = 'alps_script_forker'
# if '--name' in sys.argv:
# state_name = sys.argv[sys.argv.index('--name') + 1]
seq_num = 0
if '--seq' in sys.argv:
seq_num = sys.argv[sys.argv.index('--seq') + 1]
sys.argv.remove('--seq')
sys.argv.remove(seq_num)
seq_num = int(seq_num)
try:
run_component(ALPSScriptForker, single_threaded=True,
aug_comp_name=True, state_name_match_component=True,
seq_num=seq_num)
except KeyboardInterrupt:
sys.exit(1)
#!/usr/bin/env python
# $Id$
from Cobalt.Components.system.CraySystem import CraySystem
from Cobalt.Components.base import run_component
if __name__ == "__main__":
try:
run_component(CraySystem, register=True, state_name="alpssystem")
except KeyboardInterrupt:
pass
......@@ -16,6 +16,7 @@ import ConfigParser
import re
import logging
import time
import json
import Cobalt.Util
from Cobalt.Proxy import ComponentProxy
......@@ -1273,3 +1274,132 @@ def cluster_display_node_info():
output.append([host_name, ":".join(queues), status, backfill_time])
return header, output
def _extract_res_node_ranges(res_nodes):
nids = []
for nidlist in res_nodes:
nids.extend(Cobalt.Util.expand_num_list(nidlist))
return nids
def _setup_res_info():
'''set up the reservation-to-node info so showres shows associated
reservation queues when active.
'''
reservations = component_call(SCHMGR, False, 'get_reservations',
([{'queue':'*', 'partitions':'*', 'active':True}],))
res_queues = {}
for res in reservations:
res_nodes = [str(nid) for nid in
_extract_res_node_ranges(res['partitions'].split(':'))]
for node in res_nodes:
if res_queues.get(node, []) == []:
res_queues[node] = [res['queue']]
else:
res_queues[node].append(res['queue'])
return res_queues
def sec_to_human_time(raw_time):
'''convert raw seconds into a D:HH:MM:SS format for pretty prinitng'''
if raw_time <= 0:
return "00:00"
m, s = divmod(raw_time, 60)
h, m = divmod(m, 60)
d, h = divmod(h, 24)
if d > 0:
return '%d:%02d:%02d:%02d' % (d, h, m, s)
elif h > 0:
return '%d:%02d:%02d' % (h, m, s)
else:
return '%d:%02d' % (m, s)
def print_node_list():
'''fetch and print a list of node information with default headers'''
header = ['Node_id', 'Name', 'Queues', 'Status', 'Backfill']
header_aliases = {'Backfill': 'drain_until'}
fetch_header = list(header)
for headding in fetch_header:
if headding in header_aliases.keys():
fetch_header[fetch_header.index(headding)] = header_aliases[headding]
print fetch_header
nodes = json.loads(component_call(SYSMGR, False, 'get_nodes',
(True, None, fetch_header, True)))
#TODO: Allow headers to be specified by the user in the future.
if 'queues' in [h.lower() for h in header]:
res_queues = _setup_res_info()
now = int(time.time())
if len(nodes) > 0:
print_nodes = []
for node in nodes.values():
entry = []
for key in fetch_header:
if key.lower() == 'node_id':
entry.append(int(node[key.lower()]))
elif key.lower() in ['drain_until']:
if node[key.lower()] is not None:
entry.append(sec_to_human_time(node[key.lower()] - now))
else:
entry.append('-')
elif key.lower() == 'queues':
queues = node[key.lower()]
if res_queues.get(str(node['node_id']), False):
queues.extend(res_queues[str(node['node_id'])])
entry.append(':'.join(queues))
else:
entry.append(node[key.lower()])
print_nodes.append(entry)
printTabular([header] + sorted(print_nodes))
else:
logger.info('System has no nodes defined')
def print_node_details(args):
'''fetch and print a detailed view of node information
args - list of nodes to fetch detailed information on.
'''
def gen_printable_value(value):
if isinstance(value, dict):
retval = ', '.join(['%s: %s'% (k, gen_printable_value(v)) for k, v in
value.iteritems()])
elif isinstance(value, list):
retval = ', '.join([gen_printable_value(v) for v in value])
else:
retval = str(value)
return retval
nodes = component_call(SYSMGR, False, 'get_nodes',
(True, expand_node_args(args)))
res_queues = _setup_res_info()
for node in nodes.values():
header_list = []
value_list = []
header_list.append('node_id')
value_list.append(node['node_id'])
for key, value in node.iteritems():
if key == 'node_id':
pass
elif key == 'queues':
header_list.append(key)
queues = node[key.lower()]
if res_queues.get(str(node['node_id']), False):
queues.extend(res_queues[str(node['node_id'])])
value_list.append(':'.join(queues))
else:
header_list.append(key)
value_list.append(gen_printable_value(value))
print_vertical([header_list, value_list])
return
def expand_node_args(arg_list):
'''expand a comma-separated, hyphen-condensed list'''
exp_arg_list = []
for arg in arg_list:
exp_arg_list.extend(Cobalt.Util.expand_num_list(arg))
return exp_arg_list
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment