client_utils.py 45.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
"""
This module defines Cobalt Client utility functions.
The contents are:
   - Functions that abstract component interaction
   - Common client utility functions
   - Argument Parsing callback functions. These functions are prefixed by 'cb_' 
     and are defined at the end of the module.
"""
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
import sys
import pwd
import os.path
import xmlrpclib
import ConfigParser
import re
import logging
18
import time
19
import json
20
21

import Cobalt.Util
George Rojas's avatar
George Rojas committed
22
from Cobalt.Proxy import ComponentProxy
23
24
from Cobalt.Util import parse_geometry_string
from Cobalt.arg_parser import ArgParse
George Rojas's avatar
George Rojas committed
25
from Cobalt.Exceptions import ComponentLookupError, JobValidationError
26
27
28
29
30
31

logger   = None # Logging instance. setup_logging needs to be called first thing.

#
# Constant used for tagging what needs to be replaced with the current working directory
CWD_TAG     =  "<REPLACE WITH CWD>"
George Rojas's avatar
George Rojas committed
32
33
34
35
SYSMGR      = 'system'
QUEMGR      = 'queue-manager'
SLPMGR      = 'service-location'
SCHMGR      = 'scheduler'
36

George Rojas's avatar
George Rojas committed
37
38
39
40
41
42
43
#
# env Tags
COL_TAG = "<<*COL*>>"
EQL_TAG = "<<*EQL*>>"
ESC_COL = "\:"
ESC_EQL = "\="

rojas's avatar
rojas committed
44
45
46
47
48
49
# posix return statuses 
AUTH_FAIL       = 2
BAD_OPTION_FAIL = 3
GENERAL_FAIL    = 1
SUCCESS         = 0

George Rojas's avatar
George Rojas committed
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class Logger(object):
    """
    This class will handle logging to standard error or standard out
    for info will go to stdout everything else will go to standard error
    """

    def __init__(self, level):
        """
        Cobalt Client log commands
        """
        self.h_stdout = logging.StreamHandler(sys.stdout)
        self.h_stdout.setLevel(logging.INFO)
        self.stdout_logger = logging.getLogger('cobalt_client_stdout')
        self.stdout_logger.addHandler(self.h_stdout)
        self.stdout_logger.setLevel(level)

        self.h_stderr = logging.StreamHandler(sys.stderr)
        self.h_stderr.setLevel(logging.DEBUG)
        self.stderr_logger  = logging.getLogger('cobalt_client_stderr')
        self.stderr_logger.addHandler(self.h_stderr)
        self.stderr_logger.setLevel(level)

        Cobalt.Util.logger = self.stderr_logger

    def setLevel(self, level):
        """
        set logging level
        """
        self.stdout_logger.setLevel(level)
        self.stderr_logger.setLevel(level)

    def debug(self, msg, *args, **kwargs):
        """
        print debug message
        """
        self.stderr_logger.debug(msg, *args, **kwargs)
    
    def info(self, msg, *args, **kwargs):
        """
        print info message
        """
        self.stdout_logger.info(msg, *args, **kwargs)

    def warn(self, msg, *args, **kwargs):
        """
        print warning message
        """
        self.stderr_logger.warn(msg, *args, **kwargs)

    def warning(self, msg, *args, **kwargs):
        """
        print warning message
        """
        self.stderr_logger.warn(msg, *args, **kwargs)

    def error(self, msg, *args, **kwargs):
        """
        print error message
        """
        self.stderr_logger.error(msg, *args, **kwargs)

    def critical(self, msg, *args, **kwargs):
        """
        print critical message
        """
        self.stderr_logger.critical(msg, *args, **kwargs)

    def fatal(self, msg, *args, **kwargs):
        """
        print fatal message
        """
        print str(kwargs)
        if not kwargs:
            self.stderr_logger.fatal(msg, *args, exc_info = True)
        else:
            self.stderr_logger.fatal(msg, *args, **kwargs)

127
128
129
130
131
132
133
class client_data(object):
    """
    This class defines global client data used for persistence.
    This class has no methods and is meant to be used as a singleton and not instances should be created.
    """
    curr_cmd = ''   # current client command 

George Rojas's avatar
George Rojas committed
134
135
136
137
138
    components = {
        QUEMGR : {'conn' : None, 'defer' : True}, 
        SYSMGR : {'conn' : None, 'defer' : True},
        SCHMGR : {'conn' : None, 'defer' : True}, 
        SLPMGR : {'conn' : None, 'defer' : True} }
139

George Rojas's avatar
George Rojas committed
140
141
142
143
144
145
146
147
def print_usage(parser, errmsg = "No arguments or options provided"):
    """
    Will print the usage and a specified error message if provided
    """
    parser.parser.print_usage()
    if errmsg is not None:
        logger.error(errmsg+'\n')

George Rojas's avatar
George Rojas committed
148
149
150
151
def component_call(comp_name, defer, func_name, args, exit_on_error = True):
    """
    This function is calls a function on another component and handle XML RPC faults
    gracefully, and other faults with something other than a traceback.
George Rojas's avatar
George Rojas committed
152
153
154
    The default is to EXIT ON ERROR.

    If exit_on_error is False then this function will raise and log only when debug level is set.
George Rojas's avatar
George Rojas committed
155
    """
George Rojas's avatar
George Rojas committed
156
157
158
159
160
161
162
163
164
165
166
167

    def component_error(msg, *args, **kwargs):
        """
        Common component_call error handling
        """
        errmsg    = 'component error: ' + msg
        if exit_on_error:
            logger.error(errmsg, *args, **kwargs)
            sys.exit(1)
        logger.debug(errmsg, *args, **kwargs)
        raise
        
George Rojas's avatar
George Rojas committed
168
    debug_msg = 'component: "%s.%s", defer: %s\n  %s(\n' % (comp_name, func_name, str(defer), func_name)
169

George Rojas's avatar
George Rojas committed
170
171
172
173
    for arg in args:
        debug_msg += '     %s,\n' % (str(arg),)
    debug_msg += '     )\n\n'
    logger.debug(debug_msg)
174

George Rojas's avatar
George Rojas committed
175
176
    if client_data.components[comp_name]['conn'] is None or \
       client_data.components[comp_name]['defer'] != defer:
177
        try:
George Rojas's avatar
George Rojas committed
178
            comp = ComponentProxy(comp_name, defer = defer)
179
        except ComponentLookupError:
George Rojas's avatar
George Rojas committed
180
            component_error("Failed to connect to %s\n", comp_name)
George Rojas's avatar
George Rojas committed
181
        except Exception, e:
George Rojas's avatar
George Rojas committed
182
183
            component_error("Following exception occured in %s: %s\n", comp_name, e)

George Rojas's avatar
George Rojas committed
184
185
186
187
        client_data.components[comp_name]['conn']  = comp
        client_data.components[comp_name]['defer'] = defer
    else:
        comp = client_data.components[comp_name]['conn']
188

George Rojas's avatar
George Rojas committed
189
    retVal = None
190
    try:
George Rojas's avatar
George Rojas committed
191
192
        func = getattr(comp,func_name)
        retVal = func(*args)
George Rojas's avatar
George Rojas committed
193
    except xmlrpclib.Fault, fault:
George Rojas's avatar
George Rojas committed
194
        component_error("XMLRPC failure %s in %s.%s\n", fault, comp_name, func_name)
195
    except Exception, e:
George Rojas's avatar
George Rojas committed
196
        component_error("Following exception occured while trying to execute %s.%s: %s\n", comp_name, func_name, e)
George Rojas's avatar
George Rojas committed
197

George Rojas's avatar
George Rojas committed
198
    return retVal
199

200
def run_jobs(jobs,location,user):
201
    """
202
    run jobs
203
    """
George Rojas's avatar
George Rojas committed
204
    part_list = component_call(SYSMGR, True, 'get_partitions', ([{'name': location}],))
205
    if len(part_list) != 1:
George Rojas's avatar
George Rojas committed
206
        logger.error("cannot find partition named '%s'" % location)
207
        sys.exit(1)
George Rojas's avatar
George Rojas committed
208
    return component_call(QUEMGR, True, 'run_jobs', (jobs, location.split(':'), user))
209

210
def add_queues(jobs,parser,user,info):
211
    """
212
    add queues
213
    """
George Rojas's avatar
George Rojas committed
214
    existing_queues = component_call(QUEMGR, True, 'get_queues', (info,))
215
216
217
218
219
220
221
    if [qname for qname in parser.args if qname in
        [q.get('name') for q in existing_queues]]:
        logger.error('queue already exists')
        sys.exit(1)
    elif len(parser.args) < 1:
        logger.error('Must specify queue name')
        sys.exit(1)
George Rojas's avatar
George Rojas committed
222
    response = component_call(QUEMGR, True, 'add_queues', (jobs, user))
223
224
225
    datatoprint = [('Added Queues', )] + [(q.get('name'), ) for q in response]
    print_tabular(datatoprint)
    return response
226

227
def del_queues(jobs,force,user):
228
    """
229
    delete queue
230
    """
231
232
    response = []
    
George Rojas's avatar
George Rojas committed
233
234
235
236
    response = component_call(QUEMGR, True, 'del_queues',(jobs, force, user))
    datatoprint = [('Deleted Queues', )] + \
                  [(q.get('name'), ) for q in response]
    print_tabular(datatoprint)
237
    return response
238

239
240
241
242
243
244
def set_scores(score, jobids, user):
    """
    reset the score of a job to zero to defer it.
    """
    specs = [{'jobid':jobid} for jobid in jobids]

George Rojas's avatar
George Rojas committed
245
    response = component_call(QUEMGR, True, 'adjust_job_scores', (specs, str(score), user))
246
247
248
249
250
251
252

    if not response:
        logger.info("no jobs matched")
    else:
        dumb = [str(_id) for _id in response]
        logger.info("updating scores for jobs: %s" % ", ".join(dumb))

George Rojas's avatar
George Rojas committed
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def get_cp_option(section, option):
    """
    get a config parser option from the config parser 
    """
    try:
        CP = read_config()
        opt = CP.get(section, option)
    except ConfigParser.NoOptionError:
        opt = None
    except ConfigParser.NoSectionError:
        logger.error("No section %s in Cobalt config" % section)
        sys.exit(1)
    except:
        logger.error("Unknown error when getting config option")
        sys.exit(1)
    return opt

def get_cqm_option(cqm_option):
    """
    get a cqm option from the config parser 
    """
    cqm_opt = get_cp_option('cqm', cqm_option)
    return cqm_opt.split(":") if cqm_opt else None

277
278
279
280
281
282
class header_info(object):
    """
    Class to organize the header type information
    """
    # define headers, long_header is used to query the queue-manager
    default_header = ['JobID','User','WallTime','Nodes','State','Location']
283

284
285
    full_header = ['JobID','JobName','User','WallTime','QueuedTime','RunTime','TimeRemaining','Nodes','State',
                   'Location','Mode','Procs','Preemptable','Queue','StartTime','Index']
286

287
288
289
    long_header = ['JobID','JobName','User','WallTime','QueuedTime','RunTime','TimeRemaining','Nodes','State',
                   'Location','Mode','Procs','Preemptable','User_Hold','Admin_Hold','Queue','StartTime','Index',
                   'SubmitTime','Path','OutputDir','ErrorPath','OutputPath','Envs','Command','Args','Kernel',
290
                   'KernelOptions', 'ION_Kernel', 'ION_KernelOptions', 'Project','Dependencies','short_state','Notify','Score','Maxtasktime','attrs',
291
                   'dep_frac','user_list','Geometry']
292

293
294
    custom_header      = None
    custom_header_full = None
295

296
    header  = None
297

298
299
300
301
302
303
304
    def __init__(self,parser):
        """
        Get header information 
        """
        # check for custom header, first in cobalt.conf, env, then in --header
        self.custom_header      = get_cqm_option('cqstat_header')
        self.custom_header_full = get_cqm_option('cqstat_header_full')
305

306
307
308
309
310
311
312
313
        if 'CQSTAT_HEADER' in os.environ.keys():
            self.custom_header = os.environ['CQSTAT_HEADER'].split(':')
        elif 'QSTAT_HEADER' in os.environ.keys():
            self.custom_header = os.environ['QSTAT_HEADER'].split(':')
        if 'CQSTAT_HEADER_FULL' in os.environ.keys():
            self.custom_header_full = os.environ['CQSTAT_HEADER_FULL'].split(':')
        elif 'QSTAT_HEADER_FULL' in os.environ.keys():
            self.custom_header_full = os.environ['QSTAT_HEADER_FULL'].split(':')
314

315
316
        if parser.options.header != None:
            self.custom_header = parser.options.header
317

318
        if parser.options.Q != None:
319
            self.header = ['Name','Users','Groups','MinTime','MaxTime','MaxRunning',
George Rojas's avatar
George Rojas committed
320
                           'MaxQueued','MaxUserNodes','MaxNodeHours','TotalNodes','State']
321
322
323
324
325
326
327
328
329
330
331
        elif parser.options.full != None and parser.options.long != None:
            self.header = self.long_header
        elif parser.options.full and self.custom_header_full != None:
            self.header = self.custom_header_full
        elif parser.options.full and parser.options.long == None:
            self.header = self.full_header
        elif self.custom_header != None:
            self.header = self.custom_header
        else:
            self.header = self.default_header

332
def sleep(t):
333
    """
334
    Wrap the Util sleep function
335
    """
336
    Cobalt.Util.sleep(t)
337
338
339
340
341
342
343

def sec_to_str(t):
    """
    sec_to_str abstract the util verion incase we want to modify it.
    """
    return Cobalt.Util.sec_to_str(t)

George Rojas's avatar
George Rojas committed
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def get_timeformat(runtime, dayf = False):
    """
    Return the seconds into time format
    """
    days = 0
    minutes, seconds = divmod(runtime, 60)
    hours, minutes   = divmod(minutes, 60)
    if dayf:
        days, hours      = divmod(hours,24)
    if days < 1:
        ret = ( "%02d:%02d:%02d" % (hours, minutes, seconds) )
    else:
        ret = ( "%dd %02d:%02d:%02d" % (days, hours, minutes, seconds))
    return ret

def get_elapsed_time(starttime, endtime, dayf = False):
George Rojas's avatar
George Rojas committed
360
361
362
363
    """
    returns hh:mm:ss elapsed time string from start and end timestamps
    """
    runtime = endtime - starttime
George Rojas's avatar
George Rojas committed
364
    return get_timeformat(runtime, dayf)
George Rojas's avatar
George Rojas committed
365

366
367
368
369
370
371
def print_tabular(rows):
    """
    print tabular abstract the util verion incase we want to modify it.
    """
    Cobalt.Util.print_tabular(rows)

372
def printTabular(rows, centered = None, with_header_info=True):
373
374
375
    """
    print tabular abstract the util verion incase we want to modify it.
    """
George Rojas's avatar
George Rojas committed
376
    _centered = [] if centered is None else centered
377
    Cobalt.Util.printTabular(rows, _centered, with_header_info)
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395

def print_vertical(rows):
    """
    print veritical abstract the util verion incase we want to modify it.
    """
    Cobalt.Util.print_vertical(rows)

def merge_nodelist(nodelist):
    """
    merge nodelist abstract the util verion incase we want to modify it.
    """
    return Cobalt.Util.merge_nodelist(nodelist)

def validate_jobid_args(parser):
    """
    Validate jobids command line arguments.
    """
    if parser.no_args():
George Rojas's avatar
George Rojas committed
396
        print_usage(parser, "No Jobid(s) given")
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
        sys.exit(1)

    # get jobids from the argument list
    jobids = get_jobids(parser.args)

    return jobids

def hold_release_command(doc_str,rev_str,ver_str):
    """
    This function is used by qrls and qhold commands to release or hold jobs.
    """
    # setup logging for client 
    setup_logging(logging.INFO)

    # no other commands other than qhold and qrls can used this function
    if client_data.curr_cmd != 'qrls' and client_data.curr_cmd != 'qhold':
        logger.error('This function only works for "qhold" and "qrls" commands')
        sys.exit(1)

    # list of callback with its arguments
    callbacks = [ [ cb_debug, () ] ]

    # Get the version information
    opt_def =  doc_str.replace('__revision__',rev_str)
    opt_def =  opt_def.replace('__version__',ver_str)

    parser = ArgParse(opt_def,callbacks)

    user = getuid()

    # Set required default values: None

    parser.parse_it() # parse the command line

    all_jobs       = validate_jobid_args(parser)
    check_specs    = [{'tag':'job', 'user':user, 'jobid':jobid, 'user_hold':'*'} for jobid in all_jobs]
George Rojas's avatar
George Rojas committed
433
    check_response = component_call(QUEMGR, False, 'get_jobs', (check_specs,))
434
435
436
437
438
439
440
441
442
443
444
445
    jobs_existed   = [j.get('jobid') for j in check_response]
    all_jobs       = all_jobs.union(set(jobs_existed))
    update_specs   = [{'tag':'job', 'user':user, 'jobid':jobid, 'user_hold':"*", 'is_active':"*"} for jobid in jobs_existed]

    if client_data.curr_cmd == 'qhold':
        updates = {'user_hold':True}
    elif client_data.curr_cmd == 'qrls':
        if parser.options.deps != None:
            updates = {'all_dependencies': []}
        else:
            updates = {'user_hold':False}

George Rojas's avatar
George Rojas committed
446
    update_response = component_call(QUEMGR, False, 'set_jobs', (update_specs,updates,user))
447
448
449
450
451
452

    if client_data.curr_cmd == 'qrls':
        if parser.options.deps != None:
            logger.info("   Removed dependencies from jobs: ")
            for j in update_response:
                logger.info("      %s" % j.get("jobid"))
George Rojas's avatar
George Rojas committed
453
            return # We are done exit
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489

    jobs_found     = [j.get('jobid') for j in update_response]
    jobs_not_found = list(all_jobs.difference(set(jobs_existed)))

    jobs_completed = [j.get('jobid') for j in update_response if j.get('has_completed')] + \
        list(set(jobs_existed).difference(set(jobs_found)))

    jobs_had_hold    = [j.get('jobid') for j in check_response if j.get('user_hold') and j.get('jobid') in jobs_found]
    jobs_active      = [j.get('jobid') for j in update_response if j.get('is_active')]

    # Initialize the following list as empty. The ones needed will not be empty after the following logic executes.
    pending_holds        = []
    jobs_no_hold         = []
    jobs_no_pending_hold = []

    if client_data.curr_cmd == 'qhold':

        pending_holds    = [j.get('jobid') for j in update_response if j.get('user_hold') and j.get('is_active')]
        unknown_failures = [j.get('jobid') for j in update_response if not j.get('user_hold') and 
                            j.get('jobid') not in jobs_completed + jobs_had_hold + jobs_active]

        # new holds and failed holds
        new_stuff    = [j.get('jobid') for j in update_response if j.get('user_hold') and j.get('jobid') not in jobs_had_hold]
        failed_stuff = list(all_jobs.difference(set(new_stuff)))
        msg_str1     = "Placed user hold on jobs: "
        msg_str2     = "   Failed to place user hold on jobs: "
        msg_str3     = "to place the 'user hold'"

    elif client_data.curr_cmd == 'qrls':

        jobs_no_hold         = list(set(jobs_found).difference(set(jobs_had_hold)))
        jobs_no_pending_hold = list(set(jobs_no_hold).intersection(set(jobs_active)))
        unknown_failures     = [j.get('jobid') for j in update_response if j.get('user_hold') and
                                j.get('jobid') not in jobs_completed + jobs_no_pending_hold + jobs_active]

        new_stuff    = [j.get('jobid') for j in update_response if not j.get('user_hold') and j.get('jobid') in jobs_had_hold]
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
        failed_stuff = list(all_jobs.difference(set(new_stuff)))
        msg_str1     = "   Removed user hold on jobs: "
        msg_str2     = "   Failed to remove user hold on jobs: "
        msg_str3     = "to release the 'user hold'"
        
        # set this back to a empty list so it does not get used 
        jobs_had_hold = []


    if not check_response and not update_response:
        logger.error("   No jobs found.")
        logger.error("Failed to match any jobs")
    else:
        logger.debug("Response: %s" % (update_response,))

    if len(failed_stuff) > 0:

        logger.info(msg_str2)

        for jobid in failed_stuff:

            if jobid in jobs_not_found:
                logger.info("      job %s not found" % (jobid,))

            elif jobid in jobs_completed:
                logger.info("      job %s has already completed" % (jobid,))

            elif jobid in jobs_had_hold:
                if jobid in pending_holds:
                    logger.info("      job %s already has a pending 'user hold'" % (jobid,))
                else:
                    logger.info("      job %s already in state 'user hold'" % (jobid,))

            elif jobid in jobs_no_pending_hold:
                logger.info("      job %s is already active and does not have a pending 'user hold'" % (jobid,))

            elif jobid in jobs_active:
                logger.info("      job %s is already active" % (jobid,))

            elif jobid in jobs_no_hold:
                logger.info("      job %s does not have a 'user hold'" % (jobid,))

            elif jobid in unknown_failures:
                logger.info("      job %s encountered an unexpected problem while attempting %s" % (jobid,msg_str3))

            else:
                logger.error("job %s not properly categorized" % (jobid,))
                sys.exit(1)

    if len(new_stuff) > 0:
        logger.info(msg_str1)
        for jobid in new_stuff:
            if client_data.curr_cmd == 'qhold':
                if jobid in pending_holds:
                    logger.info("      %s (pending)" % (jobid,))
                else:
                    logger.info("      %s" % (jobid,))
            else:
                logger.info("      %s" % (jobid,))


def get_options(spec,opts,opt2spec,parser):
    """
    Get the parser values and store them in opts and spec
    """
    # keep track of how many options the user specified
    opt_count = 0

    # opts defaults. these will not be put into spec
    opts['version'] = False
    opts['debug']   = False

    destdic = {} # keep track of options that point to the same destination string

    if client_data.curr_cmd == 'qsub':
        opts['forcenoval'] = False # not used at all, but assign it for old qsub

    # go through all the options
    for optstr, deststr, optval in parser:
        if optstr == 'help': continue # skip this option
        if optstr == 'debug':
            if optval != None: opts[optstr] = True
            continue # do not assign it to spec dictionary

        if optval != None: # Option in command line or has default value

            # opts already been assigned in a callback function or elsewhere, so do not overwrite
            if optstr not in opts: 
                opts[optstr]  = optval

            skip_list = ['verbose','version', 'force','res_id','cycle_id','force_id','modify_res']

            if (deststr not in destdic) and (optstr not in skip_list):
                spec[deststr]     = optval
                destdic[deststr]  = True
                opt_count        += 1
586

587
588
589
        else: # Option not in command line

            # need the default specified in spec for these options (qsub)
590
            if optstr in ['queue', 'kernel', 'ion_kernel', 'cwd'] and client_data.curr_cmd == 'qsub':
591
592
593
594
595
596
597
598
599
600
601
                opts[optstr] = spec[deststr]

            # for option attrs the default is an empty dictionary (qsub, qalter)
            elif optstr == 'attrs' and client_data.curr_cmd in ['qsub','qalter']: 
                opts[optstr] = {}

            # no option in the command line so assign opts to false
            else:
                opts[optstr] = False

        # opts to job spec keys
602
        opt2spec[optstr] = deststr
603

604
605
606
    return opt_count

def getuid():
607
    """
608
    Get current user id 
609
    """
610
611
    user = pwd.getpwuid(os.getuid())[0] 
    return user
612

613
def getcwd():
614
    """
615
    Get current working directory
616
    """
617
    return os.getcwd()
618

619
def getpath():
620
    """
621
    Get the environment variable PATH
622
    """
623
    return os.environ['PATH']
624

625
def setumask(umask):
626
    """
627
    This function will setup logging
628
    """
629
630
631
632
    # set umask
    os.umask(umask)

def setup_logging(level):
633
    """
634
    Will setup standard logging for the current client.
635
    """
636
    global logger
637

638
639
640
    if hasattr(logger, 'already_setup'):
        return

George Rojas's avatar
George Rojas committed
641
    logger               = Logger(level)
642
    logger.already_setup = True
George Rojas's avatar
George Rojas committed
643
    client_data.curr_cmd = os.path.split(sys.argv[0])[1].replace('.py','')
644

645
646
647
648
649
650
651
652
def read_config():
    """
    This function will read the Cobalt Config files
    """
    CP = ConfigParser.ConfigParser()
    CP.read(Cobalt.CONFIG_FILES)
    return CP

653
def validate_geometry(geometry,nodes):
654
    """
655
    This will validate the geometry for the specified job
656
657
    """
    try:
658
659
660
661
        Cobalt.Util.validate_geometry(geometry, nodes)
    except JobValidationError as err:
        logger.error(err.message)
        logger.error( "Jobs not altered.")
662
        sys.exit(1)
663
664
665
    except:
        logger.error("Invalid Geometry")
        sys.exit(1)
666

667
def system_info():
668
    """
669
    This function will return the system and and job types information
670
671
    """
    try:
672
673
        CP = read_config()
        sys_type = CP.get('bgsystem', 'bgtype')
674
675
676
677
678
    except:
        sys_type = 'bgl'
    
    if sys_type == 'bgp':
        job_types = ['smp', 'co', 'dual', 'vn', 'script']
679
    if sys_type == 'bgq':
rojas's avatar
rojas committed
680
        job_types = ['c1', 'c2', 'c4', 'c8', 'c16', 'c32', 'c64', 'script', 'interactive']
681
    else:
rojas's avatar
rojas committed
682
        job_types = ['co', 'vn', 'script', 'interactive']
683
    return (sys_type,job_types)
684

685
def get_jobids(args):
686
    """
687
    This function will return the list of jobids in the argument list
688
    """
689
690
691
692
693
694
695
696
697
    jobids = set()
    for i in range(len(args)):
        if args[i] == '*':
            jobid = args[i]
            continue
        try:
            jobid = int(args[i])
        except:
            logger.error("jobid must be an integer: %s",str(args[i]))
698
            sys.exit(1)
699
700
        jobids.add(jobid)
    return jobids
701

702
def get_filters():
703
    """
704
    This function current filters
705
    """
706
707
708
709
    filters = get_cqm_option('filters')
    if filters == None:
        filters = []
    return filters
710

711
def process_filters(filters, spec):
712
    """
713
    Process the specified filters to spec
714
    """
715
    for filt in filters:
George Rojas's avatar
George Rojas committed
716
717
718
719
720
        try:
            Cobalt.Util.processfilter(filt, spec)
        except Exception, e:
            logger.error("Filter failure: please contact Administrator: %s: %s", e, filt)
            sys.exit(1)
721

722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
def validate_conflicting_options(parser, option_lists):
    """
    This function will validate that the list of passed options are mutually exclusive
    """
    errmsg = [] # init error msessage to empty string
    for mutex_option_list in option_lists:
        optc  = 0
        for mutex_option in mutex_option_list:
            if getattr(parser.options, mutex_option) != None:
                errmsg.append(mutex_option)
                optc += 1
        if optc > 1:
            errmsg = 'Option combinations not allowed with: %s option(s)' % ", ".join(errmsg[1:])
            logger.error(errmsg)
            sys.exit(1)

738
739
740
741
742
743
744
745
746
747
748
749
def parse_datetime(datetime_str):
    """
    wrapper for Util.parse_datetime
    """
    return Cobalt.Util.parse_datetime(datetime_str)

def cobalt_date(date):
    """
    Convert date to Cobalt format
    """
    return time.strftime('%Y_%m_%d-%H:%M', date)

750
def boot_block(block, user, jobid, resid=None):
rojas's avatar
rojas committed
751
    """
752
    utility to boot specified block.  This is always doing this on behalf of a process outside of Cobalt's server components.
rojas's avatar
rojas committed
753
    """
754
755
756
757
758
    #set a timeout pull from config file.  Default will be 5 mintues after termination.
    Cobalt.Util.init_cobalt_config()
    timeout = int(Cobalt.Util.get_config_option('bgsystem', 'terminal_boot_timeout' , 300))

    success = component_call(SYSMGR, False, 'initiate_proxy_boot', (block, user, jobid, resid, timeout), False)
rojas's avatar
rojas committed
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
    if not success:
        logger.error("Boot request for block %s failed authorization." % (block, ))
        return AUTH_FAIL
    #give the system component a moment to initiate the boot
    sleep(3)
    #wait for block to boot
    failed = SUCCESS
    found = False
    while True:
        boot_id, status, status_strings = component_call(SYSMGR, False, 'get_boot_statuses_and_strings', (block,))
        if not found:
            if boot_id != None:
                found = True
        else:
            if status_strings != [] and status_strings != None:
                print "\n".join(status_strings)
            if status in ['complete', 'failed']:
                component_call(SYSMGR, False, 'reap_boot', (block,))
                if status == 'failed':
778
                    failed = GENERAL_FAIL
rojas's avatar
rojas committed
779
780
781
                break
        sleep(1)
    if failed:
782
        logger.error("Boot for location %s failed."% (block,))
rojas's avatar
rojas committed
783
    else:
784
        logger.info("Boot for location %s complete."% (block,))
rojas's avatar
rojas committed
785
786
    return failed

787
#
788
789
790
# Callback fucntions for argument parsing defined below
#

791
def cb_debug(option, opt_str, value, parser, *args):
792
793
794
795
    """
    Set debug mode for logging
    """
    logger.setLevel(logging.DEBUG)
796
797
798
799
800
801

    # log the command line arguments for the current command
    cmdinfo = os.path.split(sys.argv[0])
    args    = '\n'+cmdinfo[1] + ' ' + ' '.join(sys.argv[1:])+'\n'
    logger.debug(args)

802
    setattr(parser.values, option.dest, True) # set the option
803

804
def cb_nodes(option, opt_str, value, parser, *args):
805
806
807
    """
    This callback will validate value is greater than zero and store it.
    """
808
    type_int = args[0]
809
    try:
810
811
        CP = read_config()
        sys_size = int(CP.get('system', 'size'))
812
813
814
815
816
817
    except:
        sys_size = 1024
    if not 0 < value <= sys_size:
        logger.error("node count out of realistic range")
        sys.exit(1)

818
819
820
821
822
    if type_int:
        _value = value
    else:
        _value = str(value)
    setattr(parser.values,option.dest,_value) # set the option
823
824
825
826
827

def cb_gtzero(option,opt_str,value,parser,*args):
    """
    Validate the value entered is greater than zero
    """
828
    type_int = args[0]
829
830
831
832
    if value <= 0:
        logger.error(opt_str + " is " + str(value) + " which is greater <= to zero")
        sys.exit(1)

833
834
835
836
837
    if type_int:
        _value = value
    else:
        _value = str(value)
    setattr(parser.values,option.dest,_value) # set the option
838

839
840
841


def cb_score(option, opt_str, value, parser, *args):
842
843
844
845
846
847
848
849
    """
    Validate the value entered is greater than zero
    """
    try:
        _value = float(value)
    except:
        logger.error('%s is %s which is not number value' % (opt_str,value))
        sys.exit(1)
850
    setattr(parser.values,option.dest,str(value)) # set the option
851

852
853
854
855
856
857
def cb_time(option,opt_str,value,parser,*args):
    """
    This callback will validate the time convert it to minutes and store it.
    """
    dt_allowed = args[0] # delta time flag
    seconds    = args[1] # convert to seconds if true
858
    type_int   = args[2] # return int
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
    _time      = value

    # default the flags to false
    if value[0] in ['+','-'] and dt_allowed:
        _time = value[1:]
        parser.__timeop__ = value[0]
            
    # ensure time is actually in minutes
    try:
        minutes = Cobalt.Util.get_time(_time)
    except Cobalt.Exceptions.TimeFormatError, e:
        logger.error("invalid time specification: %s" % e.args[0])
        sys.exit(1)

    if seconds:
        _time = 60*minutes
    else:
        _time = minutes

878
879
880
881
    if not type_int:
        _time = str(_time)

    setattr(parser.values, option.dest, _time) # set the option
882
883
884
885
886
887
888
889
890
891
892
893

def cb_umask(option,opt_str,value,parser,*args):
    """
    Convert the umask octal string value to int
    """
    try:
        um = int(value,8)
    except:
        logger.error("Invalid umask value %s",value)
        sys.exit(1)
    setattr(parser.values,option.dest,um) # set the option

George Rojas's avatar
George Rojas committed
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
def _check_dependencies(dependency_string):

    if dependency_string.lower() == 'none':
        #we are removing all job dependencies.
        logger.info("Removing job dependencies")
        return

    deps = set(dependency_string.split(":"))
    
    query = []
    for dep in deps:
        try:
            query.append({"jobid": int(dep)})
        except:
            pass
    
    jobs = component_call(QUEMGR, True, 'get_jobs', (query,))
    
    job_ids = set( [str(j["jobid"]) for j in jobs] )
    
    missing = deps.difference(job_ids)
    
    if missing:
        logger.error("WARNING: dependencies %s do not match jobs currently in the queue" % ":".join(missing))

919
920
921
922
def cb_upd_dep(option,opt_str,value,parser,*args):
    """
    check and update dependencies
    """
George Rojas's avatar
George Rojas committed
923
    _check_dependencies(value)
924
    deps = value.split(":")
925
926
    if deps[0].lower() == "none":
        deps = []
927
928
929
930
931
932
    setattr(parser.values,option.dest,deps) # set the option 

def cb_dep(option,opt_str,value,parser,*args):
    """
    check and set dependencies.
    """
George Rojas's avatar
George Rojas committed
933
    _check_dependencies(value)
934
935
936
937
938
939
940
    setattr(parser.values,option.dest,value) # set the option 

def cb_split(option,opt_str,value,parser,*args):
    """
    split string according to passed delimiter
    """
    delim = args[0] # delimiter to use for splitting the string value
941
    split_value = [field for field in value.split(delim)]
942

943
944
945
946
947
948
    setattr(parser.values,option.dest,split_value) # set the option 

def cb_env(option,opt_str,value,parser,*args):
    """
    This callback will validate the env variables and store them.
    """
George Rojas's avatar
George Rojas committed
949
950
951
952
    opts   = args[0]
    _env   = {}
    _value = value.replace(ESC_COL, COL_TAG).replace(ESC_EQL, EQL_TAG)
    key_value_pairs = [item.split('=', 1) for item in re.split(r':(?=\w+\b=)', _value)]
953
954
955
956
957
    for kv in key_value_pairs:
        if len(kv) != 2:
            logger.error( "Improperly formatted argument to env : %r" % kv)
            sys.exit(1)
    for key, val in key_value_pairs:
George Rojas's avatar
George Rojas committed
958
959
        _env.update({key:val.replace(COL_TAG,':').replace(EQL_TAG,'=')})
    
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
    setattr(parser.values,option.dest,_env) # set the option
    opts['env'] = value
 
def cb_path(option,opt_str,value,parser,*args):
    """
    This callback will validate the path and store it.
    """
    opts     = args[0]
    use_cwd  = args[1]
    _path = value
    if not _path.startswith('/') and use_cwd:
        _path = CWD_TAG + '/' + value
    else:
        # validate the path
        if not os.path.isdir(os.path.dirname(_path)):
            logger.error("directory %s does not exist" % _path)
            sys.exit(1)
    setattr(parser.values,option.dest,_path) # set the option
    optstr = option.get_opt_string().replace('-','')
    opts[optstr] = value

def cb_geometry(option,opt_str,value,parser,*args):
    """
    This callback will validate the geometry value and store it
    """
    opts = args[0]
    try:
        geom_str = ''
        geom_str = parse_geometry_string(value)
    except:
        logger.error("Invalid geometry entered: %s" % geom_str)
        sys.exit(1)
    setattr(parser.values,option.dest,geom_str) # set the option
    opts[option.dest] = value

995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
def cb_bgq_geo(option,opt_str,value,parser,*args):
    """
    This callback will validate the bgq geometry value and store it
    """
    geo_list = None
    match = Cobalt.Util.bgq_node_geo_re.match(value)
    if match == None:
        logger.error("Invalid Geometry. Geometry must be in the form of AxBxCxDxE")
        sys.exit(1)
    geo_list = [int(nodect) for nodect in match.groups()]
    setattr(parser.values,option.dest,geo_list)

1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
def cb_attrs(option,opt_str,value,parser,*args):
    """
    This callback will validate the attributes specified and store them.
    """
    _val = getattr(parser.values,option.dest)
    if _val != None:
        logger.error("Multiple --attrs options not supported.  Specify multiple attributes as follows: --attrs FOO=1:BAR=2")
        sys.exit(1)

    newoptsattrs = {}
    for attr in value.split(":"):
        if len(attr.split("=")) == 2:
            key, value = attr.split("=")
            newoptsattrs.update({key:value})
        elif len(attr.split("=")) == 1:
            if attr[:3] == "no_":
                newoptsattrs.update({attr[3:]:"false"})
            else:
                newoptsattrs.update({attr:"true"})
        else:
            logger.error( "Improperly formatted argument to attrs : %s" % attr)
            sys.exit(1)
    setattr(parser.values,option.dest,newoptsattrs) # set the option
George Rojas's avatar
George Rojas committed
1030
1031

def _validate_users(users):
1032
    """
George Rojas's avatar
George Rojas committed
1033
    This function will validate the user list
1034
    """
George Rojas's avatar
George Rojas committed
1035
    user_list = [auth_user for auth_user in users.split(':')]  
1036
1037
1038
1039
1040
1041
    for auth_user in user_list:
        try:
            pwd.getpwnam(auth_user)
        except KeyError:
            logger.error("user %s does not exist." % auth_user)
            sys.exit(1)
George Rojas's avatar
George Rojas committed
1042
1043
        except Exception, e:
            logger.error("UNKNOWN FAILURE: user %s." % (auth_user,),e)
1044
            sys.exit(1)
George Rojas's avatar
George Rojas committed
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
    return user_list
    
def cb_user_list(option, opt_str, value, parser, *args):
    """
    This callback will validate the user list and store it.
    """
    opts = args[0] 
    user = getuid()
    user_list = _validate_users(value)
    if user not in user_list:
1055
        user_list.insert(0, user)
George Rojas's avatar
George Rojas committed
1056
    setattr(parser.values,option.dest,user_list)
1057
1058
    opts[option.dest] = value
    
George Rojas's avatar
George Rojas committed
1059
1060
1061
1062
1063
1064
1065
1066
1067
def cb_res_users(option, opt_str, value, parser, *args):
    """
    This callback will validate resevation users list and store it.
    """
    if value != '*':
        _validate_users(value)
    users = value
    setattr(parser.values, option.dest, users)
    
rojas's avatar
rojas committed
1068
def _set_mode(option, opt_str, value, parser, *args):
1069
    """
rojas's avatar
rojas committed
1070
    check mode
1071
1072
1073
    """
    (sys_type,job_types) = system_info()

George Rojas's avatar
George Rojas committed
1074
1075
1076
1077
1078
1079
    if parser.values.mode is not None:
        if parser.values.mode != value:
            logger.error("Mode already set to '%s' and trying to set it again to '%s'", parser.values.mode, value)
            sys.exit(1)
        return

1080
1081
1082
1083
1084
1085
1086
    mode = value

    if mode not in job_types:
        logger.error("Specifed mode '%s' not valid, valid modes are\n%s" % (mode, "\n".join(job_types)))
        sys.exit(1)
    if mode == 'co' and sys_type == 'bgp':
        mode = 'SMP'
rojas's avatar
rojas committed
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
    setattr(parser.values, 'mode', mode) # set the option

def cb_interactive(option, opt_str, value, parser, *args):
    """
    Callback to handle interactive mode
    """
    _set_mode(option, opt_str, 'interactive', parser, *args)

def cb_mode(option, opt_str, value, parser, *args):
    """
    This callback will validate an store the system mode.
    """
    _set_mode(option, opt_str, value, parser, *args)
1100
1101
1102
1103
1104
1105

def cb_cwd(option,opt_str,value,parser,*args):
    """
    validate current working directory
    """
    if not os.path.isdir(value):
George Rojas's avatar
George Rojas committed
1106
        logger.error("dir '%s' is not a directory" % value)
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
        sys.exit(1)
    setattr(parser.values,option.dest,value)

def cb_setqueues(option,opt_str,value,parser,*args):
    """
    get the set queue data from specified option
    """
    optstr = opt_str.replace('-','')
    #  only one set queue option is allowed at a time
    if hasattr(parser.values,'setq_opt'):
        prev_opt = parser.values.setq_opt
        logger.error('Only one option that sets queue data is allowed: options %s and %s encountered' % 
                     (prev_opt,optstr))
        sys.exit(1)

    if optstr == 'stopq':
        qdata = {'state':'stopped'}
    elif optstr == 'startq':
        qdata = {'state':'running'}
    elif optstr == 'drainq':
        qdata = {'state':'draining'}
    elif optstr == 'killq':
        qdata = {'state':'dead'}
    elif optstr == 'policy':
        qdata = {'policy':value}
    elif optstr == 'unsetq':
        qdata = {}
        for prop in value.split(' '):
            qdata[prop.lower()] = None
    elif optstr == 'setq':
        props = [p.split('=') for p in value.split(' ')]
        for p in props:
            if len(p) != 2:
                logger.error("Improperly formatted argument to setq : %r" % p)
                sys.exit(1)
        qdata = {}
        for prop, val in props:
            if prop.lower() in ['maxtime', 'mintime']:
                if val.count(':') in [0, 2]:
                    t = val.split(':')
                    for i in t:
                        try:
                            if i != '*':
                                dummy = int(i)
                        except:
                            logger.error(prop + ' value is not a number')
                            sys.exit(1)
                    if val.count(':') == 2:
                        t = val.split(':')
                        val = str(int(t[0])*60 + int(t[1]))
                    elif val.count(':') == 0:
                        pass
                else:
                    logger.error('Time for ' + prop + ' is not valid, must be in hh:mm:ss or mm format')
                    sys.exit(1)
            qdata.update({prop.lower():val})
    parser.values.qdata    = qdata
    parser.values.setq_opt = optstr


George Rojas's avatar
George Rojas committed
1167
def _setbool_attr(parser, opt_str, attr, true_opt_list):
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
    """
    Set the specified attr to true if opt string in the true option list.
    Will generate an error if the attr is already set.
    """
    if hasattr(parser.values,attr):
        val = getattr(parser.values,attr)
        if  val != None:
            logger.error("Attribute %s already set" % attr)
            sys.exit(1)
    if opt_str in true_opt_list:
        setattr(parser.values,attr,True)
    else:
        setattr(parser.values,attr,False)

def cb_hold(option,opt_str,value,parser,*args):
    """
George Rojas's avatar
George Rojas committed
1184
    handles the (user | admin) hold and release attributes
1185
    """
George Rojas's avatar
George Rojas committed
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
    hold_str = 'admin_hold'

    if opt_str.find('user') != -1:
        hold_str = 'user_hold'

    elif opt_str.find('admin') == -1:
        cp_opt = get_cp_option('client', 'allow_cqadm_hold_and_release_options')
        if cp_opt is None:
            allow = False
        else:
            allow = True if cp_opt.lower() == 'true' else False

        if not allow:
            logger.error('Options --hold and --release have been deprecated')
            sys.exit(1)

    _setbool_attr(parser, opt_str, hold_str, ['--hold', '--user-hold', '--admin-hold'])
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212

def cb_passthrough(option,opt_str,value,parser,*args):
    """
    handles the block_passthrough attribute
    """
    _setbool_attr(parser,opt_str,'block_passthrough',['--block_passthrough'])

        
def cb_date(option,opt_str,value,parser,*args):
    """
1213
    parse date
1214
1215
    """
    try:
1216
        _value     = value
1217
1218
        if args is not ():
            if _value.lower() == 'now':
1219
                allow_now = args[0]
1220
1221
                if not allow_now: 
                    raise
1222
                _value = cobalt_date(time.localtime(time.time()))
1223

1224
        starttime = parse_datetime(_value)
1225
        logger.info("Got starttime %s" % (sec_to_str(starttime)))
1226

1227
1228
    except Exception, e:
        logger.error("start time '%s. Error: %s' is invalid", value, e)
1229
1230
1231
        logger.error("start time is expected to be in the format: YYYY_MM_DD-HH:MM")
        sys.exit(1)
    setattr(parser.values,option.dest,starttime)
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244

def cluster_display_node_info():
    '''fetch informaion for display in nodeadm and nodelist for cluster systems.

    returns:
    header - headers to print out for display
    output - data to be displayed about nodes on a cluster system

    '''

    statuses = component_call(SYSMGR, False, 'get_node_status', ())
    queue_data = component_call(SYSMGR, False, 'get_queue_assignments', ())
    end_times_to_nodes = component_call(SYSMGR, False, 'get_backfill_windows', ())
1245
    reservations = component_call(SCHMGR, False, 'get_reservations', ([{'queue':'*', 'partitions':'*', 'active':True}],))
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257

    header = [['Host', 'Queue', 'State', 'Backfill']]
    #build output list
    output = []
    for status in statuses:
        host_name = status[0]
        status = status[1]
        queues = []
        backfill_time = '-'
        for queue in queue_data:
            if host_name in queue_data[queue]:
                queues.append(queue)
1258
1259
1260
1261
1262
        queues.sort()
        for res in reservations:
            if host_name in res['partitions'].split(':'):
                if res['queue'] not in queues:
                    queues.append(res['queue'])
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
        now = int(time.time()) #This comes back as a float in python
        for end_time in end_times_to_nodes:
            if int(end_time) == 0 or status != 'idle':
                pass
            elif host_name in end_times_to_nodes[end_time]:
                raw_backfill_time = max(0, int(end_time) - now)
                if raw_backfill_time <= 0:
                    backfill_time = "00:00:00"
                else:
                    backfill_time = "%02d:%02d:%02d" % (raw_backfill_time / 3600, (raw_backfill_time % 3600) / 60,
                            raw_backfill_time % 60)
        output.append([host_name, ":".join(queues), status, backfill_time])

    return header, output
1277
1278
1279



1280
1281
1282
1283
1284
def _extract_res_node_ranges(res_nodes):
    nids = []
    for nidlist in res_nodes:
        nids.extend(Cobalt.Util.expand_num_list(nidlist))
    return nids
1285

1286
1287
1288
1289
1290
1291
1292
def _setup_res_info():
    '''set up the reservation-to-node info so showres shows associated
    reservation queues when active.

    '''
    reservations = component_call(SCHMGR, False, 'get_reservations',
            ([{'queue':'*', 'partitions':'*', 'active':True}],))
1293
1294
    res_queues = {}
    for res in reservations:
1295
1296
        res_nodes = [str(nid) for nid in
                _extract_res_node_ranges(res['partitions'].split(':'))]
1297
1298
1299
1300
1301
        for node in res_nodes:
            if res_queues.get(node, []) == []:
                res_queues[node] = [res['queue']]
            else:
                res_queues[node].append(res['queue'])
1302
1303
    return res_queues

Paul Rich's avatar
Paul Rich committed
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318

def sec_to_human_time(raw_time):
    '''convert raw seconds into a D:HH:MM:SS format for pretty prinitng'''
    if raw_time <= 0:
        return "00:00"
    m, s = divmod(raw_time, 60)
    h, m = divmod(m, 60)
    d, h = divmod(h, 24)
    if d > 0:
        return '%d:%02d:%02d:%02d' % (d, h, m, s)
    elif h > 0:
        return '%d:%02d:%02d' % (h, m, s)
    else:
        return '%d:%02d' % (m, s)

1319
1320
1321
def print_node_list():
    '''fetch and print a list of node information with default headers'''

Paul Rich's avatar
Paul Rich committed
1322
1323
1324
1325
1326
1327
1328
    header = ['Node_id', 'Name', 'Queues', 'Status', 'Backfill']
    header_aliases = {'Backfill': 'drain_until'}
    fetch_header = list(header)
    for headding in fetch_header:
        if headding in header_aliases.keys():
            fetch_header[fetch_header.index(headding)] = header_aliases[headding]
    print fetch_header
1329
    nodes = json.loads(component_call(SYSMGR, False, 'get_nodes',
Paul Rich's avatar
Paul Rich committed
1330
            (True, None, fetch_header, True)))
1331
1332
1333
1334

    #TODO: Allow headers to be specified by the user in the future.
    if 'queues' in [h.lower() for h in header]:
        res_queues = _setup_res_info()
1335

Paul Rich's avatar
Paul Rich committed
1336
    now = int(time.time())
1337
1338
1339
1340
    if len(nodes) > 0:
        print_nodes = []
        for node in nodes.values():
            entry = []
Paul Rich's avatar
Paul Rich committed
1341
            for key in fetch_header:
1342
1343
                if key.lower() == 'node_id':
                    entry.append(int(node[key.lower()]))
Paul Rich's avatar
Paul Rich committed
1344
1345
1346
1347
1348
                elif key.lower() in ['drain_until']:
                    if node[key.lower()] is not None:
                        entry.append(sec_to_human_time(node[key.lower()] - now))
                    else:
                        entry.append('-')
1349
1350
1351
1352
1353
                elif key.lower() == 'queues':
                    queues = node[key.lower()]
                    if res_queues.get(str(node['node_id']), False):
                        queues.extend(res_queues[str(node['node_id'])])
                    entry.append(':'.join(queues))
1354
1355
                else:
                    entry.append(node[key.lower()])
1356
            print_nodes.append(entry)
1357
        printTabular([header] + sorted(print_nodes))
1358
1359
1360
1361
    else:
        logger.info('System has no nodes defined')

def print_node_details(args):
1362
1363
1364
1365
1366
    '''fetch and print a detailed view of node information

        args - list of nodes to fetch detailed information on.

    '''
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
    def gen_printable_value(value):
        if isinstance(value, dict):
            retval = ', '.join(['%s: %s'% (k, gen_printable_value(v)) for k, v in
                value.iteritems()])
        elif isinstance(value, list):
            retval = ', '.join([gen_printable_value(v) for v in value])
        else:
            retval = str(value)
        return retval

1377
1378
    nodes = component_call(SYSMGR, False, 'get_nodes',
            (True, expand_node_args(args)))
1379
    res_queues = _setup_res_info()
1380
1381
1382
1383
1384
1385
    for node in nodes.values():
        header_list = []
        value_list = []
        header_list.append('node_id')
        value_list.append(node['node_id'])
        for key, value in node.iteritems():
1386
            if key == 'node_id':