CraySystem.py 74.5 KB
Newer Older
1 2
"""Resource management for Cray ALPS based systems"""

3 4 5 6
import logging
import threading
import thread
import time
7
import sys
8
import xmlrpclib
9
import json
10
import ConfigParser
11 12
import Cobalt.Util
import Cobalt.Components.system.AlpsBridge as ALPSBridge
13
from Cobalt.Components.base import Component, exposed, automatic, query, locking
14 15
from Cobalt.Components.system.base_system import BaseSystem
from Cobalt.Components.system.CrayNode import CrayNode
16
from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
17
from Cobalt.Components.system.ALPSProcessGroup import ALPSProcessGroup
18
from Cobalt.Exceptions import ComponentLookupError
Paul Rich's avatar
Paul Rich committed
19
from Cobalt.Exceptions import JobNotInteractive
20
from Cobalt.Components.system.ALPSProcessGroup import ALPSProcessGroup
21
from Cobalt.Exceptions import JobValidationError
22
from Cobalt.DataTypes.ProcessGroup import ProcessGroup
23
from Cobalt.Util import compact_num_list, expand_num_list
24
from Cobalt.Util import init_cobalt_config, get_config_option
25

26
_logger = logging.getLogger(__name__)
27
init_cobalt_config()
28

29 30
UPDATE_THREAD_TIMEOUT = int(get_config_option('alpssystem', 'update_thread_timeout', 10))
TEMP_RESERVATION_TIME = int(get_config_option('alpssystem', 'temp_reservation_time', 300))
31
SAVE_ME_INTERVAL = float(get_config_option('alpsssytem', 'save_me_interval', 10.0))
32 33
#default 20 minutes to account for boot.
PENDING_STARTUP_TIMEOUT = float(get_config_option('alpssystem', 'pending_startup_timeout', 1200))
Paul Rich's avatar
Paul Rich committed
34
APKILL_CMD = get_config_option('alps', 'apkill', '/opt/cray/alps/default/bin/apkill')
35
DRAIN_MODE = get_config_option('system', 'drain_mode', 'first-fit')
36 37 38
#cleanup time in seconds
CLEANUP_DRAIN_WINDOW = get_config_option('system', 'cleanup_drain_window', 300)

39 40
#Epsilon for backfilling.  This system does not do this on a per-node basis.
BACKFILL_EPSILON = int(get_config_option('system', 'backfill_epsilon', 120))
41
ELOGIN_HOSTS = [host for host in get_config_option('system', 'elogin_hosts', '').split(':')]
42 43
if ELOGIN_HOSTS == ['']:
    ELOGIN_HOSTS = []
Paul Rich's avatar
Paul Rich committed
44
DRAIN_MODES = ['first-fit', 'backfill']
45
CLEANING_ID = -1
46 47
DEFAULT_MCDRAM_MODE = get_config_option('alpssystem', 'default_mcdram_mode', 'cache')
DEFAULT_NUMA_MODE = get_config_option('alpssystem', 'default_numa_mode', 'quad')
48
MCDRAM_TO_HBMCACHEPCT = {'flat':'0', 'cache':'100', 'split':'25', 'equal':'50', '0':'0', '25':'25', '50':'50', '100':'100'}
49 50
VALID_MCDRAM_MODES = ['flat', 'cache', 'split', 'equal', '0', '25', '50', '100']
VALID_NUMA_MODES = ['a2a', 'hemi', 'quad', 'snc2', 'snc4']
51 52


53 54 55 56 57 58 59 60 61 62
def chain_loc_list(loc_list):
    '''Take a list of compact Cray locations,
    expand and concatenate them.

    '''
    retlist = []
    for locs in loc_list:
        retlist.extend(expand_num_list(locs))
    return retlist

63

64
class CraySystem(BaseSystem):
65 66 67
    '''Cray/ALPS-specific system component.  Behaviors should go here.  Direct
    ALPS interaction through BASIL/other APIs should go through the ALPSBridge
    (or other bridge) module as appropriate.
68

69
    '''
70 71
    name = "system"
    implementation = "alps_system"
72
    logger = _logger
73

74
    def __init__(self, *args, **kwargs):
75 76 77 78
        '''Initialize system.  Read initial states from bridge.
        Get current state

        '''
79
        start_time = time.time()
80
        super(CraySystem, self).__init__(*args, **kwargs)
81
        _logger.info('BASE SYSTEM INITIALIZED')
82 83 84 85 86 87 88 89 90 91
        self._common_init_restart()
        _logger.info('ALPS SYSTEM COMPONENT READY TO RUN')
        _logger.info('Initilaization complete in %s sec.', (time.time() -
                start_time))

    def _common_init_restart(self, spec=None):
        '''Common routine for cold and restart intialization of the system
        component.

        '''
92 93 94 95 96
        try:
            self.system_size = int(get_config_option('system', 'size'))
        except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
            _logger.critical('ALPS SYSTEM: ABORT STARTUP: System size must be specified in the [system] section of the cobalt configuration file.')
            sys.exit(1)
97 98 99 100 101
        if DRAIN_MODE not in DRAIN_MODES:
            #abort startup, we have a completely invalid config.
            _logger.critical('ALPS SYSTEM: ABORT STARTUP: %s is not a valid drain mode.  Must be one of %s.',
                DRAIN_MODE, ", ".join(DRAIN_MODES))
            sys.exit(1)
102
        #initilaize bridge.
103 104 105 106
        bridge_pending = True
        while bridge_pending:
            # purge stale children from prior run.  Also ensure the
            # system_script_forker is currently up.
Paul Rich's avatar
Paul Rich committed
107 108
            # These attempts may fail due to system_script_forker not being up.
            # We don't want to trash the statefile in this case.
109 110 111 112
            try:
                ALPSBridge.init_bridge()
            except ALPSBridge.BridgeError:
                _logger.error('Bridge Initialization failed.  Retrying.')
113 114 115 116
                Cobalt.Util.sleep(10)
            except ComponentLookupError:
                _logger.warning('Error reaching forker.  Retrying.')
                Cobalt.Util.sleep(10)
117 118 119
            else:
                bridge_pending = False
                _logger.info('BRIDGE INITIALIZED')
120
        #process manager setup
121
        if spec is None:
Paul Rich's avatar
Paul Rich committed
122
            self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup)
123
        else:
Paul Rich's avatar
Paul Rich committed
124
            self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup).__setstate__(spec['process_manager'])
Paul Rich's avatar
Paul Rich committed
125
            self.logger.debug('pg type %s', self.process_manager.process_groups.item_cls)
126 127
        #self.process_manager.forkers.append('alps_script_forker')
        self.process_manager.update_launchers()
Paul Rich's avatar
Paul Rich committed
128
        self.pending_start_timeout = PENDING_STARTUP_TIMEOUT
129 130
        _logger.info('PROCESS MANAGER INTIALIZED')
        #resource management setup
131
        self.nodes = {} #cray node_id: CrayNode
132 133 134 135
        self.node_name_to_id = {} #cray node name to node_id map
        self.alps_reservations = {} #cobalt jobid : AlpsReservation object
        if spec is not None:
            self.alps_reservations = spec['alps_reservations']
136
        self._init_nodes_and_reservations()
137 138 139
        if spec is not None:
            node_info = spec.get('node_info', {})
            for nid, node in node_info.items():
140 141 142 143
                try:
                    self.nodes[nid].reset_info(node)
                except: #check the exception types later.  Carry on otherwise.
                    self.logger.warning("Node nid: %s not found in restart information.  Bringing up node in a clean configuration.", nid)
Paul Rich's avatar
Paul Rich committed
144 145 146
        #storage for pending job starts.  Allows us to handle slow starts vs
        #user deletes
        self.pending_starts = {} #jobid: time this should be cleared.
147
        self.nodes_by_queue = {} #queue:[node_ids]
148
        #populate initial state
149
        #state update thread and lock
150
        self._node_lock = threading.RLock()
151
        self._gen_node_to_queue()
Paul Rich's avatar
Paul Rich committed
152
        self.node_update_thread = thread.start_new_thread(self._run_update_state, tuple())
153
        _logger.info('UPDATE THREAD STARTED')
154
        self.current_equivalence_classes = []
Paul Rich's avatar
Paul Rich committed
155
        self.killing_jobs = {}
156 157 158 159
        #hold on to the initial spec in case nodes appear out of nowhere.
        self.init_spec = None
        if spec is not None:
            self.init_spec = spec
160

161 162 163 164 165 166 167 168
    def __getstate__(self):
        '''Save process, alps-reservation information, along with base
        information'''
        state = {}
        state.update(super(CraySystem, self).__getstate__())
        state['alps_system_statefile_version'] = 1
        state['process_manager'] = self.process_manager.__getstate__()
        state['alps_reservations'] = self.alps_reservations
169
        state['node_info'] = self.nodes
170 171 172 173 174 175 176
        return state

    def __setstate__(self, state):
        start_time = time.time()
        super(CraySystem, self).__setstate__(state)
        _logger.info('BASE SYSTEM INITIALIZED')
        self._common_init_restart(state)
177
        _logger.info('ALPS SYSTEM COMPONENT READY TO RUN')
178
        _logger.info('Reinitilaization complete in %s sec.', (time.time() -
179
                start_time))
180

181 182 183 184 185
    def save_me(self):
        '''Automatically save a copy of the state of the system component.'''
        #should we be holding the block lock as well?
        Component.save(self)
    save_me = automatic(save_me, SAVE_ME_INTERVAL)
186

187 188 189 190
    def _init_nodes_and_reservations(self):
        '''Initialize nodes from ALPS bridge data'''

        retnodes = {}
191 192 193
        pending = True
        while pending:
            try:
194 195 196 197 198
                # None of these queries has strictly degenerate data.  Inventory
                # is needed for raw reservation data.  System gets memory and a
                # much more compact representation of data.  Reservednodes gives
                # which notes are reserved.
                inventory = ALPSBridge.fetch_inventory()
Paul Rich's avatar
Paul Rich committed
199
                _logger.info('INVENTORY FETCHED')
200
                system = ALPSBridge.extract_system_node_data(ALPSBridge.system())
Paul Rich's avatar
Paul Rich committed
201
                _logger.info('SYSTEM DATA FETCHED')
202
                reservations = ALPSBridge.fetch_reservations()
Paul Rich's avatar
Paul Rich committed
203
                _logger.info('ALPS RESERVATION DATA FETCHED')
204
                # reserved_nodes = ALPSBridge.reserved_nodes()
205 206 207 208 209 210
                ssd_enabled = ALPSBridge.fetch_ssd_enable()
                _logger.info('CAPMC SSD ENABLED DATA FETCHED')
                ssd_info = ALPSBridge.fetch_ssd_static_data()
                _logger.info('CAPMC SSD DETAIL DATA FETCHED')
                ssd_diags = ALPSBridge.fetch_ssd_diags()
                _logger.info('CAPMC SSD DIAG DATA FETCHED')
211 212 213
            except Exception:
                #don't crash out here.  That may trash a statefile.
                _logger.error('Possible transient encountered during initialization. Retrying.',
214
                        exc_info=True)
215 216 217 218
                Cobalt.Util.sleep(10)
            else:
                pending = False

219
        self._assemble_nodes(inventory, system, ssd_enabled, ssd_info, ssd_diags)
220 221
        #Reversing the node name to id lookup is going to save a lot of cycles.
        for node in self.nodes.values():
222
            self.node_name_to_id[node.name] = node.node_id
223 224
        _logger.info('NODE INFORMATION INITIALIZED')
        _logger.info('ALPS REPORTS %s NODES', len(self.nodes))
225 226 227
        # self._assemble_reservations(reservations, reserved_nodes)
        return

228
    def _assemble_nodes(self, inventory, system, ssd_enabled, ssd_info, ssd_diags):
229 230 231
        '''merge together the INVENTORY and SYSTEM query data to form as
        complete a picture of a node as we can.

232 233 234 235 236 237 238 239 240 241 242 243 244
        Args:
            inventory - ALPS QUERY(INVENTORY) data
            system - ALPS QUERY(SYSTEM) data
            ssd_enable - CAPMC get_ssd_enable data
            ssd_info - CAPMC get_ssds data
            ssd_diags - CAPMC get_ssd_diags data

        Returns:
            None

        Side Effects:
            Populates the node dictionary

245 246 247 248 249 250 251
        '''
        nodes = {}
        for nodespec in inventory['nodes']:
            node = CrayNode(nodespec)
            node.managed = True
            nodes[node.node_id] = node
        for node_id, nodespec in system.iteritems():
252 253 254 255 256 257
            nodes[node_id].attributes.update(nodespec['attrs'])
            # Should this be a different status?
            nodes[node_id].role = nodespec['role'].upper()
            if nodes[node_id].role.upper() not in ['BATCH']:
                nodes[node_id].status = 'down'
            nodes[node_id].status = nodespec['state']
258
        self._update_ssd_data(nodes, ssd_enabled, ssd_info, ssd_diags)
259 260
        self.nodes = nodes

261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
    def _update_ssd_data(self, nodes, ssd_enabled=None, ssd_info=None, ssd_diags=None):
        '''Update/add ssd data from CAPMC'''
        if ssd_enabled is not None:
            for ssd_data in ssd_enabled['nids']:
                try:
                    nodes[str(ssd_data['nid'])].attributes['ssd_enabled'] = int(ssd_data['ssd_enable'])
                except KeyError:
                    _logger.warning('ssd info present for nid %s, but not reported in ALPS.', ssd_data['nid'])
        if ssd_info is not None:
            for ssd_data in ssd_info['nids']:
                try:
                    nodes[str(ssd_data['nid'])].attributes['ssd_info'] = ssd_data
                except KeyError:
                    _logger.warning('ssd info present for nid %s, but not reported in ALPS.', ssd_data['nid'])
        if ssd_diags is not None:
            for diag_info in ssd_diags['ssd_diags']:
                try:
                    node = nodes[str(diag_info['nid'])]
                except KeyError:
                    _logger.warning('ssd diag data present for nid %s, but not reported in ALPS.', ssd_data['nid'])
                else:
                    for field in ['life_remaining', 'ts', 'firmware', 'percent_used']:
                        node.attributes['ssd_info'][field] = diag_info[field]


286 287 288
    def _assemble_reservations(self, reservations, reserved_nodes):
        # FIXME: we can recover reservations now.  Implement this.
        pass
289

290
    def _gen_node_to_queue(self):
291
        '''(Re)Generate a mapping for fast lookup of node-id's to queues.'''
292
        with self._node_lock:
293
            self.nodes_by_queue = {}
294 295 296
            for node in self.nodes.values():
                for queue in node.queues:
                    if queue in self.nodes_by_queue.keys():
297
                        self.nodes_by_queue[queue].add(node.node_id)
298
                    else:
299
                        self.nodes_by_queue[queue] = set([node.node_id])
300

301
    @exposed
302
    def get_nodes(self, as_dict=False, node_ids=None, params=None, as_json=False):
303 304
        '''fetch the node dictionary.

305 306 307 308 309 310 311
            as_dict  - Return node information as a dictionary keyed to string
                        node_id value.
            node_ids - A list of node names to return, if None, return all nodes
                       (default None).
            params   - If requesting a dict, only request this list of
                       parameters of the node.
            json     - Encode to json before sending.  Useful on large systems.
312 313 314 315 316

            returns the node dictionary.  Can reutrn underlying node data as
            dictionary for XMLRPC purposes

        '''
317 318 319 320 321 322 323 324 325
        def node_filter(node):
            if node_ids is not None:
                return (str(node[0]) in [str(nid) for nid in node_ids])
            return True

        node_info = None
        if as_dict:
            retdict = {k:v.to_dict(True, params) for k, v in self.nodes.items()}
            node_info = dict(filter(node_filter, retdict.items()))
326
        else:
327 328 329 330 331
            node_info = dict(filter(node_filter, self.nodes.items()))
        if as_json:
            return json.dumps(node_info)
        return node_info

332 333
    def _run_update_state(self):
        '''automated node update functions on the update timer go here.'''
334 335

        def _run_and_wrap(func):
336
            try:
337
                func()
338
            except Exception:
339
                # Prevent this thread from dying.
340
                _logger.critical('Error in _run_update_state', exc_info=True)
341 342 343 344 345 346 347 348

        while True:
            # Each of these is wrapped in it's own log-and-preserve block.
            # The outer try is there to ensure the thread update timeout happens.
            _run_and_wrap(self.process_manager.update_launchers)
            _run_and_wrap(self.update_node_state)
            _run_and_wrap(self._get_exit_status)
            Cobalt.Util.sleep(UPDATE_THREAD_TIMEOUT)
349

350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
    def _reconstruct_node(self, inven_node, inventory):
        '''Reconstruct a node from statefile information.  Needed whenever we
        find a new node.  If no statefile information from the orignal cobalt
        invocation exists, bring up a node in default states and mark node
        administratively down.

        This node was disabled and invisible to ALPS at the time Cobalt was
        initialized and so we have no current record of that node.

        '''
        nid = inven_node['node_id']
        new_node = None
        #construct basic node from inventory
        for node_info in inventory['nodes']:
            if int(node_info['node_id']) == int(nid):
                new_node = CrayNode(node_info)
                break
        if new_node is None:
            #we have a phantom node?
            self.logger.error('Unable to find inventory information for nid: %s', nid)
            return
        # if we have information from the statefile we need to repopulate the
        # node with the saved data.
        # Perhaps this should be how I construct all node data anyway?
        if self.init_spec is not None:
            node_info = self.init_spec.get('node_info', {})
            try:
                new_node.reset_info(node_info[str(nid)])
                self.logger.warning('Node %s reconstructed.', nid)
            except:
                self.logger.warning("Node nid: %s not found in restart information.  Bringing up node in a clean configuration.", nid, exc_info=True)
                #put into admin_down
                new_node.admin_down = True
                new_node.status = 'down'
                self.logger.warning('Node %s marked down.', nid)
Paul Rich's avatar
Paul Rich committed
385
        new_node.managed = True
386 387 388
        self.nodes[str(nid)] = new_node
        self.logger.warning('Node %s added to tracking.', nid)

389

390 391 392 393 394 395
    @exposed
    def update_node_state(self):
        '''update the state of cray nodes. Check reservation status and system
        stateus as reported by ALPS

        '''
Paul Rich's avatar
Paul Rich committed
396 397 398
        #Check clenaup progress.  Check ALPS reservations.  Check allocated
        #nodes.  If there is no resource reservation and the node is not in
        #current alps reservations, the node is ready to schedule again.
399
        now = time.time()
Paul Rich's avatar
Paul Rich committed
400 401 402 403 404 405 406 407
        startup_time_to_clear = []
        #clear pending starttimes.
        for jobid, start_time in self.pending_starts.items():
            if int(now) > int(start_time):
                startup_time_to_clear.append(jobid)
        for jobid in startup_time_to_clear:
            del self.pending_starts[jobid]

Paul Rich's avatar
Paul Rich committed
408
        self.check_killing_aprun()
409
        with self._node_lock:
Paul Rich's avatar
Paul Rich committed
410
            fetch_time_start = time.time()
411
            try:
412 413
                #I have seen problems with the kitchen-sink query here, where
                #the output gets truncated on it's way into Cobalt.
Paul Rich's avatar
Paul Rich committed
414
                #inventory = ALPSBridge.fetch_inventory(resinfo=True) #This is a full-refresh,
415
                #determine if summary may be used under normal operation
416
                #updated for >= 1.6 interface
Paul Rich's avatar
Paul Rich committed
417 418 419
                inven_nodes = ALPSBridge.extract_system_node_data(ALPSBridge.system())
                reservations = ALPSBridge.fetch_reservations()
                #reserved_nodes = ALPSBridge.reserved_nodes()
420 421 422
                # Fetch SSD diagnostic data and enabled flags. I would hope these change in event of dead ssd
                ssd_enabled = ALPSBridge.fetch_ssd_enable()
                ssd_diags = ALPSBridge.fetch_ssd_diags()
423 424 425 426
            except (ALPSBridge.ALPSError, ComponentLookupError):
                _logger.warning('Error contacting ALPS for state update.  Aborting this update',
                        exc_info=True)
                return
427
            inven_reservations = reservations.get('reservations', [])
Paul Rich's avatar
Paul Rich committed
428
            fetch_time_start = time.time()
429
            #_logger.debug("time in ALPS fetch: %s seconds", (time.time() - fetch_time_start))
430
            start_time = time.time()
Paul Rich's avatar
Paul Rich committed
431
            self._detect_rereservation(inven_reservations)
432 433 434 435 436 437
            # check our reservation objects.  If a res object doesn't correspond
            # to any backend reservations, this reservation object should be
            # dropped
            alps_res_to_delete = []
            current_alps_res_ids = [int(res['reservation_id']) for res in
                    inven_reservations]
Paul Rich's avatar
Paul Rich committed
438
            res_jobid_to_delete = []
439 440 441 442
            if self.alps_reservations == {}:
                # if we have nodes in cleanup-pending but no alps reservations,
                # then the nodes in cleanup pending are considered idle (or
                # at least not in cleanup).  Hardware check can catch these
443 444
                # later. This catches leftover reservations from hard-shutdowns
                # while running.
445 446 447 448
                for node in self.nodes.values():
                    if node.status in ['cleanup', 'cleanup-pending']:
                        node.status = 'idle'
            for alps_res in self.alps_reservations.values():
Paul Rich's avatar
Paul Rich committed
449 450
                if alps_res.jobid in self.pending_starts.keys():
                    continue #Get to this only after timeout happens
Paul Rich's avatar
Paul Rich committed
451
                #find alps_id associated reservation
452
                if int(alps_res.alps_res_id) not in current_alps_res_ids:
Paul Rich's avatar
Paul Rich committed
453
                    for node_id in alps_res.node_ids:
454
                        if not self.nodes[str(node_id)].reserved:
Paul Rich's avatar
Paul Rich committed
455
                            #pending hardware status update
456
                            self.nodes[str(node_id)].status = 'idle'
Paul Rich's avatar
Paul Rich committed
457
                    res_jobid_to_delete.append(alps_res.jobid)
Paul Rich's avatar
Paul Rich committed
458
                    _logger.info('job %s: Nodes %s cleanup complete.', alps_res.jobid, compact_num_list(alps_res.node_ids))
Paul Rich's avatar
Paul Rich committed
459
            for jobid in res_jobid_to_delete:
460
                _logger.info('%s: ALPS reservation for this job complete.', jobid)
Paul Rich's avatar
Paul Rich committed
461 462 463 464 465 466 467 468
                try:
                    del self.alps_reservations[str(jobid)]
                except KeyError:
                    _logger.warning('Job %s: Attempted to remove ALPS reservation for this job multiple times', jobid)
                if self.alps_reservations.get(int(jobid), None) is not None:
                    # in case of type leakage
                    _logger.warning('Job %s: ALPS reservation found with integer key: deleting', jobid)
                    del self.alps_reservations[jobid]
Paul Rich's avatar
Paul Rich committed
469 470
            #process group should already be on the way down since cqm released the
            #resource reservation
471
            cleanup_nodes = [node for node in self.nodes.values()
472
                             if node.status in ['cleanup-pending', 'cleanup']]
Paul Rich's avatar
Paul Rich committed
473
            #If we have a block marked for cleanup, send a release message.
Paul Rich's avatar
Paul Rich committed
474
            released_res_jobids = []
475
            cleaned_nodes = []
Paul Rich's avatar
Paul Rich committed
476
            for node in cleanup_nodes:
477
                found = False
478
                for alps_res in self.alps_reservations.values():
479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
                    if str(node.node_id) in alps_res.node_ids:
                        found = True
                        if alps_res.jobid not in released_res_jobids:
                            #send only one release per iteration
                            apids = alps_res.release()
                            if apids is not None:
                                for apid in apids:
                                    self.signal_aprun(apid)
                            released_res_jobids.append(alps_res.jobid)
                if not found:
                    # There is no alps reservation to release, cleanup is
                    # already done.  This happens with very poorly timed
                    # qdel requests. Status will be set properly with the
                    # subsequent hardware status check.
                    _logger.info('Node %s cleanup complete.', node.node_id)
                    node.status = 'idle'
                    cleaned_nodes.append(node)
            for node in cleaned_nodes:
                cleanup_nodes.remove(node)
Paul Rich's avatar
Paul Rich committed
498 499

        #find hardware status
500 501 502
            #so we do this only once for nodes being added.
            #full inventory fetch is expensive.
            recon_inventory = None
Paul Rich's avatar
Paul Rich committed
503
            for inven_node in inven_nodes.values():
504 505
                if self.nodes.has_key(str(inven_node['node_id'])):
                    node = self.nodes[str(inven_node['node_id'])]
506
                    node.role = inven_node['role'].upper()
507
                    node.attributes.update(inven_node['attrs'])
Paul Rich's avatar
Paul Rich committed
508
                    if node.reserved:
509 510
                        #node marked as reserved.
                        if self.alps_reservations.has_key(str(node.reserved_jobid)):
Paul Rich's avatar
Paul Rich committed
511 512
                            node.status = 'busy'
                        else:
513 514 515 516 517 518
                            # check to see if the resource reservation should be
                            # released.
                            if node.reserved_until >= now:
                                node.status = 'allocated'
                            else:
                                node.release(user=None, jobid=None, force=True)
Paul Rich's avatar
Paul Rich committed
519
                    else:
520
                        node.status = inven_node['state'].upper()
521 522
                        if node.role.upper() not in ['BATCH'] and node.status is 'idle':
                            node.status = 'alps-interactive'
523
                else:
524 525 526
                    # Apparently, we CAN add nodes on the fly.  The node would
                    # have been disabled.  We need to add a new node and update
                    # it's state.
527
                    _logger.warning('Unknown node %s found. Starting reconstruction.', inven_node['node_id'])
528 529 530 531 532 533 534 535 536
                    try:
                        if recon_inventory is None:
                            recon_inventory = ALPSBridge.fetch_inventory()
                    except:
                        _logger.error('Failed to fetch inventory.  Will retry on next pass.', exc_info=True)
                    else:
                        self._reconstruct_node(inven_node, recon_inventory)
                   # _logger.error('UNS: ALPS reports node %s but not in our node list.',
                   #               inven_node['node_id'])
537 538
            # Update SSD data:
            self._update_ssd_data(self.nodes, ssd_enabled=ssd_enabled, ssd_diags=ssd_diags)
Paul Rich's avatar
Paul Rich committed
539 540 541
            #should down win over running in terms of display?
            #keep node that are marked for cleanup still in cleanup
            for node in cleanup_nodes:
542
                node.status = 'cleanup-pending'
543
        #_logger.debug("time in UNS lock: %s seconds", (time.time() - start_time))
544 545
        return

Paul Rich's avatar
Paul Rich committed
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594
    def _detect_rereservation(self, inven_reservations):
        '''Detect and update the ALPS reservation associated with a running job.
        We are only concerned with BATCH reservations.  Others would be
        associated with running jobs, and should not be touched.

        '''
        def _construct_alps_res():
            with self._node_lock:
                job_nodes = [node.node_id for node in self.nodes.values()
                        if node.reserved_jobid == int(alps_res['batch_id'])]
            new_resspec = {'reserved_nodes': job_nodes,
                           'reservation_id': str(alps_res['reservation_id']),
                           'pagg_id': 0 #unknown.  Not used here.
                            }
            new_jobspec = {'jobid': int(alps_res['batch_id']),
                           'user' : alps_res['user_name']}

            return ALPSReservation(new_jobspec, new_resspec, self.nodes)

        replaced_reservation = None
        for alps_res in inven_reservations:
            try:
                #This traversal is terrible. May want to hide this in the API
                #somewhere
                if alps_res['ApplicationArray'][0]['Application'][0]['CommandArray'][0]['Command'][0]['cmd'] != 'BASIL':
                    # Not a reservation we're in direct control of.
                    continue
            except (KeyError, IndexError):
                #not a batch reservation
                continue
            if str(alps_res['batch_id']) in self.alps_reservations.keys():
                # This is a reservation we may already know about
                if (int(alps_res['reservation_id']) ==
                        self.alps_reservations[str(alps_res['batch_id'])].alps_res_id):
                    # Already know about this one
                    continue
                # we have a re-reservation.  If this has a batch id, we need
                # to add it to our list of tracked reservations, and inherit
                # other reservation details.  We can pull the reservation
                # information out of reserve_resources_until.

                # If this is a BATCH reservation and no hardware has that
                # reservation id, then this reservation needs to be released
                # Could happen if we have a job starting right at the RRU
                # boundary.
                new_alps_res = _construct_alps_res()
                tracked_res = self.alps_reservations.get(new_alps_res.jobid, None)
                if tracked_res is not None:
                    try:
Paul Rich's avatar
Paul Rich committed
595
                        apids = tracked_res.release()
Paul Rich's avatar
Paul Rich committed
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610
                    except ALPSBridge.ALPSError:
                        # backend reservation probably is gone, which is why
                        # we are here in the first place.
                        pass
                self.alps_reservations[str(alps_res['batch_id'])] = new_alps_res
            else:
                #this is a basil reservation we don't know about already.
                new_alps_res = _construct_alps_res()
                if len(new_alps_res.node_ids) == 0:
                    # This reservation has no resources, so Cobalt's internal
                    # resource reservation tracking has no record.  This needs to
                    # be removed.
                    new_alps_res.release()
                else:
                    self.alps_reservations[str(alps_res['batch_id'])] = new_alps_res
Paul Rich's avatar
Paul Rich committed
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636
        return

    def signal_aprun(self, aprun_id, signame='SIGINT'):
        '''Signal an aprun by aprun id (application_id).  Does not block.
        Use check_killing_aprun to determine completion/termination.  Does not
        depend on the host the aprun(s) was launched from.

        Input:
            aprun_id - integer application id number.
            signame  - string name of signal to send (default: SIGINT)
        Notes:
            Valid signals to apkill are:
            SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGABRT, SIGUSR1, SIGUSR2, SIGURG,
            and SIGWINCH (from apkill(1) man page.)  Also allowing SIGKILL.

        '''
        #Expect changes with an API updte

        #mark legal signals from docos
        if (signame not in ['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM', 'SIGABRT',
            'SIGUSR1', 'SIGUSR2', 'SIGURG','SIGWINCH', 'SIGKILL']):
            raise ValueError('%s is not a legal value for signame.', signame)
        try:
            retval = Cobalt.Proxy.ComponentProxy('system_script_forker').fork(
                    [APKILL_CMD, '-%s' % signame, '%d' % int(aprun_id)],
                    'aprun_termination', '%s cleanup:'% aprun_id)
Paul Rich's avatar
Paul Rich committed
637
            _logger.info("killing backend ALPS application_id: %s", aprun_id)
Paul Rich's avatar
Paul Rich committed
638 639 640 641 642 643 644 645 646 647 648 649 650 651 652
        except xmlrpclib.Fault:
            _logger.warning("XMLRPC Error while killing backend job: %s, will retry.",
                    aprun_id, exc_info=True)
        except:
            _logger.critical("Unknown Error while killing backend job: %s, will retry.",
                    aprun_id, exc_info=True)
        else:
            self.killing_jobs[aprun_id] = retval
        return

    def check_killing_aprun(self):
        '''Check that apkill commands have completed and clean them from the
        system_script_forker.  Allows for non-blocking cleanup initiation.

        '''
653

Paul Rich's avatar
Paul Rich committed
654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669
        try:
            system_script_forker = Cobalt.Proxy.ComponentProxy('system_script_forker')
        except:
            self.logger.critical("Cannot connect to system_script_forker.",
                    exc_info=True)
            return
        complete_jobs = []
        rev_killing_jobs = dict([(v,k) for (k,v) in self.killing_jobs.iteritems()])
        removed_jobs = []
        current_killing_jobs = system_script_forker.get_children(None, self.killing_jobs.values())

        for job in current_killing_jobs:
            if job['complete']:
                del self.killing_jobs[rev_killing_jobs[int(job['id'])]]
                removed_jobs.append(job['id'])
        system_script_forker.cleanup_children(removed_jobs)
Paul Rich's avatar
Paul Rich committed
670
        return
Paul Rich's avatar
Paul Rich committed
671

672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703
    @exposed
    def find_queue_equivalence_classes(self, reservation_dict,
            active_queue_names, passthrough_blocking_res_list=[]):
        '''Aggregate queues together that can impact eachother in the same
        general pass (both drain and backfill pass) in find_job_location.
        Equivalence classes will then be used in find_job_location to consider
        placement of jobs and resources, in separate passes.  If multiple
        equivalence classes are returned, then they must contain orthogonal sets
        of resources.

        Inputs:
        reservation_dict -- a mapping of active reservations to resrouces.
                            These will block any job in a normal queue.
        active_queue_names -- A list of queues that are currently enabled.
                              Queues that are not in the 'running' state
                              are ignored.
        passthrough_partitions -- Not used on Cray systems currently.  This is
                                  for handling hardware that supports
                                  partitioned interconnect networks.

        Output:
        A list of dictionaries of queues that may impact eachother while
        scheduling resources.

        Side effects:
        None

        Internal Data:
        queue_assignments: a mapping of queues to schedulable locations.

        '''
        equiv = []
704
        node_active_queues = set([])
705
        self.current_equivalence_classes = [] #reverse mapping of queues to nodes
706 707 708
        for node in self.nodes.values():
            if node.managed and node.schedulable:
                #only condiser nodes that we are scheduling.
709
                node_active_queues = set([])
710 711
                for queue in node.queues:
                    if queue in active_queue_names:
712 713
                        node_active_queues.add(queue)
                if node_active_queues == set([]):
714 715 716 717 718 719 720 721 722
                    #this node has nothing active.  The next check can get
                    #expensive, so skip it.
                    continue
            #determine the queues that overlap.  Hardware has to be included so
            #that reservations can be mapped into the equiv classes.
            found_a_match = False
            for e in equiv:
                for queue in node_active_queues:
                    if queue in e['queues']:
723
                        e['data'].add(node.node_id)
724 725 726 727 728 729
                        e['queues'] = e['queues'] | set(node_active_queues)
                        found_a_match = True
                        break
                if found_a_match:
                    break
            if not found_a_match:
730
                equiv.append({'queues': set(node_active_queues),
731
                              'data': set([node.node_id]),
732
                              'reservations': set()})
733 734 735 736 737
        #second pass to merge queue lists based on hardware
        real_equiv = []
        for eq_class in equiv:
            found_a_match = False
            for e in real_equiv:
738
                if e['queues'].intersection(eq_class['queues']):
739 740 741 742 743 744 745 746 747 748
                    e['queues'].update(eq_class['queues'])
                    e['data'].update(eq_class['data'])
                    found_a_match = True
                    break
            if not found_a_match:
                real_equiv.append(eq_class)
        equiv = real_equiv
        #add in reservations:
        for eq_class in equiv:
            for res_name in reservation_dict:
749 750 751 752
                for node_hunk in reservation_dict[res_name].split(":"):
                    for node_id in expand_num_list(node_hunk):
                        if str(node_id) in eq_class['data']:
                            eq_class['reservations'].add(res_name)
753
                            break
754
            #don't send what could be a large block list back in the returun
755 756 757
            for key in eq_class:
                eq_class[key] = list(eq_class[key])
            del eq_class['data']
758
            self.current_equivalence_classes.append(eq_class)
759 760
        return equiv

761 762 763 764 765 766
    @staticmethod
    def _setup_special_locaitons(job):
        forbidden = set([str(loc) for loc in chain_loc_list(job.get('forbidden', []))])
        required = set([str(loc) for loc in chain_loc_list(job.get('required', []))])
        requested_locations = set([str(n) for n in expand_num_list(job['attrs'].get('location', ''))])
        return (forbidden, required, requested_locations)
767

768
    def _assemble_queue_data(self, job, idle_only=True, drain_time=None):
769 770
        '''put together data for a queue, or queue-like reservation structure.

771 772 773 774 775 776 777
        Input:
            job - dictionary of job data.
            idle_only - [default: True] if True, return only idle nodes.
                        Otherwise return nodes in any non-down status.

        return count of idle resources, and a list of valid nodes to run on.
        if idle_only is set to false, returns a set of candidate draining nodes.
778 779 780


        '''
781 782 783 784
        # RESERVATION SUPPORT: Reservation queues are ephemeral, so we will
        # not find the queue normally. In the event of a reservation we'll
        # have to intersect required nodes with the idle and available
        # we also have to forbid a bunch of locations, in  this case.
785
        unavailable_nodes = []
786
        forbidden, required, requested_locations = self._setup_special_locaitons(job)
787 788 789 790 791 792
        requested_loc_in_forbidden = False
        for loc in requested_locations:
            if loc in forbidden:
                #don't spam the logs.
                requested_loc_in_forbidden = True
                break
793
        if job['queue'] not in self.nodes_by_queue.keys():
794 795 796 797 798
            # Either a new queue with no resources, or a possible
            # reservation need to do extra work for a reservation
            node_id_list = list(required - forbidden)
        else:
            node_id_list = list(set(self.nodes_by_queue[job['queue']]) - forbidden)
799
        if requested_locations != set([]): # handle attrs location= requests
800
            job_set = set([str(nid) for nid in requested_locations])
801
            if job['queue'] not in self.nodes_by_queue.keys():
802 803 804
                #we're in a reservation and need to further restrict nodes.
                if job_set <= set(node_id_list):
                    # We are in a reservation there are no forbidden nodes.
805
                    node_id_list = list(requested_locations)
806 807 808 809
                else:
                    # We can't run this job.  Insufficent resources in this
                    # reservation to do so.  Don't risk blocking anything.
                    node_id_list = []
810
            else:
811 812
                #normal queues.  Restrict to the non-reserved nodes.
                if job_set <= set([str(node_id) for node_id in
813
                                    self.nodes_by_queue[job['queue']]]):
814
                    node_id_list = list(requested_locations)
815 816 817 818 819 820 821
                    if not set(node_id_list).isdisjoint(forbidden):
                        # this job has requested locations that are a part of an
                        # active reservation.  Remove locaitons and drop available
                        # nodecount appropriately.
                        node_id_list = list(set(node_id_list) - forbidden)
                else:
                    node_id_list = []
822 823
                    if not requested_loc_in_forbidden:
                        raise ValueError("forbidden locations not in queue")
824
        with self._node_lock:
825 826 827 828 829 830 831
            if idle_only:
                unavailable_nodes = [node_id for node_id in node_id_list
                        if self.nodes[str(node_id)].status not in ['idle']]
            else:
                unavailable_nodes = [node_id for node_id in node_id_list
                        if self.nodes[str(node_id)].status in
                        self.nodes[str(node_id)].DOWN_STATUSES]
832
            if drain_time is not None:
833
                unavailable_nodes.extend([node_id for node_id in node_id_list
834
                    if (self.nodes[str(node_id)].draining and
835
                        (self.nodes[str(node_id)].drain_until - BACKFILL_EPSILON) < int(drain_time))])
836
        for node_id in set(unavailable_nodes):
837
            node_id_list.remove(node_id)
838
        return sorted(node_id_list, key=lambda nid: int(nid))
839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857

    def _select_first_nodes(self, job, node_id_list):
        '''Given a list of nids, select the first node count nodes fromt the
        list.  This is the target for alternate allocator replacement.

        Input:
            job - dictionary of job data from the scheduler
            node_id_list - a list of possible candidate nodes

        Return:
            A list of nodes.  [] if insufficient nodes for the allocation.

        Note: hold the node lock while doing this.  We really don't want a
        update to happen while doing this.

        '''
        ret_nodes = []
        with self._node_lock:
            if int(job['nodes']) <= len(node_id_list):
858 859
                node_id_list.sort(key=lambda nid: int(nid))
                ret_nodes = node_id_list[:int(job['nodes'])]
860 861
        return ret_nodes

862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880
    def _select_first_nodes_prefer_memory_match(self, job, node_id_list):
        '''Given a list of nids, select the first node count nodes fromt the
        list.  Prefer nodes that match the memory modes for a given job, then
        go in nid order.

        Input:
            job - dictionary of job data from the scheduler
            node_id_list - a list of possible candidate nodes

        Return:
            A list of nodes.  [] if insufficient nodes for the allocation.

        Note: hold the node lock while doing this.  We really don't want a
        update to happen while doing this.

        '''
        if job.get('attrs', {}).get('mcdram', None) is None or job.get('attrs', {}).get('numa', None) is None:
            # insufficient information to include a mode match
            return self._select_first_nodes(job, node_id_list)
881
        ret_nids = []
882 883 884
        with self._node_lock:
            considered_nodes = [node for node in self.nodes.values() if node.node_id in node_id_list]
            for node in considered_nodes:
885
                if (node.attributes['hbm_cache_pct'] == MCDRAM_TO_HBMCACHEPCT[job['attrs']['mcdram']] and
886
                        node.attributes['numa_cfg'] == job['attrs']['numa']):
887 888
                    ret_nids.append(int(node.node_id))
            if len(ret_nids) < int(job['nodes']):
889 890
                node_id_list.sort(key=lambda nid: int(nid))
                for nid in node_id_list:
891 892
                    if int(nid) not in ret_nids:
                        ret_nids.append(int(nid))
893 894
                        if len(ret_nids) >= int(job['nodes']):
                            break
895 896
        return ret_nids[:int(job['nodes'])]

897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
    def _associate_and_run_immediate(self, job, resource_until_time, node_id_list):
        '''Given a list of idle node ids, choose a set that can run a job
        immediately, if a set exists in the node_id_list.

        Inputs:
            job - Dictionary of job data
            node_id_list - a list of string node id values

        Side Effects:
            Will reserve resources in ALPS and will set resource reservations on
            allocated nodes.

        Return:
            None if no match, otherwise the pairing of a jobid and set of nids
            that have been allocated to a job.

        '''
        compact_locs = None
        if int(job['nodes']) <= len(node_id_list):
            #this job can be run immediately
917
            to_alps_list = self._select_first_nodes_prefer_memory_match(job, node_id_list)
918 919 920 921 922 923 924 925
            job_locs = self._ALPS_reserve_resources(job, resource_until_time,
                    to_alps_list)
            if job_locs is not None and len(job_locs) == int(job['nodes']):
                compact_locs = compact_num_list(job_locs)
                #temporary reservation until job actually starts
                self.pending_starts[job['jobid']] = resource_until_time
                self.reserve_resources_until(compact_locs, resource_until_time, job['jobid'])
        return compact_locs
926

927
    @locking
928 929
    @exposed
    def find_job_location(self, arg_list, end_times, pt_blocking_locations=[]):
930 931 932 933 934 935
        '''Given a list of jobs, and when jobs are ending, return a set of
        locations mapped to a jobid that can be run.  Also, set up draining
        as-needed to run top-scored jobs and backfill when possible.

        Called once per equivalence class.

936 937
        Args::
            arg_list: A list of dictionaries containning information on jobs to
938
                   cosnider.
939
            end_times: list containing a mapping of locations and the times jobs
940 941
                    runninng on those locations are scheduled to end.  End times
                    are in seconds from Epoch UTC.
942
            pt_blocking_locations: Not used for this system.  Used in partitioned
943 944 945 946
                                interconnect schemes. A list of locations that
                                should not be used to prevent passthrough issues
                                with other scheduler reservations.

947
        Returns:
948 949 950 951 952 953 954 955 956 957 958 959
        A mapping of jobids to locations to run a job to run immediately.

        Side Effects:
        May set draining flags and backfill windows on nodes.
        If nodes are being returned to run, set ALPS reservations on them.

        Notes:
        The reservation set on ALPS resources is uncomfirmed at this point.
        This reservation may timeout.  The forker when it confirms will detect
        this and will re-reserve as needed.  The alps reservation id may change
        in this case post job startup.

960 961 962
        pt_blocking_locations may be used later to block out nodes that are
        impacted by warmswap operations.

963
        This function *DOES NOT* hold the component lock.
964

965 966 967
        '''
        now = time.time()
        resource_until_time = now + TEMP_RESERVATION_TIME
968
        with self._node_lock:
969
            # only valid for this scheduler iteration.
970 971 972 973
            self._clear_draining_for_queues(arg_list[0]['queue'])
            #check if we can run immedaitely, if not drain.  Keep going until all
            #nodes are marked for draining or have a pending run.
            best_match = {} #jobid: list(locations)
974
            for job in arg_list:
975
                label = '%s/%s' % (job['jobid'], job['user'])
976 977 978
                # walltime is in minutes.  We should really fix the storage of
                # that --PMR
                job_endtime = now + (int(job['walltime']) * 60)
979
                try:
980
                    node_id_list = self._assemble_queue_data(job, drain_time=job_endtime)
981
                    available_node_list = self._assemble_queue_data(job, idle_only=False)
982 983 984
                except ValueError:
                    _logger.warning('Job %s: requesting locations that are not in requested queue.',
                            job['jobid'])
985
                    continue
986
                if int(job['nodes']) > len(available_node_list):
987
                    # Insufficient operational nodes for this job at all
988
                    continue
989 990
                elif len(node_id_list) == 0:
                    pass #allow for draining pass to run.
991
                elif int(job['nodes']) <= len(node_id_list):