Commit ebce003e authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch 'develop'

parents 2d475e8c 179969d9
= Changes from previous Cobalt Versions =
== Changes to 1.0.14 ==
* Fix for cluster systems where nodes that weren't numbered could cause
showres/qstat to fail to error out when displaying locations.
* TS has location added, TE records have location, and seconds from epoch UTC
start and end for tasks
* Memory management scripts have been updated so that they can handle larger
Cray XC40 systems.
* Memory management boot records have been enhanced with start/end seconds from
epoch UTC times and blocked locations on the boot-start record.
* Fix for a potential memory mode script switch crash when rebooting from certain
memory mode mixes.
* Group membership checks for queues are now being handled more efficiently.
* Boot start and boot end records from the memory mode management scripts now have
a corrected date format in the message timestamp that is consistient with other
accounting records.
== Changes to 1.0.13 ==
* Added a web-service interface "cweb" client. This presents job, reservation
and node data over a REST API. This is based on the gronkd daemon from
......
......@@ -11,8 +11,6 @@ Source0: %{name}-%{version}.tar.gz
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root
BuildRequires: systemd-rpm-macros
Requires: python >= 2.7
Requires: python-lockfile
Requires: python-python-daemon
%package -n cobalt-clients
Version: %{version}
......@@ -33,7 +31,7 @@ The Cobalt Resource Management System
%{python} setup.py build
%define client_wrapper_dir /usr/libexec/cobalt
%define python_wrapper_dir %{client_wrapper_dir}/bin
%define python_site_packages %{_libdir}%{python}/site-packages
%define python_site_packages %{_libdir}/%{python}/site-packages
%build
cd src/clients && make PROGPREFIX=%{client_wrapper_dir}
......@@ -54,6 +52,8 @@ install -m 755 src/clients/cobalt-admin ${RPM_BUILD_ROOT}%{_sbindir$}
%{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/bg_mpirun_forker.py ${RPM_BUILD_ROOT}%{_sbindir}
%{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/bg_runjob_forker.py ${RPM_BUILD_ROOT}%{_sbindir}
%{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/system_script_forker.py ${RPM_BUILD_ROOT}%{_sbindir}
%{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/alps_script_forker.py ${RPM_BUILD_ROOT}%{_sbindir}
%{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/alpssystem.py ${RPM_BUILD_ROOT}%{_sbindir}
%{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/gravina.py ${RPM_BUILD_ROOT}%{_sbindir}
%{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/partadm.py ${RPM_BUILD_ROOT}%{_sbindir}
%{__mv} ${RPM_BUILD_ROOT}%{python_wrapper_dir}/setres.py ${RPM_BUILD_ROOT}%{_sbindir}
......
......@@ -705,6 +705,7 @@ class Job (StateMachine):
if self.user_hold:
dbwriter.log_to_db(self.user, "user_hold", "job_prog", JobProgMsg(self))
self.current_task_start = time.time()
self.initializing = False
......@@ -786,6 +787,7 @@ class Job (StateMachine):
if self.ion_kerneloptions == False:
self.ion_kerneloptions = None
self.runid = state.get("runid", None)
self.current_task_start = state.get("current_task_start", None)
#for old statefiles, make sure to update the dependency state on restart:
self.__dep_hold = False
self.initializing = False
......@@ -885,7 +887,8 @@ class Job (StateMachine):
else:
self.__max_job_timer = Timer()
self.__max_job_timer.start()
task_start = accounting.task_start(self.jobid, self.runid)
self.current_task_start = time.time()
task_start = accounting.task_start(self.jobid, self.runid, self.current_task_start, self.location)
accounting_logger.info(task_start)
logger.info(task_start)
return Job.__rc_success
......@@ -894,7 +897,9 @@ class Job (StateMachine):
'''get exit code from system component'''
def end_time_and_log():
self.__max_job_timer.stop()
task_end = accounting.task_end(self.jobid, self.runid, self.__max_job_timer.elapsed_times[-1])
task_end = accounting.task_end(self.jobid, self.runid, self.__max_job_timer.elapsed_times[-1], self.current_task_start,
time.time(), self.location)
self.current_task_start = None
accounting_logger.info(task_end)
logger.info(task_end)
try:
......@@ -3260,21 +3265,18 @@ class Restriction (Data):
retval = False
retstr = "You are not allowed to submit to the '%s' queue (group restriction)" % self.queue.name
queue_groups = self.value.split(':')
try:
if '*' in queue_groups:
retval = True
retstr = ""
elif grp.getgrgid(pwd.getpwnam(job['user']).pw_gid).gr_name in queue_groups:
retval = True
retstr = ""
else:
all_groups = grp.getgrall()
for group in all_groups:
if group.gr_name in queue_groups and job['user'] in group.gr_mem:
retval = True
retstr = ""
except KeyError:
retstr = "Group could not be verified for queue restriction."
if '*' in queue_groups:
return(True,"")
for group_name in queue_groups:
try:
if job['user'] in grp.getgrnam(group_name).gr_mem:
retval = True
retstr = ""
break
except KeyError:
retstr = "Group could not be verified for queue restriction."
return retval, retstr
def maxuserjobs(self, job, queuestate=None):
......
......@@ -255,26 +255,31 @@ def confirmed (reservation_id, requester):
return entry("Y", reservation_id, {'requester':requester})
def task_start(job_id, task_id):
def task_start(job_id, task_id, start_time, location):
'''Indicate a task has started. Typically this would indicate that add_process_groups has been called successfully.
Args:
job_id - id of job that this task belongs to
task_id - id of the task launched
start_time - time when the task started as seconds from epoch
location - a list of locations that this task is running on
'''
return entry("TS", job_id, {'task_id': task_id})
return entry("TS", job_id, {'task_id': task_id, 'start': start_time, 'location': location})
def task_end(job_id, task_id, task_runtime):
def task_end(job_id, task_id, task_runtime, start_time, end_time, location):
'''Indicate a task has started. Typically this would indicate that add_process_groups has been called successfully.
Args:
job_id - id of job that this task belongs to
task_id - id of the task launched
task_runtime - The running time of the task in seconds. Start time for this is the task_start record timestamp.
start_time - time when the task started as seconds from epoch
end_time - tme when the task ended as seconds from epoch
location - a list of locations that this task is running on
'''
return entry("TE", job_id, {'task_id': task_id, 'task_runtime': task_runtime})
return entry("TE", job_id, {'task_id': task_id, 'task_runtime': task_runtime, 'start': start_time, 'end': end_time, 'location': location})
class DatetimeFileHandler (BaseRotatingHandler):
......
......@@ -50,7 +50,7 @@ logger.addHandler(syslog)
ACCOUNTING_LOG_PATH = '/var/log/pbs/boot'
ACCOUNTING_MSG_FMT = "%s;%s;%s;%s" # Date, Type, Jobid, keyvals
ACCOUNTING_DATE_FMT = "%d/%m/%Y %H:%M:%S"
ACCOUNTING_DATE_FMT = "%m/%d/%Y %H:%M:%S"
def dict_to_keyval_str(dct):
'''put a record keyval dict into a string format for pbs logging'''
......@@ -104,7 +104,12 @@ def exec_fetch_output(cmd, args, timeout=None):
cmd_list = [cmd]
cmd_list.extend(args)
proc = Popen(cmd_list, stdout=PIPE, stderr=PIPE)
stdout = ""
stderr = ""
while(True):
curr_stdout, curr_stderr = proc.communicate()
stdout += curr_stdout
stderr += curr_stderr
if endtime is not None and int(time.time()) >= endtime:
#signal and kill
timeout_trip
......@@ -114,8 +119,12 @@ def exec_fetch_output(cmd, args, timeout=None):
if proc.poll() is not None:
break
time.sleep(POLL_INT)
stdout, stderr = proc.communicate()
try:
curr_stdout, curr_stderr = proc.communicate()
stdout += curr_stdout
stderr += curr_stderr
except ValueError:
pass # Everything is closed and terminated.
if timeout_trip:
raise RuntimeError("%s timed out!" % cmd)
if proc.returncode != 0:
......@@ -225,7 +234,6 @@ def main():
label = "%s/%s/%s" % (user, jobid, bootid)
accounting_log_filename = '%s-%s' % (time.strftime('%Y%m%d-boot', time.gmtime()), bootid)
current_node_cfg = get_current_modes(node_list)
nodes_to_modify = []
initial_modes = {}
......@@ -240,8 +248,8 @@ def main():
initial_modes[mode] = [int(nid)]
nodes_to_modify.append(int(nid))
initial_mode_list = []
for mode, node_list in initial_modes.items():
initial_mode_list.append('%s:%s' % (mode, compact_num_list(node_list)))
for mode, mod_node_list in initial_modes.items():
initial_mode_list.append('%s:%s' % (mode, compact_num_list(mod_node_list)))
# assuming that mode change is immediately followed by reboot. Modify when
# current setting inspection available.
......@@ -250,18 +258,21 @@ def main():
reboot_info = {'bootid': bootid,
'boot_time': 'N/A',
'rebooted': compact_num_list(nodes_to_modify),
'blocked': compact_num_list(node_list),
'blocked': node_list,
'from_mode': ','.join(initial_mode_list),
'to_mode': '%s:%s:%s' %(mcdram_mode, numa_mode, compact_num_list(node_list)),
'to_mode': '%s:%s:%s' % (mcdram_mode, numa_mode, node_list),
'successful': False,
'start': None,
'end': None,
}
if len(nodes_to_modify) != 0: #if we don't have to reboot, don't go through this.
accounting_start_msg = ACCOUNTING_MSG_FMT % (time.strftime(ACCOUNTING_DATE_FMT, time.gmtime()), 'BS', jobid,
"bootid=%s" % bootid)
"bootid=%s blocked=%s" % (bootid, reboot_info['blocked']))
with open(os.path.join(ACCOUNTING_LOG_PATH, accounting_log_filename), "a+") as acc_file:
acc_file.write(accounting_start_msg + '\n')
logger.info("%s", accounting_start_msg)
start = time.time()
reboot_info['start'] = start
compact_nodes_to_modify = compact_num_list(nodes_to_modify)
try:
if not reset_modes(compact_nodes_to_modify, mcdram_mode, numa_mode,
......@@ -285,7 +296,9 @@ def main():
reboot_info['successful'] = success
finally:
reboot_info['boot_time'] = int(time.time() - start)
end = time.time()
reboot_info['end'] = end
reboot_info['boot_time'] = int(end - start)
accounting_end_msg = ACCOUNTING_MSG_FMT % (time.strftime(ACCOUNTING_DATE_FMT, time.gmtime()), 'BE', jobid,
dict_to_keyval_str(reboot_info))
with open(os.path.join(ACCOUNTING_LOG_PATH, accounting_log_filename), "a+") as acc_file:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment