Commit b9de5d5e authored by Eric Pershey's avatar Eric Pershey Committed by Eric Pershey
Browse files

Added instrumented logging to many of the components.

Wrapped Proxy errors with sanatize password.
Cleaned up logging spelling mistakes.
Added extract_traceback to format the exceptions
Added get_current_thread_identifier to help identify threads
Added get_caller to help identify the caller.
Added cray_messaging to aid in debugging and allow importing.
Fixed a bug that would allow retrying of a fork.
Added debugging tools of slp_hammer.py and slp_nail.py
Updated the simulator to simulate background tasks.
parent c3213b52
......@@ -9,6 +9,7 @@ from Cobalt.Components.base import run_component
if __name__ == "__main__":
try:
run_component(TimingServiceLocator, register=False)
#FIXME: we cannot register slp because of a problem with _register located in the property register.
run_component(TimingServiceLocator, register=False, time_out=10.0)
except KeyboardInterrupt:
sys.exit(1)
../tools/cray_messaging.py
\ No newline at end of file
......@@ -53,7 +53,7 @@ def state_file_location():
def run_component (component_cls, argv=None, register=True, state_name=False,
cls_kwargs={}, extra_getopt='', time_out=10,
single_threaded=False, seq_num=0, aug_comp_name=False,
state_name_match_component=False):
state_name_match_component=False, sleeptime=10.0):
'''Run the specified Cobalt component until recieving signal to terminate.
Args::
......@@ -169,11 +169,12 @@ def run_component (component_cls, argv=None, register=True, state_name=False,
capath = None
if single_threaded:
# sleeptime is not used due to differences in api.
server = BaseXMLRPCServer(location, keyfile=keypath, certfile=certpath,
cafile=capath, register=register, timeout=time_out)
else:
server = XMLRPCServer(location, keyfile=keypath, certfile=certpath,
cafile=capath, register=register, timeout=time_out)
cafile=capath, register=register, timeout=time_out, sleeptime=sleeptime)
#Two components of the same type cannot be allowed to run at the same time.
if component.name != 'service-location':
......
......@@ -538,6 +538,12 @@ class BaseChild (object):
if self.pipe_write is not None:
self._close_pipe_and_check(self.pipe_write)
def _handle_preexec_error_and_exit(self, exc_info):
_logger.error("%s: pre-exec function failed; terminating job", self.label, exc_info=exc_info)
self.print_clf_error("forker's pre-exec methods failed; terminating job")
os._exit(255)
def start(self):
if self.stdin_string is not None:
......@@ -590,10 +596,10 @@ class BaseChild (object):
try:
self.preexec_first()
self.preexec_last()
except SystemExit:
self._handle_preexec_error_and_exit(exc_info=False)
except:
_logger.error("%s: pre-exec function failed; terminating job", self.label, exc_info = True)
self.print_clf_error("forker's pre-exec methods failed; terminating job")
os._exit(255)
self._handle_preexec_error_and_exit(exc_info=True)
if not self.exe:
self.exe = self.args[0]
......
......@@ -1527,8 +1527,7 @@ class Job (StateMachine):
count = 0
for local_id in self.job_prescript_ids:
if local_id == None:
logger.error("Job %s/%s: Script: %s failed to run.",
self.user, self.jobid, script[count])
logger.error("Job %s/%s: Script: %s failed to run.", self.user, self.jobid, script[count])
break
count += 1
dbwriter.log_to_db(None, "job_prologue_failed",
......@@ -1555,20 +1554,17 @@ class Job (StateMachine):
for script in scripts:
try:
retval = ComponentProxy("system_script_forker").fork(
script, tag, label, None)
if retval != None:
script_ids.append(retval)
else:
#job failed to run
script_ids = [None]
retval = ComponentProxy("system_script_forker", retry=False).fork(script, tag, label, None)
if retval != None:
script_ids.append(retval)
else:
#job failed to run
script_ids = [None]
except ComponentLookupError:
logger.error("%s: Error connecting to forker. Retrying",
label)
logger.error("%s: Error connecting to forker. Retrying", label)
raise ComponentLookupError
except xmlrpclib.Fault:
logger.error("%s: Failure in exectuing script: %s",
label, script)
logger.error("%s: Failure in exectuing script: %s", label, script)
script_ids.append(None)
return script_ids
......@@ -1827,8 +1823,7 @@ class Job (StateMachine):
else:
self._sm_state = 'Job_Prologue_Retry_Release'
else:
logger.info("Job %s/%s: Job Prologue scripts completed "
"successfuly.", self.jobid, self.user)
logger.info("Job %s/%s: Job Prologue scripts completed successfuly.", self.jobid, self.user)
#if we have recieved a kill, we shouldn't bother running any further
#scripts and should invoke cleanup.
if (has_private_attr(self, '__signaling_info') and
......@@ -2077,7 +2072,7 @@ class Job (StateMachine):
elif rc != Job.__rc_retry:
# if the task failed to run, then proceed with job termination by
# starting the resource prologue scripts
self._sm_log_error("execution failure; initiating job cleanup and "
self._sm_log_error("execution l; initiating job cleanup and "
"removal", cobalt_log = True)
dbwriter.log_to_db(None, "failed", "job_prog", JobProgMsg(self))
self._sm_start_resource_epilogue_scripts()
......@@ -3017,7 +3012,7 @@ class Job (StateMachine):
try:
self.trigger_event('Progress')
except:
self._sm_log_exception(None, "an exception occurred during a progress event")
self._sm_log_exception("an exception occurred during a progress event", cobalt_log=True)
def run(self, nodelist, user = None):
'''casue the job to go from queued to starting.
......@@ -3031,7 +3026,7 @@ class Job (StateMachine):
raise JobRunError("Jobs in the '%s' state may not be started." % (self.state,), self.jobid,
self.state, self._sm_state)
except:
self._sm_log_exception(None, "an unexpected exception occurred while attempting to start the task")
self._sm_log_exception("an unexpected exception occurred while attempting to start the task", cobalt_log=True)
raise JobRunError("An unexpected exception occurred while attempting to start the job. See log for details.",
self.jobid, self.state, self._sm_state)
finally:
......@@ -3078,7 +3073,7 @@ class Job (StateMachine):
except StateMachineIllegalEventError:
raise JobPreemptionError("Jobs in the '%s' state may not be preempted." % (self.state,), self.jobid, user, force)
except:
self._sm_log_exception(None, "an unexpected exception occurred while attempting to preempt the task")
self._sm_log_exception("an unexpected exception occurred while attempting to preempt the task", cobalt_log=True)
raise JobPreemptionError("An unexpected exception occurred while attempting to preempt the job. See log for details.",
self.jobid, user, force)
......@@ -3099,8 +3094,7 @@ class Job (StateMachine):
self.trigger_event('Kill', {'user' : user,
'signal' : signame})
except:
self._sm_log_exception(None, "an unexpected exception occurred"
" while attempting to kill the task")
self._sm_log_exception("an unexpected exception occurred while attempting to kill the task", cobalt_log=True)
raise JobDeleteError("An unexpected exception occurred while "
"attempting to delete the job. See log for details.",
self.jobid, user, force, self.state, self._sm_state)
......@@ -3115,8 +3109,7 @@ class Job (StateMachine):
if self.taskid != None:
self.__task_signal(retry = False)
except:
self._sm_log_exception(None, "an exception occurred while "
"attempting to forcibly kill the task")
self._sm_log_exception("an exception occurred while attempting to forcibly kill the task", cobalt_log=True)
raise JobDeleteError(("An error occurred while forcibly "
"killing the job. The job has been removed from the "
"queue; however, resouces may not have been released. "
......@@ -4111,7 +4104,7 @@ class QueueManager(Component):
try:
f = open(filename)
except:
self.logger.error("Can't read utility function definitions from file %s" % get_bgsched_config("utility_file", ""))
self.logger.error("Can't read utility function definitions from file %s" % filename)
return
str = f.read()
......
......@@ -4,36 +4,26 @@ Classes:
BGSimProcessGroup -- virtual process group running on the system
Simulator -- simulated system component
"""
import pwd
import logging
import sys
import os
import operator
import random
import time
import thread
import uuid
import threading
import xmlrpclib
from datetime import datetime
from Queue import Queue
try:
from elementtree import ElementTree
except ImportError:
from xml.etree import ElementTree
import Cobalt
import Cobalt.Data
import Cobalt.Util
get_config_option = Cobalt.Util.get_config_option
from Cobalt.Components import bg_base_system
from Cobalt.Data import Data, DataDict, IncrID
from Cobalt.Components.base import Component, exposed, automatic, query
from Cobalt.Components.bg_base_system import NodeCard, Partition, PartitionDict, BGProcessGroupDict, BGBaseSystem
from Cobalt.Exceptions import ProcessGroupCreationError, ComponentLookupError
from Cobalt.Proxy import ComponentProxy
from Cobalt.Statistics import Statistics
from Cobalt.Components.base import Component, exposed, automatic, locking
from Cobalt.Components.bg_base_system import NodeCard, PartitionDict, BGBaseSystem
from Cobalt.DataTypes.ProcessGroup import ProcessGroup
from Cobalt.Util import sanatize_password, extract_traceback, get_current_thread_identifier
get_config_option = Cobalt.Util.get_config_option
__all__ = [
"BGSimProcessGroup",
......@@ -41,6 +31,7 @@ __all__ = [
]
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class BGSimProcessGroup(ProcessGroup):
......@@ -74,16 +65,145 @@ class Simulator (BGBaseSystem):
def __init__ (self, *args, **kwargs):
BGBaseSystem.__init__(self, *args, **kwargs)
sys.setrecursionlimit(5000) #why this magic number?
self.config_file = kwargs.get("config_file", get_config_option('bgsystem', 'system_def_file', None))
self._common_init_restart()
def do_lookup(self):
system_script_forker = Cobalt.Proxy.ComponentProxy('system_script_forker')
current_killing_jobs = system_script_forker.get_children(None, [])
pass
def do_something_intense(self, worktime=20000):
"""This will create some synthetic work for python and the lst.sort() will not release the GIL.
This can be used to find timeouts due to locking in the GIL."""
# worktime can be increased to find race conditions.
# worktime = worktime * 1000
lst = list([random.random() for x in range(worktime)])
lst.sort()
def do_something_intense_lock_a(self):
"""Do some work and grab a lock"""
self.logger.info('do_something_intense_lock_a, tid:%s, lock_a try', get_current_thread_identifier())
with self._lock_a:
self.logger.info('do_something_intense_lock_a, tid:%s, lock_a acq', get_current_thread_identifier())
self.do_lookup()
self.do_something_intense(worktime=4000)
self.logger.info('do_something_intense_lock_a, tid:%s, lock_a rel', get_current_thread_identifier())
def do_something_intense_lock_b(self):
"""Do some work and grab b lock"""
self.logger.info('do_something_intense_lock_b, tid:%s, lock_b try', get_current_thread_identifier())
with self._lock_b:
self.logger.info('do_something_intense_lock_b, tid:%s, lock_b acq', get_current_thread_identifier())
self.do_lookup()
self.do_something_intense(worktime=4000)
self.logger.info('do_something_intense_lock_b, tid:%s, lock_b rel', get_current_thread_identifier())
def do_something_intense_with_lock(self):
"""Do some work and grab b lock, then a lock"""
self.logger.info('do_something_intense_with_lock, tid:%s, lock_b try', get_current_thread_identifier())
with self._lock_b:
self.logger.info('do_something_intense_with_lock, tid:%s, lock_b acq', get_current_thread_identifier())
self.logger.info('do_something_intense_with_lock, tid:%s, lock_a try', get_current_thread_identifier())
with self._lock_a:
self.logger.info('do_something_intense_with_lock, tid:%s, lock_a acq', get_current_thread_identifier())
self.do_lookup()
self.do_something_intense(worktime=4000)
self.logger.info('do_something_intense_with_lock, tid:%s, lock_a rel', get_current_thread_identifier())
self.logger.info('do_something_intense_with_lock, tid:%s, lock_b rel', get_current_thread_identifier())
def _run_and_wrap(self, update_func):
"""same code pulled from CraySystem.py"""
self.logger.info('_run_and_wrap %s, tid:%s', update_func, get_current_thread_identifier())
update_func_name = update_func.__name__
ts = time.time()
try:
update_func()
except Exception:
te = time.time()
tb_str = sanatize_password('\n'.join(extract_traceback()))
td = te - ts
self.logger.error('_run_and_wrap(%s): td:%s error:%s' % (update_func_name, td, tb_str))
bool_error = True
else:
te = time.time()
td = te - ts
bool_error = False
return update_func_name, bool_error, td
def _run_update_state(self):
'''automated node update functions on the update timer go here.'''
try:
self.logger.info('_run_update_state starting, tid:%s, queue:%s',
get_current_thread_identifier(), self.node_update_thread_kill_queue.qsize())
while self.node_update_thread_kill_queue.empty() is True:
self.logger.info('_run_update_state running, tid:%s', get_current_thread_identifier())
# Each of these is wrapped in it's own log-and-preserve block.
# The outer try is there to ensure the thread update timeout happens.
metadata_lst = []
metadata_lst.append(self._run_and_wrap(self.do_something_intense_lock_a))
metadata_lst.append(self._run_and_wrap(self.do_something_intense_lock_b))
metadata_lst.append(self._run_and_wrap(self.do_something_intense_with_lock))
any_error = False
for func_name, error, td in metadata_lst:
if error is True:
any_error = True
if any_error is True:
self.logger.critical("_run_update_state: error occurred, timings below.")
for func_name, error, td in metadata_lst:
self.logger.critical("%s: %s", func_name, td)
self.logger.info('_run_update_state sleeping for %s, tid:%s', self.update_thread_timeout,
get_current_thread_identifier())
Cobalt.Util.sleep(self.update_thread_timeout)
self.logger.critical('_run_update_state exiting, tid:%s', get_current_thread_identifier())
self.node_update_thread_kill_queue.get(timeout=1.0)
self.node_update_thread_dead = True
finally:
self.node_update_thread_dead = True
self.logger.critical('_run_update_state dead, tid:%s', get_current_thread_identifier())
return
def _common_init_restart(self, spec=None):
"""this is the common code that must be called when instanciating the class or
bringing it back from a state file."""
self.logger.info("init: Brooklyn starting.")
self._lock_a = threading.RLock()
self._lock_b = threading.RLock()
self.process_groups.item_cls = BGSimProcessGroup
self.node_card_cache = dict()
self.failed_components = set()
self.config_file = kwargs.get("config_file", get_config_option('bgsystem', 'system_def_file', None))
self.update_thread_timeout = 10.0
if spec is None:
operation = 'start'
self.failed_components = set()
else:
operation = 'restart'
try:
self.failed_components = spec['failed_components']
except KeyError:
self.failed_components = set()
try:
self.config_file = spec['config_file']
except KeyError:
self.config_file = os.path.expandvars(get_config_option('system', 'def_file', ""))
if self.config_file is not None:
self.logger.log(1, "init: loading machine configuration")
self.logger.log(1, "%s: loading machine configuration", operation)
self.configure(self.config_file)
self.logger.log(1, "init: recomputing partition state")
self.logger.log(1, "%s: restoring partition state", operation)
if spec is not None:
self._restore_partition_state(spec)
self.logger.log(1, "%s: recomputing partition state", operation)
self._recompute_partition_state()
self.node_update_thread_kill_queue = Queue()
self.node_update_thread_dead = False
self.logger.info("_run_update_state thread starting.")
self.node_update_thread = threading.Thread(target=self._run_update_state)
self.node_update_thread.start()
self.logger.info("_run_update_state thread started:%s", self.node_update_thread)
self.logger.info("init: Brooklyn ready.")
def __getstate__(self):
state = {}
state.update(BGBaseSystem.__getstate__(self))
......@@ -94,29 +214,14 @@ class Simulator (BGBaseSystem):
return state
def __setstate__(self, state):
operation = 'restart'
try:
self.logger.log(1, "restart: initializing base system class")
self.logger.log(1, "%s: initializing base system class", operation)
BGBaseSystem.__setstate__(self, state)
self.process_groups.item_cls = BGSimProcessGroup
self.node_card_cache = dict()
try:
self.failed_components = state['failed_components']
except KeyError:
self.failed_components = set()
try:
self.config_file = state['config_file']
except KeyError:
self.config_file = os.path.expandvars(get_config_option('system', 'def_file', ""))
if self.config_file:
self.logger.log(1, "restart: loading machine configuration")
self.configure(self.config_file)
self.logger.log(1, "restart: restoring partition state")
self._restore_partition_state(state)
self.logger.log(1, "restart: recomputing partition state")
self._recompute_partition_state()
self._common_init_restart(spec=state)
except:
self.logger.error("A fatal error occurred while restarting the system component", exc_info=True)
print "A fatal error occurred while restarting the system component. Terminating."
self.logger.error("A fatal error occurred while %sing the system component", operation, exc_info=True)
print("A fatal error occurred while restarting the system component. Terminating.")
sys.exit(1)
def save_me(self):
......@@ -413,3 +518,34 @@ class Simulator (BGBaseSystem):
def list_failed_components(self, component_names):
return list(self.failed_components)
list_failed_components = exposed(list_failed_components)
@locking
@exposed
def find_job_location(self, arg_list, end_times):
"""This is a wrapper around find_job_location that can be used to create a circumstance
that can allow finding of errors. This also provides entry and exit logging."""
self.logger.debug("Simulator:starting find_job_location")
result = super(Simulator, self).find_job_location(arg_list, end_times)
#lets do at least 10 seconds of work.
#self.logger.debug("Simulator:starting *** do_something_intense *** ")
with self._lock_a:
self.do_something_intense(worktime=4000)
#self.logger.debug("Simulator:complete *** do_something_intense *** ")
self.logger.debug("Simulator:complete find_job_location")
return result
@exposed
def reserve_resources_until(self, location, new_time, jobid):
"""This is a wrapper around reserve_resources_until that can be used to create a circumstance
that can allow finding of errors. This also provides entry and exit logging."""
self.logger.debug("Simulator:starting reserve_resources_until")
result = super(Simulator, self).reserve_resources_until(location, new_time, jobid)
#lets do at least 10 seconds of work.
ident = uuid.uuid4().hex
self.logger.debug("Simulator:starting *** %s do_something_intense *** ", ident)
with self._lock_a:
self.do_something_intense(worktime=4000)
self.logger.debug("Simulator:complete *** %s do_something_intense *** ", ident)
self.logger.debug("Simulator:complete reserve_resources_until")
return result
......@@ -111,6 +111,7 @@ class ServiceLocator (Component):
service_name -- name of the service to register
location -- location of the service
"""
self.logger.info("register(%r, %r)" % (service_name, location))
try:
service = self.services[service_name]
except KeyError:
......@@ -147,6 +148,8 @@ class ServiceLocator (Component):
except KeyError:
self.logger.debug("locate(%r) [not registered]" % (service_name))
return ""
else:
self.logger.debug("locate(%r) [registered]" % (service_name))
return service.location
locate = exposed(locate)
......
......@@ -293,7 +293,7 @@ def _log_xmlrpc_error(runid, fault):
None
'''
_logger.error('XMLRPC Fault recieved while fetching child %s status:', runid)
_logger.error('XMLRPC Fault received while fetching child %s status:', runid)
_logger.error('Child %s: Fault code: %s', runid, fault.faultCode)
_logger.error('Child %s: Fault string: %s', runid,
fault.faultString)
......
"""Resource management for Cray ALPS based systems"""
import logging
import threading
import thread
import time
import sys
import xmlrpclib
......@@ -21,6 +20,9 @@ from Cobalt.Exceptions import JobValidationError
from Cobalt.DataTypes.ProcessGroup import ProcessGroup
from Cobalt.Util import compact_num_list, expand_num_list
from Cobalt.Util import init_cobalt_config, get_config_option
from Cobalt.Util import init_cobalt_config, get_config_option
from Cobalt.Util import extract_traceback, sanatize_password, get_current_thread_identifier
from Queue import Queue
_logger = logging.getLogger(__name__)
init_cobalt_config()
......@@ -29,7 +31,8 @@ UPDATE_THREAD_TIMEOUT = int(get_config_option('alpssystem', 'update_thread_timeo
TEMP_RESERVATION_TIME = int(get_config_option('alpssystem', 'temp_reservation_time', 300))
SAVE_ME_INTERVAL = float(get_config_option('alpsssytem', 'save_me_interval', 10.0))
#default 20 minutes to account for boot.
PENDING_STARTUP_TIMEOUT = float(get_config_option('alpssystem', 'pending_startup_timeout', 1200))
# DISABLED: Not used
#PENDING_STARTUP_TIMEOUT = float(get_config_option('alpssystem', 'pending_startup_timeout', 1200))
APKILL_CMD = get_config_option('alps', 'apkill', '/opt/cray/alps/default/bin/apkill')
DRAIN_MODE = get_config_option('system', 'drain_mode', 'first-fit')
#cleanup time in seconds
......@@ -50,7 +53,6 @@ MCDRAM_TO_HBMCACHEPCT = {'flat':'0', 'cache':'100', 'split':'25', 'equal':'50',
VALID_MCDRAM_MODES = ['flat', 'cache', 'split', 'equal', '0', '25', '50', '100']
VALID_NUMA_MODES = ['a2a', 'hemi', 'quad', 'snc2', 'snc4']
def chain_loc_list(loc_list):
'''Take a list of compact Cray locations,
expand and concatenate them.
......@@ -78,26 +80,27 @@ class CraySystem(BaseSystem):
'''
start_time = time.time()
self.update_thread_timeout = UPDATE_THREAD_TIMEOUT
super(CraySystem, self).__init__(*args, **kwargs)
_logger.info('BASE SYSTEM INITIALIZED')
self._common_init_restart()
_logger.info('ALPS SYSTEM COMPONENT READY TO RUN')
_logger.info('Initilaization complete in %s sec.', (time.time() -
_logger.info('Initialization complete in %s sec.', (time.time() -
start_time))
def _common_init_restart(self, spec=None):
'''Common routine for cold and restart intialization of the system
'''Common routine for cold and restart initialization of the system
component.
'''
try:
self.system_size = int(get_config_option('system', 'size'))
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
_logger.critical('ALPS SYSTEM: ABORT STARTUP: System size must be specified in the [system] section of the cobalt configuration file.')
self.logger.critical('ALPS SYSTEM: ABORT STARTUP: System size must be specified in the [system] section of the cobalt configuration file.')
sys.exit(1)
if DRAIN_MODE not in DRAIN_MODES:
#abort startup, we have a completely invalid config.
_logger.critical('ALPS SYSTEM: ABORT STARTUP: %s is not a valid drain mode. Must be one of %s.',
self.logger.critical('ALPS SYSTEM: ABORT STARTUP: %s is not a valid drain mode. Must be one of %s.',
DRAIN_MODE, ", ".join(DRAIN_MODES))
sys.exit(1)
#initilaize bridge.
......@@ -110,14 +113,14 @@ class CraySystem(BaseSystem):
try:
ALPSBridge.init_bridge()
except ALPSBridge.BridgeError:
_logger.error('Bridge Initialization failed. Retrying.')
self.logger.error('Bridge Initialization failed. Retrying.')
Cobalt.Util.sleep(10)
except ComponentLookupError:
_logger.warning('Error reaching forker. Retrying.')
self.logger.warning('Error reaching forker. Retrying.')
Cobalt.Util.sleep(10)
else:
bridge_pending = False
_logger.info('BRIDGE INITIALIZED')
self.logger.info('BRIDGE INITIALIZED')
#process manager setup
if spec is None:
self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup)
......@@ -133,9 +136,16 @@ class CraySystem(BaseSystem):
self.process_manager = spec['process_manager']
self.process_manager.pgroup_type = ALPSProcessGroup
self.logger.debug('pg type %s', self.process_manager.process_groups.item_cls)
self.process_manager.update_launchers()
self.pending_start_timeout = PENDING_STARTUP_TIMEOUT
_logger.info('PROCESS MANAGER INTIALIZED')
# DISABLED: Why are we calling this here? It's called below in the update thread.
# Any error in update_launchers will cause CraySystem to fail to start.
# the below thread that has wrappers will protect it and should be used.
#self.process_manager.update_launchers()
self.logger.info('PROCESS MANAGER INITIALIZED')
# DISABLED: Not used
# self.pending_start_timeout = PENDING_STARTUP_TIMEOUT
#resource management setup
self.nodes = {} #cray node_id: CrayNode
self.node_name_to_id = {} #cray node name to node_id map
......@@ -158,8 +168,12 @@ class CraySystem(BaseSystem):
#state update thread and lock
self._node_lock = threading.RLock()
self._gen_node_to_queue()
self.node_update_thread = thread.start_new_thread(self._run_update_state, tuple())
_logger.info('UPDATE THREAD STARTED')
self.node_update_thread_kill_queue = Queue()
self.node_update_thread_dead = False
self.logger.info("_run_update_state thread starting.")
self.node_update_thread = threading.Thread(target=self._run_update_state)
self.node_update_thread.start()
self.logger.info("_run_update_state thread started:%s", self.node_update_thread)
self.killing_jobs = {}
#hold on to the initial spec in case nodes appear out of nowhere.
self.init_spec = None
......@@ -350,23 +364,57 @@ class CraySystem(BaseSystem):
return json.dumps(node_info)
return node_info
def _run_and_wrap(self, update_func):
self.logger.info('_run_and_wrap %s, tid:%s', update_func, get_current_thread_identifier())
update_func_name = update_func.__name__
bool_error = False