test_cray.py 74.4 KB
Newer Older
Paul Rich's avatar
Paul Rich committed
1
# Test Cray-specific utilities/calls.
2 3 4 5 6 7 8
SYSTEM_CONFIG_ENTRY = """
[system]
size: 10
elogin_hosts: foo:bar
"""
import Cobalt
import TestCobalt
9
import sys
10 11
import xml.etree.ElementTree
import xmlrpclib
12 13 14 15 16
config_file = Cobalt.CONFIG_FILES[0]
config_fp = open(config_file, "w")
config_fp.write(SYSTEM_CONFIG_ENTRY)
config_fp.close()

17 18 19 20
from mock import MagicMock, Mock, patch
# need to mock the import of a dependency library for sending messages to BASIL.
# None of these tests actually communicate with BASIL and this library should be a stub
# for these purposes
21 22 23 24 25
cray_messaging_mock = MagicMock()
# This error gets passed through the ALPSBridge module as ALPSError, but originates from
# cray_messaging.
cray_messaging_mock.ALPSError = ValueError
sys.modules['cray_messaging'] = cray_messaging_mock
26

Paul Rich's avatar
Paul Rich committed
27 28
from nose.tools import raises
from testsuite.TestCobalt.Utilities.assert_functions import assert_match, assert_not_match
29 30


Paul Rich's avatar
Paul Rich committed
31 32 33 34 35 36 37 38 39 40 41
from Cobalt.Components.system.CrayNode import CrayNode
import Cobalt.Exceptions
import time
from Cobalt.Components.system.CraySystem import CraySystem
from Cobalt.Components.system.base_pg_manager import ProcessGroupManager
import Cobalt.Components.system.AlpsBridge as AlpsBridge


def is_match(a, b):
    return a is b

42 43 44 45 46 47 48 49 50 51 52 53
def fake_alps_reserve(user, jobid, nodes, attrs, node_id_list):
    '''mock for AlpsBridge.reserve method'''
    assert type(node_id_list) == type([])
    ret_info = {'reserved_nodes': node_id_list,
                'reservation_id': 1,
                }
    return ret_info

def return_none(*args, **kwargs):
    return None


Paul Rich's avatar
Paul Rich committed
54 55 56 57 58 59 60 61 62 63 64 65 66
class TestCrayNode(object):

    def setup(self):
        self.spec = {'name':'test', 'state': 'UP', 'node_id': 1, 'role':'batch',
                'architecture': 'XT', 'SocketArray':['foo', 'bar'],
                }
        self.base_node = CrayNode(self.spec)

    def teardown(self):
        del self.spec
        del self.base_node

    def test_init(self):
67
        '''CrayNode.__init__: test initilaizer'''
Paul Rich's avatar
Paul Rich committed
68 69 70 71 72 73 74 75 76 77 78 79 80
        spec = {'name':'test', 'state': 'UP', 'node_id': 1, 'role':'batch',
                'architecture': 'XT', 'SocketArray':['foo', 'bar'],
                }
        node = CrayNode(spec)
        assert_match(node.status, 'idle', 'bad status')
        assert_match(node.node_id, 1, 'bad nodeid')
        assert_match(node.role, 'BATCH', 'bad role')
        assert_match(node.attributes['architecture'], 'XT',
                'bad architecture',  is_match)
        assert_match(node.segment_details, ['foo', 'bar'],
                'bad segment')
        assert_match(node.ALPS_status, 'UNKNOWN',
                'bad default ALPS status')
81 82
        assert 'alps-interactive' in node.RESOURCE_STATUSES,(
                'alps-interactive not in resource statuses')
Paul Rich's avatar
Paul Rich committed
83 84

    def test_init_alps_states(self):
85
        '''CrayNode.__init__: alps states correctly set'''
Paul Rich's avatar
Paul Rich committed
86 87 88 89 90 91 92 93 94 95
        cray_state_list = ['UP', 'DOWN', 'UNAVAILABLE', 'ROUTING', 'SUSPECT',
                           'ADMIN', 'UNKNOWN', 'UNAVAIL', 'SWDOWN', 'REBOOTQ',
                           'ADMINDOWN']
        correct_alps_states = {'UP': 'idle', 'DOWN':'down', 'UNAVAILABLE':'down',
                               'ROUTING':'down', 'SUSPECT':'down', 'ADMIN':'down',
                               'UNKNOWN':'down', 'UNAVAIL': 'down', 'SWDOWN': 'down',
                               'REBOOTQ':'down', 'ADMINDOWN':'down'}
        for state in cray_state_list:
            self.spec['state'] = state
            node = CrayNode(self.spec)
96 97 98
            assert node.status == correct_alps_states[state],(
                    "%s should map to %s" % (node.status,
                        correct_alps_states[state]))
Paul Rich's avatar
Paul Rich committed
99 100

    def test_non_cray_statuses(self):
101
        '''CrayNode.status: can set cobalt-tracking statuses.'''
Paul Rich's avatar
Paul Rich committed
102 103 104 105 106 107 108
        test_statuses = ['busy', 'cleanup-pending', 'allocated',
                'alps-interactive']
        for status in test_statuses:
            self.base_node.status = status
            assert_match(self.base_node.status, status, "failed validation")

class TestCraySystem(object):
109 110
    '''Test Cray system component functionality'''
    #SETUP AND TEARDOWN HELPERS
Paul Rich's avatar
Paul Rich committed
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
    @patch.object(AlpsBridge, 'init_bridge')
    @patch.object(CraySystem, '_init_nodes_and_reservations', return_value=None)
    @patch.object(CraySystem, '_run_update_state', return_value=None)
    def setup(self, *args, **kwargs):
        self.system = CraySystem()
        self.base_spec = {'name':'test', 'state': 'UP', 'node_id': '1', 'role':'batch',
                'architecture': 'XT', 'SocketArray':['foo', 'bar'],
                'queues':['default'],
                }
        for i in range(1,6):
            self.base_spec['name'] = "test%s" % i
            self.base_spec['node_id'] = str(i)
            node_dict=dict(self.base_spec)
            self.system.nodes[str(i)] = CrayNode(node_dict)
            self.system.node_name_to_id[node_dict['name']] = node_dict['node_id']
126 127
        for node in self.system.nodes.values():
            node.managed = True
Paul Rich's avatar
Paul Rich committed
128 129 130
        self.system._gen_node_to_queue()

        self.base_job = {'jobid':1, 'user':'crusher', 'attrs':{},
131
                'queue':'default', 'nodes': 1, 'walltime': 60,
Paul Rich's avatar
Paul Rich committed
132
                }
133
        self.fake_reserve_called = False
134 135
        Cobalt.Components.system.CraySystem.BACKFILL_EPSILON = 120
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "first-fit"
Paul Rich's avatar
Paul Rich committed
136 137 138 139

    def teardown(self):
        del self.system
        del self.base_job
140 141
        Cobalt.Components.system.CraySystem.BACKFILL_EPSILON = 120
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "first-fit"
142 143
        self.fake_reserve_called = False

Paul Rich's avatar
Paul Rich committed
144

145 146 147 148 149
    # HELPER MOCK FUNCTIONS
    def fake_reserve(self, job, new_time, node_id_list):
        '''Mimic first-fit function of ALPS placement scheme'''
        # self gets overriden by the call within fjl to be the real system
        # component.
150
        self.fake_reserve_called = True
151 152 153 154 155 156
        ret_nodes = []
        if job['nodes'] <= len(node_id_list):
            ret_nodes = node_id_list[:int(job['nodes'])]
        return ret_nodes

    #TESTS
Paul Rich's avatar
Paul Rich committed
157
    def test_assemble_queue_data(self):
158
        '''CraySystem._assemble_queue_data: base functionality'''
159
        nodelist =  self.system._assemble_queue_data(self.base_job)
160
        assert_match(sorted(nodelist), ['1', '2', '3', '4', '5'], 'nodelist mismatch')
Paul Rich's avatar
Paul Rich committed
161 162

    def test_assemble_queue_data_bad_queue(self):
163
        '''CraySystem._assemble_queue_data: return nothing if queue for job doesn't exist'''
Paul Rich's avatar
Paul Rich committed
164
        self.base_job['queue'] = 'foo'
165
        nodelist = self.system._assemble_queue_data(self.base_job)
166
        assert_match(nodelist, [], 'nonempty nodelist')
Paul Rich's avatar
Paul Rich committed
167 168

    def test_assemble_queue_data_multiple_queue(self):
169
        '''CraySystem._assemble_queue_data: return only proper queue nodes'''
Paul Rich's avatar
Paul Rich committed
170 171 172
        self.system.nodes['1'].queues = ['foo']
        self.system.nodes['4'].queues = ['bar']
        self.system._gen_node_to_queue()
173
        nodelist = self.system._assemble_queue_data(self.base_job)
174
        assert_match(sorted(nodelist), ['2', '3', '5'], 'Wrong nodelist')
Paul Rich's avatar
Paul Rich committed
175 176

    def test_assemble_queue_data_multiple_queue_overlap(self):
177
        '''CraySystem._assemble_queue_data: return only proper queue nodes in overlaping queues'''
Paul Rich's avatar
Paul Rich committed
178 179 180 181
        self.system.nodes['1'].queues = ['foo', 'default', 'bar']
        self.system.nodes['4'].queues = ['default','bar']
        self.system.nodes['5'].queues = ['baz']
        self.system._gen_node_to_queue()
182
        nodelist = self.system._assemble_queue_data(self.base_job)
183
        assert_match(sorted(nodelist), ['1', '2', '3', '4'], 'Wrong nodelist')
Paul Rich's avatar
Paul Rich committed
184
        self.base_job['queue'] = 'foo'
185
        nodelist = self.system._assemble_queue_data(self.base_job)
186
        assert_match(nodelist, ['1'], 'Wrong nodelist')
Paul Rich's avatar
Paul Rich committed
187
        self.base_job['queue'] = 'bar'
188
        nodelist = self.system._assemble_queue_data(self.base_job)
189
        assert_match(sorted(nodelist), ['1', '4'], 'Wrong nodelist')
Paul Rich's avatar
Paul Rich committed
190
        self.base_job['queue'] = 'baz'
191
        nodelist = self.system._assemble_queue_data(self.base_job)
192
        assert_match(nodelist, ['5'], 'Wrong nodelist')
Paul Rich's avatar
Paul Rich committed
193

194 195
    def test_assemble_queue_data_idle(self):
        '''CraySystem._assemble_queue_data: return only idle nodes'''
Paul Rich's avatar
Paul Rich committed
196 197 198
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['4'].status = 'ADMINDOWN'
        self.system._gen_node_to_queue()
199
        nodelist = self.system._assemble_queue_data(self.base_job)
200
        assert_match(sorted(nodelist), ['2','3','5'], 'Wrong nodelist')
Paul Rich's avatar
Paul Rich committed
201

202 203 204 205 206 207 208 209 210 211 212 213 214 215
    def test_assemble_queue_data_non_down(self):
        '''CraySystem._assemble_queue_data: return nodes that are not down'''
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'cleanup-pending'
        self.system.nodes['3'].status = 'allocated'
        self.system.nodes['4'].status = 'ADMINDOWN'
        nodelist = self.system._assemble_queue_data(self.base_job,
                idle_only=False)
        assert sorted(nodelist) == ['1','2','3','5'], 'Wrong nodes in list %s' % nodelist
        self.system.nodes['1'].status = 'SUSPECT'
        self.system.nodes['2'].status = 'alps-interactive'
        nodelist = self.system._assemble_queue_data(self.base_job,
                idle_only=False)
        assert sorted(nodelist) == ['3','5'], 'Wrong nodes in list %s' % nodelist
216

Paul Rich's avatar
Paul Rich committed
217
    def test_assemble_queue_data_attrs_location(self):
218
        '''CraySystem._assemble_queue_data: return only attr locaiton loc'''
Paul Rich's avatar
Paul Rich committed
219
        self.base_job['attrs'] = {'location':'3'}
220
        nodelist = self.system._assemble_queue_data(self.base_job)
Paul Rich's avatar
Paul Rich committed
221 222
        assert nodelist == ['3'], 'Wrong node in list %s' % nodelist

223 224 225
    def test_assemble_queue_data_attrs_location_repeats(self):
        '''CraySystem._assemble_queue_data: eliminate repeat location entries'''
        self.base_job['attrs'] = {'location':'1,1,2,3'}
226
        nodelist = self.system._assemble_queue_data(self.base_job)
227 228
        assert sorted(nodelist) == ['1', '2', '3'], 'Wrong node in list %s' % nodelist

229 230
    @raises(ValueError)
    def test_assemble_queue_data_attrs_bad_location(self):
231
        '''CraySystem._assemble_queue_data: raise error for location completely outside of queue'''
232
        self.base_job['attrs'] = {'location':'6'}
233
        nodelist = self.system._assemble_queue_data(self.base_job)
234

Paul Rich's avatar
Paul Rich committed
235
    def test_assemble_queue_data_attrs_location_multi(self):
236
        '''CraySystem._assemble_queue_data: return only attr locaiton complex loc string'''
Paul Rich's avatar
Paul Rich committed
237
        self.base_job['attrs'] = {'location':'1-3,5'}
238
        nodelist = self.system._assemble_queue_data(self.base_job)
Paul Rich's avatar
Paul Rich committed
239 240 241
        assert sorted(nodelist) == ['1','2','3','5'], 'Wrong nodes in list %s' % nodelist

    def test_assemble_queue_data_forbidden_loc(self):
242
        '''CraySystem._assemble_queue_data: avoid reserved nodes'''
Paul Rich's avatar
Paul Rich committed
243
        self.base_job['forbidden'] = ['1-3','5']
244
        nodelist = self.system._assemble_queue_data(self.base_job)
Paul Rich's avatar
Paul Rich committed
245 246 247
        assert sorted(nodelist) == ['4'], 'Wrong nodes in list %s' % nodelist

    def test_assemble_queue_data_forbidden_loc_attrs_loc(self):
248
        '''CraySystem._assemble_queue_data: avoid reserved nodes despite location being set'''
Paul Rich's avatar
Paul Rich committed
249 250
        self.base_job['forbidden'] = ['1-3']
        self.base_job['attrs'] = {'location':'1-4'}
251
        nodelist = self.system._assemble_queue_data(self.base_job)
Paul Rich's avatar
Paul Rich committed
252 253 254
        assert sorted(nodelist) == ['4'], 'Wrong nodes in list %s' % nodelist

    def test_assemble_queue_data_forbidden_loc_attrs_loc_complete(self):
255
        '''CraySystem._assemble_queue_data: avoid reserved nodes block location if superset'''
Paul Rich's avatar
Paul Rich committed
256 257
        self.base_job['forbidden'] = ['1-3']
        self.base_job['attrs'] = {'location':'1-3'}
258
        nodelist = self.system._assemble_queue_data(self.base_job)
Paul Rich's avatar
Paul Rich committed
259 260 261
        assert sorted(nodelist) == [], 'Wrong nodes in list %s' % nodelist

    def test_assemble_queue_data_forbidden_loc_attrs_loc_permit(self):
262
        '''CraySystem._assemble_queue_data: forbidden doesn't block everything'''
Paul Rich's avatar
Paul Rich committed
263 264
        self.base_job['forbidden'] = ['1-3']
        self.base_job['attrs'] = {'location':'4-5'}
265
        nodelist = self.system._assemble_queue_data(self.base_job)
Paul Rich's avatar
Paul Rich committed
266 267 268
        assert sorted(nodelist) == ['4','5'], 'Wrong nodes in list %s' % nodelist

    def test_assemble_queue_data_reserved_loc(self):
269
        '''CraySystem._assemble_queue_data: return reservation nodes'''
Paul Rich's avatar
Paul Rich committed
270 271
        self.base_job['required'] = ['2-4']
        self.base_job['queue'] = 'reservation'
272
        nodelist = self.system._assemble_queue_data(self.base_job)
Paul Rich's avatar
Paul Rich committed
273 274 275
        assert sorted(nodelist) == ['2','3','4'], 'Wrong nodes in list %s' % nodelist

    def test_assemble_queue_data_reserved_loc_idle_only(self):
276
        '''CraySystem._assemble_queue_data: return reservation nodes that are idle'''
Paul Rich's avatar
Paul Rich committed
277 278 279 280 281 282
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'cleanup-pending'
        self.system.nodes['3'].status = 'allocated'
        self.system.nodes['4'].status = 'ADMINDOWN'
        self.base_job['required'] = ['1-5']
        self.base_job['queue'] = 'reservation'
283
        nodelist = self.system._assemble_queue_data(self.base_job)
Paul Rich's avatar
Paul Rich committed
284 285
        assert sorted(nodelist) == ['5'], 'Wrong nodes in list %s' % nodelist

286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
    def test_assemble_queue_data_reserved_loc_non_down(self):
        '''CraySystem._assemble_queue_data: return reservation nodes that are not down'''
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'cleanup-pending'
        self.system.nodes['3'].status = 'allocated'
        self.system.nodes['4'].status = 'ADMINDOWN'
        self.base_job['required'] = ['1-5']
        self.base_job['queue'] = 'reservation'
        nodelist = self.system._assemble_queue_data(self.base_job,
                idle_only=False)
        assert sorted(nodelist) == ['1','2','3','5'], 'Wrong nodes in list %s' % nodelist
        self.system.nodes['1'].status = 'SUSPECT'
        self.system.nodes['2'].status = 'alps-interactive'
        self.base_job['required'] = ['1-5']
        self.base_job['queue'] = 'reservation'
        nodelist = self.system._assemble_queue_data(self.base_job,
                idle_only=False)
        assert sorted(nodelist) == ['3','5'], 'Wrong nodes in list %s' % nodelist

Paul Rich's avatar
Paul Rich committed
305
    def test_assemble_queue_data_reserved_loc_location_set(self):
306
        '''CraySystem._assemble_queue_data: return reservation nodes for job with location set'''
Paul Rich's avatar
Paul Rich committed
307 308 309
        self.base_job['required'] = ['1-4']
        self.base_job['attrs'] = {'location':'1,2,4'}
        self.base_job['queue'] = 'reservation'
310
        nodelist = self.system._assemble_queue_data(self.base_job)
Paul Rich's avatar
Paul Rich committed
311
        assert sorted(nodelist) == ['1','2','4'], 'Wrong nodes in list %s' % nodelist
312

313 314 315 316 317 318 319
    def test_assemble_queue_data_attrs_location_blocked_nodes(self):
        '''CraySystem._assemble_queue_data: return only idle locations'''
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'cleanup-pending'
        self.system.nodes['3'].status = 'allocated'
        self.system.nodes['4'].status = 'ADMINDOWN'
        self.base_job['attrs'] = {'location':'1-5'}
320
        nodelist = self.system._assemble_queue_data(self.base_job)
321 322 323 324 325 326 327 328 329 330
        assert nodelist == ['5'], 'Wrong node in list %s' % nodelist

    def test_assemble_queue_data_attrs_location_all_blocked_nodes(self):
        '''CraySystem._assemble_queue_data: return no locations if attrs location nodes are
        all non idle'''
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'cleanup-pending'
        self.system.nodes['3'].status = 'allocated'
        self.system.nodes['4'].status = 'ADMINDOWN'
        self.base_job['attrs'] = {'location':'1-4'}
331
        nodelist = self.system._assemble_queue_data(self.base_job)
332
        assert nodelist == [], 'Wrong node in list %s' % nodelist
333

334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
    def test_assemble_queue_data_attrs_location_any_not_down(self):
        '''CraySystem._assemble_queue_data: attrs locaiton return any not down'''
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'cleanup-pending'
        self.system.nodes['3'].status = 'allocated'
        self.system.nodes['4'].status = 'ADMINDOWN'
        self.base_job['attrs'] = {'location':'1-5'}
        self.base_job['nodes'] = 4
        nodelist = self.system._assemble_queue_data(self.base_job, idle_only=False)
        assert nodelist == ['1', '2', '3', '5'], 'Wrong node in list %s' % nodelist

    def test_assemble_queue_data_attrs_location_any_not_down_drain_limit(self):
        '''CraySystem._assemble_queue_data: attrs locaiton return any not down in drain window'''
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'cleanup-pending'
        self.system.nodes['3'].status = 'allocated'
        self.system.nodes['4'].status = 'ADMINDOWN'
        self.system.nodes['1'].set_drain(500.0, 1)
        self.system.nodes['2'].set_drain(600.0, 2)
        self.system.nodes['3'].set_drain(700.0, 3)
        self.base_job['attrs'] = {'location':'1-5'}
        self.base_job['nodes'] = 2
        nodelist = self.system._assemble_queue_data(self.base_job,
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
                idle_only=False, drain_time=650)
        assert nodelist == ['5'], 'Wrong node in list %s' % nodelist

    def test_assemble_queue_data_attrs_location_any_not_down_drain_limit_no_ep(self):
        '''CraySystem._assemble_queue_data: attrs locaiton return any not down in drain window no epsilon'''
        Cobalt.Components.system.CraySystem.BACKFILL_EPSILON = 0
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'cleanup-pending'
        self.system.nodes['3'].status = 'allocated'
        self.system.nodes['4'].status = 'ADMINDOWN'
        self.system.nodes['1'].set_drain(500.0, 1)
        self.system.nodes['2'].set_drain(600.0, 2)
        self.system.nodes['3'].set_drain(700.0, 3)
        self.base_job['attrs'] = {'location':'1-5'}
        self.base_job['nodes'] = 2
        nodelist = self.system._assemble_queue_data(self.base_job,
                idle_only=False, drain_time=650)
374 375 376
        assert nodelist == ['3', '5'], 'Wrong node in list %s' % nodelist

    def test_assemble_queue_data_non_draining(self):
377 378 379 380 381 382
        '''CraySystem._assemble_queue_data: return idle and non draining only'''
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'down'
        self.system.nodes['3'].status = 'allocated'
        self.system.nodes['4'].set_drain(100, 1)
        nodelist = self.system._assemble_queue_data(self.base_job,
383
                drain_time=150)
384 385
        assert_match(sorted(nodelist), ['5'], "Bad Nodelist")

386
    def test_assemble_queue_data_within_draining(self):
387 388 389 390 391
        '''CraySystem._assemble_queue_data: return idle and draining if within
        time'''
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'down'
        self.system.nodes['3'].set_drain(50.0, 2)
392
        self.system.nodes['4'].set_drain(220.0, 1) #add in epsilon
393 394 395 396
        nodelist = self.system._assemble_queue_data(self.base_job,
                drain_time=90.0)
        assert_match(sorted(nodelist), ['4', '5'], "Bad Nodelist")

397
    def test_assemble_queue_data_match_draining(self):
398 399 400 401
        '''CraySystem._assemble_queue_data: return idle and matched drain node'''
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'down'
        self.system.nodes['3'].status = 'allocated'
402
        self.system.nodes['4'].set_drain(220.0, 1) #add in epsilon
403 404 405 406
        nodelist = self.system._assemble_queue_data(self.base_job,
                drain_time=100.0)
        assert_match(sorted(nodelist), ['4', '5'], "Bad Nodelist")

407 408
    def test_find_queue_equivalence_classes_single(self):
        '''CraySystem.find_queue_equivalence_classes: single queue'''
409 410 411 412
        self.system._gen_node_to_queue()
        equivs = self.system.find_queue_equivalence_classes({}, ['default'], [])
        assert len(equivs) == 1, 'Have %s equiv classes, should have 1.'
        for equiv in equivs:
413 414 415 416 417 418
            assert equiv['queues'] == ['default'], 'mismatch in returned equiv class queues'

    def test_find_queue_equivalence_classes_overlap(self):
        '''CraySystem.find_queue_equivalence_classes: partial overlapping queues'''
        self.system.nodes['1'].queues = ['foo']
        self.system.nodes['2'].queues = ['foo', 'default']
419 420 421
        self.system._gen_node_to_queue()
        equivs = self.system.find_queue_equivalence_classes({}, ['default', 'foo'], [])
        assert len(equivs) == 1, (
422
                'Have %s equiv classes, should have 1.' %
423 424
                len(equivs))
        for equiv in equivs:
425 426 427 428
            assert sorted(equiv['queues']) == ['default', 'foo'], (
                    'mismatch in returned equiv class queues %s' %
                    equiv['queues'])

429 430 431 432 433 434 435 436 437 438 439 440 441 442
    def test_find_queue_equivalence_classes_overlap_single_active(self):
        '''CraySystem.find_queue_equivalence_classes: partial overlapping queues one active queue only'''
        self.system.nodes['1'].queues = ['foo']
        self.system.nodes['2'].queues = ['foo', 'default']
        self.system._gen_node_to_queue()
        equivs = self.system.find_queue_equivalence_classes({}, ['foo'], [])
        assert len(equivs) == 1, (
                'Have %s equiv classes, should have 1.' %
                len(equivs))
        for equiv in equivs:
            assert sorted(equiv['queues']) == ['foo'], (
                    'mismatch in returned equiv class queues %s' %
                    equiv['queues'])

443 444
    def test_find_queue_equivalence_classes_disjoint(self):
        '''CraySystem.find_queue_equivalence_classes: disjoint queues'''
445
        # we return one class now, no matter what.
446 447
        self.system.nodes['1'].queues = ['foo']
        self.system.nodes['2'].queues = ['foo']
448 449 450 451
        self.system._gen_node_to_queue()
        equivs = self.system.find_queue_equivalence_classes({}, ['default', 'foo'], [])
        expect = [{'reservations': [], 'queues': ['default', 'foo']}]
        assert equivs == expect, 'Expected %s, got %s' % (expect, equivs)
452

453 454 455 456
    def test_find_queue_equivalence_classes_disjoint_reservation(self):
        '''CraySystem.find_queue_equivalence_classes: bind reservation all eq classes'''
        self.system.nodes['1'].queues = ['foo']
        self.system.nodes['2'].queues = ['foo']
457 458 459 460
        self.system._gen_node_to_queue()
        equivs = self.system.find_queue_equivalence_classes({'test':'1-2,4-5'}, ['default', 'foo'], [])
        expect = [{'reservations': ['test'], 'queues': ['default', 'foo']}]
        assert equivs == expect, 'Expected %s, got %s' % (expect, equivs)
461

462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
    def test_find_queue_equivalence_classes_disjoint_fuse_res(self):
        '''CraySystem.find_queue_equivalence_classes: fuse equivalence classes with overlapping reservation'''
        self.system.nodes['1'].queues = ['foo']
        self.system.nodes['2'].queues = ['foo']
        self.system._gen_node_to_queue()
        equivs = self.system.find_queue_equivalence_classes({'test':'1-2,4-5'}, ['default', 'foo'], [])
        expect = [ {'reservations': ['test'], 'queues': ['default', 'foo']}]
        assert equivs == expect, 'Expected %s, got %s' % (expect, equivs)

    def test_find_queue_equivalence_classes_disjoint_single_res(self):
        '''CraySystem.find_queue_equivalence_classes: bind only appropriate equivalence class'''
        self.system.nodes['1'].queues = ['foo']
        self.system.nodes['2'].queues = ['foo']
        self.system._gen_node_to_queue()
        equivs = self.system.find_queue_equivalence_classes({'test':'3-5'}, ['default', 'foo'], [])
        expect = [{'reservations': ['test'], 'queues': ['default', 'foo']}]
        assert equivs == expect, 'Expected %s, got %s' % (expect, equivs)

480 481 482
    def test_clear_draining_for_queues_full_clear(self):
        '''CraySystem._clear_draining_for_queues: clear queue's draining times'''
        for node in self.system.nodes.values():
483 484
            node.set_drain(101.0, 300)
        self.system.find_queue_equivalence_classes({}, ['default'], [])
485
        self.system._clear_draining_for_queues()
486 487 488 489 490 491 492
        for node in self.system.nodes.values():
            assert not node.draining, "node %s marked as draining!" % node.node_id

    def test_clear_draining_for_queues_multi_queue(self):
        '''CraySystem._clear_draining_for_queues: clear whole equivalence class'''
        self.system.nodes['1'].queues = ['foo']
        self.system.nodes['2'].queues = ['foo', 'default']
493
        self.system._gen_node_to_queue()
494 495
        for node in self.system.nodes.values():
            node.set_drain(100.0, 300)
496
        self.system.find_queue_equivalence_classes({}, ['default', 'foo'], [])
497
        self.system._clear_draining_for_queues()
498 499 500
        for node in self.system.nodes.values():
            assert not node.draining, "node %s marked as draining!" % node.node_id

501 502 503 504 505 506 507 508 509
    def test_clear_draining_for_queues_one_equiv(self):
        '''CraySystem._clear_draining_for_queues: clear only one equivalence class'''
        # There is now one and only one equivalence class so everything should be cleared
        self.system.nodes['1'].queues = ['foo']
        self.system.nodes['2'].queues = ['foo']
        self.system._gen_node_to_queue()
        for node in self.system.nodes.values():
            node.set_drain(100.0, 300)
        self.system.find_queue_equivalence_classes({}, ['default', 'foo'], [])
510
        self.system._clear_draining_for_queues()
511 512 513 514 515 516 517 518 519 520 521 522
        for node in self.system.nodes.values():
            assert not node.draining, "node %s marked as draining!" % node.node_id

    def test_clear_draining_for_queues_reservation(self):
        '''CraySystem._clear_draining_for_queues: clear specified reservation nodes'''
        # There is now one and only one equivalence class so everything should be cleared
        self.system.nodes['1'].queues = ['foo']
        self.system.nodes['2'].queues = ['foo']
        self.system._gen_node_to_queue()
        for node in self.system.nodes.values():
            node.set_drain(100.0, 300)
        self.system.find_queue_equivalence_classes({}, ['default', 'foo'], [])
523
        self.system._clear_draining_for_queues()
524 525 526
        for node in self.system.nodes.values():
            assert not node.draining, "node %s marked as draining!" % node.node_id

527 528
    def test_select_nodes_for_draining_single_job(self):
        '''CraySystem._select_nodes_for_draining: drain nodes from a single job'''
529
        end_times = [[['1-3'], 100]]
530 531 532 533 534 535 536 537
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'busy'
        self.system.nodes['3'].status = 'busy'
        self.base_job['nodes'] = 4
        drain_nodes = self.system._select_nodes_for_draining(self.base_job,
                end_times)
        assert_match(sorted(drain_nodes), ['1', '2', '3', '4'], "Bad Selection.")

538 539 540 541 542 543 544 545 546 547 548 549
    def test_select_nodes_for_draining_user_location(self):
        '''CraySystem._select_nodes_for_draining: drain nodes for user specified location'''
        end_times = [[['1-3'], 100]]
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'busy'
        self.system.nodes['3'].status = 'busy'
        self.base_job['nodes'] = 4
        self.base_job['attrs'] = {'location':'2-5'}
        drain_nodes = self.system._select_nodes_for_draining(self.base_job,
                end_times)
        assert_match(sorted(drain_nodes), ['2', '3', '4', '5'], "Bad Selection.")

Paul Rich's avatar
Paul Rich committed
550 551
    def test_select_nodes_for_draining_prefer_running(self):
        '''CraySystem._select_nodes_for_draining: prefer nodes from running job'''
552
        end_times = [[['4-5'], 100]]
Paul Rich's avatar
Paul Rich committed
553 554 555 556 557 558 559 560 561
        self.system.nodes['4'].status = 'busy'
        self.system.nodes['5'].status = 'busy'
        self.base_job['nodes'] = 4
        drain_nodes = self.system._select_nodes_for_draining(self.base_job,
                end_times)
        assert_match(sorted(drain_nodes), ['1', '2', '4', '5'], "Bad Selection.")

    def test_select_nodes_for_draining_only_running(self):
        '''CraySystem._select_nodes_for_draining: fit entirely in running job if possible'''
562
        end_times = [[['2-5'], 100]]
Paul Rich's avatar
Paul Rich committed
563 564 565 566 567 568 569 570 571 572 573
        self.system.nodes['2'].status = 'busy'
        self.system.nodes['3'].status = 'busy'
        self.system.nodes['4'].status = 'busy'
        self.system.nodes['5'].status = 'busy'
        self.base_job['nodes'] = 2
        drain_nodes = self.system._select_nodes_for_draining(self.base_job,
                end_times)
        assert_match(sorted(drain_nodes), ['2', '3'], "Bad Selection.")

    def test_select_nodes_for_draining_correct_time(self):
        '''CraySystem._select_nodes_for_draining: set correct drain times single job'''
574
        end_times = [[['5'], 100]]
Paul Rich's avatar
Paul Rich committed
575 576 577 578 579 580 581 582 583 584 585 586
        self.system.nodes['5'].status = 'busy'
        self.base_job['nodes'] = 5
        drain_nodes = self.system._select_nodes_for_draining(self.base_job,
                end_times)
        for i in range(1, 6):
            assert_match(self.system.nodes[str(i)].draining, True,
                "Draining not set")
            assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
            assert_match(self.system.nodes[str(i)].drain_until, 100, "Bad drain time")

    def test_select_nodes_for_draining_multiple_running(self):
        '''CraySystem._select_nodes_for_draining: choose from shortest job to drain'''
587
        end_times = [[['2-3'], 100.0], [['4-5'], 91.0]]
Paul Rich's avatar
Paul Rich committed
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603
        self.system.nodes['2'].status = 'busy'
        self.system.nodes['3'].status = 'busy'
        self.system.nodes['4'].status = 'allocated'
        self.system.nodes['5'].status = 'allocated'
        self.base_job['nodes'] = 3
        drain_nodes = self.system._select_nodes_for_draining(self.base_job,
                end_times)
        assert_match(sorted(drain_nodes), ['1' ,'4' , '5'], "Bad Selection")
        for i in ['1', '4', '5']:
            assert_match(self.system.nodes[str(i)].draining, True,
                "Draining not set")
            assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
            assert_match(self.system.nodes[str(i)].drain_until, 91, "Bad drain time")

    def test_select_nodes_for_draining_select_multiple_running(self):
        '''CraySystem._select_nodes_for_draining: set time to longest if draining from multiple jobs'''
604
        end_times = [[['2-3'], 100.0], [['4-5'], 91.0]]
Paul Rich's avatar
Paul Rich committed
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
        self.system.nodes['2'].status = 'busy'
        self.system.nodes['3'].status = 'busy'
        self.system.nodes['4'].status = 'allocated'
        self.system.nodes['5'].status = 'allocated'
        self.base_job['nodes'] = 5
        drain_nodes = self.system._select_nodes_for_draining(self.base_job,
                end_times)
        assert_match(sorted(drain_nodes), ['1', '2', '3', '4' , '5'], "Bad Selection")
        for i in range(1,6):
            assert_match(self.system.nodes[str(i)].draining, True,
                "Draining not set")
            assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
            assert_match(self.system.nodes[str(i)].drain_until, 100, "Bad drain time")

    def test_select_nodes_for_draining_select_queue(self):
        '''CraySystem._select_nodes_for_draining: confine to proper queue'''
        self.base_job['queue'] = 'bar'
622
        end_times = [[['5'], 100.0], [['2'], 50.0]]
Paul Rich's avatar
Paul Rich committed
623 624 625 626 627 628 629
        self.system.nodes['1'].queues = ['default']
        self.system.nodes['2'].queues = ['default']
        self.system.nodes['3'].queues = ['bar']
        self.system.nodes['4'].queues = ['default', 'bar']
        self.system.nodes['5'].queues = ['default', 'bar']
        self.system.nodes['2'].status = 'busy'
        self.system.nodes['5'].status = 'busy'
630
        self.system.find_queue_equivalence_classes({},['default', 'bar'],[])
Paul Rich's avatar
Paul Rich committed
631 632 633 634 635 636 637 638 639 640 641
        self.system._gen_node_to_queue()
        self.base_job['nodes'] = 3
        drain_nodes = self.system._select_nodes_for_draining(self.base_job,
                end_times)
        assert_match(sorted(drain_nodes), ['3', '4', '5'], "Bad Selection")
        for i in range(3,6):
            assert_match(self.system.nodes[str(i)].draining, True,
                "Draining not set")
            assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
            assert_match(self.system.nodes[str(i)].drain_until, 100, "Bad drain time")

642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658
    def test_select_nodes_for_draining_select_cleaning(self):
        '''CraySystem._select_nodes_for_draining: include cleaning nodes if marked'''
        end_times = []
        now = int(time.time())
        self.system.nodes['2'].status = 'cleanup'
        self.system.nodes['5'].status = 'cleanup-pending'
        self.system.nodes['2'].set_drain(now + 300, -1)
        self.system.nodes['5'].set_drain(now + 300, -1)
        self.base_job['nodes'] = 5
        drain_nodes = self.system._select_nodes_for_draining(self.base_job,
                end_times)
        assert_match(sorted(drain_nodes), ['1', '2', '3', '4', '5'], "Bad Selection")
        for i in range(1,6):
            assert_match(self.system.nodes[str(i)].draining, True, "Draining not set")
            assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
            assert_match(self.system.nodes[str(i)].drain_until, now + 300, "Bad drain time")

659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676
    def test_select_nodes_for_draining_running_but_down(self):
        '''CraySystem._select_nodes_for_draining: do not drain down node if job still "running"'''
        # If a node dies while a job is running, it will still show up in the
        # end-times range until termination of that job is complete.
        end_times = [[['1-4'], 100.0]]
        self.system.nodes['2'].status = 'down'
        self.base_job['nodes'] = 4
        drain_nodes = self.system._select_nodes_for_draining(self.base_job,
                end_times)
        assert_match(sorted(drain_nodes), ['1', '3', '4', '5'], "Bad Selection")
        assert_match(self.system.nodes['2'].draining, False, "Draining set")
        assert_match(self.system.nodes['2'].drain_jobid, None, "Should not have drain_jobid", is_match)
        assert_match(self.system.nodes['2'].drain_until, None, "Should not have drain_until", is_match)
        for i in ['1', '3', '4', '5']:
            assert_match(self.system.nodes[str(i)].draining, True, "Draining not set")
            assert_match(self.system.nodes[str(i)].drain_jobid, 1, "Bad drain job")
            assert_match(self.system.nodes[str(i)].drain_until, 100.0, "Bad drain time")

677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
    # common checks for find_job_location
    def assert_draining(self, nid, until, drain_jobid):
        assert self.system.nodes[str(nid)].draining, "Node %s should be draining" % nid
        assert_match(self.system.nodes[str(nid)].drain_until, until,
                "Bad drain_until: node %s" % nid)
        assert_match(self.system.nodes[str(nid)].drain_jobid, drain_jobid,
                "Bad drain_jobid: node %s" % nid)

    def assert_not_draining(self, nid):
        assert not self.system.nodes[str(nid)].draining, "Node %s should not be draining" % nid
        assert_match(self.system.nodes[str(nid)].drain_until, None,
                     "Bad drain_until: node %s" % nid, is_match)
        assert_match(self.system.nodes[str(nid)].drain_jobid, None,
                     "Bad drain_jobid: node %s" % nid, is_match)

692 693
    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(time, 'time', return_value=500.000)
694
    def test_find_job_location_allocate_first_fit(self, *args, **kwargs):
695
        '''CraySystem.find_job_locaton: Assign basic job to nodes'''
696
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "first-fit"
697 698
        retval = self.system.find_job_location([self.base_job], [], [])
        assert retval == {1: ['1']}, 'bad loc: expected %s, got %s' % ({1: ['1']}, retval)
699 700 701
        assert self.system.pending_starts[1] == 800.0, (
                'bad pending start: expected %s, got %s' %
                (800.0, self.system.pending_starts[1]))
702
        assert self.system.nodes['1'].reserved_jobid == 1, 'Node not reserved'
703 704
        assert self.system.nodes['1'].reserved_until == 800.0, (
                'reserved until expected 800.0, got %s' % self.system.nodes['1'].reserved_until)
705

706 707 708 709
    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_first_fit_prior_job(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: Assign second job to nodes'''
710
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "first-fit"
711 712 713 714 715 716 717 718 719 720 721
        self.system.nodes['2'].status = 'allocated'
        self.system.nodes['2'].reserved_jobid = 2
        retval = self.system.find_job_location([self.base_job],
                [[['2'], int(time.time()) + 3600 ]], [])
        assert retval == {1: ['1']}, 'bad loc: expected %s, got %s' % ({1: ['1']}, retval)
        assert self.system.pending_starts[1] == 800.0, (
                'bad pending start: expected %s, got %s' %
                (800.0, self.system.pending_starts[1]))
        assert self.system.nodes['1'].reserved_jobid == 1, 'Node not reserved'
        assert self.system.nodes['1'].reserved_until == 800.0, (
                'reserved until expected 800.0, got %s' % self.system.nodes['1'].reserved_until)
722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754

    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_drain_one_eq(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: Assign job to w/drain'''
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "backfill"
        retval = self.system.find_job_location([self.base_job], [], [])
        assert retval == {1: ['1']}, 'bad loc: expected %s, got %s' % ({1: ['1']}, retval)
        assert self.system.pending_starts[1] == 800.0, (
                'bad pending start: expected %s, got %s' %
                (800.0, self.system.pending_starts[1]))
        assert self.system.nodes['1'].reserved_jobid == 1, 'Node not reserved'
        assert self.system.nodes['1'].reserved_until == 800.0, (
                'reserved until expected 800.1, got %s' % self.system.nodes['1'].reserved_until)
        for i in range(1,6):
            self.assert_not_draining(i)

    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_drain_for_large(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: Drain for large job, block other'''
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "backfill"
        jobs = []
        jobs.append(dict(self.base_job))
        jobs.append(dict(self.base_job))
        jobs[0]['jobid'] = 2
        jobs[0]['nodes'] = 5
        jobs[0]['walltime'] = 500
        jobs[1]['jobid'] = 3
        jobs[1]['nodes'] = 1
        jobs[1]['walltime'] = 400
        self.system.reserve_resources_until('1', 100, 1)
        self.system.nodes['1'].status = 'busy'
755
        self.system.find_queue_equivalence_classes({}, ['default'], [])
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777
        retval = self.system.find_job_location(jobs, [[['1'], 600]], [])
        assert_match(retval, {}, "no location should be assigned")
        assert_match(self.system.pending_starts, {}, "no starts should be pending")
        for i in range(1,6):
            self.assert_draining(i, 600, 2)

    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_first_fit_despite_larger(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: First fit smaller job ahead of large job'''
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "first-fit"
        jobs = []
        jobs.append(dict(self.base_job))
        jobs.append(dict(self.base_job))
        jobs[0]['jobid'] = 2
        jobs[0]['nodes'] = 5
        jobs[0]['walltime'] = 500
        jobs[1]['jobid'] = 3
        jobs[1]['nodes'] = 1
        jobs[1]['walltime'] = 400
        self.system.reserve_resources_until('1', 600, 1)
        self.system.nodes['1'].status = 'busy'
778
        self.system.find_queue_equivalence_classes({}, ['default'], [])
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803
        retval = self.system.find_job_location(jobs, [[['1'], 600]], [])
        assert_match(retval, {3: ['2']}, "bad location")
        assert_match(self.system.pending_starts, {3: 800.0}, "no starts should be pending")
        for i in range(1, 6):
            # first fit should never set drain characteristics.
            self.assert_not_draining(i)

    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_drain_on_running(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: Drain: Favor running location'''
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "backfill"
        jobs = []
        for _ in range(0,2):
            jobs.append(dict(self.base_job))
        jobs[0]['jobid'] = 2
        jobs[0]['nodes'] = 3
        jobs[0]['walltime'] = 500
        jobs[1]['jobid'] = 3
        jobs[1]['nodes'] = 2
        jobs[1]['walltime'] = 400
        self.system.reserve_resources_until('3-5', 100, 1)
        self.system.nodes['3'].status = 'busy'
        self.system.nodes['4'].status = 'busy'
        self.system.nodes['5'].status = 'busy'
804
        self.system.find_queue_equivalence_classes({}, ['default'], [])
805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830
        retval = self.system.find_job_location(jobs, [[['3-5'], 600]], [])
        assert_match(retval, {3: ['1-2']}, 'bad location')
        assert_match(self.system.pending_starts, {3: 800.0}, "bad pending start")
        for i in range(3,6):
            self.assert_draining(i, 600, 2)
        for i in range(1,3):
            self.assert_not_draining(i)

    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_no_drain_on_down(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: Drain: Do not drain if insufficient hardware'''
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "backfill"
        jobs = []
        for _ in range(0,2):
            jobs.append(dict(self.base_job))
        jobs[0]['jobid'] = 2
        jobs[0]['nodes'] = 5
        jobs[0]['walltime'] = 500
        jobs[1]['jobid'] = 3
        jobs[1]['nodes'] = 2
        jobs[1]['walltime'] = 400
        self.system.reserve_resources_until('2,5', 100, 1)
        self.system.nodes['2'].status = 'busy'
        self.system.nodes['3'].status = 'down'
        self.system.nodes['5'].status = 'busy'
831
        self.system.find_queue_equivalence_classes({}, ['default'], [])
832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859
        retval = self.system.find_job_location(jobs, [[['2,5'], 600]], [])
        assert_match(retval, {3: ['1,4']}, 'bad location')
        assert_match(self.system.pending_starts, {3: 800.0}, "bad pending start")
        for i in range(1, 6):
            self.assert_not_draining(i)

    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(CraySystem, 'update_node_state')
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_drain_correct_queue(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: Drain correct queue'''
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "backfill"
        jobs = []
        for _ in range(0,2):
            jobs.append(dict(self.base_job))
        jobs[0]['jobid'] = 2
        jobs[0]['nodes'] = 2
        jobs[0]['walltime'] = 500
        jobs[0]['queue'] = 'bar'
        jobs[1]['jobid'] = 3
        jobs[1]['nodes'] = 1
        jobs[1]['walltime'] = 400
        jobs[1]['queue'] = 'bar'
        self.system.reserve_resources_until('2,5', 600, 1)
        self.system.update_nodes({'queues': 'foo:default'}, ['1', '2'], None)
        self.system.update_nodes({'queues': 'bar:default'}, ['4', '5'], None)
        self.system.nodes['2'].status = 'busy'
        self.system.nodes['5'].status = 'busy'
860
        self.system.find_queue_equivalence_classes({}, ['default', 'foo', 'bar'], [])
861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890
        retval = self.system.find_job_location(jobs, [[['2,5'], 600]], [])
        assert_match(retval, {}, 'bad location')
        assert_match(self.system.pending_starts, {}, "bad pending start")
        for i in [4, 5]:
            self.assert_draining(i, 600, 2)
        for i in [1, 2, 3]:
            self.assert_not_draining(i)

    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(CraySystem, 'update_node_state')
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_drain_correct_queue_run_short_job(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: Drain correct queue, run short job'''
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "backfill"
        jobs = []
        for _ in range(0,2):
            jobs.append(dict(self.base_job))
        jobs[0]['jobid'] = 2
        jobs[0]['nodes'] = 2
        jobs[0]['walltime'] = 500
        jobs[0]['queue'] = 'bar'
        jobs[1]['jobid'] = 3
        jobs[1]['nodes'] = 1
        jobs[1]['walltime'] = 3
        jobs[1]['queue'] = 'bar'
        self.system.reserve_resources_until('2,5', 1000, 1)
        self.system.update_nodes({'queues': 'foo:default'}, ['1', '2'], None)
        self.system.update_nodes({'queues': 'bar:default'}, ['4', '5'], None)
        self.system.nodes['2'].status = 'busy'
        self.system.nodes['5'].status = 'busy'
891
        self.system.find_queue_equivalence_classes({}, ['default', 'foo', 'bar'], [])
892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
        retval = self.system.find_job_location(jobs, [[['2,5'], 1000]], [])
        assert_match(retval, {3: ['4']}, 'bad location')
        assert_match(self.system.pending_starts, {3: 800.0}, "bad pending start")
        for i in [4, 5]:
            self.assert_draining(i, 1000, 2)
        for i in [1, 2, 3]:
            self.assert_not_draining(i)

    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_drain_only_required(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: drain only attrs=location nodes'''
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "backfill"
        jobs = []
        for _ in range(0,2):
            jobs.append(dict(self.base_job))
        jobs[0]['jobid'] = 2
        jobs[0]['nodes'] = 3
        jobs[0]['walltime'] = 500
        jobs[0]['attrs'] = {'location': '1,3,5'}
        jobs[1]['jobid'] = 3
        jobs[1]['nodes'] = 3
        jobs[1]['walltime'] = 400
        self.system.reserve_resources_until('1', 600, 1)
        self.system.nodes['1'].status = 'busy'
917
        self.system.find_queue_equivalence_classes({}, ['default'], [])
918 919 920 921 922 923 924 925
        retval = self.system.find_job_location(jobs, [[['1'], 600]], [])
        assert_match(retval, {}, 'bad location')
        assert_match(self.system.pending_starts, {}, "bad pending start")
        for i in [1, 3, 5]:
            self.assert_draining(i, 600, 2)
        for i in [2, 4]:
            self.assert_not_draining(i)

926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949
    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_drain_multiple(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: drain for multiple jobs'''
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "backfill"
        jobs = []
        for _ in range(0,3):
            jobs.append(dict(self.base_job))
        jobs[0]['jobid'] = 2
        jobs[0]['nodes'] = 2
        jobs[0]['walltime'] = 500
        jobs[1]['jobid'] = 3
        jobs[1]['nodes'] = 2
        jobs[1]['walltime'] = 400
        jobs[2]['jobid'] = 4
        jobs[2]['nodes'] = 1
        jobs[2]['walltime'] = 1500
        self.system.reserve_resources_until('1', 600, 1)
        self.system.reserve_resources_until('2-3', 550, 5)
        self.system.reserve_resources_until('4', 700, 6)
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'busy'
        self.system.nodes['3'].status = 'busy'
        self.system.nodes['4'].status = 'busy'
950
        self.system.find_queue_equivalence_classes({}, ['default'], [])
951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984
        retval = self.system.find_job_location(jobs, [[['2-3'], 550.0], [['1'],
            600.0], [['4'], 700.0]], [])
        assert_match(retval, {}, 'bad location')
        assert_match(self.system.pending_starts, {}, "bad pending start")
        for i in [1, 5]:
            self.assert_draining(i, 600, 3)
        for i in [2, 3]:
            self.assert_draining(i, 550, 2)
        for i in [4]:
            self.assert_draining(i, 700, 4)

    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_drain_multiple_and_run(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: drain for multiple jobs, run leftover'''
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "backfill"
        jobs = []
        for _ in range(0,3):
            jobs.append(dict(self.base_job))
        jobs[0]['jobid'] = 2
        jobs[0]['nodes'] = 2
        jobs[0]['walltime'] = 500
        jobs[1]['jobid'] = 3
        jobs[1]['nodes'] = 2
        jobs[1]['walltime'] = 400
        jobs[2]['jobid'] = 4
        jobs[2]['nodes'] = 1
        jobs[2]['walltime'] = 1500
        self.system.reserve_resources_until('1,4', 600, 1)
        self.system.reserve_resources_until('2-3', 550, 5)
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'busy'
        self.system.nodes['3'].status = 'busy'
        self.system.nodes['4'].status = 'busy'
985
        self.system.find_queue_equivalence_classes({}, ['default'], [])
986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
        retval = self.system.find_job_location(jobs, [[['2-3'], 550.0], [['1,4'],
            600.0]], [])
        assert_match(retval, {4: ['5']}, 'bad location')
        assert_match(self.system.pending_starts, {4: 800.0}, "bad pending start")
        for i in [1, 4]:
            self.assert_draining(i, 600, 3)
        for i in [2, 3]:
            self.assert_draining(i, 550, 2)
        for i in [5]:
            self.assert_not_draining(i)

    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_no_drain_after_run(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: no drain computation after run'''
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "backfill"
        jobs = []
        for _ in range(0,3):
            jobs.append(dict(self.base_job))
        jobs[0]['jobid'] = 2
        jobs[0]['nodes'] = 2
        jobs[0]['walltime'] = 500
        jobs[1]['jobid'] = 4
        jobs[1]['nodes'] = 1
        jobs[1]['walltime'] = 1500
        jobs[2]['jobid'] = 3
        jobs[2]['nodes'] = 2
        jobs[2]['walltime'] = 400
        self.system.reserve_resources_until('1,4', 600, 1)
        self.system.reserve_resources_until('2-3', 550, 5)
        self.system.nodes['1'].status = 'busy'
        self.system.nodes['2'].status = 'busy'
        self.system.nodes['3'].status = 'busy'
        self.system.nodes['4'].status = 'busy'
1020
        self.system.find_queue_equivalence_classes({}, ['default'], [])
1021 1022 1023 1024 1025 1026 1027 1028
        retval = self.system.find_job_location(jobs, [[['2-3'], 550.0], [['1,4'],
            600.0]], [])
        assert_match(retval, {4: ['5']}, 'bad location')
        assert_match(self.system.pending_starts, {4: 800.0}, "bad pending start")
        for i in [2, 3]:
            self.assert_draining(i, 550, 2)
        for i in [1, 4, 5]:
            self.assert_not_draining(i)
1029

1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068
    @patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
    @patch.object(time, 'time', return_value=500.000)
    def test_find_job_location_allocate_ignore_drain_for_reservation(self, *args, **kwargs):
        '''CraySystem.find_job_locaton: Ignore existing drain for reservation'''
        Cobalt.Components.system.CraySystem.DRAIN_MODE = "backfill"
        jobs = []
        jobs.append(dict(self.base_job))
        jobs.append(dict(self.base_job))
        jobs[0]['jobid'] = 2
        jobs[0]['nodes'] = 5
        jobs[0]['walltime'] = 500
        jobs[1]['jobid'] = 3
        jobs[1]['nodes'] = 1
        jobs[1]['walltime'] = 400
        self.system.reserve_resources_until('1', 100, 1)
        self.system.nodes['1'].status = 'busy'
        self.system.find_queue_equivalence_classes({}, ['default'], [])
        retval = self.system.find_job_location(jobs, [[['1'], 600]], [])
        assert_match(retval, {}, "no location should be assigned")
        assert_match(self.system.pending_starts, {}, "no starts should be pending")
        for i in range(1,6):
            self.assert_draining(i, 600, 2)
        # All nodes should be draining, now send in a reservation job.
        # This situation can occur as a reservation is ending, and a job is
        # waiting for reservation resources to free/cleanup within reservatino
        jobs_reservation = []
        jobs_reservation.append(dict(self.base_job))
        jobs_reservation[0]['jobid'] = 10
        jobs_reservation[0]['nodes'] = 4 # Get the other four nodes with the job
        jobs_reservation[0]['walltime'] = 700 # Walltime longer than any possible drain window
        jobs_reservation[0]['required'] = ['1-5']
        jobs_reservation[0]['queue'] = 'R.test'
        self.system.find_queue_equivalence_classes({'test':'1-5'}, ['default'], [])
        retval = self.system.find_job_location(jobs_reservation, [[['1'], 600]], [])
        assert_match(retval, {10: ['2-5']}, "Bad Location Match")
        assert_match(self.system.pending_starts, {10: 800.0}, "Bad reservation pending start")
        for i in range(1, 6):
            self.assert_not_draining(i)

1069 1070
    def test_validate_job_normal(self):
        '''CraySystem.validate_job: valid job submission'''
1071 1072
        expected = {'nodecount': 1, 'proccount': 1, 'mode': 'script', 'attrs': {'numa': 'quad', 'mcdram': 'cache'}}
        spec = {'mode':'script', 'nodecount': 1}
1073
        ret_spec = self.system.validate_job(spec)
1074
        assert_match(expected, ret_spec, "Invalid spec returned")
1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086

    @raises(Cobalt.Exceptions.JobValidationError)
    def test_validate_job_reject_too_large(self):
        '''CraySystem.validate_job: reject too big job'''
        spec  = {'mode':'script', 'nodecount': 9999}
        ret_spec = self.system.validate_job(spec)

    @raises(Cobalt.Exceptions.JobValidationError)
    def test_validate_job_reject_no_host(self):
        '''CraySystem.validate_job: reject missing ssh host'''
        spec  = {'mode':'interactive', 'nodecount': 1, 'qsub_host':'foo'}
        ret_spec = self.system.validate_job(spec)
Paul Rich's avatar