Commit 87c014eb authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '45-message-race' into 'develop'

Fixing possible race condition, reported in ticket #45.

Adding a final "read the stdout redirect" after exit status collected.
This addresses a potential issue spotted in the CRAY port work merge.

Closes #45

See merge request !20
parents 964169a0 5189c42a
......@@ -779,11 +779,29 @@ class BaseForker (Component):
cleanup_children = exposed(cleanup_children)
def _wait(self):
"""Call os.waitpid to status of dead processes.
"""
#read any connected stdout to string pipes.
for child in self.children.itervalues():
def _read_stdout_pipe(self, child_pids=None):
'''Read messages from our children that are using redirected stdout to
string pipes.
Args:
child_pids: An optional list of child_pids to fetch information for.
Default: None.
Returns:
None
Side Effects:
Updates the output string buffer for running children from the child
process' stdout.
'''
if child_pids is None:
children = self.children.itervalues()
else:
children = [child for child in self.children.itervalues()
if child.pid in child_pids]
for child in children:
if child.use_stdout_string and child.exit_status is None:
while True:
try:
......@@ -799,10 +817,20 @@ class BaseForker (Component):
elif exc.errno in [errno.EINTR]:
#Try again
continue
else:
_logger.error("%s: Error reading stdout from child pipe: [%s] %s",
child.label, errno.errorcode[exc.errno],
exc.strerror, exc_info=True)
else:
if child_str == '':
break #we're done
child.stdout_string += child_str
def _wait(self):
"""Call os.waitpid to status of dead processes.
"""
self._read_stdout_pipe()
while True:
try:
pid, status = os.waitpid(-1, os.WNOHANG)
......@@ -837,6 +865,11 @@ class BaseForker (Component):
if child.pid == pid:
_logger.info("task %s: dead pid %s matches child %s", child.label, pid, child.id)
if child.use_stdout_string:
# need to do a final read for anything remaining
# post-exit, then close the fd.
self._read_stdout_pipe([child.pid])
child.close_read_pipe()
child.exit_status = exit_status
child.core_dump = core_dump
child.signum = signum
......@@ -844,12 +877,6 @@ class BaseForker (Component):
child.complete = True
if self.marked_for_death.has_key(child.id):
del self.marked_for_death[child.id]
if child.use_stdout_string:
#need to close pipe. Have alredy read above and if
#we're here, the child's already dead anyway and the
#last of the data in the pipe would have been read by
#now. Write pipe was closed a long time ago.
child.close_read_pipe()
if child.return_output:
try:
if child.stdout_file:
......@@ -901,29 +928,29 @@ class BaseForker (Component):
_wait = automatic(_wait, float(get_forker_config('wait_interval', 10.0)))
if __name__ == "__main__":
import time
print "Initiating forker unit tests"
test_count = IncrID()
with open("CHANGES") as test_in:
test_str = "#".join(test_in)
forker = BaseForker()
forker.child_cls = BaseChild
child_id = forker.fork(['/usr/bin/grep', '.*'], stdin_string=test_str,
stdout_string=True)
complete = False
child = None
while(not complete):
forker._wait()
children = forker.get_children(child_ids=[child_id])
child = children[0]
if child['complete']:
print child['stdout_string']
complete = child['complete']
time.sleep(1)
forker.cleanup_children([child_id])
# if __name__ == "__main__":
# import time
# print "Initiating forker unit tests"
# test_count = IncrID()
# with open("CHANGES") as test_in:
# test_str = "#".join(test_in)
# forker = BaseForker()
# forker.child_cls = BaseChild
# child_id = forker.fork(['/usr/bin/grep', '.*'], stdin_string=test_str,
# stdout_string=True)
# complete = False
# child = None
# while(not complete):
# forker._wait()
# children = forker.get_children(child_ids=[child_id])
# child = children[0]
# if child['complete']:
# print child['stdout_string']
# complete = child['complete']
# time.sleep(1)
# forker.cleanup_children([child_id])
# init_pid = forker.fork("/bin/ls", runid=1)
# print test_count.next(),":", "forked process with pid %s" % init_pid
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment