Commit 8ae62195 authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '1.0.8-crayport-merge' into 'develop'

1.0.8 crayport merge

Final merge to synchronize cray-port and production repos.

See merge request !17
parents fd87b5fe 89eae311
= Changes from previous Cobalt Versions =
== Changes to 1.0.8 ==
* Fixed a bug where a node going down during a running job could allow that
node to be selected for draining.
* Expanded a buffer used by the forkers that should significantly improve the
speed of reading large ALPS responses and no longer interfere with node
reacquisition
* Cleaned up stray print statements in node information
* Fixed an error where score could be set to a string rather than a float via
the filter scripts
* Fixed an error where the queue-manager statefile could be lost if Cobalt was
brought down with a job in the "Terminal" state.
== Changes to 1.0.6 ==
* Backfill now supported on Cray systems.
* Major performance improvement to nodelist and nodeadm -l.
......
......@@ -76,6 +76,12 @@ any non-root simulation mode. If this is set and the forker components are not
running as root, this will cause any job ran to be run as the user to run as the
user that the forker component is running as.
.TP
.B pipe_buffsize
The size in bytes of the buffer to use for reading from a pipe connected to a
child process standard out. Increase this if you are getting particularly
large messages from standard out. This is most likely to occur with large Cray
systems using the BASIL interface to ALPS. Default 16777216 bytes.
.TP
.B save_me_interval
The minimum interval that Cobalt will wait between saving statefiles for this
component, in seconds. By default the interval is 10.0 seconds. Under periods
......
#!/bin/sh
#
# Cobalt
#
# chkconfig: 2345 18 09
# description: Cobalt resource manager/scheduler
# modified for use on Cray systems
prefix=/usr
exec_prefix=/usr/bin
CONFIGPATH=/etc
CONFIGFILE="${CONFIGPATH}/cobalt.conf"
DAEMONDIR=/usr/sbin
PIDPATH=/var/run/
HOSTNAME=`hostname`
# Configure DB2 environment
# . /discovery/db.src
umask 022
COMPONENT_LIST="slp system_script_forker alps_script_forker cqm bgsched alpssystem"
if [ -e '/etc/sysconfig/cobalt' ]
then
source /etc/sysconfig/cobalt
fi
# ANL Local:
#MPIRUN_VERBOSE=1
#export MPIRUN_VERBOSE
#LD_LIBRARY_PATH=/bgsys/drivers/ppcfloor/lib64:$LD_LIBRARY_PATH
#export LD_LIBRARY_PATH
# Starts the component specified as the first argument.
# Optional second argument used for extra arguments to
# pass to the component.
start_component () {
component=$1
if [ -n $2 ] ; then
component_args=$2
else
component_args=''
fi
pid=`pgrep -f ${component}.py`
echo -n "Starting ${component}: "
if [ -z "$pid" ]
then
(
date=`/bin/date`
if [ "$component" == "alps_script_forker" ]
then
echo "--- $date: START ${component} ---" >>/var/log/cobalt/${component}_${HOSTNAME}.out
nohup "${DAEMONDIR}/${component}.py" $component_args --config-files "${CONFIGFILE}" >>/var/log/cobalt/${component}_${HOSTNAME}.out 2>&1 &
else
echo "--- $date: START ${component} ---" >>/var/log/cobalt/${component}.out
nohup "${DAEMONDIR}/${component}.py" $component_args --config-files "${CONFIGFILE}" >>/var/log/cobalt/${component}.out 2>&1 &
fi
)
echo "done"
return 0
else
echo "failed -- $component already running (pid $pid)"
return 1
fi
}
# Stops the component specified as the first argument.
stop_component () {
component=$1
# if [ -f "${PIDPATH}/${component}.pid" ] ; then
echo -n "Stopping ${component}: "
# kill -INT `cat ${PIDPATH}/${component}.pid`
pkill -f ${DAEMONDIR}/${component}.py
echo "done"
# rm -f "${PIDPATH}/${component}.pid"
# else
# echo "${component} not running"
# fi
return 0
}
stat_component () {
component=$1
pid=`pgrep -f ${component}.py`
echo -n "Component: $component: "
if [ -z "$pid" ]
then
echo STOPPED
return 3
else
echo RUNNING
return 0
fi
return 0
}
case "$1" in
start)
# ANL Localization: No scriptm on surveyor
if [ -e /var/lock/subsys/cobalt ]
then
echo Cobalt already started
exit 0
fi
for component in $COMPONENT_LIST; do
#if [ "$component" == "bgqsystem" ] ; then
# source ~bgqsysdb/sqllib/db2profile
#else
# source ~db2cat/sqllib/db2profile
#fi
start_component $component
done
touch /var/lock/subsys/cobalt
;;
start-component )
for component in $*
do
if [ $component != "start-component" ]
then
start_component $component
fi
done
touch /var/lock/subsys/cobalt
;;
stop-component )
for component in $*
do
if [ $component != "stop-component" ]
then
stop_component $component
fi
done
;;
start-debug)
# ANL Localization: No scriptm on surveyor
# for component in slp brooklyn cqm bgsched bgforker bgsystem scriptm cdbwriter; do
for component in $COMPONENT_LIST; do
start_component $component
done
touch /var/lock/subsys/cobalt
;;
stop)
# ANL Localization: No scriptm on surveyor
# for component in slp cqm bgsched bgforker bgsystem scriptm cdbwriter; do
for component in $COMPONENT_LIST; do
stop_component $component
done
rm -f /var/lock/subsys/cobalt
;;
stop-debug)
# ANL Localization: No scriptm on surveyor
# for component in slp brooklyn cqm bgsched bgforker bgsystem scriptm cdbwriter; do
for component in $COMPONENT_LIST; do
stop_component $component
done
rm -f /var/lock/subsys/cobalt
;;
restart)
if [ $# -gt 1 ] ; then #specific components to be restarted
shift
for component in $@; do
case "$component" in
cqm|bgsched|alpssystem)
stop_component $component
start_component $component
;;
slp|bg_runjob_forker|user_script_forker|system_script_forker|alps_script_forker)
echo "${component} cannot be restarted with jobs running"
;;
esac
done
else #restart all components
$0 stop
$0 start
fi
;;
status)
count=0
running=0
failed=0
for component in $COMPONENT_LIST; do
count=$(($count + 1))
stat_component $component
if [ 0 -ne $? ]
then
failed=$(($failed + 1))
else
running=$(($running + 1))
fi
done
if [ $running -eq $count ]
then
echo All Components Running
exit 0
elif [ $failed -eq $count ]
then
echo All Components Stopped
exit 3
else
exit 4
fi
;;
force-reload)
$0 stop
$0 start
;;
*)
echo "Usage: $0 {start|stop|restart|force-reload|start-component <component>|stop-component <component>}"
echo "Default daemons are: $COMPONENT_LIST"
exit 1
esac
exit 0
......@@ -18,8 +18,6 @@ import signal
import sys
import ConfigParser
import time
import errno
import select
import Cobalt
......@@ -35,6 +33,8 @@ import Cobalt.Util
sleep = Cobalt.Util.sleep
Timer = Cobalt.Util.Timer
from Cobalt.Util import init_cobalt_config, get_config_option
__all__ = [
"BaseForker",
"BaseChild"
......@@ -44,9 +44,11 @@ _logger = logging.getLogger(__name__.split('.')[-1])
config = ConfigParser.ConfigParser()
config.read(Cobalt.CONFIG_FILES)
init_cobalt_config()
PIPE_BUFSIZE = 4096
# Number of bytes to attempt to read at once from stdout. Excpect some large
# values due to large Cray system states/statuses.
PIPE_BUFSIZE = int(get_config_option('forker', ' pipe_buffsize', 16777216))
def get_forker_config(option, default):
try:
......@@ -120,6 +122,7 @@ class BaseChild (object):
self.stdin_string = kwargs.get('stdin_string', None) #string to send
self.use_stdout_string = kwargs.get('stdout_string', False)
self.stdout_string = ''
self.stdout_fd_gone = False
self.complete = False
self.lost_child = False
......@@ -616,6 +619,8 @@ class BaseForker (Component):
self.marked_for_death = state['marked_for_death']
else:
self.marked_for_death = {}
for child in self.children.values():
_logger.debug("Child found: %s", child.id)
def __save_me(self):
'''Periodically save off a statefile.'''
......@@ -779,18 +784,13 @@ class BaseForker (Component):
"""
#read any connected stdout to string pipes.
for child in self.children.itervalues():
if child.use_stdout_string:
if child.use_stdout_string and child.exit_status is None:
while True:
#(rd_list, wr_list, exc_list) = select.select([child.pipe_read],[],[],0)
#if len(rd_list) == 0:
# break
try:
child_str = os.read(child.pipe_read, PIPE_BUFSIZE)
except (OSError, IOError) as exc:
if exc.errno in [errno.EAGAIN, errno.EWOULDBLOCK]:
#read would block. Don't block and continue.
_logger.debug("%s: Read from child would block.",
child.label)
break
elif exc.errno in [errno.EBADF, errno.EINVAL, errno.EPIPE]:
_logger.error("%s: Error reading stdout from child pipe.",
......@@ -898,7 +898,7 @@ class BaseForker (Component):
else:
child.death_timer.max_time = child.death_timer.elapsed_time + self.DEATH_TIMEOUT
_wait = automatic(_wait, float(get_forker_config('wait_interval', 10)))
_wait = automatic(_wait, float(get_forker_config('wait_interval', 10.0)))
if __name__ == "__main__":
......
......@@ -2,7 +2,6 @@
# $Id$
'''Cobalt Queue Manager'''
__revision__ = '$Revision$'
#
# TODO:
......@@ -68,7 +67,6 @@ __revision__ = '$Revision$'
# to be non-zero and thus would be a valid exit status if the task was terminated.
#
DEFAULT_FORCE_KILL_DELAY = 5 # (in minutes)
import errno
import logging
......@@ -103,8 +101,10 @@ from Cobalt import accounting
from Cobalt.Statistics import Statistics
from Cobalt.Util import get_config_option, init_cobalt_config
__revision__ = '$Revision$'
init_cobalt_config()
DEFAULT_FORCE_KILL_DELAY = 5 # (in minutes)
CLOB_SIZE = 4096
logger = logging.getLogger(__name__.split('.')[-1])
......@@ -788,7 +788,6 @@ class Job (StateMachine):
self.runid = state.get("runid", None)
#for old statefiles, make sure to update the dependency state on restart:
self.__dep_hold = False
self.update_dep_state()
self.initializing = False
def __task_signal(self, retry = True):
......@@ -3584,6 +3583,18 @@ class QueueManager(Component):
Component.__setstate__(self, state)
self.Queues = state['Queues']
# jobs are reloaded after queues. Update dependencies on jobs now. In
# the event of a non-terminal job, don't try and transition it
# FIXME: migrate change here.
for queue in self.Queues.values():
for job in queue.jobs:
try:
job.update_dep_state()
except StateMachineIllegalEventError:
if job.state == 'done':
logger.warning('Job %s/%s: Job in Terminal state found.', job.jobid, job.user)
else:
raise
use_db_jobid_generator = get_cqm_config("use_db_jobid_generator", "False").lower() in Cobalt.Util.config_true_values
self.id_gen = IncrID(use_database = use_db_jobid_generator)
self.id_gen.set(state['next_job_id'], override = True)
......@@ -3625,7 +3636,6 @@ class QueueManager(Component):
if state.has_key('overflow') and (dbwriter.max_queued != None):
dbwriter.overflow = state['overflow']
def __save_me(self):
Component.save(self)
__save_me = automatic(__save_me, float(get_cqm_config('save_me_interval', 10)))
......@@ -3807,7 +3817,7 @@ class QueueManager(Component):
failure_msg = 'No Max Walltime default or for queue "%s" defined. Please contact system administrator' % spec['queue']
logger.error(failure_msg)
raise QueueError, failure_msg
spec.update({'adminemail':self.Queues[spec['queue']].adminemail})
if walltime_prediction_enabled:
spec['walltime_p'] = self.get_walltime_p(spec) #*AdjEst*
......@@ -3838,13 +3848,20 @@ class QueueManager(Component):
if updates.has_key("queue"):
new_q_name = updates["queue"]
if new_q_name not in self.Queues:
logger.error("attempted to move a job to non-existent queue '%s'" % new_q_name)
raise QueueError, "Error: queue '%s' does not exist" % new_q_name
logger.error("attempted to move a job to non-existent queue '%s'",
new_q_name)
raise QueueError("Error: queue '%s' does not exist" % new_q_name)
for job in joblist:
if job.is_active or job.has_completed:
raise QueueError, "job %d is running; it cannot be moved" % job.jobid
raise QueueError("job %d is running; it cannot be moved" % job.jobid)
if updates.has_key("score"):
try:
updates['score'] = float(updates['score'])
except ValueError:
logger.error('new score of %s cannot be converted to a floating point value',
updates['score'])
raise QueueError("Bad new score value.")
for job in joblist:
......
......@@ -6,6 +6,7 @@
import logging
import xml.etree
import xmlrpclib
from cray_messaging import InvalidBasilMethodError, BasilRequest
from cray_messaging import parse_response, ALPSError
from Cobalt.Proxy import ComponentProxy
......@@ -203,7 +204,15 @@ def _call_sys_forker(basil_path, in_str):
while True:
#Is a timeout needed here?
children = ComponentProxy(FORKER).get_children('apbridge', [runid])
try:
children = ComponentProxy(FORKER).get_children('apbridge', [runid])
except xmlrpclib.Fault as fault:
_logger.error('XMLRPC Fault recieved while fetching child %s status:', runid)
_logger.error('Child %s: Fault code: %s', runid, fault.faultCode)
_logger.error('Child %s: Fault string: %s', runid,
fault.faultString)
_logger.debug('Traceback information: for runid %s',runid,
exc_info=True)
complete = False
for child in children:
if child['complete']:
......@@ -211,7 +220,15 @@ def _call_sys_forker(basil_path, in_str):
_logger.error("BASIL returned a status of %s",
child['exit_status'])
resp = child['stdout_string']
ComponentProxy(FORKER).cleanup_children([runid])
try:
ComponentProxy(FORKER).cleanup_children([runid])
except xmlrpclib.Fault as fault:
_logger.error('XMLRPC Fault recieved while fetching child %s status:', runid)
_logger.error('Child %s: Fault code: %s', runid, fault.faultCode)
_logger.error('Child %s: Fault string: %s', runid,
fault.faultString)
_logger.debug('Traceback information: for runid %s',runid,
exc_info=True)
complete = True
if complete:
break
......
......@@ -754,7 +754,6 @@ class CraySystem(BaseSystem):
if self.nodes[str(node_id)].status in
self.nodes[str(node_id)].DOWN_STATUSES]
if drain_time is not None:
print drain_time, BACKFILL_EPSILON, drain_time - BACKFILL_EPSILON
unavailable_nodes.extend([node_id for node_id in node_id_list
if (self.nodes[str(node_id)].draining and
(self.nodes[str(node_id)].drain_until - BACKFILL_EPSILON) < int(drain_time))])
......@@ -1002,8 +1001,14 @@ class CraySystem(BaseSystem):
nid in required) and
not self.nodes[str(nid)].draining)]
for nid in running_nodes:
self.nodes[str(nid)].set_drain(loc_time[1], job['jobid'])
candidate_list.extend(running_nodes)
# We set a drain on all running nodes for use in a later
# so that we can "favor" draining on the longest
# running set of nodes.
if (self.nodes[str(nid)].status != 'down' and
self.nodes[str(nid)].managed):
self.nodes[str(nid)].set_drain(loc_time[1], job['jobid'])
candidate_list.extend([nid for nid in running_nodes if
self.nodes[str(nid)].draining])
candidate_drain_time = int(loc_time[1])
if len(candidate_list) >= int(job['nodes']):
# Enough nodes have been found to drain for this job
......
......@@ -1325,7 +1325,6 @@ def print_node_list():
for headding in fetch_header:
if headding in header_aliases.keys():
fetch_header[fetch_header.index(headding)] = header_aliases[headding]
print fetch_header
nodes = json.loads(component_call(SYSMGR, False, 'get_nodes',
(True, None, fetch_header, True)))
......
......@@ -29,6 +29,7 @@ config_fp.close()
import ConfigParser
from nose.tools import timed, TimeExpired
from nose.tools import raises
import os
import os.path
import pwd
......@@ -332,7 +333,8 @@ class TestCQMJobManagement (TestCQMComponent):
group_name = grp.getgrgid(os.getegid()).gr_name #group we're running under for verification check.
self.cqm.add_queues([{'name':"default"}])
self.cqm.add_queues([{'name':"restricted-group"}])
self.cqm.set_queues([{'name':"restricted-group"}], {'groups':'bar'})
self.cqm.set_queues([{'name':"restricted-group"}],
{'groups':'ThisGroupDefinitelyDoesntExist'})
self.cqm.add_jobs([{'queue':"default", 'jobname':"hello", 'user':pwd.getpwuid(os.getuid()).pw_name}])
try:
......@@ -349,6 +351,37 @@ class TestCQMJobManagement (TestCQMComponent):
#assert len(r) == 1
#assert r[0].queue == "restricted-group"
def test_set_job_score_type_from_float(self):
# Ensure that job scores are set to a float and are appropriately cast
self.cqm.add_queues([{'tag':"queue", 'name':"default"}])
self.cqm.add_jobs([{'tag':"job", 'queue':"default"}])
self.cqm.set_jobs([{'tag':"job", 'queue':"*"}], {'score':0.1})
job = self.cqm.get_jobs([{'tag':"job", 'queue':"default"}])[0]
assert type(job.score) == type(0.0), "Job score not float type when set from float"
def test_set_job_score_type_from_int(self):
# Ensure that job scores are set to a float and are appropriately cast
self.cqm.add_queues([{'tag':"queue", 'name':"default"}])
self.cqm.add_jobs([{'tag':"job", 'queue':"default"}])
self.cqm.set_jobs([{'tag':"job", 'queue':"*"}], {'score':1})
job = self.cqm.get_jobs([{'tag':"job", 'queue':"default"}])[0]
assert type(job.score) == type(0.0), "Job score not float type when set from int"
def test_set_job_score_type_from_string(self):
# Ensure that job scores are set to a float and are appropriately cast
self.cqm.add_queues([{'tag':"queue", 'name':"default"}])
self.cqm.add_jobs([{'tag':"job", 'queue':"default"}])
self.cqm.set_jobs([{'tag':"job", 'queue':"*"}], {'score': "0.1"})
job = self.cqm.get_jobs([{'tag':"job", 'queue':"default"}])[0]
assert type(job.score) == type(0.0), "Job score not float type when set from string"
@raises(Cobalt.Exceptions.QueueError)
def test_set_job_score_bad_string(self):
# Refuse to queue if score cannot be cast to a float.
self.cqm.add_queues([{'tag':"queue", 'name':"default"}])
self.cqm.add_jobs([{'tag':"job", 'queue':"default"}])
self.cqm.set_jobs([{'tag':"job", 'queue':"*"}], {'score': "NotANumber"})
class Task (Data):
required_fields = ['jobid', 'location', 'user', 'cwd', 'executable', 'args', ]
fields = Data.fields + ["id", "jobid", "location", "size", "mode", "user", "executable", "args", "env", "cwd", "umask",
......
......@@ -584,6 +584,24 @@ class TestCraySystem(object):
assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
assert_match(self.system.nodes[str(i)].drain_until, now + 300, "Bad drain time")
def test_select_nodes_for_draining_running_but_down(self):
'''CraySystem._select_nodes_for_draining: do not drain down node if job still "running"'''
# If a node dies while a job is running, it will still show up in the
# end-times range until termination of that job is complete.
end_times = [[['1-4'], 100.0]]
self.system.nodes['2'].status = 'down'
self.base_job['nodes'] = 4
drain_nodes = self.system._select_nodes_for_draining(self.base_job,
end_times)
assert_match(sorted(drain_nodes), ['1', '3', '4', '5'], "Bad Selection")
assert_match(self.system.nodes['2'].draining, False, "Draining set")
assert_match(self.system.nodes['2'].drain_jobid, None, "Should not have drain_jobid", is_match)
assert_match(self.system.nodes['2'].drain_until, None, "Should not have drain_until", is_match)
for i in ['1', '3', '4', '5']:
assert_match(self.system.nodes[str(i)].draining, True, "Draining not set")
assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
assert_match(self.system.nodes[str(i)].drain_until, 100.0, "Bad drain time")
# common checks for find_job_location
def assert_draining(self, nid, until, drain_jobid):
assert self.system.nodes[str(nid)].draining, "Node %s should be draining" % nid
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment