Commit 5e02bd63 authored by Eric Pershey's avatar Eric Pershey

updates for the cluster node state accounting logs.

parent da8171a6
......@@ -11,6 +11,7 @@ import sys
import Cobalt
import pwd
import grp
import logging
import ConfigParser
import os
import Cobalt.Util
......@@ -20,6 +21,7 @@ from Cobalt.Data import DataDict
from Cobalt.Proxy import ComponentProxy
from Cobalt.Components.base import Component, exposed, automatic
from Cobalt.Util import config_true_values
import Cobalt.accounting as accounting
__all__ = [
"ClusterBaseSystem",
......@@ -44,7 +46,6 @@ if cluster_hostfile == None:
print '''ERROR: No "hostfile" entry in "cluster_system" section of cobalt config file'''
sys.exit(1)
class ClusterNode(object):
def __init__(self, name):
......@@ -143,6 +144,7 @@ class ClusterBaseSystem (Component):
def __init__ (self, *args, **kwargs):
Component.__init__(self, *args, **kwargs)
self._init_accounting_log()
self.process_groups = ProcessGroupDict()
self.active_queues = []
self.all_nodes = set()
......@@ -174,6 +176,21 @@ class ClusterBaseSystem (Component):
self.logger.info("allocation timeout set to %d seconds." % self.alloc_timeout)
self.logger.info("Minimum Backfill Window set to %d seconds." % self.MINIMUM_BACKFILL_WINDOW)
for node in self.all_nodes:
self._write_to_accounting_log(accounting.node_up(node, '__init__'))
def _init_accounting_log(self):
'''Initialize PBS-style accounting log for this module'''
self.accounting_logger = logging.getLogger("system.accounting")
filename = "%s-%%Y%%m%%d" % get_cluster_system_config("accounting_log_prefix", 'system')
accounting_logdir = os.path.expandvars(get_cluster_system_config("log_dir", Cobalt.DEFAULT_LOG_DIRECTORY))
self.accounting_logger.addHandler(accounting.DatetimeFileHandler(os.path.join(accounting_logdir, filename)))
def _write_to_accounting_log(self, msg):
'''Send to PBS-style accounting log for this module to accounting log and syslog'''
self.logger.info(msg)
self.accounting_logger.info(msg)
def __getstate__(self):
state = {}
state.update(Component.__getstate__(self))
......@@ -185,6 +202,7 @@ class ClusterBaseSystem (Component):
def __setstate__(self, state):
Component.__setstate__( self, state)
self._init_accounting_log()
self.all_nodes = set()
self.active_queues = []
self.node_order = {}
......@@ -208,6 +226,10 @@ class ClusterBaseSystem (Component):
for queue in nonexistent_queues:
del self.queue_assignments[queue]
self.down_nodes = self.all_nodes & set(state.get('down_nodes', set()))
for node in self.down_nodes:
self._write_to_accounting_log(accounting.node_down(node, '__setstate__'))
for node in self.all_nodes - self.down_nodes:
self._write_to_accounting_log(accounting.node_up(node, '__setstate__'))
self.process_groups = ProcessGroupDict()
self.running_nodes = set()
self.alloc_only_nodes = {} # nodename:starttime
......@@ -879,6 +901,8 @@ class ClusterBaseSystem (Component):
changed.append(n)
if changed:
self.logger.info("%s marking nodes up: %s", user_name, ", ".join(changed))
for node in changed:
self._write_to_accounting_log(accounting.node_up(node, 'nodes_up', user=user_name))
return changed
nodes_up = exposed(nodes_up)
......@@ -890,6 +914,8 @@ class ClusterBaseSystem (Component):
changed.append(n)
if changed:
self.logger.info("%s marking nodes down: %s", user_name, ", ".join(changed))
for node in changed:
self._write_to_accounting_log(accounting.node_down(node, 'nodes_down', user=user_name))
return changed
nodes_down = exposed(nodes_down)
......@@ -1058,6 +1084,7 @@ class ClusterBaseSystem (Component):
self.logger.error("Job %s/%s: Failed to run epilogue on host "
"%s, marking node down", jobid, user, h, exc_info=True)
self.down_nodes.add(h)
self._write_to_accounting_log(accounting.node_down(h, 'clean_nodes', job_id=jobid, user=user))
self.running_nodes.discard(h)
def launch_script(self, config_option, host, jobid, user, group_name):
......@@ -1109,6 +1136,9 @@ class ClusterBaseSystem (Component):
exc_info=True)
self.cleaning_host_count[cleaning_process['jobid']] -= 1
self.down_nodes.add(cleaning_process['host'])
self._write_to_accounting_log(accounting.node_down(cleaning_process['host'],
'retry_cleaning_scripts',
jobid=cleaning_process['jobid']))
self.running_nodes.discard(cleaning_process['host'])
retry_cleaning_scripts = automatic(retry_cleaning_scripts,
......@@ -1237,6 +1267,7 @@ class ClusterBaseSystem (Component):
(user, jobid, host)
self.down_nodes.add(host)
self._write_to_accounting_log(accounting.node_down(host, "__mark_failed_cleaning", job_id=jobid, user=user))
self.running_nodes.discard(host)
self.cleaning_host_count[jobid] -= 1
cleaning_process['completed'] = True
......
......@@ -27,12 +27,15 @@ RECORD_MAPPING = {'abort': 'A',
'hold_acquire': 'HA',
'hold_release': 'HR',
'reservation_altered': 'YA',
'node_up': 'NU',
'node_down': 'ND',
}
__all__ = ["abort", "begin", "checkpoint", "delete", "end", "finish",
"system_remove", "remove", "queue", "rerun", "start", "unconfirmed",
"confirmed", "task_start", "task_end", "DatetimeFileHandler",
"modify", 'hold_acquire', 'hold_release', 'reservation_altered']
"modify", 'hold_acquire', 'hold_release', 'reservation_altered',
'node_up', 'node_down']
def abort (job_id, user, resource_list, account=None, resource=RESOURCE_NAME):
"""Job was aborted by the server.
......@@ -588,6 +591,43 @@ def hold_release(job_id, hold_type, end_time, user, resource=RESOURCE_NAME):
return entry("HR", job_id, {'hold_type': hold_type, 'end': end_time, 'resource': resource, 'user':user})
def node_up(node, reason, job_id=None, user=None, resource=RESOURCE_NAME):
'''Indicate a node is up.
Arguments:
node -- the node up
reason -- the function or reason for the change
job_id -- id of job that this task belongs to
user -- user in the context
resource -- identifier of the resource that Cobalt is managing. Usually the system name.
(default: as specified by the resource_name option in the [system] cobalt.conf section)
Returns:
A string accounting log message
'''
return entry("NU", node, {'reason': reason, 'job_id': job_id if job_id else '', 'user': user if user else '',
'resource': resource})
def node_down(node, reason, job_id=None, user=None, resource=RESOURCE_NAME):
'''Indicate a node is down.
Arguments:
node -- the node down
reason -- the function or reason for the change
job_id -- id of job that this task belongs to
user -- user in the context
resource -- identifier of the resource that Cobalt is managing. Usually the system name.
(default: as specified by the resource_name option in the [system] cobalt.conf section)
Returns:
A string accounting log message
'''
return entry("ND", node, {'reason': reason, 'job_id': job_id if job_id else '', 'user': user if user else '',
'resource': resource})
class DatetimeFileHandler (BaseRotatingHandler):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment