CraySystem.py 78.1 KB
Newer Older
1
"""Resource management for Cray ALPS based systems"""
2 3 4
import logging
import threading
import time
5
import sys
6
import xmlrpclib
7
import json
8
import ConfigParser
9 10
import Cobalt.Util
import Cobalt.Components.system.AlpsBridge as ALPSBridge
11
from Cobalt.Components.base import Component, exposed, automatic, query, locking
12 13
from Cobalt.Components.system.base_system import BaseSystem
from Cobalt.Components.system.CrayNode import CrayNode
14
from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
15
from Cobalt.Components.system.ALPSProcessGroup import ALPSProcessGroup
16
from Cobalt.Exceptions import ComponentLookupError
Paul Rich's avatar
Paul Rich committed
17
from Cobalt.Exceptions import JobNotInteractive
18
from Cobalt.Components.system.ALPSProcessGroup import ALPSProcessGroup
19
from Cobalt.Exceptions import JobValidationError
20
from Cobalt.DataTypes.ProcessGroup import ProcessGroup
21
from Cobalt.Util import compact_num_list, expand_num_list
22
from Cobalt.Util import init_cobalt_config, get_config_option
23 24 25
from Cobalt.Util import init_cobalt_config, get_config_option
from Cobalt.Util import extract_traceback, sanatize_password, get_current_thread_identifier
from Queue import Queue
26

27
_logger = logging.getLogger(__name__)
28
init_cobalt_config()
29

30 31
UPDATE_THREAD_TIMEOUT = int(get_config_option('alpssystem', 'update_thread_timeout', 10))
TEMP_RESERVATION_TIME = int(get_config_option('alpssystem', 'temp_reservation_time', 300))
32
SAVE_ME_INTERVAL = float(get_config_option('alpsssytem', 'save_me_interval', 10.0))
33
#default 20 minutes to account for boot.
34 35
# DISABLED: Not used
#PENDING_STARTUP_TIMEOUT = float(get_config_option('alpssystem', 'pending_startup_timeout', 1200))
Paul Rich's avatar
Paul Rich committed
36
APKILL_CMD = get_config_option('alps', 'apkill', '/opt/cray/alps/default/bin/apkill')
37
DRAIN_MODE = get_config_option('system', 'drain_mode', 'first-fit')
38 39
#cleanup time in seconds
CLEANUP_DRAIN_WINDOW = get_config_option('system', 'cleanup_drain_window', 300)
40 41
#SSD information
DEFAULT_MIN_SSD_SIZE = int(get_config_option('alpssystem', 'min_ssd_size', -1))
42

43 44
#Epsilon for backfilling.  This system does not do this on a per-node basis.
BACKFILL_EPSILON = int(get_config_option('system', 'backfill_epsilon', 120))
45
ELOGIN_HOSTS = [host for host in get_config_option('system', 'elogin_hosts', '').split(':')]
46 47
if ELOGIN_HOSTS == ['']:
    ELOGIN_HOSTS = []
Paul Rich's avatar
Paul Rich committed
48
DRAIN_MODES = ['first-fit', 'backfill']
49
CLEANING_ID = -1
50 51
DEFAULT_MCDRAM_MODE = get_config_option('alpssystem', 'default_mcdram_mode', 'cache')
DEFAULT_NUMA_MODE = get_config_option('alpssystem', 'default_numa_mode', 'quad')
52
MCDRAM_TO_HBMCACHEPCT = {'flat':'0', 'cache':'100', 'split':'25', 'equal':'50', '0':'0', '25':'25', '50':'50', '100':'100'}
53 54
VALID_MCDRAM_MODES = ['flat', 'cache', 'split', 'equal', '0', '25', '50', '100']
VALID_NUMA_MODES = ['a2a', 'hemi', 'quad', 'snc2', 'snc4']
55

56 57 58 59 60 61 62 63 64 65
def chain_loc_list(loc_list):
    '''Take a list of compact Cray locations,
    expand and concatenate them.

    '''
    retlist = []
    for locs in loc_list:
        retlist.extend(expand_num_list(locs))
    return retlist

66

67
class CraySystem(BaseSystem):
68 69 70
    '''Cray/ALPS-specific system component.  Behaviors should go here.  Direct
    ALPS interaction through BASIL/other APIs should go through the ALPSBridge
    (or other bridge) module as appropriate.
71

72
    '''
73 74
    name = "system"
    implementation = "alps_system"
75
    logger = _logger
76

77
    def __init__(self, *args, **kwargs):
78 79 80 81
        '''Initialize system.  Read initial states from bridge.
        Get current state

        '''
82
        start_time = time.time()
83
        self.update_thread_timeout = UPDATE_THREAD_TIMEOUT
84
        super(CraySystem, self).__init__(*args, **kwargs)
85
        _logger.info('BASE SYSTEM INITIALIZED')
86 87
        self._common_init_restart()
        _logger.info('ALPS SYSTEM COMPONENT READY TO RUN')
88
        _logger.info('Initialization complete in %s sec.', (time.time() -
89 90 91
                start_time))

    def _common_init_restart(self, spec=None):
92
        '''Common routine for cold and restart initialization of the system
93 94 95
        component.

        '''
96 97 98
        try:
            self.system_size = int(get_config_option('system', 'size'))
        except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
99
            self.logger.critical('ALPS SYSTEM: ABORT STARTUP: System size must be specified in the [system] section of the cobalt configuration file.')
100
            sys.exit(1)
101 102
        if DRAIN_MODE not in DRAIN_MODES:
            #abort startup, we have a completely invalid config.
103
            self.logger.critical('ALPS SYSTEM: ABORT STARTUP: %s is not a valid drain mode.  Must be one of %s.',
104 105
                DRAIN_MODE, ", ".join(DRAIN_MODES))
            sys.exit(1)
106
        #initilaize bridge.
107 108 109 110
        bridge_pending = True
        while bridge_pending:
            # purge stale children from prior run.  Also ensure the
            # system_script_forker is currently up.
Paul Rich's avatar
Paul Rich committed
111 112
            # These attempts may fail due to system_script_forker not being up.
            # We don't want to trash the statefile in this case.
113 114 115
            try:
                ALPSBridge.init_bridge()
            except ALPSBridge.BridgeError:
116
                self.logger.error('Bridge Initialization failed.  Retrying.')
117 118
                Cobalt.Util.sleep(10)
            except ComponentLookupError:
119
                self.logger.warning('Error reaching forker.  Retrying.')
120
                Cobalt.Util.sleep(10)
121 122
            else:
                bridge_pending = False
123
                self.logger.info('BRIDGE INITIALIZED')
124
        #process manager setup
125
        if spec is None:
Paul Rich's avatar
Paul Rich committed
126
            self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup)
127
        else:
128 129 130 131 132 133 134 135 136 137 138
            spec_version = spec.get('alps_system_statefile_version', 1)
            if spec_version <= 1:
                # Compat for old version of process manager information that was stored as a dict
                # rather than an actual object.  Yes, this results in a double initialize. Ugly, but
                # doesn't hurt anything, yet.
                self.process_manager = ProcessGroupManager(pgroup_type=ALPSProcessGroup).__setstate__(spec['process_manager'])
                self.logger.debug('pg type %s', self.process_manager.process_groups.item_cls)
            else:
                self.process_manager = spec['process_manager']
                self.process_manager.pgroup_type = ALPSProcessGroup
                self.logger.debug('pg type %s', self.process_manager.process_groups.item_cls)
139 140 141 142 143 144 145 146 147 148

        # DISABLED: Why are we calling this here?  It's called below in the update thread.
        # Any error in update_launchers will cause CraySystem to fail to start.
        # the below thread that has wrappers will protect it and should be used.
        #self.process_manager.update_launchers()
        self.logger.info('PROCESS MANAGER INITIALIZED')

        # DISABLED: Not used
        # self.pending_start_timeout = PENDING_STARTUP_TIMEOUT

149
        #resource management setup
150
        self.nodes = {} #cray node_id: CrayNode
151 152 153 154
        self.node_name_to_id = {} #cray node name to node_id map
        self.alps_reservations = {} #cobalt jobid : AlpsReservation object
        if spec is not None:
            self.alps_reservations = spec['alps_reservations']
155
        self._init_nodes_and_reservations()
156 157 158
        if spec is not None:
            node_info = spec.get('node_info', {})
            for nid, node in node_info.items():
159 160 161 162
                try:
                    self.nodes[nid].reset_info(node)
                except: #check the exception types later.  Carry on otherwise.
                    self.logger.warning("Node nid: %s not found in restart information.  Bringing up node in a clean configuration.", nid)
Paul Rich's avatar
Paul Rich committed
163 164 165
        #storage for pending job starts.  Allows us to handle slow starts vs
        #user deletes
        self.pending_starts = {} #jobid: time this should be cleared.
166
        self.nodes_by_queue = {} #queue:[node_ids]
167
        #populate initial state
168
        #state update thread and lock
169
        self._node_lock = threading.RLock()
170
        self._gen_node_to_queue()
171 172 173 174 175 176
        self.node_update_thread_kill_queue = Queue()
        self.node_update_thread_dead = False
        self.logger.info("_run_update_state thread starting.")
        self.node_update_thread = threading.Thread(target=self._run_update_state)
        self.node_update_thread.start()
        self.logger.info("_run_update_state thread started:%s", self.node_update_thread)
Paul Rich's avatar
Paul Rich committed
177
        self.killing_jobs = {}
178 179 180 181
        #hold on to the initial spec in case nodes appear out of nowhere.
        self.init_spec = None
        if spec is not None:
            self.init_spec = spec
182

183 184 185 186 187
    def __getstate__(self):
        '''Save process, alps-reservation information, along with base
        information'''
        state = {}
        state.update(super(CraySystem, self).__getstate__())
188
        state['alps_system_statefile_version'] = 2
189
        state['process_manager'] = self.process_manager
190
        state['alps_reservations'] = self.alps_reservations
191
        state['node_info'] = self.nodes
192 193 194 195
        return state

    def __setstate__(self, state):
        start_time = time.time()
196
        _logger.info('INITIALIZING FROM ALPS SYSTEM STATE FILE VERSION %s', state.get('alps_system_statefile_version', None))
197 198 199
        super(CraySystem, self).__setstate__(state)
        _logger.info('BASE SYSTEM INITIALIZED')
        self._common_init_restart(state)
200
        _logger.info('ALPS SYSTEM COMPONENT READY TO RUN')
201
        _logger.info('Reinitilaization complete in %s sec.', (time.time() -
202
                start_time))
203

204 205 206 207 208
    def save_me(self):
        '''Automatically save a copy of the state of the system component.'''
        #should we be holding the block lock as well?
        Component.save(self)
    save_me = automatic(save_me, SAVE_ME_INTERVAL)
209

210 211 212 213
    def _init_nodes_and_reservations(self):
        '''Initialize nodes from ALPS bridge data'''

        retnodes = {}
214 215 216
        pending = True
        while pending:
            try:
217 218 219 220 221
                # None of these queries has strictly degenerate data.  Inventory
                # is needed for raw reservation data.  System gets memory and a
                # much more compact representation of data.  Reservednodes gives
                # which notes are reserved.
                inventory = ALPSBridge.fetch_inventory()
Paul Rich's avatar
Paul Rich committed
222
                _logger.info('INVENTORY FETCHED')
223
                system = ALPSBridge.extract_system_node_data(ALPSBridge.system())
Paul Rich's avatar
Paul Rich committed
224
                _logger.info('SYSTEM DATA FETCHED')
225
                reservations = ALPSBridge.fetch_reservations()
Paul Rich's avatar
Paul Rich committed
226
                _logger.info('ALPS RESERVATION DATA FETCHED')
227
                # reserved_nodes = ALPSBridge.reserved_nodes()
228 229
                ssd_enabled = ALPSBridge.fetch_ssd_enable()
                _logger.info('CAPMC SSD ENABLED DATA FETCHED')
230
                ssd_info = ALPSBridge.fetch_ssd_static_data(raw=True) #2017-09-15: API doc is wrong.  Need RAW and work on cname here.
231 232 233
                _logger.info('CAPMC SSD DETAIL DATA FETCHED')
                ssd_diags = ALPSBridge.fetch_ssd_diags()
                _logger.info('CAPMC SSD DIAG DATA FETCHED')
234 235 236
            except Exception:
                #don't crash out here.  That may trash a statefile.
                _logger.error('Possible transient encountered during initialization. Retrying.',
237
                        exc_info=True)
238 239 240 241
                Cobalt.Util.sleep(10)
            else:
                pending = False

242
        self._assemble_nodes(inventory, system, ssd_enabled, ssd_info, ssd_diags)
243 244
        #Reversing the node name to id lookup is going to save a lot of cycles.
        for node in self.nodes.values():
245
            self.node_name_to_id[node.name] = node.node_id
246 247
        _logger.info('NODE INFORMATION INITIALIZED')
        _logger.info('ALPS REPORTS %s NODES', len(self.nodes))
248 249 250
        # self._assemble_reservations(reservations, reserved_nodes)
        return

251
    def _assemble_nodes(self, inventory, system, ssd_enabled, ssd_info, ssd_diags):
252 253 254
        '''merge together the INVENTORY and SYSTEM query data to form as
        complete a picture of a node as we can.

255 256 257 258 259 260 261 262 263 264 265 266 267
        Args:
            inventory - ALPS QUERY(INVENTORY) data
            system - ALPS QUERY(SYSTEM) data
            ssd_enable - CAPMC get_ssd_enable data
            ssd_info - CAPMC get_ssds data
            ssd_diags - CAPMC get_ssd_diags data

        Returns:
            None

        Side Effects:
            Populates the node dictionary

268 269 270 271 272 273 274
        '''
        nodes = {}
        for nodespec in inventory['nodes']:
            node = CrayNode(nodespec)
            node.managed = True
            nodes[node.node_id] = node
        for node_id, nodespec in system.iteritems():
275 276 277 278 279 280
            nodes[node_id].attributes.update(nodespec['attrs'])
            # Should this be a different status?
            nodes[node_id].role = nodespec['role'].upper()
            if nodes[node_id].role.upper() not in ['BATCH']:
                nodes[node_id].status = 'down'
            nodes[node_id].status = nodespec['state']
281
        self._update_ssd_data(nodes, ssd_enabled, ssd_info, ssd_diags)
282 283
        self.nodes = nodes

284 285 286 287 288 289 290 291 292
    def _update_ssd_data(self, nodes, ssd_enabled=None, ssd_info=None, ssd_diags=None):
        '''Update/add ssd data from CAPMC'''
        if ssd_enabled is not None:
            for ssd_data in ssd_enabled['nids']:
                try:
                    nodes[str(ssd_data['nid'])].attributes['ssd_enabled'] = int(ssd_data['ssd_enable'])
                except KeyError:
                    _logger.warning('ssd info present for nid %s, but not reported in ALPS.', ssd_data['nid'])
        if ssd_info is not None:
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
            # Cray broke this query.  Need to remap based on cray-cname.  Yes, not having the nid is a violation of their own API.
            for ssd_data_key, ssd_data_val in ssd_info.items():
                if ssd_data_key not in ['e', 'err_msg']:
                    found = False
                    for node in nodes.values():
                        if node.name == ssd_data_key:
                            node.attributes['ssd_info'] = ssd_data_val[0]
                            found = True
                            break
                    if not found:
                        _logger.warning('ssd data for %s found, but no node found.', ssd_data_key)
            #This is the corrected query where we can reconstruct the nid index.
            # for ssd_data in ssd_info['nids']:
                # try:
                    # nodes[str(ssd_data['nid'])].attributes['ssd_info'] = ssd_data
                # except KeyError:
                    # _logger.warning('ssd info present for nid %s, but not reported in ALPS.', ssd_data['nid'])
310 311 312 313 314 315 316 317 318 319 320
        if ssd_diags is not None:
            for diag_info in ssd_diags['ssd_diags']:
                try:
                    node = nodes[str(diag_info['nid'])]
                except KeyError:
                    _logger.warning('ssd diag data present for nid %s, but not reported in ALPS.', ssd_data['nid'])
                else:
                    for field in ['life_remaining', 'ts', 'firmware', 'percent_used']:
                        node.attributes['ssd_info'][field] = diag_info[field]


321 322 323
    def _assemble_reservations(self, reservations, reserved_nodes):
        # FIXME: we can recover reservations now.  Implement this.
        pass
324

325
    def _gen_node_to_queue(self):
326
        '''(Re)Generate a mapping for fast lookup of node-id's to queues.'''
327
        with self._node_lock:
328
            self.nodes_by_queue = {}
329 330 331
            for node in self.nodes.values():
                for queue in node.queues:
                    if queue in self.nodes_by_queue.keys():
332
                        self.nodes_by_queue[queue].add(node.node_id)
333
                    else:
334
                        self.nodes_by_queue[queue] = set([node.node_id])
335

336
    @exposed
337
    def get_nodes(self, as_dict=False, node_ids=None, params=None, as_json=False):
338 339
        '''fetch the node dictionary.

340 341 342 343 344 345 346
            as_dict  - Return node information as a dictionary keyed to string
                        node_id value.
            node_ids - A list of node names to return, if None, return all nodes
                       (default None).
            params   - If requesting a dict, only request this list of
                       parameters of the node.
            json     - Encode to json before sending.  Useful on large systems.
347 348 349 350 351

            returns the node dictionary.  Can reutrn underlying node data as
            dictionary for XMLRPC purposes

        '''
352 353 354 355 356 357 358 359 360
        def node_filter(node):
            if node_ids is not None:
                return (str(node[0]) in [str(nid) for nid in node_ids])
            return True

        node_info = None
        if as_dict:
            retdict = {k:v.to_dict(True, params) for k, v in self.nodes.items()}
            node_info = dict(filter(node_filter, retdict.items()))
361
        else:
362 363 364 365 366
            node_info = dict(filter(node_filter, self.nodes.items()))
        if as_json:
            return json.dumps(node_info)
        return node_info

367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
    def _run_and_wrap(self, update_func):
        self.logger.info('_run_and_wrap %s, tid:%s', update_func, get_current_thread_identifier())
        update_func_name = update_func.__name__
        bool_error = False
        td = -1.0
        ts = time.time()
        try:
            update_func()
        except Exception:
            te = time.time()
            tb_str = sanatize_password('\n'.join(extract_traceback()))
            td = te - ts
            self.logger.error('_run_and_wrap(%s): td:%s error:%s' % (update_func_name, td, tb_str))
            bool_error = True
        else:
            te = time.time()
            td = te - ts
            bool_error = False
        return update_func_name, bool_error, td

387 388
    def _run_update_state(self):
        '''automated node update functions on the update timer go here.'''
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
        try:
            self.logger.info('_run_update_state starting, tid:%s, queue:%s',
                              get_current_thread_identifier(), self.node_update_thread_kill_queue.qsize())
            while self.node_update_thread_kill_queue.empty() is True:
                self.logger.info('_run_update_state running, tid:%s', get_current_thread_identifier())
                # Each of these is wrapped in it's own log-and-preserve block.
                # The outer try is there to ensure the thread update timeout happens.
                metadata_lst = []
                metadata_lst.append(self._run_and_wrap(self.process_manager.update_launchers))
                metadata_lst.append(self._run_and_wrap(self.update_node_state))
                metadata_lst.append(self._run_and_wrap(self._get_exit_status))
                any_error = False
                for func_name, error, td in metadata_lst:
                    if error is True:
                        any_error = True
                if any_error is True:
                    self.logger.critical("_run_update_state: error occurred, timings below.")
                    for func_name, error, td in metadata_lst:
                        self.logger.critical("%s: %s", func_name, td)
                self.logger.info('_run_update_state sleeping for %s, tid:%s', self.update_thread_timeout,
                                  get_current_thread_identifier())
                Cobalt.Util.sleep(self.update_thread_timeout)
            self.logger.critical('_run_update_state exiting, tid:%s', get_current_thread_identifier())
            self.node_update_thread_kill_queue.get(timeout=1.0)
            self.node_update_thread_dead = True
        finally:
            self.node_update_thread_dead = True
        self.logger.critical('_run_update_state dead, tid:%s', get_current_thread_identifier())
        return
418

419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
    def _reconstruct_node(self, inven_node, inventory):
        '''Reconstruct a node from statefile information.  Needed whenever we
        find a new node.  If no statefile information from the orignal cobalt
        invocation exists, bring up a node in default states and mark node
        administratively down.

        This node was disabled and invisible to ALPS at the time Cobalt was
        initialized and so we have no current record of that node.

        '''
        nid = inven_node['node_id']
        new_node = None
        #construct basic node from inventory
        for node_info in inventory['nodes']:
            if int(node_info['node_id']) == int(nid):
                new_node = CrayNode(node_info)
                break
        if new_node is None:
            #we have a phantom node?
            self.logger.error('Unable to find inventory information for nid: %s', nid)
            return
        # if we have information from the statefile we need to repopulate the
        # node with the saved data.
        # Perhaps this should be how I construct all node data anyway?
        if self.init_spec is not None:
            node_info = self.init_spec.get('node_info', {})
            try:
                new_node.reset_info(node_info[str(nid)])
                self.logger.warning('Node %s reconstructed.', nid)
            except:
                self.logger.warning("Node nid: %s not found in restart information.  Bringing up node in a clean configuration.", nid, exc_info=True)
                #put into admin_down
                new_node.admin_down = True
                new_node.status = 'down'
                self.logger.warning('Node %s marked down.', nid)
Paul Rich's avatar
Paul Rich committed
454
        new_node.managed = True
455 456 457
        self.nodes[str(nid)] = new_node
        self.logger.warning('Node %s added to tracking.', nid)

458

459 460 461 462 463 464
    @exposed
    def update_node_state(self):
        '''update the state of cray nodes. Check reservation status and system
        stateus as reported by ALPS

        '''
Paul Rich's avatar
Paul Rich committed
465 466 467
        #Check clenaup progress.  Check ALPS reservations.  Check allocated
        #nodes.  If there is no resource reservation and the node is not in
        #current alps reservations, the node is ready to schedule again.
468
        now = time.time()
Paul Rich's avatar
Paul Rich committed
469 470 471 472 473 474 475 476
        startup_time_to_clear = []
        #clear pending starttimes.
        for jobid, start_time in self.pending_starts.items():
            if int(now) > int(start_time):
                startup_time_to_clear.append(jobid)
        for jobid in startup_time_to_clear:
            del self.pending_starts[jobid]

Paul Rich's avatar
Paul Rich committed
477
        self.check_killing_aprun()
478
        with self._node_lock:
Paul Rich's avatar
Paul Rich committed
479
            fetch_time_start = time.time()
480
            try:
481 482
                #I have seen problems with the kitchen-sink query here, where
                #the output gets truncated on it's way into Cobalt.
Paul Rich's avatar
Paul Rich committed
483
                #inventory = ALPSBridge.fetch_inventory(resinfo=True) #This is a full-refresh,
484
                #determine if summary may be used under normal operation
485
                #updated for >= 1.6 interface
Paul Rich's avatar
Paul Rich committed
486 487 488
                inven_nodes = ALPSBridge.extract_system_node_data(ALPSBridge.system())
                reservations = ALPSBridge.fetch_reservations()
                #reserved_nodes = ALPSBridge.reserved_nodes()
489 490 491
                # Fetch SSD diagnostic data and enabled flags. I would hope these change in event of dead ssd
                ssd_enabled = ALPSBridge.fetch_ssd_enable()
                ssd_diags = ALPSBridge.fetch_ssd_diags()
492 493 494 495
            except (ALPSBridge.ALPSError, ComponentLookupError):
                _logger.warning('Error contacting ALPS for state update.  Aborting this update',
                        exc_info=True)
                return
496
            inven_reservations = reservations.get('reservations', [])
Paul Rich's avatar
Paul Rich committed
497
            fetch_time_start = time.time()
498
            #_logger.debug("time in ALPS fetch: %s seconds", (time.time() - fetch_time_start))
499
            start_time = time.time()
Paul Rich's avatar
Paul Rich committed
500
            self._detect_rereservation(inven_reservations)
501 502 503 504 505 506
            # check our reservation objects.  If a res object doesn't correspond
            # to any backend reservations, this reservation object should be
            # dropped
            alps_res_to_delete = []
            current_alps_res_ids = [int(res['reservation_id']) for res in
                    inven_reservations]
Paul Rich's avatar
Paul Rich committed
507
            res_jobid_to_delete = []
508 509 510 511
            if self.alps_reservations == {}:
                # if we have nodes in cleanup-pending but no alps reservations,
                # then the nodes in cleanup pending are considered idle (or
                # at least not in cleanup).  Hardware check can catch these
512 513
                # later. This catches leftover reservations from hard-shutdowns
                # while running.
514 515 516 517
                for node in self.nodes.values():
                    if node.status in ['cleanup', 'cleanup-pending']:
                        node.status = 'idle'
            for alps_res in self.alps_reservations.values():
Paul Rich's avatar
Paul Rich committed
518 519
                if alps_res.jobid in self.pending_starts.keys():
                    continue #Get to this only after timeout happens
Paul Rich's avatar
Paul Rich committed
520
                #find alps_id associated reservation
521
                if int(alps_res.alps_res_id) not in current_alps_res_ids:
Paul Rich's avatar
Paul Rich committed
522
                    for node_id in alps_res.node_ids:
523
                        if not self.nodes[str(node_id)].reserved:
Paul Rich's avatar
Paul Rich committed
524
                            #pending hardware status update
525
                            self.nodes[str(node_id)].status = 'idle'
Paul Rich's avatar
Paul Rich committed
526
                    res_jobid_to_delete.append(alps_res.jobid)
Paul Rich's avatar
Paul Rich committed
527
                    _logger.info('job %s: Nodes %s cleanup complete.', alps_res.jobid, compact_num_list(alps_res.node_ids))
Paul Rich's avatar
Paul Rich committed
528
            for jobid in res_jobid_to_delete:
529
                _logger.info('%s: ALPS reservation for this job complete.', jobid)
Paul Rich's avatar
Paul Rich committed
530 531 532 533 534 535 536 537
                try:
                    del self.alps_reservations[str(jobid)]
                except KeyError:
                    _logger.warning('Job %s: Attempted to remove ALPS reservation for this job multiple times', jobid)
                if self.alps_reservations.get(int(jobid), None) is not None:
                    # in case of type leakage
                    _logger.warning('Job %s: ALPS reservation found with integer key: deleting', jobid)
                    del self.alps_reservations[jobid]
Paul Rich's avatar
Paul Rich committed
538 539
            #process group should already be on the way down since cqm released the
            #resource reservation
540
            cleanup_nodes = [node for node in self.nodes.values()
541
                             if node.status in ['cleanup-pending', 'cleanup']]
Paul Rich's avatar
Paul Rich committed
542
            #If we have a block marked for cleanup, send a release message.
Paul Rich's avatar
Paul Rich committed
543
            released_res_jobids = []
544
            cleaned_nodes = []
Paul Rich's avatar
Paul Rich committed
545
            for node in cleanup_nodes:
546
                found = False
547
                for alps_res in self.alps_reservations.values():
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
                    if str(node.node_id) in alps_res.node_ids:
                        found = True
                        if alps_res.jobid not in released_res_jobids:
                            #send only one release per iteration
                            apids = alps_res.release()
                            if apids is not None:
                                for apid in apids:
                                    self.signal_aprun(apid)
                            released_res_jobids.append(alps_res.jobid)
                if not found:
                    # There is no alps reservation to release, cleanup is
                    # already done.  This happens with very poorly timed
                    # qdel requests. Status will be set properly with the
                    # subsequent hardware status check.
                    _logger.info('Node %s cleanup complete.', node.node_id)
                    node.status = 'idle'
                    cleaned_nodes.append(node)
            for node in cleaned_nodes:
                cleanup_nodes.remove(node)
Paul Rich's avatar
Paul Rich committed
567 568

        #find hardware status
569 570 571
            #so we do this only once for nodes being added.
            #full inventory fetch is expensive.
            recon_inventory = None
Paul Rich's avatar
Paul Rich committed
572
            for inven_node in inven_nodes.values():
573 574
                if self.nodes.has_key(str(inven_node['node_id'])):
                    node = self.nodes[str(inven_node['node_id'])]
575
                    node.role = inven_node['role'].upper()
576
                    node.attributes.update(inven_node['attrs'])
Paul Rich's avatar
Paul Rich committed
577
                    if node.reserved:
578 579
                        #node marked as reserved.
                        if self.alps_reservations.has_key(str(node.reserved_jobid)):
Paul Rich's avatar
Paul Rich committed
580 581
                            node.status = 'busy'
                        else:
582 583 584 585 586 587
                            # check to see if the resource reservation should be
                            # released.
                            if node.reserved_until >= now:
                                node.status = 'allocated'
                            else:
                                node.release(user=None, jobid=None, force=True)
Paul Rich's avatar
Paul Rich committed
588
                    else:
589
                        node.status = inven_node['state'].upper()
590 591
                        if node.role.upper() not in ['BATCH'] and node.status is 'idle':
                            node.status = 'alps-interactive'
592
                else:
593 594 595
                    # Apparently, we CAN add nodes on the fly.  The node would
                    # have been disabled.  We need to add a new node and update
                    # it's state.
596
                    _logger.warning('Unknown node %s found. Starting reconstruction.', inven_node['node_id'])
597 598 599 600 601 602 603 604 605
                    try:
                        if recon_inventory is None:
                            recon_inventory = ALPSBridge.fetch_inventory()
                    except:
                        _logger.error('Failed to fetch inventory.  Will retry on next pass.', exc_info=True)
                    else:
                        self._reconstruct_node(inven_node, recon_inventory)
                   # _logger.error('UNS: ALPS reports node %s but not in our node list.',
                   #               inven_node['node_id'])
606 607
            # Update SSD data:
            self._update_ssd_data(self.nodes, ssd_enabled=ssd_enabled, ssd_diags=ssd_diags)
Paul Rich's avatar
Paul Rich committed
608 609 610
            #should down win over running in terms of display?
            #keep node that are marked for cleanup still in cleanup
            for node in cleanup_nodes:
611
                node.status = 'cleanup-pending'
612
        #_logger.debug("time in UNS lock: %s seconds", (time.time() - start_time))
613 614
        return

Paul Rich's avatar
Paul Rich committed
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663
    def _detect_rereservation(self, inven_reservations):
        '''Detect and update the ALPS reservation associated with a running job.
        We are only concerned with BATCH reservations.  Others would be
        associated with running jobs, and should not be touched.

        '''
        def _construct_alps_res():
            with self._node_lock:
                job_nodes = [node.node_id for node in self.nodes.values()
                        if node.reserved_jobid == int(alps_res['batch_id'])]
            new_resspec = {'reserved_nodes': job_nodes,
                           'reservation_id': str(alps_res['reservation_id']),
                           'pagg_id': 0 #unknown.  Not used here.
                            }
            new_jobspec = {'jobid': int(alps_res['batch_id']),
                           'user' : alps_res['user_name']}

            return ALPSReservation(new_jobspec, new_resspec, self.nodes)

        replaced_reservation = None
        for alps_res in inven_reservations:
            try:
                #This traversal is terrible. May want to hide this in the API
                #somewhere
                if alps_res['ApplicationArray'][0]['Application'][0]['CommandArray'][0]['Command'][0]['cmd'] != 'BASIL':
                    # Not a reservation we're in direct control of.
                    continue
            except (KeyError, IndexError):
                #not a batch reservation
                continue
            if str(alps_res['batch_id']) in self.alps_reservations.keys():
                # This is a reservation we may already know about
                if (int(alps_res['reservation_id']) ==
                        self.alps_reservations[str(alps_res['batch_id'])].alps_res_id):
                    # Already know about this one
                    continue
                # we have a re-reservation.  If this has a batch id, we need
                # to add it to our list of tracked reservations, and inherit
                # other reservation details.  We can pull the reservation
                # information out of reserve_resources_until.

                # If this is a BATCH reservation and no hardware has that
                # reservation id, then this reservation needs to be released
                # Could happen if we have a job starting right at the RRU
                # boundary.
                new_alps_res = _construct_alps_res()
                tracked_res = self.alps_reservations.get(new_alps_res.jobid, None)
                if tracked_res is not None:
                    try:
Paul Rich's avatar
Paul Rich committed
664
                        apids = tracked_res.release()
Paul Rich's avatar
Paul Rich committed
665 666 667 668 669 670 671 672 673 674 675 676 677 678 679
                    except ALPSBridge.ALPSError:
                        # backend reservation probably is gone, which is why
                        # we are here in the first place.
                        pass
                self.alps_reservations[str(alps_res['batch_id'])] = new_alps_res
            else:
                #this is a basil reservation we don't know about already.
                new_alps_res = _construct_alps_res()
                if len(new_alps_res.node_ids) == 0:
                    # This reservation has no resources, so Cobalt's internal
                    # resource reservation tracking has no record.  This needs to
                    # be removed.
                    new_alps_res.release()
                else:
                    self.alps_reservations[str(alps_res['batch_id'])] = new_alps_res
Paul Rich's avatar
Paul Rich committed
680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705
        return

    def signal_aprun(self, aprun_id, signame='SIGINT'):
        '''Signal an aprun by aprun id (application_id).  Does not block.
        Use check_killing_aprun to determine completion/termination.  Does not
        depend on the host the aprun(s) was launched from.

        Input:
            aprun_id - integer application id number.
            signame  - string name of signal to send (default: SIGINT)
        Notes:
            Valid signals to apkill are:
            SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGABRT, SIGUSR1, SIGUSR2, SIGURG,
            and SIGWINCH (from apkill(1) man page.)  Also allowing SIGKILL.

        '''
        #Expect changes with an API updte

        #mark legal signals from docos
        if (signame not in ['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM', 'SIGABRT',
            'SIGUSR1', 'SIGUSR2', 'SIGURG','SIGWINCH', 'SIGKILL']):
            raise ValueError('%s is not a legal value for signame.', signame)
        try:
            retval = Cobalt.Proxy.ComponentProxy('system_script_forker').fork(
                    [APKILL_CMD, '-%s' % signame, '%d' % int(aprun_id)],
                    'aprun_termination', '%s cleanup:'% aprun_id)
Paul Rich's avatar
Paul Rich committed
706
            _logger.info("killing backend ALPS application_id: %s", aprun_id)
Paul Rich's avatar
Paul Rich committed
707 708 709 710 711 712 713 714 715 716 717 718 719 720 721
        except xmlrpclib.Fault:
            _logger.warning("XMLRPC Error while killing backend job: %s, will retry.",
                    aprun_id, exc_info=True)
        except:
            _logger.critical("Unknown Error while killing backend job: %s, will retry.",
                    aprun_id, exc_info=True)
        else:
            self.killing_jobs[aprun_id] = retval
        return

    def check_killing_aprun(self):
        '''Check that apkill commands have completed and clean them from the
        system_script_forker.  Allows for non-blocking cleanup initiation.

        '''
722

Paul Rich's avatar
Paul Rich committed
723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
        try:
            system_script_forker = Cobalt.Proxy.ComponentProxy('system_script_forker')
        except:
            self.logger.critical("Cannot connect to system_script_forker.",
                    exc_info=True)
            return
        complete_jobs = []
        rev_killing_jobs = dict([(v,k) for (k,v) in self.killing_jobs.iteritems()])
        removed_jobs = []
        current_killing_jobs = system_script_forker.get_children(None, self.killing_jobs.values())

        for job in current_killing_jobs:
            if job['complete']:
                del self.killing_jobs[rev_killing_jobs[int(job['id'])]]
                removed_jobs.append(job['id'])
        system_script_forker.cleanup_children(removed_jobs)
Paul Rich's avatar
Paul Rich committed
739
        return
Paul Rich's avatar
Paul Rich committed
740

741
    @exposed
742 743 744 745 746 747
    def find_queue_equivalence_classes(self, reservation_dict, active_queue_names, passthrough_blocking_res_list=[]):
        '''Given a list of reservations and a list of active queues from the queue-manager
        via the scheduler, returns a list of dictionaries containing the current active
        reservations on the machine as well as a list of active queues that have nodes
        associated with them.   From this the scheduler can determine which jobs have
        any chance at all of being eligible to run.
748 749

        Inputs:
750 751 752 753 754 755 756 757
            reservation_dict -- a mapping of active reservations to resrouces.
                                These will block any job in a normal queue.
            active_queue_names -- A list of queues that are currently enabled.
                                  Queues that are not in the 'running' state
                                  are ignored.
            passthrough_partitions -- Not used on Cray systems currently.  This is
                                      for handling hardware that supports
                                      partitioned interconnect networks.
758 759

        Output:
760
            A list containing dictionaries with reservations and active queues with associated nodes.
761 762

        Side effects:
763
            None
764 765

        Internal Data:
766 767 768 769 770 771 772 773
            This does make use of a list of current queues associated with nodes.
            This is done for speed, to prevent a lot of needless iteration.
            The nodes_by_queue association is updated any time that there is a change
            in the mapping of queues to nodes, when nodeadm --queue is invoked.

        Notes:
            This always returns a single "equivalence class" no matter what the
            current queue-node binding on the machine is.
774 775

        '''
Eric Roy Pershey's avatar
Eric Roy Pershey committed
776 777
        with self._node_lock:
            equiv_class = [{'reservations': reservation_dict.keys(),
778 779
                 'queues': [queue_name for queue_name in self.nodes_by_queue.keys()
                            if queue_name in active_queue_names]}]
Eric Roy Pershey's avatar
Eric Roy Pershey committed
780
        return equiv_class
781

782
    def _setup_special_locations(self, job):
783 784 785
        forbidden = set([str(loc) for loc in chain_loc_list(job.get('forbidden', []))])
        required = set([str(loc) for loc in chain_loc_list(job.get('required', []))])
        requested_locations = set([str(n) for n in expand_num_list(job['attrs'].get('location', ''))])
786 787 788 789 790 791 792 793 794
        # If ssds are required, add nodes without working SSDs to the forbidden list
        ssd_unavail = set([])
        if job.get('attrs', {}).get("ssds", "none").lower() == "required":
            ssd_min_size = int(job.get('attrs', {}).get("ssd_size", DEFAULT_MIN_SSD_SIZE)) * 1000000000 #convert to bytes
            ssd_unavail.update(set([str(node.node_id) for node in self.nodes.values()
                                  if (node.attributes['ssd_enabled'] == 0 or
                                      int(node.attributes.get('ssd_info', {'size': DEFAULT_MIN_SSD_SIZE})['size'])  < ssd_min_size)
                                ]))
        return (forbidden, required, requested_locations, ssd_unavail)
795

796
    def _assemble_queue_data(self, job, idle_only=True, drain_time=None):
797 798
        '''put together data for a queue, or queue-like reservation structure.

799 800 801 802 803 804 805
        Input:
            job - dictionary of job data.
            idle_only - [default: True] if True, return only idle nodes.
                        Otherwise return nodes in any non-down status.

        return count of idle resources, and a list of valid nodes to run on.
        if idle_only is set to false, returns a set of candidate draining nodes.
806 807 808


        '''
809 810 811 812
        # RESERVATION SUPPORT: Reservation queues are ephemeral, so we will
        # not find the queue normally. In the event of a reservation we'll
        # have to intersect required nodes with the idle and available
        # we also have to forbid a bunch of locations, in  this case.
813
        unavailable_nodes = []
814
        forbidden, required, requested_locations, ssd_unavail = self._setup_special_locations(job)
815 816 817 818 819 820
        requested_loc_in_forbidden = False
        for loc in requested_locations:
            if loc in forbidden:
                #don't spam the logs.
                requested_loc_in_forbidden = True
                break
821
        if job['queue'] not in self.nodes_by_queue.keys():
822 823
            # Either a new queue with no resources, or a possible
            # reservation need to do extra work for a reservation
824
            node_id_list = list(required - forbidden - ssd_unavail)
825
        else:
826
            node_id_list = list(set(self.nodes_by_queue[job['queue']]) - forbidden - ssd_unavail)
827
        if requested_locations != set([]): # handle attrs location= requests
828
            job_set = set([str(nid) for nid in requested_locations])
829
            if job['queue'] not in self.nodes_by_queue.keys():
830 831 832
                #we're in a reservation and need to further restrict nodes.
                if job_set <= set(node_id_list):
                    # We are in a reservation there are no forbidden nodes.
833
                    node_id_list = list(requested_locations - ssd_unavail)
834 835 836 837
                else:
                    # We can't run this job.  Insufficent resources in this
                    # reservation to do so.  Don't risk blocking anything.
                    node_id_list = []
838
            else:
839 840 841 842 843
                # Check to see if the job is requesting resources that are not
                # available in the queue specified and raise an exception.  This
                # results in a warning.
                if not requested_locations.issubset(set(self.nodes_by_queue[job['queue']])):
                    raise ValueError("requested locations not in queue")
844 845
                #normal queues.  Restrict to the non-reserved nodes.
                if job_set <= set([str(node_id) for node_id in
846
                                    self.nodes_by_queue[job['queue']]]):
847
                    node_id_list = list(requested_locations)
848 849 850 851
                    if not set(node_id_list).isdisjoint(forbidden):
                        # this job has requested locations that are a part of an
                        # active reservation.  Remove locaitons and drop available
                        # nodecount appropriately.
852
                        node_id_list = list(set(node_id_list) - forbidden - ssd_unavail)
853 854
                    # if not requested_loc_in_forbidden:
                        # raise ValueError("forbidden locations not in queue")
855
        with self._node_lock:
856 857 858 859 860 861 862
            if idle_only:
                unavailable_nodes = [node_id for node_id in node_id_list
                        if self.nodes[str(node_id)].status not in ['idle']]
            else:
                unavailable_nodes = [node_id for node_id in node_id_list
                        if self.nodes[str(node_id)].status in
                        self.nodes[str(node_id)].DOWN_STATUSES]
863
            if drain_time is not None:
864
                unavailable_nodes.extend([node_id for node_id in node_id_list
865
                    if (self.nodes[str(node_id)].draining and
866
                        (self.nodes[str(node_id)].drain_until - BACKFILL_EPSILON) < int(drain_time))])
867
        for node_id in set(unavailable_nodes):
868
            node_id_list.remove(node_id)
869
        return sorted(node_id_list, key=lambda nid: int(nid))
870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888

    def _select_first_nodes(self, job, node_id_list):
        '''Given a list of nids, select the first node count nodes fromt the
        list.  This is the target for alternate allocator replacement.

        Input:
            job - dictionary of job data from the scheduler
            node_id_list - a list of possible candidate nodes

        Return:
            A list of nodes.  [] if insufficient nodes for the allocation.

        Note: hold the node lock while doing this.  We really don't want a
        update to happen while doing this.

        '''
        ret_nodes = []
        with self._node_lock:
            if int(job['nodes']) <= len(node_id_list):
889 890
                node_id_list.sort(key=lambda nid: int(nid))
                ret_nodes = node_id_list[:int(job['nodes'])]
891 892
        return ret_nodes

893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908
    def _select_first_nodes_prefer_memory_match(self, job, node_id_list):
        '''Given a list of nids, select the first node count nodes fromt the
        list.  Prefer nodes that match the memory modes for a given job, then
        go in nid order.

        Input:
            job - dictionary of job data from the scheduler
            node_id_list - a list of possible candidate nodes

        Return:
            A list of nodes.  [] if insufficient nodes for the allocation.

        Note: hold the node lock while doing this.  We really don't want a
        update to happen while doing this.

        '''
909
        ssd_required = (job.get('attrs', {}).get("ssds", "none").lower() == "required")
910 911 912
        if job.get('attrs', {}).get('mcdram', None) is None or job.get('attrs', {}).get('numa', None) is None:
            # insufficient information to include a mode match
            return self._select_first_nodes(job, node_id_list)
913
        ret_nids = []
914 915 916
        with self._node_lock:
            considered_nodes = [node for node in self.nodes.values() if node.node_id in node_id_list]
            for node in considered_nodes:
917
                if (node.attributes['hbm_cache_pct'] == MCDRAM_TO_HBMCACHEPCT[job['attrs']['mcdram']] and
918
                        node.attributes['numa_cfg'] == job['attrs']['numa']):
919 920
                    ret_nids.append(int(node.node_id))
            if len(ret_nids) < int(job['nodes']):
921 922
                node_id_list.sort(key=lambda nid: int(nid))
                for nid in node_id_list:
923 924
                    if int(nid) not in ret_nids:
                        ret_nids.append(int(nid))
925 926
                        if len(ret_nids) >= int(job['nodes']):
                            break
927 928
        return ret_nids[:int(job['nodes'])]

929

930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949
    def _associate_and_run_immediate(self, job, resource_until_time, node_id_list):
        '''Given a list of idle node ids, choose a set that can run a job
        immediately, if a set exists in the node_id_list.

        Inputs:
            job - Dictionary of job data
            node_id_list - a list of string node id values

        Side Effects:
            Will reserve resources in ALPS and will set resource reservations on
            allocated nodes.

        Return:
            None if no match, otherwise the pairing of a jobid and set of nids
            that have been allocated to a job.

        '''
        compact_locs = None
        if int(job['nodes']) <= len(node_id_list):
            #this job can be run immediately
950
            to_alps_list = self._select_first_nodes_prefer_memory_match(job, node_id_list)
951 952 953 954 955
            job_locs = self._ALPS_reserve_resources(job, resource_until_time,
                    to_alps_list)
            if job_locs is not None and len(job_locs) == int(job['nodes']):
                compact_locs = compact_num_list(job_locs)
                #temporary reservation until job actually starts
956
                #FIXME: data race, self.pending_starts can be modified without locks
957 958 959
                self.pending_starts[job['jobid']] = resource_until_time
                self.reserve_resources_until(compact_locs, resource_until_time, job['jobid'])
        return compact_locs
960

961
    @locking
962 963
    @exposed
    def find_job_location(self, arg_list, end_times, pt_blocking_locations=[]):
964 965 966 967 968 969
        '''Given a list of jobs, and when jobs are ending, return a set of
        locations mapped to a jobid that can be run.  Also, set up draining
        as-needed to run top-scored jobs and backfill when possible.

        Called once per equivalence class.

970 971
        Args::
            arg_list: A list of dictionaries containning information on jobs to
972
                   cosnider.
973
            end_times: list containing a mapping of locations and the times jobs
974 975
                    runninng on those locations are scheduled to end.  End times
                    are in seconds from Epoch UTC.
976
            pt_blocking_locations: Not used for this system.  Used in partitioned
977 978 979