Commit 47062f4a authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch 'multiple-forkers' into 'master'

Multiple forkers

Support for multiple Forkers (i.e. multiple script hosts) for Cray systems.

See merge request !9
parents 8d520a31 a07733a2
#!/usr/bin/env python
'''Launcher for alps_script_forkers. If we are running multiple on a node,
this will spawn the actual forkers as subprocesses.
'''
__revision__ = '$Revision: $'
import sys
from Cobalt.Components.alps_script_forker import ALPSScriptForker
from Cobalt.Components.base import run_component
#from Cobalt.Util import init_cobalt_config, get_config_option
if __name__ == "__main__":
# state_name = 'alps_script_forker'
# if '--name' in sys.argv:
# state_name = sys.argv[sys.argv.index('--name') + 1]
seq_num = 0
if '--seq' in sys.argv:
seq_num = sys.argv[sys.argv.index('--seq') + 1]
sys.argv.remove('--seq')
sys.argv.remove(seq_num)
seq_num = int(seq_num)
try:
run_component(ALPSScriptForker, single_threaded=True, state_name="alps_script_forker")
run_component(ALPSScriptForker, single_threaded=True,
aug_comp_name=True, state_name_match_component=True,
seq_num=seq_num)
except KeyboardInterrupt:
sys.exit(1)
......@@ -15,6 +15,7 @@ import logging
import time
import threading
import xmlrpclib
import socket
import Cobalt
import Cobalt.Proxy
......@@ -42,7 +43,7 @@ except Exception, e:
def state_file_location():
'''Grab the location of the Cobalt statefiles.
'''Grab the location of the Cobalt statefiles.
default: /var/spool/cobalt
......@@ -51,8 +52,9 @@ def state_file_location():
def run_component (component_cls, argv=None, register=True, state_name=False,
cls_kwargs={}, extra_getopt='', time_out=10,
single_threaded=False):
'''Run the Cobalt component.
single_threaded=False, seq_num=0, aug_comp_name=False,
state_name_match_component=False):
'''Run the Cobalt component.
arguments:
......@@ -64,6 +66,8 @@ def run_component (component_cls, argv=None, register=True, state_name=False,
extra_getopt
time_out
single_threaded
seq_num
aug_comp_name
This will run until a the component is terminated.
......@@ -91,6 +95,16 @@ def run_component (component_cls, argv=None, register=True, state_name=False,
elif item[0] == '-d':
level = logging.DEBUG
# form new component name for running on multiple hosts override in
# component_cls passed in
if aug_comp_name:
component_cls.name = '_'.join([component_cls.name, socket.gethostname(),
str(seq_num)])
if state_name_match_component:
# Request that the statefile name match the augmented component name
if not state_name:
state_name = component_cls.name
logging.getLogger().setLevel(level)
Cobalt.Logging.setup_logging(component_cls.implementation, console_timestamp=True)
......
......@@ -46,9 +46,9 @@ __all__ = [
class Service (Data):
fields = Data.fields + ["tag", "name", "location"]
def __init__ (self, spec):
Data.__init__(self, spec)
spec = spec.copy()
......@@ -63,50 +63,50 @@ class Service (Data):
class ServiceDict (DataDict):
item_cls = Service
key = "name"
class ServiceLocator (Component):
"""Generic implementation of the service-location component.
Methods:
register -- register a service (exposed)
unregister -- remove a service from the registry (exposed)
locate -- retrieve the location of a service (exposed)
get_services -- part of the query interface from DataSet (exposed)
"""
name = "service-location"
# A default logger for the class is placed here.
# Assigning an instance-level logger is supported,
# and expected in the case of multiple instances.
logger = logging.getLogger("Cobalt.Components.ServiceLocator")
def __init__ (self, *args, **kwargs):
"""Initialize a new ServiceLocator.
All arguments are passed to the component constructor.
"""
Component.__init__(self, *args, **kwargs)
self.services = ServiceDict()
def __getstate__(self):
state = {}
state.update(Component.__getstate__(self))
state.update({
'slp_version': 1})
return state
def __setstate__(self, state):
Component.__setstate__(self, state)
def register (self, service_name, location):
"""Register the availability of a service.
Arguments:
service_name -- name of the service to register
location -- location of the service
......@@ -121,10 +121,10 @@ class ServiceLocator (Component):
service.location = location
service.touch()
register = exposed(register)
def unregister (self, service_name):
"""Remove a service from the registry.
Arguments:
service_name -- name of the service to remove
"""
......@@ -135,10 +135,10 @@ class ServiceLocator (Component):
else:
self.logger.info("unregister(%r)" % (service_name))
unregister = exposed(unregister)
def locate (self, service_name):
"""Retrieve the location for a service.
Arguments:
service_name -- name of the service to look up
"""
......@@ -149,7 +149,7 @@ class ServiceLocator (Component):
return ""
return service.location
locate = exposed(locate)
def get_services (self, specs):
"""Query interface "Get" method."""
return self.services.q_get(specs)
......@@ -157,20 +157,20 @@ class ServiceLocator (Component):
class PollingServiceLocator (ServiceLocator):
"""ServiceLocator with active expiration.
Methods:
check_services -- ping services (automatic)
"""
implementation = "polling"
logger = logging.getLogger("Cobalt.Components.PollingServiceLocator")
def check_services (self):
"""Ping each service to check its availability.
Unregister unresponsive services.
"""
for service in self.services.values():
......@@ -186,34 +186,34 @@ class PollingServiceLocator (ServiceLocator):
class TimingServiceLocator (ServiceLocator):
"""ServiceLocator with passive expiration.
Attributes:
expire -- number of seconds to expire a service
Methods:
expire_services -- check service timestamps (automatic)
"""
implementation = "slp"
logger = logging.getLogger("Cobalt.Components.TimingServiceLocator")
def __init__ (self, expire=180, *args, **kwargs):
"""Initialize a TimingServiceLocator.
Keyword arguments:
expire -- Number of seconds when services expire.
Additional arguments are passed to ServiceLocator.
"""
ServiceLocator.__init__(self, *args, **kwargs)
self.expire = expire
def expire_services (self):
"""Check each service timestamp.
Unregister expired services.
"""
now = time.time()
......
......@@ -86,7 +86,8 @@ class CraySystem(BaseSystem):
self.process_manager = ProcessGroupManager()
else:
self.process_manager = ProcessGroupManager().__setstate__(spec['process_manager'])
self.process_manager.forkers.append('alps_script_forker')
#self.process_manager.forkers.append('alps_script_forker')
self.process_manager.update_launchers()
_logger.info('PROCESS MANAGER INTIALIZED')
#resource management setup
self.nodes = {} #cray node_id: CrayNode
......@@ -238,9 +239,15 @@ class CraySystem(BaseSystem):
def _run_update_state(self):
'''automated node update functions on the update timer go here.'''
while True:
self.update_node_state()
self._get_exit_status()
Cobalt.Util.sleep(UPDATE_THREAD_TIMEOUT)
try:
self.process_manager.update_launchers()
self.update_node_state()
self._get_exit_status()
except Exception:
# prevent the update thread from dying
_logger.critical('Error in _run_update_state', exc_info=True)
finally:
Cobalt.Util.sleep(UPDATE_THREAD_TIMEOUT)
@exposed
def update_node_state(self):
......@@ -738,7 +745,7 @@ class CraySystem(BaseSystem):
start_apg_timer = int(time.time())
for spec in specs:
spec['forker'] = 'alps_script_forker'
spec['forker'] = None
alps_res = self.alps_reservations.get(str(spec['jobid']), None)
if alps_res is not None:
spec['alps_res_id'] = alps_res.alps_res_id
......
......@@ -6,6 +6,7 @@
import logging
import time
import Queue
import re
from threading import RLock
from Cobalt.Proxy import ComponentProxy
from Cobalt.DataTypes.ProcessGroup import ProcessGroup, ProcessGroupDict
......@@ -17,6 +18,8 @@ _logger = logging.getLogger()
init_cobalt_config()
FORKER_RE = re.compile('forker')
class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''Manager for process groups. These are tasks that Cobalt run on behalf of
the user. Typically these are scripts submitted via qsub.'''
......@@ -47,7 +50,9 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
self.process_groups.id_gen.set(int(state['next_pg_id']))
self.process_group_actions = {}
self.forkers = [] #list of forker identifiers to use with ComponentProxy
self.forker_taskcounts = {} # dict of forkers and counts of pgs attached
self.process_groups_lock = RLock()
self.update_launchers()
def __getstate__(self):
state = {}
......@@ -70,6 +75,17 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
list of process groups that were just added.
'''
# modify the forker in specs to force the job to round-robbin forkers
for spec in specs:
ordered_forkers = [f[0] for f in
sorted(self.forker_taskcounts.items(), key=lambda x:x[1])]
if len(ordered_forkers) < 0:
raise RuntimeError("No forkers registered!")
else:
spec['forker'] = ordered_forkers[0] #this is now a tuple
self.forker_taskcounts[spec['forker']] += 1
_logger.info("Job %s using forker %s", spec['jobid'], spec['forker'])
return self.process_groups.q_add(specs)
def signal_groups(self, pgids, signame="SIGINT"):
......@@ -194,6 +210,38 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
for pg_id in pgids:
pg = self.process_groups[pg_id]
cleaned_groups.append(pg)
self.forker_taskcounts[pg.forker] -= 1
del self.process_groups[pg_id]
_logger.info('%s Process Group deleted', pg.label)
return cleaned_groups
def update_launchers(self):
'''Update the list of task launchers. This right now works for
alps_forkers. Drop entries that slp doesn't know about and add in ones
that it does.
Will want to run this in the normal update loop
If we have no launchers, we should prevent jobs from starting.
resets the internal forker list to an updated list based on SLP registry
return is void
'''
# TODO: Move this to something Cray-specific later
updated_forker_list = []
try:
services = ComponentProxy('service-location').get_services([{'name':'*'}])
except Exception:
_logger.critical('Unable to reach service-location', exc_info=True)
return
for service in services:
asf_re = re.compile('alps_script_forker')
if re.match(asf_re, service['name']):
updated_forker_list.append(service['name'])
if service['name'] not in self.forker_taskcounts.keys():
self.forker_taskcounts[service['name']] = 0
# Get currently running tasks from forkers. Different loop?
self.forkers = updated_forker_list
return
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment