Commit e9d8fb39 authored by Paul Rich's avatar Paul Rich
Browse files

Update behavior now in alpssystem. Alpssystem now a full component.

alpssystem can now be invoked as a system component.  It will initialize
query and update nodes.  This adds in the udpate behavior for nodes.
Persistence isn't supported yet and will be coming soon.  This is
currently using a base process manager.
parent 4ef80dcb
#!/usr/bin/env python
# $Id$
from Cobalt.Components.system.CraySystem import CraySystem
from Cobalt.Components.base import run_component
if __name__ == "__main__":
run_component(CraySystem, register=True, state_name="alpssystem")
except KeyboardInterrupt:
......@@ -25,21 +25,60 @@ CHILD_SLEEP_TIMEOUT = 1.0
class BridgeError(Exception):
def reserve():
def reserve(node_id_list=None):
'''reserve a set of nodes in ALPS'''
raise NotImplementedError
def release():
'''release a set of nodes in an ALPS reservation'''
def release(res_id):
'''release a set of nodes in an ALPS reservation. May be used on a
reservation with running jobs. If that occurs, the reservation will be
released when the jobs exit/are terminated.
def confirm():
'''confirm an ALPS reservation'''
alps_res_id - id of the ALPS reservation to release.
True if relese was successful
Side Effects:
ALPS reservation will be released. New aprun attempts agianst
reservation will fail.
None Yet
params = { 'reservation': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RELEASE',
return retval
def confirm(alps_res_id, pg_id):
'''confirm an ALPS reservation. Call this after we have the process group
id of the user job that we are reserving nodes for.
alps_res_id - The id of the reservation that is being confirmed.
pg_id - process group id to bind the reservation to.
True if the reservation is confirmed. False otherwise.
Side effects:
None Yet.
params = {'pagg_id': pg_id,
'reservation': alps_res_id}
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('CONFIRM',
return retval
def fetch_inventory(changecount=None, resinfo=False):
'''fetch the inventory for the machine
......@@ -88,6 +127,7 @@ def _call_sys_forker(basil_path, in_str):
if complete:
return parse_response(resp)
def print_node_names(spec):
......@@ -98,3 +138,5 @@ def print_node_names(spec):
if __name__ == '__main__':
print fetch_inventory(changecount=0)
......@@ -10,8 +10,7 @@ class CrayNode(ClusterNode):
def __init__(self, spec):
super(CrayNode, self).__init__(spec)
print spec
self.state = self.CRAY_STATE_MAP[spec['state'].upper()]
self._status = self.CRAY_STATE_MAP[spec['state'].upper()]
self.node_id = spec['node_id']
self.role = spec['role']
self.attributes['architecture'] = spec['architecture']
......@@ -21,4 +20,25 @@ class CrayNode(ClusterNode):
return self.__dict__
def __str__(self):
return str(to_dict)
return str(self.to_dict())
def status(self):
return super(CrayNode, self).status()
def status(self, new_status):
'''set status using cray states, as well as internal state.
also, coerce to allocated if we are used by something, but still marked
if new_status.upper() in self.CRAY_STATE_MAP.keys():
self._status = self.CRAY_STATE_MAP[new_status.upper()]
elif new_status in self.RESOURCE_STATUSES:
self._status = new_status
raise KeyError('%s is not a valid state for Cray Nodes.', new_status)
if self._status == 'idle' and self.reserved:
self.status == 'allocated'
......@@ -2,39 +2,61 @@
from Cobalt.Components.base import Component, exposed
from Cobalt.Components.base import Component, exposed, automatic
from Cobalt.Components.system.base_system import BaseSystem
import Cobalt.Components.system.AlpsBridge as AlpsBridge
from Cobalt.Components.system.CrayNode import CrayNode
from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
import Cobalt.Util
import logging
import threading
import thread
import copy
logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)
UPDATE_THREAD_TIMEOUT = 10 #TODO: Time in seconds, make setable
class CraySystem(BaseSystem):
name = "system"
implementation = "alps_system"
logger = _logger
def __init__(self):
def __init__(self, *args, **kwargs):
'''Initialize system. Read initial states from bridge.
Get current state
super(CraySystem, self).__init__(*args, **kwargs)
#process manager setup
self.process_manager = ProcessGroupManager()
self.process_manager.forkers.append('user_script_forker')'PROCESS MANAGER INTIALIZED')
#resource management setup
self.nodes = {}
self.alps_reservations = {}
#state update thread and lock
self._node_lock = threading.Lock()
self.node_update_thread = thread.start_new_thread(self._run_update_state,
def _init_nodes_and_reservations(self):
'''Initialize nodes from ALPS bridge data'''
retnodes = {}
inventory = AlpsBridge.fetch_inventory()
inventory = AlpsBridge.fetch_inventory(resinfo=True)
for nodespec in inventory['nodes']:
node = CrayNode(nodespec)
node.managed = True
retnodes[] = node
self.nodes = retnodes'NODE INFORMATION INITIALIZED')'ALPS REPORTS %s NODES', len(self.nodes))
print [str(node) for node in self.nodes.values()]
for resspec in inventory['reservations']:
self.alps_reservations[resspec['reservation_id']] = ALPSReservation(resspec)
......@@ -53,13 +75,55 @@ class CraySystem(BaseSystem):
if as_dict:
retdict = {}
for node in self.nodes.values():
retdict[] = node.to_dict()
raw_node = node.to_dict()
cooked_node = {}
for key, val in raw_node.items():
if key.startswith('_'):
cooked_node[key[1:]] = val
cooked_node[key] = val
retdict[] = cooked_node
return retdict
return self.nodes
raise NotImplementedError
def _run_update_state(self):
'''automated node update functions on the update timer go here.'''
while True:
def update_node_state(self):
'''update the state of cray nodes. Check reservation status and system
stateus as reported by ALPS
with self._node_lock:
original_nodes = copy.deepcopy(self.nodes)
updates = {} #node_id and node to update
inventory = AlpsBridge.fetch_inventory(resinfo=True) #This is a full-refresh,
#summary should be used otherwise
inven_nodes = inventory['nodes']
inven_reservations = inventory['reservations']
#find hardware status
with self._node_lock:
for inven_node in inven_nodes:
if self.nodes.has_key(inven_node['name']):
self.nodes[inven_node['name']].status = inven_node['state'].upper()
# Cannot add nodes on the fly. Or at lesat we shouldn't be
# able to.
_logger.error('UNS: ALPS reports node %s but not in our node list.',
#check/update reservation information and currently running locations?
#fetch info from process group manager for currently running jobs.
#should down win over running in terms of display?
class ALPSReservation(object):
......@@ -86,3 +150,4 @@ if __name__ == '__main__':
print "Resid: %s" % res_id
for key, val in reservation.__dict__.items():
print " %s: %s" % (key, val)
......@@ -16,6 +16,7 @@ Cluster-based equivalence classes
import logging
from Cobalt.Components.base import exposed, automatic, query, locking
from Cobalt.Components.base import Component
from Cobalt.Util import init_cobalt_config, get_config_option
_logger = logging.getLogger()
......@@ -24,9 +25,11 @@ init_cobalt_config()
class BaseSystem(object):
class BaseSystem(Component):
def __init__(self, *args, **kwargs):
super(BaseSystem, self).__init__(*args, **kwargs)
self.process_manager = ProcessManager()
self.resource_manager = ResourceManager()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment