Commit c5ea709a authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '38-large-message-hang' into 'master'

Fix for hang during large message fetch from Cray systems

In the end the buffer size had to be increased to avoid timing issues.
Added in further try-except safety checks to prevent system component
issues if this runs long agian.
Closes #38

See merge request !25
parents 71952df1 d173b47b
......@@ -76,6 +76,12 @@ any non-root simulation mode. If this is set and the forker components are not
running as root, this will cause any job ran to be run as the user to run as the
user that the forker component is running as.
.TP
.B pipe_buffsize
The size in bytes of the buffer to use for reading from a pipe connected to a
child process standard out. Increase this if you are getting particularly
large messages from standard out. This is most likely to occur with large Cray
systems using the BASIL interface to ALPS. Default 16777216 bytes.
.TP
.B save_me_interval
The minimum interval that Cobalt will wait between saving statefiles for this
component, in seconds. By default the interval is 10.0 seconds. Under periods
......
......@@ -18,8 +18,6 @@ import signal
import sys
import ConfigParser
import time
import errno
import select
import Cobalt
......@@ -35,6 +33,8 @@ import Cobalt.Util
sleep = Cobalt.Util.sleep
Timer = Cobalt.Util.Timer
from Cobalt.Util import init_cobalt_config, get_config_option
__all__ = [
"BaseForker",
"BaseChild"
......@@ -44,9 +44,11 @@ _logger = logging.getLogger(__name__.split('.')[-1])
config = ConfigParser.ConfigParser()
config.read(Cobalt.CONFIG_FILES)
init_cobalt_config()
PIPE_BUFSIZE = 4096
# Number of bytes to attempt to read at once from stdout. Excpect some large
# values due to large Cray system states/statuses.
PIPE_BUFSIZE = int(get_config_option('forker', ' pipe_buffsize', 16777216))
def get_forker_config(option, default):
try:
......@@ -120,6 +122,7 @@ class BaseChild (object):
self.stdin_string = kwargs.get('stdin_string', None) #string to send
self.use_stdout_string = kwargs.get('stdout_string', False)
self.stdout_string = ''
self.stdout_fd_gone = False
self.complete = False
self.lost_child = False
......@@ -616,6 +619,8 @@ class BaseForker (Component):
self.marked_for_death = state['marked_for_death']
else:
self.marked_for_death = {}
for child in self.children.values():
_logger.debug("Child found: %s", child.id)
def __save_me(self):
'''Periodically save off a statefile.'''
......@@ -779,18 +784,13 @@ class BaseForker (Component):
"""
#read any connected stdout to string pipes.
for child in self.children.itervalues():
if child.use_stdout_string:
if child.use_stdout_string and child.exit_status is None:
while True:
#(rd_list, wr_list, exc_list) = select.select([child.pipe_read],[],[],0)
#if len(rd_list) == 0:
# break
try:
child_str = os.read(child.pipe_read, PIPE_BUFSIZE)
except (OSError, IOError) as exc:
if exc.errno in [errno.EAGAIN, errno.EWOULDBLOCK]:
#read would block. Don't block and continue.
_logger.debug("%s: Read from child would block.",
child.label)
break
elif exc.errno in [errno.EBADF, errno.EINVAL, errno.EPIPE]:
_logger.error("%s: Error reading stdout from child pipe.",
......@@ -898,7 +898,7 @@ class BaseForker (Component):
else:
child.death_timer.max_time = child.death_timer.elapsed_time + self.DEATH_TIMEOUT
_wait = automatic(_wait, float(get_forker_config('wait_interval', 10)))
_wait = automatic(_wait, float(get_forker_config('wait_interval', 10.0)))
if __name__ == "__main__":
......
......@@ -6,6 +6,7 @@
import logging
import xml.etree
import xmlrpclib
from cray_messaging import InvalidBasilMethodError, BasilRequest
from cray_messaging import parse_response, ALPSError
from Cobalt.Proxy import ComponentProxy
......@@ -203,7 +204,15 @@ def _call_sys_forker(basil_path, in_str):
while True:
#Is a timeout needed here?
children = ComponentProxy(FORKER).get_children('apbridge', [runid])
try:
children = ComponentProxy(FORKER).get_children('apbridge', [runid])
except xmlrpclib.Fault as fault:
_logger.error('XMLRPC Fault recieved while fetching child %s status:', runid)
_logger.error('Child %s: Fault code: %s', runid, fault.faultCode)
_logger.error('Child %s: Fault string: %s', runid,
fault.faultString)
_logger.debug('Traceback information: for runid %s',runid,
exc_info=True)
complete = False
for child in children:
if child['complete']:
......@@ -211,7 +220,15 @@ def _call_sys_forker(basil_path, in_str):
_logger.error("BASIL returned a status of %s",
child['exit_status'])
resp = child['stdout_string']
ComponentProxy(FORKER).cleanup_children([runid])
try:
ComponentProxy(FORKER).cleanup_children([runid])
except xmlrpclib.Fault as fault:
_logger.error('XMLRPC Fault recieved while fetching child %s status:', runid)
_logger.error('Child %s: Fault code: %s', runid, fault.faultCode)
_logger.error('Child %s: Fault string: %s', runid,
fault.faultString)
_logger.debug('Traceback information: for runid %s',runid,
exc_info=True)
complete = True
if complete:
break
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment