Commit e6f594df authored by Paul Rich's avatar Paul Rich
Browse files

Forker automatic acquisiton and dispatch working: single forker case

Well, we're at least back at original functionality.  Forkers are
automatically acquired and dispatch to at least the single forker case
appears to work.
parent dd612322
......@@ -86,7 +86,8 @@ class CraySystem(BaseSystem):
self.process_manager = ProcessGroupManager()
else:
self.process_manager = ProcessGroupManager().__setstate__(spec['process_manager'])
self.process_manager.forkers.append('alps_script_forker')
#self.process_manager.forkers.append('alps_script_forker')
self.process_manager.update_launchers()
_logger.info('PROCESS MANAGER INTIALIZED')
#resource management setup
self.nodes = {} #cray node_id: CrayNode
......@@ -238,9 +239,15 @@ class CraySystem(BaseSystem):
def _run_update_state(self):
'''automated node update functions on the update timer go here.'''
while True:
self.update_node_state()
self._get_exit_status()
Cobalt.Util.sleep(UPDATE_THREAD_TIMEOUT)
try:
self.process_manager.update_launchers()
self.update_node_state()
self._get_exit_status()
except Exception:
# prevent the update thread from dying
_logger.critical('Error in _run_update_state', exc_info=True)
finally:
Cobalt.Util.sleep(UPDATE_THREAD_TIMEOUT)
@exposed
def update_node_state(self):
......@@ -738,7 +745,7 @@ class CraySystem(BaseSystem):
start_apg_timer = int(time.time())
for spec in specs:
spec['forker'] = 'alps_script_forker'
spec['forker'] = None
alps_res = self.alps_reservations.get(str(spec['jobid']), None)
if alps_res is not None:
spec['alps_res_id'] = alps_res.alps_res_id
......
......@@ -6,6 +6,7 @@
import logging
import time
import Queue
import re
from threading import RLock
from Cobalt.Proxy import ComponentProxy
from Cobalt.DataTypes.ProcessGroup import ProcessGroup, ProcessGroupDict
......@@ -17,6 +18,8 @@ _logger = logging.getLogger()
init_cobalt_config()
FORKER_RE = re.compile('forker')
class ProcessGroupManager(object): #degenerate with ProcessMonitor.
'''Manager for process groups. These are tasks that Cobalt run on behalf of
the user. Typically these are scripts submitted via qsub.'''
......@@ -47,7 +50,9 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
self.process_groups.id_gen.set(int(state['next_pg_id']))
self.process_group_actions = {}
self.forkers = [] #list of forker identifiers to use with ComponentProxy
self.forker_taskcounts = {} # dict of forkers and counts of pgs attached
self.process_groups_lock = RLock()
self.update_launchers()
def __getstate__(self):
state = {}
......@@ -70,6 +75,15 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
list of process groups that were just added.
'''
# modify the forker in specs to force the job to round-robbin forkers
for spec in specs:
ordered_forkers = sorted(self.forker_taskcounts, key=lambda x:x[1])
if len(ordered_forkers) < 0:
raise RuntimeError("No forkers registered!")
else:
spec['forker'] = ordered_forkers[0]
self.forker_taskcounts[spec['forker']] += 1
return self.process_groups.q_add(specs)
def signal_groups(self, pgids, signame="SIGINT"):
......@@ -194,6 +208,38 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
for pg_id in pgids:
pg = self.process_groups[pg_id]
cleaned_groups.append(pg)
self.forker_taskcounts[pg.forker] -= 1
del self.process_groups[pg_id]
_logger.info('%s Process Group deleted', pg.label)
return cleaned_groups
def update_launchers(self):
'''Update the list of task launchers. This right now works for
alps_forkers. Drop entries that slp doesn't know about and add in ones
that it does.
Will want to run this in the normal update loop
If we have no launchers, we should prevent jobs from starting.
resets the internal forker list to an updated list based on SLP registry
return is void
'''
# TODO: Move this to something Cray-specific later
updated_forker_list = []
try:
services = ComponentProxy('service-location').get_services([{'name':'*'}])
except Exception:
_logger.critical('Unable to reach service-location', exc_info=True)
return
for service in services:
asf_re = re.compile('alps_script_forker')
if re.match(asf_re, service['name']):
updated_forker_list.append(service['name'])
if service['name'] not in self.forker_taskcounts.keys():
self.forker_taskcounts[service['name']] = 0
# Get currently running tasks from forkers. Different loop?
self.forkers = updated_forker_list
return
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment