Commit 0b33186d authored by Paul Rich's avatar Paul Rich

Merge branch 'cluster_node_state_change' into 'develop'

Cluster node state change accounting log

See merge request aig/cobalt!125
parents f7da1f0f 4f1559ba
......@@ -483,6 +483,9 @@ This is a list of hostnames for nodes that the cluster system component can
schedule. Nodes may be added or removed, and the list of available nodes
is updated at restart.
.TP
.B log_dir
The directory to place the system state change accounting logs.
.TP
.B epilogue
This is a colon-delimited set of scripts to run on a per-node basis on task
termination on a resource. If any script returns a non-zero exit status,
......
......@@ -11,6 +11,7 @@ import sys
import Cobalt
import pwd
import grp
import logging
import ConfigParser
import os
import Cobalt.Util
......@@ -20,6 +21,7 @@ from Cobalt.Data import DataDict
from Cobalt.Proxy import ComponentProxy
from Cobalt.Components.base import Component, exposed, automatic
from Cobalt.Util import config_true_values
import Cobalt.accounting as accounting
__all__ = [
"ClusterBaseSystem",
......@@ -44,7 +46,6 @@ if cluster_hostfile == None:
print '''ERROR: No "hostfile" entry in "cluster_system" section of cobalt config file'''
sys.exit(1)
class ClusterNode(object):
def __init__(self, name):
......@@ -143,6 +144,7 @@ class ClusterBaseSystem (Component):
def __init__ (self, *args, **kwargs):
Component.__init__(self, *args, **kwargs)
self._init_accounting_log()
self.process_groups = ProcessGroupDict()
self.active_queues = []
self.all_nodes = set()
......@@ -174,6 +176,21 @@ class ClusterBaseSystem (Component):
self.logger.info("allocation timeout set to %d seconds." % self.alloc_timeout)
self.logger.info("Minimum Backfill Window set to %d seconds." % self.MINIMUM_BACKFILL_WINDOW)
for node in self.all_nodes:
self._write_to_accounting_log(accounting.node_up(node, '__init__'))
def _init_accounting_log(self):
'''Initialize PBS-style accounting log for this module'''
self.accounting_logger = logging.getLogger("system.accounting")
filename = "%s-%%Y%%m%%d" % get_cluster_system_config("accounting_log_prefix", 'system')
accounting_logdir = os.path.expandvars(get_cluster_system_config("log_dir", Cobalt.DEFAULT_LOG_DIRECTORY))
self.accounting_logger.addHandler(accounting.DatetimeFileHandler(os.path.join(accounting_logdir, filename)))
def _write_to_accounting_log(self, msg):
'''Send to PBS-style accounting log for this module to accounting log and syslog'''
self.logger.info(msg)
self.accounting_logger.info(msg)
def __getstate__(self):
state = {}
state.update(Component.__getstate__(self))
......@@ -185,6 +202,7 @@ class ClusterBaseSystem (Component):
def __setstate__(self, state):
Component.__setstate__( self, state)
self._init_accounting_log()
self.all_nodes = set()
self.active_queues = []
self.node_order = {}
......@@ -208,6 +226,10 @@ class ClusterBaseSystem (Component):
for queue in nonexistent_queues:
del self.queue_assignments[queue]
self.down_nodes = self.all_nodes & set(state.get('down_nodes', set()))
for node in self.down_nodes:
self._write_to_accounting_log(accounting.node_down(node, '__setstate__'))
for node in self.all_nodes - self.down_nodes:
self._write_to_accounting_log(accounting.node_up(node, '__setstate__'))
self.process_groups = ProcessGroupDict()
self.running_nodes = set()
self.alloc_only_nodes = {} # nodename:starttime
......@@ -879,6 +901,8 @@ class ClusterBaseSystem (Component):
changed.append(n)
if changed:
self.logger.info("%s marking nodes up: %s", user_name, ", ".join(changed))
for node in changed:
self._write_to_accounting_log(accounting.node_up(node, 'nodes_up', user=user_name))
return changed
nodes_up = exposed(nodes_up)
......@@ -890,6 +914,8 @@ class ClusterBaseSystem (Component):
changed.append(n)
if changed:
self.logger.info("%s marking nodes down: %s", user_name, ", ".join(changed))
for node in changed:
self._write_to_accounting_log(accounting.node_down(node, 'nodes_down', user=user_name))
return changed
nodes_down = exposed(nodes_down)
......@@ -1058,6 +1084,7 @@ class ClusterBaseSystem (Component):
self.logger.error("Job %s/%s: Failed to run epilogue on host "
"%s, marking node down", jobid, user, h, exc_info=True)
self.down_nodes.add(h)
self._write_to_accounting_log(accounting.node_down(h, 'clean_nodes', job_id=jobid, user=user))
self.running_nodes.discard(h)
def launch_script(self, config_option, host, jobid, user, group_name):
......@@ -1109,6 +1136,9 @@ class ClusterBaseSystem (Component):
exc_info=True)
self.cleaning_host_count[cleaning_process['jobid']] -= 1
self.down_nodes.add(cleaning_process['host'])
self._write_to_accounting_log(accounting.node_down(cleaning_process['host'],
'retry_cleaning_scripts',
jobid=cleaning_process['jobid']))
self.running_nodes.discard(cleaning_process['host'])
retry_cleaning_scripts = automatic(retry_cleaning_scripts,
......@@ -1237,6 +1267,7 @@ class ClusterBaseSystem (Component):
(user, jobid, host)
self.down_nodes.add(host)
self._write_to_accounting_log(accounting.node_down(host, "__mark_failed_cleaning", job_id=jobid, user=user))
self.running_nodes.discard(host)
self.cleaning_host_count[jobid] -= 1
cleaning_process['completed'] = True
......@@ -1295,4 +1326,3 @@ class ClusterBaseSystem (Component):
stats['nodect'] = len(loc_list)
stats['nproc'] = stats['nodect']
return stats
......@@ -27,12 +27,15 @@ RECORD_MAPPING = {'abort': 'A',
'hold_acquire': 'HA',
'hold_release': 'HR',
'reservation_altered': 'YA',
'node_up': 'NU',
'node_down': 'ND',
}
__all__ = ["abort", "begin", "checkpoint", "delete", "end", "finish",
"system_remove", "remove", "queue", "rerun", "start", "unconfirmed",
"confirmed", "task_start", "task_end", "DatetimeFileHandler",
"modify", 'hold_acquire', 'hold_release', 'reservation_altered']
"modify", 'hold_acquire', 'hold_release', 'reservation_altered',
'node_up', 'node_down']
def abort (job_id, user, resource_list, account=None, resource=RESOURCE_NAME):
"""Job was aborted by the server.
......@@ -588,6 +591,51 @@ def hold_release(job_id, hold_type, end_time, user, resource=RESOURCE_NAME):
return entry("HR", job_id, {'hold_type': hold_type, 'end': end_time, 'resource': resource, 'user':user})
def node_up(node, reason, job_id=None, user=None, resource=RESOURCE_NAME):
'''Indicate a node is up.
Arguments:
node -- the node up
reason -- the function or reason for the change
job_id -- id of job that this task belongs to
user -- user in the context
resource -- identifier of the resource that Cobalt is managing. Usually the system name.
(default: as specified by the resource_name option in the [system] cobalt.conf section)
Returns:
A string accounting log message
'''
message = {'reason': reason, 'resource': resource}
if job_id is not None:
message['jobid'] = job_id
if user is not None:
message['user'] = user
return entry("NU", node, message)
def node_down(node, reason, job_id=None, user=None, resource=RESOURCE_NAME):
'''Indicate a node is down.
Arguments:
node -- the node down
reason -- the function or reason for the change
job_id -- id of job that this task belongs to
user -- user in the context
resource -- identifier of the resource that Cobalt is managing. Usually the system name.
(default: as specified by the resource_name option in the [system] cobalt.conf section)
Returns:
A string accounting log message
'''
message = {'reason': reason, 'resource': resource}
if job_id is not None:
message['jobid'] = job_id
if user is not None:
message['user'] = user
return entry("ND", node, message)
class DatetimeFileHandler (BaseRotatingHandler):
......@@ -712,5 +760,3 @@ def serialize_td (timedelta_):
+ (timedelta_.microseconds / 1000000))
except AttributeError, ex:
raise ValueError(ex)
......@@ -25,6 +25,7 @@ STD_COBALT_CONFIG_FILE = '''
hostfile = cobalt.hostfile
simulation_mode = True
run_remote = false
log_dir: /tmp
[system]
size = 4
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment