Commit e07c32ce authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch 'cray-merge-common' into 'develop'

Common Cobalt library file changes for Cray Port

See merge request !12
parents de7a1b85 7d8a7934
......@@ -4,12 +4,18 @@ __revision__ = "$Revision$"
import logging
import signal
from Cobalt.Data import Data, DataDict, IncrID
from Cobalt.Exceptions import DataCreationError
from Cobalt.Exceptions import DataCreationError, ProcessGroupStartupError
from Cobalt.Exceptions import ComponentLookupError
from Cobalt.Proxy import ComponentProxy
_logger = logging.getLogger()
#Get a list of valid signal strings
SIGNALS = [ s for s in signal.__dict__.keys()
if (s.startswith("SIG") and not s.startswith("SIG_"))]
class ProcessGroup(Data):
"""A job that runs on the system
......@@ -36,6 +42,7 @@ class ProcessGroup(Data):
stdout -- file to use for stdout of script
umask -- permissions to set
user -- the user the process group is running under
sigkill_time -- time at which to send a sigkill (seconds since epoch)
Only used by BlueGene/Q systems:
ion_kernel -- alternatve ION kernel to boot
......@@ -50,7 +57,7 @@ class ProcessGroup(Data):
"stdin", "stdout", "umask", "user", "starttime",
"walltime", "resid", "runid", "forker",
"subblock", "subblock_parent", "corner", "extents",
"attrs", "alps_res_id"]
required = Data.required + ["args", "cwd", "executable", "jobid",
"location", "size", "user"]
......@@ -92,6 +99,11 @@ class ProcessGroup(Data):
self.corner = spec.get("corner", None)
self.extents = spec.get("extents", None)
self.attrs = spec.get("attrs", {})
self.label = "%s/%s/%s" % (self.jobid, self.user,
self.sigkill_timeout = None
#TODO: extract into subclass
self.alps_res_id = spec.get('alps_res_id', None)
self.startup_timeout = spec.get("pgroup_startup_timeout", 0)
def __getstate__(self):
data = {}
......@@ -121,13 +133,44 @@ class ProcessGroup(Data):
self.head_pid = ComponentProxy(self.forker, retry=False).fork([self.executable] + self.args, self.tag,
"Job %s/%s/%s" %(self.jobid, self.user,, self.env, data, self.runid)
_logger.error("Job %s/%s/%s: problem forking; %s did not return a child id", self.jobid, self.user,,
err = "Job %s/%s/%s: problem forking; %s did not return a child id" % (self.jobid,
self.user,, self.forker)
raise ProcessGroupStartupError(err)
def signal(self, signame="SIGINT"):
'''Validate and send signal to ProcessGroup. Consult your system and
python documentation for valid signals to send.
signame - the string name of a signal to send. This must be a
signal supported by python's 'signal' library.
True if signal successfully sent. False otherwise
ValueError - The signame was set to an invalid value.
success = False
if signame not in SIGNALS:
raise ValueError("%s is not a valid signal on this system." % signame)
ComponentProxy(self.forker).signal(self.head_pid, signame)
except ComponentLookupError:
_logger.error("pg %s: Unable to reach forker to send signal %s",, signame)
except Exception:
_logger.error("Unexpected exception in ProcessGroup.signal:",
success = True
return success
def prefork (self):
"""This method is called before the fork, while it's still safe to
call object methods. It returns a dictionary, which can be passed to
"""This method is called before the fork, while it's still safe to
call object methods. It returns a dictionary, which can be passed to
a totally static function which handles the exec from inside the child
......@@ -138,6 +181,31 @@ class ProcessGroup(Data):
data[key] = value
return data
def update_data(self, child):
'''incorprate child termination data into the process group.
child - child data from a forker component.
if child['complete']:
if self.exit_status is None:
self.exit_status = child['exit_status']
if child['lost_child']:
self.exit_status = 256
_logger.warning('%s: child process reported lost from %s',
self.label, self.forker)
if child['signum'] == 0:"%s: job exited with status %s", self.label, self.exit_status)
if child["core_dump"]:
core_dump_str = ", core dumped"
core_dump_str = """%s: terminated with signal %s%s", self.label, child["signum"], core_dump_str)
class ProcessGroupDict(DataDict):
"""A container for holding process groups, keyed by id"""
......@@ -146,3 +146,30 @@ class RequiredLocationError(LookupError):
log = True
fault_code =
class ProcessGroupStartupError(Exception):
'''Error while starting a process group'''
log = True
fault_code =
# New system component exceptions.
class UnmanagedResourceError(Exception):
'''Raised if an invalid action is performed on a tracked, but
unmanaged resource.
log = True
fault_code =
class InvalidStatusError(ValueError):
'''Raised if a resource is set to a status not in its predefined resource
status list.
log = True
fault_code =
class UnschedulableNodeError(RuntimeError):
'''Raise if an action isn't valid on a node marked "unscheduled"'''
log = True
fault_code =
......@@ -85,7 +85,7 @@ def check_required_options(secopt_list):
def get_config_option(section, option, *args):
'''Get an option from the cobalt config file. Must be called after
A default value may be specified as the third argument. If the option
is not found and a default is specified, then the default will be
returned. If a default value is not specified, then a message will be
......@@ -280,6 +280,12 @@ def merge_nodelist(locations):
prefix_max_digits = {}
prefix_format_str = {}
#if this doesn't have a prefix, like a Cray nidlist, short circut
if len(locations) >= 1:
if reg.match(locations[0]) is None:
#this is cray-style
return compact_num_list(locations)
for name in locations:
prefix = reg.match(name).group(1)
nodenum = reg.match(name).group(2)
......@@ -1181,3 +1187,54 @@ def validate_geometry(geometry_str, nodecount):
raise JobValidationError("Geometry requires more nodes than specified for job.")
return True
def compact_num_list(num_list):
'''Given a list of integers return a compact string representation.
The entries are comma-separated. If a contiguous sequence of integers
exist, they are compacted into the form "a-b" where the range is a to b,
begin = None
end = begin
retcompact = []
working_list = [int(num) for num in num_list]
def append_run(begin, end):
'''convenience function for appending a value to the return list'''
if begin == end:
retcompact.append("%s-%s" % (begin, end))
for num in sorted(working_list):
if begin is None:
begin = num
end = num
elif end + 1 == num:
end = num
else: #run just ended. Set up for a new run and store current one.
append_run(begin, end)
begin = num
end = num
append_run(begin, end)
return ','.join(retcompact)
def expand_num_list(num_list):
'''Take a compact, comma-seperated string of integer values and ranges and
expand to a list of integers that is represented by that string. Ranges of
the form "a-b" will be expanded to the full sequience of integers from a to
b, inclusive.
retlist = []
elems = num_list.split(',')
for elem in elems:
if elem == '':
elif len(elem.split('-')) == 1:
nums = elem.split('-')
low = min(int(nums[0]), int(nums[1]))
high = max(int(nums[0]), int(nums[1])) + 1
retlist.extend(xrange(low, high))
return retlist
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment