Commit a9388bd8 authored by Paul Rich's avatar Paul Rich
Browse files

Adding in modified forker for cray script launch.

A modified version of the user script forker is needed so that we can
confirm the ALPS reservation and set the pg_id from the child.  This
will let apruns from a user script run.
parent 82331921
import logging
import os
import pwd
import tempfile
import signal
import subprocess
import Cobalt
import Cobalt.Components.pg_forker
PGChild = Cobalt.Components.pg_forker.PGChild
PGForker = Cobalt.Components.pg_forker.PGForker
import Cobalt.Util
from cray_messaging import BasilRequest
from cray_messaging import parse_response, ALPSError
exposed = Cobalt.Components.base.exposed
convert_argv_to_quoted_command_string = Cobalt.Util.convert_argv_to_quoted_command_string
_logger = logging.getLogger(__name__.split('.')[-1])
#CONFIG POINT TO ALPS
BASIL_PATH = '/home/richp/alps_simulator/apbasil.sh' #fetch this from config
class UserScriptChild (PGChild):
def __init__(self, id = None, **kwargs):
PGChild.__init__(self, id=id, **kwargs)
try:
self.bg_partition = self.pg.location[0]
except IndexError:
_logger.error("%s: no partition was specified", self.label)
raise
data = kwargs['data']
if data.has_key('nodect'):
self.pg.nodect = data['nodect']
else:
self.pg.nodect = self.pg.size
def __getstate__(self):
state = {}
state.update(PGChild.__getstate__(self))
return state
def __setstate__(self, state):
PGChild.__setstate__(self, state)
def preexec_first(self):
PGChild.preexec_first(self)
try:
user_info = pwd.getpwnam(self.pg.user)
shell = user_info.pw_shell
homedir = user_info.pw_dir
except:
_logger.error("%s: unable to obtain account information for user %s", self.label, self.pg.user)
self.print_clf_error("unable to obtain account information for user %s", self.pg.user)
raise
if not self.cwd:
self.cwd = homedir
self.env = {}
self.env.update(self.pg.env)
self.env['HOME'] = homedir
self.env['USER'] = self.pg.user
self.env['LOGNAME'] = self.pg.user
self.env['SHELL'] = shell
self.env["COBALT_PARTNAME"] = self.bg_partition
self.env["COBALT_PARTSIZE"] = str(self.pg.nodect)
self.env["COBALT_JOBSIZE"] = str(self.pg.size)
#used for "simulation modes"
if os.environ.has_key('COBALT_CONFIG_FILES'):
self.env['COBALT_CONFIG_FILES'] = os.environ['COBALT_CONFIG_FILES']
if os.environ.has_key('COBALT_SOURCE_DIR'):
self.env['COBALT_SOURCE_DIR'] = os.environ['COBALT_SOURCE_DIR']
if os.environ.has_key('COBALT_RUNTIME_DIR'):
self.env['COBALT_RUNTIME_DIR'] = os.environ['COBALT_RUNTIME_DIR']
#Confirm the ALPS reservation -- may need to regenerate the reservation.
if not self._confirm_alps_reservation():
_logger.error('%s: Unable to confirm ALPS reservation. Terminating.',
self.pg.label)
sys.exit(1)
# One last bit of mangling to prevent premature splitting of args
# quote the argument strings so the shell doesn't eat them.
self.cmd_string = convert_argv_to_quoted_command_string(self.args)
self.exe = shell
self.args = ["-", "-c", "exec " + self.cmd_string]
def _confirm_alps_reservation(self):
'''confirm the alps reservation. If needed, replace the ALPS
reservation. Cobalt's already holding these resources.
If resources can't be confirmed, treat as a "boot failure" and
terminate child.
'''
rc = False
success = self._send_confirm()
#if a failure, re-reserve. Use child data to reassociate reservation
#with hardware in system componient.
if not success:
#rereserve
reserve_request(self.pg.user, self.pg.jobid, self.pg.nodect,
self.pg.node_ids)
basil = subprocess.Popen(BASIL_PATH, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if basil.returncode != 0:
try:
response = parse_response(stdout)
#won't need the response itself beyond that we were successful.
except ALPSError:
_logger.warning('%s: unable to reserve nodes in ALPS: %s',
self.pg.label, self.pg.location)
else:
self.pg.alps_res_id = response['reservation_id']
success = self._send_confirm()
if success:
rc = True
else:
#Can't re-reserve. We're dead at this point. Exit child
#process now.
_logger.error('%s re-resevation failed.', self.pg.label)
else:
self.logger.info('%s: ALPS reservation %s confirmed', self.pg.label,
self.pg.alps_res_id)
rc = True
return rc
def _send_confirm(self):
success = False
params = {'alps_res_id': self.pg.alps_res_id,
'pagg_id': os.getpgid()}
confirm_request = BasilRequest('CONFIRM', params)
#call alps
#if confirmed, we should have the process group as pagg_id
basil = subprocess.Popen(BASIL_PATH, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#couldn't confirm, log message and let failure happen
stdout, stderr = basil.communicate(str(confirm_request))
if basil.returncode == 0:
#if we get a nonzero, that's a failure, fall through to return no
#success
try:
response = parse_response(stdout)
#won't need the response itself beyond that we were successful.
except ALPSError:
_logger.warning('%s: unable to confirm ALPS reservation %s',
self.pg.label, self.pg.alps_res_id)
else:
success = True
else:
_logger.error('%s: Child exited with stderr: %s', self.pg.label,
stderr)
return success
def preexec_last(self):
PGChild.preexec_last(self)
def signal(self, signum, pg=True):
#due to how cleanup happens, pg must always be true.
pg = True
PGChild.signal(self, signum, pg)
class UserScriptForker (PGForker):
"""Component for starting script jobs"""
name = __name__.split('.')[-1]
implementation = name
child_cls = UserScriptChild
logger = _logger
def __init__ (self, *args, **kwargs):
"""Initialize a new user script forker.
All arguments are passed to the base forker constructor.
"""
PGForker.__init__(self, *args, **kwargs)
def __getstate__(self):
return PGForker.__getstate__(self)
def __setstate__(self, state):
PGForker.__setstate__(self, state)
def signal(self, child_id, signame):
"""
Signal a child process.
Arguments:
child_id -- id of the child to signal
signame -- signal name
"""
_logger.debug("Using overridden signal method.")
if not self.children.has_key(child_id):
_logger.error("Child %s: child not found; unable to signal", child_id)
return
try:
signum = getattr(signal, signame)
except AttributeError:
_logger.error("%s: %s is not a valid signal name; child not signaled", child.label, signame)
raise
pg = True
if self.children[child_id].pg.attrs.has_key('nopgkill'):
pg = False
super(UserScriptChild, self.children[child_id]).signal(signum, pg=pg)
signal = exposed(signal)
......@@ -36,6 +36,8 @@ def reserve(user, jobid, nodecount, node_id_list=None):
params['batch_id'] = jobid
params['width'] = nodecount
params['depth'] = 1 #FIXME fix this. Pass this in from qsub. FIXME
if node_id_list is not None:
params['node_id_list'] = node_id_list
print str(BasilRequest('RESERVE', params=params))
retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RESERVE',
params=params)))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment