Commit 37de4f0e authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '31-fix-startup-race' into 'master'

This should effectively end the startup race condition

This should get rid of the bulk of the 1234567 exit statuses. Forces a
timeout.  The timeout goes away when the job is started.  This should
fix the process group initilization/start gap.
Closes #31

See merge request !16
parents f456d0ed 33fa5647
......@@ -32,14 +32,19 @@ SAVE_ME_INTERVAL = float(get_config_option('alpsssytem', 'save_me_interval', 10.
PENDING_STARTUP_TIMEOUT = float(get_config_option('alpssystem',
'pending_startup_timeout', 1200)) #default 20 minutes to account for boot.
APKILL_CMD = get_config_option('alps', 'apkill', '/opt/cray/alps/default/bin/apkill')
PGROUP_STARTUP_TIMEOUT = float(get_config_option('alpssystem', 'pgroup_startup_timeout', 120.0))
class ALPSProcessGroup(ProcessGroup):
'''ALPS-specific PocessGroup modifications.'''
def __init__(self, spec):
super(ALPSProcessGroup, self).__init__(spec)
self.alps_res_id = spec['alps_res_id']
self.alps_res_id = spec.get('alps_res_id', None)
self.interactive_complete = False
now = time.time()
self.startup_timeout = int(spec.get("pgroup_startup_timeout",
now + PGROUP_STARTUP_TIMEOUT))
#inherit generic getstate and setstate methods from parent
......@@ -983,7 +988,6 @@ class CraySystem(BaseSystem):
if alps_res is not None:
spec['alps_res_id'] = alps_res.alps_res_id
new_pgroups = self.process_manager.init_groups(specs)
for pgroup in new_pgroups:
_logger.info('%s: process group %s created to track job status',
pgroup.label, pgroup.id)
......
......@@ -70,7 +70,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
return self
def init_groups(self, specs):
'''Add a set of process groups from specs. Generate a unique id.]
'''Add a set of process groups from specs. Generate a unique id.
Input:
specs - a list of dictionaries that specify process groups for a
......@@ -133,6 +133,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
self.process_groups[pg_id].label)
else:
started.append(pg_id)
self.process_groups[pg_id].startup_timeout = 0
return started
#make automatic get final status of process group
......@@ -163,6 +164,9 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
#clean up orphaned process groups
for pg in self.process_groups.values():
if now < pg.startup_timeout:
#wait for startup timeout. We don't want any hasty kills
continue
pg_id = pg.id
child_uid = (pg.forker, pg.head_pid)
if child_uid not in children:
......@@ -176,7 +180,7 @@ class ProcessGroupManager(object): #degenerate with ProcessMonitor.
continue
orphaned.append(pg_id)
_logger.warning('%s: orphaned job exited with unknown status', pg.jobid)
pg.exit_status = 1234567 #FIXME: what should this sentinel be?
pg.exit_status = 1234567
completed_pgs.append(pg)
else:
children[child_uid]['found'] = True
......
......@@ -103,6 +103,7 @@ class ProcessGroup(Data):
self.sigkill_timeout = None
#TODO: extract into subclass
self.alps_res_id = spec.get('alps_res_id', None)
self.startup_timeout = spec.get("pgroup_startup_timeout", 0)
def __getstate__(self):
data = {}
......@@ -203,11 +204,6 @@ class ProcessGroup(Data):
else:
core_dump_str = ""
_logger.info("%s: terminated with signal %s%s", self.label, child["signum"], core_dump_str)
# else:
# if self.exit_status is None:
# # the forker has lost the child for our process group
# _logger.info("%s: job exited with unknown status", self.label)
# self.exit_status = 1234567 #FIXME: What should this sentinel be?
......
'''Process Manager for cluster/cray systems tests'''
import time
import logging
import sys
from mock import Mock, MagicMock, patch
import Cobalt.Proxy
from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
default_child_data = [{'id': 1}]
def fake_forker(*args, **kwargs):
print args
print kwargs
raise RuntimeError('boom')
#return 1
class InspectMock(MagicMock):
'''allow us to inspect what is going on within a proxy call'''
def __getattr__(self, attr):
if attr == 'get_children':
return MagicMock(return_value=[{'id': 1}])
elif attr == 'fork':
return MagicMock(return_value=1)
return super(InspectMock, self).__getattr__(attr)
class TestProcessManager(object):
'''tests for the base project manager'''
def setup(self):
'''common setup for process group tests'''
self.base_spec = {'args':['arg1', 'arg2'], 'user':'frodo',
'jobid': 1, 'executable': 'job.exe', 'size': 2,
'cwd': '/home/frodo', 'location': 'loc1'
}
self.process_manager = ProcessGroupManager()
self.process_manager.forkers = ['forker1']
self.process_manager.forker_taskcounts = {'forker1':0}
def teardown(self):
'''common teardown for process group tests'''
del self.base_spec
del self.process_manager
def test_process_manager_init_groups_single(self):
'''ProcessGroupManager.init_groups: create a process group and add to process manager'''
specs = [self.base_spec]
self.process_manager.init_groups(specs)
assert self.process_manager.process_groups.get(1, None) is not None, "process group not created"
assert self.process_manager.process_groups[1].forker == 'forker1', "forker not set"
@patch.object(Cobalt.Proxy.DeferredProxyMethod, '__call__', return_value=1)
def test_process_manager_start_groups_single(self, *args, **kwargs):
'''ProcessGroupManager.start_groups: start up a single process group'''
self.base_spec['startup_timeout'] = 120
self.process_manager.init_groups([self.base_spec])
started = self.process_manager.start_groups([1])
assert len(started) == 1, "started %s groups, should have started 1" % len(started)
assert sorted(started) == [1], "wrong groups started."
assert self.process_manager.process_groups[1].startup_timeout == 0, (
"startup_timeout not reset")
@patch('Cobalt.Proxy.DeferredProxy', side_effect=InspectMock)
def test_process_manager_update_groups_timeout(self, *args, **kwargs):
'''ProcessGroupManager.update_groups: startup timeout respected.'''
now = int(time.time())
pgroups = self.process_manager.process_groups
self.process_manager.init_groups([self.base_spec])
pgroups[1].startup_timeout = 120 + now
self.process_manager.update_groups()
pgroups = self.process_manager.process_groups
assert len(pgroups) == 1, "%s groups, should have 1" % len(pgroups)
assert sorted(pgroups.keys()) == [1], "wrong groups."
assert pgroups[1].startup_timeout == now + 120, (
"bad startup timeout: %s" % pgroups[1].startup_timeout)
@patch('Cobalt.Proxy.DeferredProxy', side_effect=InspectMock)
def test_process_manager_update_groups_timeout_exceeded(self, *args, **kwargs):
'''ProcessGroupManager.update_groups: startup timeout exceeded.'''
now = int(time.time())
pgroups = self.process_manager.process_groups
self.process_manager.init_groups([self.base_spec])
pgroups[1].startup_timeout = now - 120
self.process_manager.update_groups()
pgroups = self.process_manager.process_groups
assert len(pgroups) == 0, "%s groups, should have 0" % len(pgroups)
assert sorted(pgroups.keys()) == [], "groups should be empty"
'''Tests for base ProcessGroup class and actions'''
from nose.tools import raises
from mock import Mock, MagicMock, patch
import Cobalt.Exceptions
from Cobalt.DataTypes.ProcessGroup import ProcessGroup
from Cobalt.Proxy import ComponentProxy
mock_proxy = MagicMock()
class TestProcessGroup(object):
'''Group together process group tests, and apply common setup'''
def setup(self):
'''common setup for process group tests'''
self.base_spec = {'args':['arg1', 'arg2'], 'user':'frodo',
'jobid': 1, 'executable': 'job.exe', 'size': 2,
'cwd': '/home/frodo', 'location': 'loc1'
}
def teardown(self):
'''common teardown for process group tests'''
del self.base_spec
def test_process_group_init(self):
'''ProcessGroup.__init__: basic initialization'''
pgroup = ProcessGroup(self.base_spec)
assert pgroup is not None, "process group creation failed"
@raises(Cobalt.Exceptions.DataCreationError)
def test_process_group_init_missing_fields(self):
'''ProcessGroup.__init__: exception on bad spec'''
pgroup = ProcessGroup({})
assert False, "Should raise exception"
@patch.object(Cobalt.Proxy.DeferredProxyMethod, '__call__', return_value=1)
def test_process_group_start_base(self, proxy):
'''basic process group startup'''
pgroup = ProcessGroup(self.base_spec)
data = pgroup.prefork()
pgroup.start()
proxy.assert_called_with([pgroup.executable] + pgroup.args, pgroup.tag,
"Job %s/%s/%s" %(pgroup.jobid, pgroup.user, pgroup.id), pgroup.env,
data, pgroup.runid)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment