Commit 7d144226 authored by Paul Rich's avatar Paul Rich
Browse files

New system component for ALPS for Cray Port

parent cb0ce41e
"""Bridge for communicating and fetching system state in ALPS."""
#this requires forking off a apbasil process, sending XML to stdin and
#parsing XML from stdout. Synchyronous, however, the system script
#forker isn't. These will be able to block for now.
import logging
import xml.etree
from cray_messaging import InvalidBasilMethodError, BasilRequest
from cray_messaging import parse_response, ALPSError
from Cobalt.Proxy import ComponentProxy
from Cobalt.Data import IncrID
from Cobalt.Util import sleep
from Cobalt.Util import init_cobalt_config, get_config_option
from Cobalt.Util import compact_num_list, expand_num_list
_logger = logging.getLogger()
init_cobalt_config()
FORKER = get_config_option('alps', 'forker', 'system_script_forker')
BASIL_PATH = get_config_option('alps', 'basil',
'/home/richp/alps-simulator/apbasil.sh')
_RUNID_GEN = IncrID()
CHILD_SLEEP_TIMEOUT = float(get_config_option('alps', 'child_sleep_timeout',
1.0))
DEFAULT_DEPTH = int(get_config_option('alps', 'default_depth', 72))
class BridgeError(Exception):
'''Exception class so that we may easily recognize bridge-specific errors.'''
pass
def init_bridge():
'''Initialize the bridge. This includes purging all old bridge messages
from the system_script_forker. On restart or reinitialization, these old
children should be considered invalid and purged.
'''
forker = ComponentProxy(FORKER, defer=True)
try:
stale_children = forker.get_children('apbridge', None)
forker.cleanup_children([int(child['id']) for child in stale_children])
except Exception:
_logger.error('Unable to clear children from prior runs. Init failed.',
exc_info=True)
raise BridgeError('Bridge initialization failed.')
return
def reserve(user, jobid, nodecount, attributes=None, node_id_list=None):
'''reserve a set of nodes in ALPS'''
if attributes is None:
attributes = {}
params = {}
param_attrs = {}
params['user_name'] = user
params['batch_id'] = jobid
param_attrs['width'] = attributes.get('width', nodecount * DEFAULT_DEPTH)
param_attrs['depth'] = attributes.get('depth', None)
param_attrs['nppn'] = attributes.get('nppn', DEFAULT_DEPTH)
param_attrs['npps'] = attributes.get('nnps', None)
param_attrs['nspn'] = attributes.get('nspn', None)
param_attrs['reservation_mode'] = attributes.get('reservation_mode',
'EXCLUSIVE')
param_attrs['nppcu'] = attributes.get('nppcu', None)
param_attrs['p-state'] = attributes.get('p-state', None)
param_attrs['p-govenor'] = attributes.get('p-govenor', None)
for key, val in param_attrs.items():
if val is not None:
params[key] = val
if node_id_list is not None:
params['node_list'] = [int(i) for i in node_id_list]
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RESERVE',
params=params)))
return retval
def release(alps_res_id):
'''release a set of nodes in an ALPS reservation. May be used on a
reservation with running jobs. If that occurs, the reservation will be
released when the jobs exit/are terminated.
Input:
alps_res_id - id of the ALPS reservation to release.
Returns:
True if relese was successful
Side Effects:
ALPS reservation will be released. New aprun attempts agianst
reservation will fail.
Exceptions:
None Yet
'''
params = {'reservation_id': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RELEASE',
params=params)))
return retval
def confirm(alps_res_id, pg_id):
'''confirm an ALPS reservation. Call this after we have the process group
id of the user job that we are reserving nodes for.
Input:
alps_res_id - The id of the reservation that is being confirmed.
pg_id - process group id to bind the reservation to.
Return:
True if the reservation is confirmed. False otherwise.
Side effects:
None
Exceptions:
None Yet.
'''
params = {'pagg_id': pg_id,
'reservation_id': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('CONFIRM',
params=params)))
return retval
def system():
'''fetch system information using the SYSTEM query. Provides memory
information'''
params = {}
req = BasilRequest('QUERY', 'SYSTEM', params)
return _call_sys_forker(BASIL_PATH, str(req))
def fetch_inventory(changecount=None, resinfo=False):
'''fetch the inventory for the machine
changecount - changecount to send if we only want deltas past a certain
point
resinfo - also fetch information on current reservations
return:
dictionary of machine information parsed from XML response
'''
params = {}
if changecount is not None:
params['changecount'] = changecount
if resinfo:
params['resinfo'] = True
#TODO: add a flag for systems with version <=1.4 of ALPS
req = BasilRequest('QUERY', 'INVENTORY', params)
#print str(req)
return _call_sys_forker(BASIL_PATH, str(req))
def fetch_reservations():
'''fetch reservation data. This includes reservation metadata but not the
reserved nodes.
'''
params = {'resinfo': True, 'nonodes' : True}
req = BasilRequest('QUERY', 'INVENTORY', params)
return _call_sys_forker(BASIL_PATH, str(req))
def reserved_nodes():
params = {}
req = BasilRequest('QUERY', 'RESERVEDNODES', params)
return _call_sys_forker(BASIL_PATH, str(req))
def fetch_aggretate_reservation_data():
'''correlate node and reservation data to get which nodes are in which
reservation.
'''
pass
def extract_system_node_data(node_data):
ret_nodeinfo = {}
for node_info in node_data['nodes']:
#extract nodeids, construct from bulk data block...
for node_id in expand_num_list(node_info['node_ids']):
node = {}
node['node_id'] = node_id
node['state'] = node_info['state']
node['role'] = node_info['role']
node['attrs'] = node_info
ret_nodeinfo[str(node_id)] = node
del node_info['state']
del node_info['node_ids']
del node_info['role']
return ret_nodeinfo
def _call_sys_forker(basil_path, in_str):
'''take a parameter dictionary and make appropriate call through to BASIL
wait until we get output and clean up child info.'''
runid = None #_RUNID_GEN.next()i
resp = None
try:
child = ComponentProxy(FORKER).fork([basil_path], 'apbridge',
'alps', None, None, runid, in_str, True)
runid = child
except Exception:
_logger.critical("error communicating with bridge", exc_info=True)
raise
while True:
#Is a timeout needed here?
children = ComponentProxy(FORKER).get_children('apbridge', [runid])
complete = False
for child in children:
if child['complete']:
if child['exit_status'] != 0:
_logger.error("BASIL returned a status of %s",
child['exit_status'])
resp = child['stdout_string']
ComponentProxy(FORKER).cleanup_children([runid])
complete = True
if complete:
break
sleep(CHILD_SLEEP_TIMEOUT)
parsed_resp = {}
try:
parsed_resp = parse_response(resp)
except xml.etree.ElementTree.ParseError as exc:
_logger.error('Error parsing response "%s"', resp)
raise exc
return parsed_resp
def print_node_names(spec):
'''Debugging utility to print nodes returned by ALPS'''
print spec['reservations']
print spec['nodes'][0]
for node in spec['nodes']:
print node['name']
if __name__ == '__main__':
#print_node_names(fetch_inventory(resinfo=True))
# print fetch_inventory(changecount=0)
# print extract_system_node_data(system())
# print fetch_reserved_nodes()
# print fetch_inventory(resinfo=True)
print fetch_reservations()
"""General cluster node class. This contains additional information that nodes
require that may not be needed for indirectly allocated resources like wires.
"""
from Cobalt.Components.system.resource import Resource
from Cobalt.Exceptions import UnschedulableNodeError
import time
import logging
_logger = logging.getLogger()
class ClusterNode(Resource):
'''Cluster nodes have a few extra fields beyond the default resource
fields:
schedulable - should the node be scheduled. If false this will cause a
node to be invisible in the user-level nodelist command.
draining - If the node has a drain time set, this is True.
drain_until - If the node is supposed to be drained, this is the time
the draining job is supposed to end, in seconds from
epoch.
drain_jobid - The jobid that is setting the drain time.
backfill_window - the time available for jobs that can be backfilled
onto the draining resource.
backfill_epsilon - the time to subtract from the backfill window.
Default is 120 seconds.
'''
def __init__(self, spec):
'''Initialize a ClusterNode object.'''
super(ClusterNode, self).__init__(spec)
self.queues = spec.get('queues', ['default']) #list of queues
self.schedulable = spec.get('schedulable', True)
self._drain_until = spec.get('drain_until', None)
self._drain_jobid = spec.get('drain_jobid', None)
self._backfill_epsilon = None
self.backfill_epsilon = int(spec.get('backfill_epsilon', 120))
def reset_info(self, node):
'''reset node information on restart from a stored node object'''
super(ClusterNode, self).reset_info(node)
self.queues = node.queues
self.schedulable = node.schedulable
self._drain_until = node.drain_until
self._drain_jobid = node.drain_jobid
@property
def drain_until(self):
'''Time in seconds from epoch that the node will drain for.'''
return self._drain_until
@property
def drain_jobid(self):
'''Jobid that the node is waiting for.'''
return self._drain_jobid
@property
def draining(self):
'''Reutrn if a node is draining. True if drain_until is set.'''
return self.drain_until is not None
@property
def backfill_window(self, when=None):
'''The time remaining on this node for backfilling in integer
seconds. This incorporates the backfill_epsilon for this node
Inputs:
when - The time to use to consider this backfill window.
Defaults to the value returned by time.time() at the time
of call.
Output:
None if node isn't being drained. The time remaining in the
drain minus the backfill_epsilon otherwise.
'''
now = time.time() if when is None else when
backfill_time = None
if self.drain_until is not None:
backfill_time = int(
min(self.drain_until - now - self._backfill_epsilon, 0))
return backfill_time
def set_drain(self, drain_until, jobid):
'''Set a node to draining and mark with the jobid that caused it.
A non-schedulable node cannot be marked as draining.
Inputs:
drain_until - time in seconds from epoch that the job is scheduled
to drain until.
jobid - the cobalt jobid of the job that we are waiting to end.
Returns: None
'''
if not self.schedulable or self.status == 'down':
err = '%s: Attempted to drain unscheduled or down node.' % self.name
_logger.warning(err)
raise UnschedulableNodeError(err)
self._drain_until = int(drain_until)
self._drain_jobid = int(jobid)
def clear_drain(self):
'''Clear the draining data from a block.'''
self._drain_until = None
self._drain_jobid = None
@property
def backfill_epsilon(self):
'''The time to subtract from the backfill window to most efficiently
start the draining job. Must be nonnegative value. Time is in integer
seconds.
'''
return self._backfill_epsilon
@backfill_epsilon.setter
def backfill_epsilon(self, epsilon):
'''set the backfill epsilon. Must be nonnegative'''
if epsilon < 0:
raise ValueError("epsilon must be a non-negative value")
self._backfill_epsilon = int(epsilon)
"""Cray-specific node information"""
import logging
from Cobalt.Components.system.ClusterNode import ClusterNode
_logger = logging.getLogger(__name__)
class CrayNode(ClusterNode):
'''Extension of ClusterNodes for Cray specific systems. The first system
targeted for this node type is Cray's XC-40 KNL nodes.
Intended for use with ALPS. Not Native mode.
'''
CRAY_STATE_MAP = {'UP': 'idle', 'DOWN':'down', 'UNAVAILABLE':'down',
'ROUTING':'down', 'SUSPECT':'down', 'ADMIN':'down',
'UNKNOWN':'down', 'UNAVAIL': 'down', 'SWDOWN': 'down',
'REBOOTQ':'down', 'ADMINDOWN':'down'}
DOWN_STATUSES = ['down', 'alps-interactive']
def __init__(self, spec):
super(CrayNode, self).__init__(spec)
self._status = self.CRAY_STATE_MAP[spec['state'].upper()]
self.node_id = spec['node_id']
self.role = spec['role'].upper()
self.attributes['architecture'] = spec['architecture']
self.segment_details = spec['SocketArray']
self.ALPS_status = 'UNKNOWN' #Assume unknown state.
CrayNode.RESOURCE_STATUSES.append('alps-interactive')
def to_dict(self, cooked=False, params=None):
'''return a dictionary representation of a node. Used to send data to
clients/other components.
Input:
cooked - (default: False) If true, strip leading '_' characters from
variables. Useful for display applications (e.g. nodelist)
Returns:
Dictionary representation of CrayNode fields.
Notes:
The output can be sent via XMLRPC without modificaiton
'''
ret_node = self.__dict__
if cooked:
cooked_node = {}
for key, val in self.__dict__.items():
if key.startswith('_'):
cooked_node[key[1:]] = val
else:
cooked_node[key] = val
ret_node = cooked_node
if params is not None and cooked:
params = [p.lower() for p in params]
ret_node = {k:v for k, v in ret_node.items() if k.lower() in params}
return ret_node
def __str__(self):
return str(self.to_dict())
def reset_info(self, node):
'''reset node information on restart from a stored node object'''
super(CrayNode, self).reset_info(node)
self.status = node.status
@property
def status(self):
return super(CrayNode, self).status
@status.setter
def status(self, new_status):
'''set status using cray states, as well as internal state.
also, coerce to allocated if we are used by something, but still marked
idle.
'''
#admin down wins. If an admin says it's down, it's down.
if self.admin_down:
self._status = 'down'
return
if new_status.upper() in self.CRAY_STATE_MAP.keys():
self._status = self.CRAY_STATE_MAP[new_status.upper()]
self.ALPS_status = new_status
elif new_status in CrayNode.RESOURCE_STATUSES:
self._status = new_status
else:
raise KeyError('%s is not a valid state for Cray Nodes.' % new_status)
if self._status == 'idle' and self.reserved:
self.status == 'allocated'
This diff is collapsed.
"""Base process group management classes for Cobalt system components.
"""
import logging
import time
import Queue
import re
from threading import RLock
from Cobalt.Proxy import ComponentProxy
from Cobalt.DataTypes.ProcessGroup import ProcessGroup, ProcessGroupDict
from Cobalt.Exceptions import ProcessGroupStartupError, ComponentLookupError
from Cobalt.Util import init_cobalt_config, get_config_option
from Cobalt.Data import IncrID
_logger = logging.getLogger()
init_cobalt_config()
FORKER_RE = re.compile('forker')
class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''Manager for process groups. These are tasks that Cobalt run on behalf of
the user. Typically these are scripts submitted via qsub.'''
SIGKILL_TIMEOUT = int(get_config_option('system', 'sigkill_timeout', 300))
def __init__(self, pgroup_type=ProcessGroup):
'''Initialize process group manager.
Input:
pgroup_type: [optional] type of process group class to use. Must be
compatible with the ProcessGroupDict class.
'''
self.pgroup_type = pgroup_type
self._common_init_restart()
def _common_init_restart(self, state=None):
'''common intitialization code for both cold initilaization and
reinitialization.
'''
if state is None:
self.process_groups = ProcessGroupDict()
self.process_groups.item_cls = self.pgroup_type
else:
self.process_groups = state.get('process_groups',
ProcessGroupDict())
for pg in self.process_groups.values():
_logger.info('recovering pgroup %s, jobid %s', pg.id, pg.jobid)
self.process_groups.id_gen.set(int(state['next_pg_id']))
self.process_group_actions = {}
self.forkers = [] #list of forker identifiers to use with ComponentProxy
self.forker_taskcounts = {} # dict of forkers and counts of pgs attached
self.process_groups_lock = RLock()
self.update_launchers()
def __getstate__(self):
state = {}
state['process_groups'] = self.process_groups
state['next_pg_id'] = self.process_groups.id_gen.idnum + 1
return state
def __setstate__(self, state):
self._common_init_restart(state)
return self
def init_groups(self, specs):
'''Add a set of process groups from specs. Generate a unique id.
Input:
specs - a list of dictionaries that specify process groups for a
given system
Returns:
list of process groups that were just added.
'''
# modify the forker in specs to force the job to round-robbin forkers
for spec in specs:
ordered_forkers = [f[0] for f in
sorted(self.forker_taskcounts.items(), key=lambda x:x[1])]
if len(ordered_forkers) < 0:
raise RuntimeError("No forkers registered!")
else:
spec['forker'] = ordered_forkers[0] #this is now a tuple
self.forker_taskcounts[spec['forker']] += 1
_logger.info("Job %s using forker %s", spec['jobid'], spec['forker'])
return self.process_groups.q_add(specs)
def signal_groups(self, pgids, signame="SIGINT"):
'''Send signal with signame to a list of process groups.
Returns:
List of signaled process groups
'''
signaled_pgs = []
for pgid in pgids:
if self.process_groups[pgid].mode == 'interactive':
self.process_groups[pgid].interactive_complete = True
signaled_pgs.append(self.process_groups[pgid])
elif self.process_groups[pgid].signal(signame):
signaled_pgs.append(self.process_groups[pgid])
return signaled_pgs
def terminate_groups(self, pgids):
'''Send SIGINTs to process groups to allow them to terminate gracefully.
Set the time at which a SIGKILL will be send if the process group has
not completed.
'''
now = int(time.time())
self.signal_groups(pgids)
for pg_id in pgids:
self.process_groups[pg_id].sigkill_timeout = int(now + self.SIGKILL_TIMEOUT)
def start_groups(self, pgids):
'''Start process groups. Return groups that succeeded startup.
'''
started = []
for pg_id in pgids:
try:
self.process_groups[pg_id].start()
except ProcessGroupStartupError:
_logger.error("%s: Unable to start process group.",
self.process_groups[pg_id].label)
else:
started.append(pg_id)
self.process_groups[pg_id].startup_timeout = 0