Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
AIG-public
Cobalt
Commits
250d1e27
Commit
250d1e27
authored
Sep 23, 2016
by
Paul Rich
Browse files
Drain selection now restricts itself to attrs location nodes.
parent
60834980
Changes
2
Hide whitespace changes
Inline
Side-by-side
src/lib/Components/system/CraySystem.py
View file @
250d1e27
...
...
@@ -41,6 +41,16 @@ CLEANUP_DRAIN_WINDOW = get_config_option('system', 'cleanup_drain_window', 300)
DRAIN_MODES
=
[
'first-fit'
,
'backfill'
]
CLEANING_ID
=
-
1
def
chain_loc_list
(
loc_list
):
'''Take a list of compact Cray locations,
expand and concatenate them.
'''
retlist
=
[]
for
locs
in
loc_list
:
retlist
.
extend
(
expand_num_list
(
locs
))
return
retlist
class
ALPSProcessGroup
(
ProcessGroup
):
'''ALPS-specific PocessGroup modifications.'''
...
...
@@ -615,8 +625,7 @@ class CraySystem(BaseSystem):
'''
equiv
=
[]
node_active_queues
=
set
([])
self
.
current_equivalence_classes
=
[]
#reverse mapping of queues to nodes
self
.
current_equivalence_classes
=
[]
#reverse mapping of queues to nodes
for
node
in
self
.
nodes
.
values
():
if
node
.
managed
and
node
.
schedulable
:
#only condiser nodes that we are scheduling.
...
...
@@ -673,15 +682,12 @@ class CraySystem(BaseSystem):
return
equiv
def
chain_loc_list
(
self
,
loc_list
):
'''Take a list of compact Cray locations,
expand and concatenate them.
'''
retlist
=
[]
for
locs
in
loc_list
:
retlist
.
extend
(
expand_num_list
(
locs
))
return
retlist
@
staticmethod
def
_setup_special_locaitons
(
job
):
forbidden
=
set
([
str
(
loc
)
for
loc
in
chain_loc_list
(
job
.
get
(
'forbidden'
,
[]))])
required
=
set
([
str
(
loc
)
for
loc
in
chain_loc_list
(
job
.
get
(
'required'
,
[]))])
requested_locations
=
set
([
str
(
n
)
for
n
in
expand_num_list
(
job
[
'attrs'
].
get
(
'location'
,
''
))])
return
(
forbidden
,
required
,
requested_locations
)
def
_assemble_queue_data
(
self
,
job
,
idle_only
=
True
,
drain_time
=
None
):
'''put together data for a queue, or queue-like reservation structure.
...
...
@@ -701,9 +707,7 @@ class CraySystem(BaseSystem):
# have to intersect required nodes with the idle and available
# we also have to forbid a bunch of locations, in this case.
unavailable_nodes
=
[]
forbidden
=
set
([
str
(
loc
)
for
loc
in
self
.
chain_loc_list
(
job
.
get
(
'forbidden'
,
[]))])
required
=
set
([
str
(
loc
)
for
loc
in
self
.
chain_loc_list
(
job
.
get
(
'required'
,
[]))])
requested_locations
=
set
([
str
(
n
)
for
n
in
expand_num_list
(
job
[
'attrs'
].
get
(
'location'
,
''
))])
forbidden
,
required
,
requested_locations
=
self
.
_setup_special_locaitons
(
job
)
requested_loc_in_forbidden
=
False
for
loc
in
requested_locations
:
if
loc
in
forbidden
:
...
...
@@ -745,7 +749,6 @@ class CraySystem(BaseSystem):
if
idle_only
:
unavailable_nodes
=
[
node_id
for
node_id
in
node_id_list
if
self
.
nodes
[
str
(
node_id
)].
status
not
in
[
'idle'
]]
#unavailable_nodes.append(node_id)
else
:
unavailable_nodes
=
[
node_id
for
node_id
in
node_id_list
if
self
.
nodes
[
str
(
node_id
)].
status
in
...
...
@@ -820,18 +823,18 @@ class CraySystem(BaseSystem):
Called once per equivalence class.
Input
:
arg_list
-
A list of dictionaries containning information on jobs to
Args:
:
arg_list
:
A list of dictionaries containning information on jobs to
cosnider.
end_times
-
list containing a mapping of locations and the times jobs
end_times
:
list containing a mapping of locations and the times jobs
runninng on those locations are scheduled to end. End times
are in seconds from Epoch UTC.
pt_blocking_locations
-
Not used for this system. Used in partitioned
pt_blocking_locations
:
Not used for this system. Used in partitioned
interconnect schemes. A list of locations that
should not be used to prevent passthrough issues
with other scheduler reservations.
Output
:
Returns
:
A mapping of jobids to locations to run a job to run immediately.
Side Effects:
...
...
@@ -847,7 +850,7 @@ class CraySystem(BaseSystem):
pt_blocking_locations may be used later to block out nodes that are
impacted by warmswap operations.
This function
doesn't
hold the component lock.
This function
*DOES NOT*
hold the component lock.
'''
now
=
time
.
time
()
...
...
@@ -858,21 +861,6 @@ class CraySystem(BaseSystem):
#check if we can run immedaitely, if not drain. Keep going until all
#nodes are marked for draining or have a pending run.
best_match
=
{}
#jobid: list(locations)
try
:
for
loc_time
in
end_times
:
loc_spec
=
","
.
join
(
loc_time
[
0
])
end_time
=
loc_time
[
1
]
for
loc
in
expand_num_list
(
loc_spec
):
if
self
.
nodes
[
str
(
loc
)].
reserved_jobid
is
not
None
:
# if the reserved_jobid is none, this job is already
# going for cleanup. Drain location selection
# handles that.
self
.
nodes
[
str
(
loc
)].
set_drain
(
end_time
,
self
.
nodes
[
str
(
loc
)].
reserved_jobid
)
except
(
KeyError
,
IndexError
):
err
=
"Invalid value for end_times: %s"
%
end_times
_logger
.
error
(
err
)
raise
ValueError
(
err
)
for
job
in
arg_list
:
label
=
'%s/%s'
%
(
job
[
'jobid'
],
job
[
'user'
])
# walltime is in minutes. We should really fix the storage of
...
...
@@ -886,13 +874,7 @@ class CraySystem(BaseSystem):
job
[
'jobid'
])
continue
if
int
(
job
[
'nodes'
])
>
len
(
available_node_list
):
# will happen with reserved jobs.
_logger
.
debug
(
'Reserved skip?'
)
continue
if
len
(
node_id_list
)
==
0
:
# There are definitely insufficient nodes to run this job
# trivial exclude. Don't break out of the whole thing, may
# have disjoint queues.
# Insufficient operational nodes for this job at all
continue
elif
int
(
job
[
'nodes'
])
<=
len
(
node_id_list
):
# enough nodes are in a working state to consider the job.
...
...
@@ -906,7 +888,7 @@ class CraySystem(BaseSystem):
_logger
.
info
(
"%s: Job selected for running on nodes %s"
,
label
,
compact_locs
)
break
#for now only select one location
el
if
DRAIN_MODE
in
[
'backfill'
,
'drain-only'
]:
if
DRAIN_MODE
in
[
'backfill'
,
'drain-only'
]:
# drain sufficient nodes for this job to run
drain_node_ids
=
self
.
_select_nodes_for_draining
(
job
,
end_times
)
...
...
@@ -986,6 +968,7 @@ class CraySystem(BaseSystem):
drain_list
=
[]
candidate_list
=
[]
cleanup_statuses
=
[
'cleanup'
,
'cleanup-pending'
]
forbidden
,
required
,
requested_locations
=
self
.
_setup_special_locaitons
(
job
)
try
:
node_id_list
=
self
.
_assemble_queue_data
(
job
,
idle_only
=
False
)
except
ValueError
:
...
...
@@ -993,7 +976,8 @@ class CraySystem(BaseSystem):
else
:
with
self
.
_node_lock
:
drain_time
=
None
# remove the following from the list:i
candidate_drain_time
=
None
# remove the following from the list:
# 1. idle nodes that are already marked for draining.
# 2. Nodes that are in an in-use status (busy, allocated).
# 3. Nodes marked for cleanup that are not allocated to a real
...
...
@@ -1002,25 +986,36 @@ class CraySystem(BaseSystem):
# is the right thing to do. --PMR
candidate_list
=
[]
candidate_list
=
[
nid
for
nid
in
node_id_list
if
((
self
.
nodes
[
str
(
nid
)].
status
in
[
'idle'
]
and
not
self
.
nodes
[
str
(
nid
)].
draining
)
or
(
self
.
nodes
[
str
(
nid
)].
status
in
cleanup_statuses
)
# and
#self.nodes[str(nid)].drain_jobid == CLEANING_ID)
if
(
not
self
.
nodes
[
str
(
nid
)].
draining
and
(
self
.
nodes
[
str
(
nid
)].
status
in
[
'idle'
])
or
(
self
.
nodes
[
str
(
nid
)].
status
in
cleanup_statuses
)
)]
for
nid
in
candidate_list
:
if
self
.
nodes
[
str
(
nid
)].
status
in
cleanup_statuses
:
drain_time
=
now
+
CLEANUP_DRAIN_WINDOW
candidate_
drain_time
=
now
+
CLEANUP_DRAIN_WINDOW
for
loc_time
in
end_times
:
running_nodes
=
[
str
(
nid
)
for
nid
in
expand_num_list
(
","
.
join
(
loc_time
[
0
]))
if
job
[
'queue'
]
in
self
.
nodes
[
str
(
nid
)].
queues
]
if
((
job
[
'queue'
]
in
self
.
nodes
[
str
(
nid
)].
queues
or
nid
in
required
)
and
not
self
.
nodes
[
str
(
nid
)].
draining
)]
for
nid
in
running_nodes
:
self
.
nodes
[
str
(
nid
)].
set_drain
(
loc_time
[
1
],
job
[
'jobid'
])
candidate_list
.
extend
(
running_nodes
)
candidate_drain_time
=
int
(
loc_time
[
1
])
if
len
(
candidate_list
)
>=
int
(
job
[
'nodes'
]):
# Enough nodes have been found to drain for this job
drain_time
=
int
(
loc_time
[
1
])
break
candidates
=
set
(
candidate_list
)
# We need to further restrict this list based on requested
# location and reservation avoidance data:
if
forbidden
!=
set
([]):
candidates
=
candidates
.
difference
(
forbidden
)
if
requested_locations
!=
set
([]):
candidates
=
candidates
.
intersection
(
requested_locations
)
candidate_list
=
list
(
candidates
)
if
len
(
candidate_list
)
>=
int
(
job
[
'nodes'
]):
drain_time
=
candidate_drain_time
if
drain_time
is
not
None
:
# order the node ids by id and drain-time. Longest drain
# first
...
...
testsuite/TestCobalt/TestComponents/test_cray.py
View file @
250d1e27
...
...
@@ -93,16 +93,20 @@ class TestCraySystem(object):
self
.
base_job
=
{
'jobid'
:
1
,
'user'
:
'crusher'
,
'attrs'
:{},
'queue'
:
'default'
,
'nodes'
:
1
,
'walltime'
:
60
,
}
self
.
fake_reserve_called
=
False
def
teardown
(
self
):
del
self
.
system
del
self
.
base_job
self
.
fake_reserve_called
=
False
# HELPER MOCK FUNCTIONS
def
fake_reserve
(
self
,
job
,
new_time
,
node_id_list
):
'''Mimic first-fit function of ALPS placement scheme'''
# self gets overriden by the call within fjl to be the real system
# component.
self
.
fake_reserve_called
=
True
ret_nodes
=
[]
if
job
[
'nodes'
]
<=
len
(
node_id_list
):
ret_nodes
=
node_id_list
[:
int
(
job
[
'nodes'
])]
...
...
@@ -288,7 +292,33 @@ class TestCraySystem(object):
nodelist
=
self
.
system
.
_assemble_queue_data
(
self
.
base_job
)
assert
nodelist
==
[],
'Wrong node in list %s'
%
nodelist
def
test_assemble_queue_data_attrs_non_draining
(
self
):
def
test_assemble_queue_data_attrs_location_any_not_down
(
self
):
'''CraySystem._assemble_queue_data: attrs locaiton return any not down'''
self
.
system
.
nodes
[
'1'
].
status
=
'busy'
self
.
system
.
nodes
[
'2'
].
status
=
'cleanup-pending'
self
.
system
.
nodes
[
'3'
].
status
=
'allocated'
self
.
system
.
nodes
[
'4'
].
status
=
'ADMINDOWN'
self
.
base_job
[
'attrs'
]
=
{
'location'
:
'1-5'
}
self
.
base_job
[
'nodes'
]
=
4
nodelist
=
self
.
system
.
_assemble_queue_data
(
self
.
base_job
,
idle_only
=
False
)
assert
nodelist
==
[
'1'
,
'2'
,
'3'
,
'5'
],
'Wrong node in list %s'
%
nodelist
def
test_assemble_queue_data_attrs_location_any_not_down_drain_limit
(
self
):
'''CraySystem._assemble_queue_data: attrs locaiton return any not down in drain window'''
self
.
system
.
nodes
[
'1'
].
status
=
'busy'
self
.
system
.
nodes
[
'2'
].
status
=
'cleanup-pending'
self
.
system
.
nodes
[
'3'
].
status
=
'allocated'
self
.
system
.
nodes
[
'4'
].
status
=
'ADMINDOWN'
self
.
system
.
nodes
[
'1'
].
set_drain
(
500.0
,
1
)
self
.
system
.
nodes
[
'2'
].
set_drain
(
600.0
,
2
)
self
.
system
.
nodes
[
'3'
].
set_drain
(
700.0
,
3
)
self
.
base_job
[
'attrs'
]
=
{
'location'
:
'1-5'
}
self
.
base_job
[
'nodes'
]
=
2
nodelist
=
self
.
system
.
_assemble_queue_data
(
self
.
base_job
,
idle_only
=
False
,
drain_time
=
650
)
assert
nodelist
==
[
'3'
,
'5'
],
'Wrong node in list %s'
%
nodelist
def
test_assemble_queue_data_non_draining
(
self
):
'''CraySystem._assemble_queue_data: return idle and non draining only'''
self
.
system
.
nodes
[
'1'
].
status
=
'busy'
self
.
system
.
nodes
[
'2'
].
status
=
'down'
...
...
@@ -298,7 +328,7 @@ class TestCraySystem(object):
drain_time
=
150
)
assert_match
(
sorted
(
nodelist
),
[
'5'
],
"Bad Nodelist"
)
def
test_assemble_queue_data_
attrs_
within_draining
(
self
):
def
test_assemble_queue_data_within_draining
(
self
):
'''CraySystem._assemble_queue_data: return idle and draining if within
time'''
self
.
system
.
nodes
[
'1'
].
status
=
'busy'
...
...
@@ -309,7 +339,7 @@ class TestCraySystem(object):
drain_time
=
90.0
)
assert_match
(
sorted
(
nodelist
),
[
'4'
,
'5'
],
"Bad Nodelist"
)
def
test_assemble_queue_data_
attrs_
match_draining
(
self
):
def
test_assemble_queue_data_match_draining
(
self
):
'''CraySystem._assemble_queue_data: return idle and matched drain node'''
self
.
system
.
nodes
[
'1'
].
status
=
'busy'
self
.
system
.
nodes
[
'2'
].
status
=
'down'
...
...
@@ -399,6 +429,18 @@ class TestCraySystem(object):
end_times
)
assert_match
(
sorted
(
drain_nodes
),
[
'1'
,
'2'
,
'3'
,
'4'
],
"Bad Selection."
)
def
test_select_nodes_for_draining_user_location
(
self
):
'''CraySystem._select_nodes_for_draining: drain nodes for user specified location'''
end_times
=
[[[
'1-3'
],
100
]]
self
.
system
.
nodes
[
'1'
].
status
=
'busy'
self
.
system
.
nodes
[
'2'
].
status
=
'busy'
self
.
system
.
nodes
[
'3'
].
status
=
'busy'
self
.
base_job
[
'nodes'
]
=
4
self
.
base_job
[
'attrs'
]
=
{
'location'
:
'2-5'
}
drain_nodes
=
self
.
system
.
_select_nodes_for_draining
(
self
.
base_job
,
end_times
)
assert_match
(
sorted
(
drain_nodes
),
[
'2'
,
'3'
,
'4'
,
'5'
],
"Bad Selection."
)
def
test_select_nodes_for_draining_prefer_running
(
self
):
'''CraySystem._select_nodes_for_draining: prefer nodes from running job'''
end_times
=
[[[
'4-5'
],
100
]]
...
...
@@ -508,10 +550,26 @@ class TestCraySystem(object):
assert_match
(
self
.
system
.
nodes
[
str
(
i
)].
drain_jobid
,
1
,
"Bad drain job"
)
assert_match
(
self
.
system
.
nodes
[
str
(
i
)].
drain_until
,
now
+
300
,
"Bad drain time"
)
# common checks for find_job_location
def
assert_draining
(
self
,
nid
,
until
,
drain_jobid
):
assert
self
.
system
.
nodes
[
str
(
nid
)].
draining
,
"Node %s should be draining"
%
nid
assert_match
(
self
.
system
.
nodes
[
str
(
nid
)].
drain_until
,
until
,
"Bad drain_until: node %s"
%
nid
)
assert_match
(
self
.
system
.
nodes
[
str
(
nid
)].
drain_jobid
,
drain_jobid
,
"Bad drain_jobid: node %s"
%
nid
)
def
assert_not_draining
(
self
,
nid
):
assert
not
self
.
system
.
nodes
[
str
(
nid
)].
draining
,
"Node %s should not be draining"
%
nid
assert_match
(
self
.
system
.
nodes
[
str
(
nid
)].
drain_until
,
None
,
"Bad drain_until: node %s"
%
nid
,
is_match
)
assert_match
(
self
.
system
.
nodes
[
str
(
nid
)].
drain_jobid
,
None
,
"Bad drain_jobid: node %s"
%
nid
,
is_match
)
@
patch
.
object
(
CraySystem
,
'_ALPS_reserve_resources'
,
fake_reserve
)
@
patch
.
object
(
time
,
'time'
,
return_value
=
500.000
)
def
test_find_job_location_allocate_first_fit
(
self
,
*
args
,
**
kwargs
):
'''CraySystem.find_job_locaton: Assign basic job to nodes'''
Cobalt
.
Components
.
system
.
CraySystem
.
DRAIN_MODE
=
"first-fit"
retval
=
self
.
system
.
find_job_location
([
self
.
base_job
],
[],
[])
assert
retval
==
{
1
:
[
'1'
]},
'bad loc: expected %s, got %s'
%
({
1
:
[
'1'
]},
retval
)
assert
self
.
system
.
pending_starts
[
1
]
==
800.0
,
(
...
...
@@ -521,11 +579,11 @@ class TestCraySystem(object):
assert
self
.
system
.
nodes
[
'1'
].
reserved_until
==
800.0
,
(
'reserved until expected 800.0, got %s'
%
self
.
system
.
nodes
[
'1'
].
reserved_until
)
@
patch
.
object
(
CraySystem
,
'_ALPS_reserve_resources'
,
fake_reserve
)
@
patch
.
object
(
time
,
'time'
,
return_value
=
500.000
)
def
test_find_job_location_allocate_first_fit_prior_job
(
self
,
*
args
,
**
kwargs
):
'''CraySystem.find_job_locaton: Assign second job to nodes'''
Cobalt
.
Components
.
system
.
CraySystem
.
DRAIN_MODE
=
"first-fit"
self
.
system
.
nodes
[
'2'
].
status
=
'allocated'
self
.
system
.
nodes
[
'2'
].
reserved_jobid
=
2
retval
=
self
.
system
.
find_job_location
([
self
.
base_job
],
...
...
@@ -537,3 +595,207 @@ class TestCraySystem(object):
assert
self
.
system
.
nodes
[
'1'
].
reserved_jobid
==
1
,
'Node not reserved'
assert
self
.
system
.
nodes
[
'1'
].
reserved_until
==
800.0
,
(
'reserved until expected 800.0, got %s'
%
self
.
system
.
nodes
[
'1'
].
reserved_until
)
@
patch
.
object
(
CraySystem
,
'_ALPS_reserve_resources'
,
fake_reserve
)
@
patch
.
object
(
time
,
'time'
,
return_value
=
500.000
)
def
test_find_job_location_allocate_drain_one_eq
(
self
,
*
args
,
**
kwargs
):
'''CraySystem.find_job_locaton: Assign job to w/drain'''
Cobalt
.
Components
.
system
.
CraySystem
.
DRAIN_MODE
=
"backfill"
retval
=
self
.
system
.
find_job_location
([
self
.
base_job
],
[],
[])
assert
retval
==
{
1
:
[
'1'
]},
'bad loc: expected %s, got %s'
%
({
1
:
[
'1'
]},
retval
)
assert
self
.
system
.
pending_starts
[
1
]
==
800.0
,
(
'bad pending start: expected %s, got %s'
%
(
800.0
,
self
.
system
.
pending_starts
[
1
]))
assert
self
.
system
.
nodes
[
'1'
].
reserved_jobid
==
1
,
'Node not reserved'
assert
self
.
system
.
nodes
[
'1'
].
reserved_until
==
800.0
,
(
'reserved until expected 800.1, got %s'
%
self
.
system
.
nodes
[
'1'
].
reserved_until
)
for
i
in
range
(
1
,
6
):
self
.
assert_not_draining
(
i
)
@
patch
.
object
(
CraySystem
,
'_ALPS_reserve_resources'
,
fake_reserve
)
@
patch
.
object
(
time
,
'time'
,
return_value
=
500.000
)
def
test_find_job_location_allocate_drain_for_large
(
self
,
*
args
,
**
kwargs
):
'''CraySystem.find_job_locaton: Drain for large job, block other'''
Cobalt
.
Components
.
system
.
CraySystem
.
DRAIN_MODE
=
"backfill"
jobs
=
[]
jobs
.
append
(
dict
(
self
.
base_job
))
jobs
.
append
(
dict
(
self
.
base_job
))
jobs
[
0
][
'jobid'
]
=
2
jobs
[
0
][
'nodes'
]
=
5
jobs
[
0
][
'walltime'
]
=
500
jobs
[
1
][
'jobid'
]
=
3
jobs
[
1
][
'nodes'
]
=
1
jobs
[
1
][
'walltime'
]
=
400
self
.
system
.
reserve_resources_until
(
'1'
,
100
,
1
)
self
.
system
.
nodes
[
'1'
].
status
=
'busy'
self
.
system
.
find_queue_equivalence_classes
([],
[
'default'
],
[])
retval
=
self
.
system
.
find_job_location
(
jobs
,
[[[
'1'
],
600
]],
[])
assert_match
(
retval
,
{},
"no location should be assigned"
)
assert_match
(
self
.
system
.
pending_starts
,
{},
"no starts should be pending"
)
for
i
in
range
(
1
,
6
):
self
.
assert_draining
(
i
,
600
,
2
)
@
patch
.
object
(
CraySystem
,
'_ALPS_reserve_resources'
,
fake_reserve
)
@
patch
.
object
(
time
,
'time'
,
return_value
=
500.000
)
def
test_find_job_location_allocate_first_fit_despite_larger
(
self
,
*
args
,
**
kwargs
):
'''CraySystem.find_job_locaton: First fit smaller job ahead of large job'''
Cobalt
.
Components
.
system
.
CraySystem
.
DRAIN_MODE
=
"first-fit"
jobs
=
[]
jobs
.
append
(
dict
(
self
.
base_job
))
jobs
.
append
(
dict
(
self
.
base_job
))
jobs
[
0
][
'jobid'
]
=
2
jobs
[
0
][
'nodes'
]
=
5
jobs
[
0
][
'walltime'
]
=
500
jobs
[
1
][
'jobid'
]
=
3
jobs
[
1
][
'nodes'
]
=
1
jobs
[
1
][
'walltime'
]
=
400
self
.
system
.
reserve_resources_until
(
'1'
,
600
,
1
)
self
.
system
.
nodes
[
'1'
].
status
=
'busy'
self
.
system
.
find_queue_equivalence_classes
([],
[
'default'
],
[])
retval
=
self
.
system
.
find_job_location
(
jobs
,
[[[
'1'
],
600
]],
[])
assert_match
(
retval
,
{
3
:
[
'2'
]},
"bad location"
)
assert_match
(
self
.
system
.
pending_starts
,
{
3
:
800.0
},
"no starts should be pending"
)
for
i
in
range
(
1
,
6
):
# first fit should never set drain characteristics.
self
.
assert_not_draining
(
i
)
@
patch
.
object
(
CraySystem
,
'_ALPS_reserve_resources'
,
fake_reserve
)
@
patch
.
object
(
time
,
'time'
,
return_value
=
500.000
)
def
test_find_job_location_allocate_drain_on_running
(
self
,
*
args
,
**
kwargs
):
'''CraySystem.find_job_locaton: Drain: Favor running location'''
Cobalt
.
Components
.
system
.
CraySystem
.
DRAIN_MODE
=
"backfill"
jobs
=
[]
for
_
in
range
(
0
,
2
):
jobs
.
append
(
dict
(
self
.
base_job
))
jobs
[
0
][
'jobid'
]
=
2
jobs
[
0
][
'nodes'
]
=
3
jobs
[
0
][
'walltime'
]
=
500
jobs
[
1
][
'jobid'
]
=
3
jobs
[
1
][
'nodes'
]
=
2
jobs
[
1
][
'walltime'
]
=
400
self
.
system
.
reserve_resources_until
(
'3-5'
,
100
,
1
)
self
.
system
.
nodes
[
'3'
].
status
=
'busy'
self
.
system
.
nodes
[
'4'
].
status
=
'busy'
self
.
system
.
nodes
[
'5'
].
status
=
'busy'
self
.
system
.
find_queue_equivalence_classes
([],
[
'default'
],
[])
retval
=
self
.
system
.
find_job_location
(
jobs
,
[[[
'3-5'
],
600
]],
[])
assert_match
(
retval
,
{
3
:
[
'1-2'
]},
'bad location'
)
assert_match
(
self
.
system
.
pending_starts
,
{
3
:
800.0
},
"bad pending start"
)
for
i
in
range
(
3
,
6
):
self
.
assert_draining
(
i
,
600
,
2
)
for
i
in
range
(
1
,
3
):
self
.
assert_not_draining
(
i
)
@
patch
.
object
(
CraySystem
,
'_ALPS_reserve_resources'
,
fake_reserve
)
@
patch
.
object
(
time
,
'time'
,
return_value
=
500.000
)
def
test_find_job_location_allocate_no_drain_on_down
(
self
,
*
args
,
**
kwargs
):
'''CraySystem.find_job_locaton: Drain: Do not drain if insufficient hardware'''
Cobalt
.
Components
.
system
.
CraySystem
.
DRAIN_MODE
=
"backfill"
jobs
=
[]
for
_
in
range
(
0
,
2
):
jobs
.
append
(
dict
(
self
.
base_job
))
jobs
[
0
][
'jobid'
]
=
2
jobs
[
0
][
'nodes'
]
=
5
jobs
[
0
][
'walltime'
]
=
500
jobs
[
1
][
'jobid'
]
=
3
jobs
[
1
][
'nodes'
]
=
2
jobs
[
1
][
'walltime'
]
=
400
self
.
system
.
reserve_resources_until
(
'2,5'
,
100
,
1
)
self
.
system
.
nodes
[
'2'
].
status
=
'busy'
self
.
system
.
nodes
[
'3'
].
status
=
'down'
self
.
system
.
nodes
[
'5'
].
status
=
'busy'
self
.
system
.
find_queue_equivalence_classes
([],
[
'default'
],
[])
retval
=
self
.
system
.
find_job_location
(
jobs
,
[[[
'2,5'
],
600
]],
[])
assert_match
(
retval
,
{
3
:
[
'1,4'
]},
'bad location'
)
assert_match
(
self
.
system
.
pending_starts
,
{
3
:
800.0
},
"bad pending start"
)
for
i
in
range
(
1
,
6
):
self
.
assert_not_draining
(
i
)
@
patch
.
object
(
CraySystem
,
'_ALPS_reserve_resources'
,
fake_reserve
)
@
patch
.
object
(
CraySystem
,
'update_node_state'
)
@
patch
.
object
(
time
,
'time'
,
return_value
=
500.000
)
def
test_find_job_location_allocate_drain_correct_queue
(
self
,
*
args
,
**
kwargs
):
'''CraySystem.find_job_locaton: Drain correct queue'''
Cobalt
.
Components
.
system
.
CraySystem
.
DRAIN_MODE
=
"backfill"
jobs
=
[]
for
_
in
range
(
0
,
2
):
jobs
.
append
(
dict
(
self
.
base_job
))
jobs
[
0
][
'jobid'
]
=
2
jobs
[
0
][
'nodes'
]
=
2
jobs
[
0
][
'walltime'
]
=
500
jobs
[
0
][
'queue'
]
=
'bar'
jobs
[
1
][
'jobid'
]
=
3
jobs
[
1
][
'nodes'
]
=
1
jobs
[
1
][
'walltime'
]
=
400
jobs
[
1
][
'queue'
]
=
'bar'
self
.
system
.
reserve_resources_until
(
'2,5'
,
600
,
1
)
self
.
system
.
update_nodes
({
'queues'
:
'foo:default'
},
[
'1'
,
'2'
],
None
)
self
.
system
.
update_nodes
({
'queues'
:
'bar:default'
},
[
'4'
,
'5'
],
None
)
self
.
system
.
nodes
[
'2'
].
status
=
'busy'
self
.
system
.
nodes
[
'5'
].
status
=
'busy'
self
.
system
.
find_queue_equivalence_classes
([],
[
'default'
,
'foo'
,
'bar'
],
[])
retval
=
self
.
system
.
find_job_location
(
jobs
,
[[[
'2,5'
],
600
]],
[])
assert_match
(
retval
,
{},
'bad location'
)
assert_match
(
self
.
system
.
pending_starts
,
{},
"bad pending start"
)
for
i
in
[
4
,
5
]:
self
.
assert_draining
(
i
,
600
,
2
)
for
i
in
[
1
,
2
,
3
]:
self
.
assert_not_draining
(
i
)
@
patch
.
object
(
CraySystem
,
'_ALPS_reserve_resources'
,
fake_reserve
)
@
patch
.
object
(
CraySystem
,
'update_node_state'
)
@
patch
.
object
(
time
,
'time'
,
return_value
=
500.000
)
def
test_find_job_location_allocate_drain_correct_queue_run_short_job
(
self
,
*
args
,
**
kwargs
):
'''CraySystem.find_job_locaton: Drain correct queue, run short job'''
Cobalt
.
Components
.
system
.
CraySystem
.
DRAIN_MODE
=
"backfill"
jobs
=
[]
for
_
in
range
(
0
,
2
):
jobs
.
append
(
dict
(
self
.
base_job
))
jobs
[
0
][
'jobid'
]
=
2
jobs
[
0
][
'nodes'
]
=
2
jobs
[
0
][
'walltime'
]
=
500
jobs
[
0
][
'queue'
]
=
'bar'
jobs
[
1
][
'jobid'
]
=
3
jobs
[
1
][
'nodes'
]
=
1
jobs
[
1
][
'walltime'
]
=
3
jobs
[
1
][
'queue'
]
=
'bar'
self
.
system
.
reserve_resources_until
(
'2,5'
,
1000
,
1
)
self
.
system
.
update_nodes
({
'queues'
:
'foo:default'
},
[
'1'
,
'2'
],
None
)
self
.
system
.
update_nodes
({
'queues'
:
'bar:default'
},
[
'4'
,
'5'
],
None
)
self
.
system
.
nodes
[
'2'
].
status
=
'busy'
self
.
system
.
nodes
[
'5'
].
status
=
'busy'
self
.
system
.
find_queue_equivalence_classes
([],
[
'default'
,
'foo'
,
'bar'
],
[])
retval
=
self
.
system
.
find_job_location
(
jobs
,
[[[
'2,5'
],
1000
]],
[])
assert_match
(
retval
,
{
3
:
[
'4'
]},
'bad location'
)
assert_match
(
self
.
system
.
pending_starts
,
{
3
:
800.0
},
"bad pending start"
)
for
i
in
[
4
,
5
]:
self
.
assert_draining
(
i
,
1000
,
2
)
for
i
in
[
1
,
2
,
3
]:
self
.
assert_not_draining
(
i
)
@
patch
.
object
(
CraySystem
,
'_ALPS_reserve_resources'
,
fake_reserve
)
@
patch
.
object
(
time
,
'time'
,
return_value
=
500.000
)
def
test_find_job_location_allocate_drain_only_required
(
self
,
*
args
,
**
kwargs
):
'''CraySystem.find_job_locaton: drain only attrs=location nodes'''
Cobalt
.
Components
.
system
.
CraySystem
.
DRAIN_MODE
=
"backfill"
jobs
=
[]
for
_
in
range
(
0
,
2
):
jobs
.
append
(
dict
(
self
.
base_job
))
jobs
[
0
][
'jobid'
]
=
2
jobs
[
0
][
'nodes'
]
=
3
jobs
[
0
][
'walltime'
]
=
500
jobs
[
0
][
'attrs'
]
=
{
'location'
:
'1,3,5'
}
jobs
[
1
][
'jobid'
]
=
3
jobs
[
1
][
'nodes'
]
=
3
jobs
[
1
][
'walltime'
]
=
400
self
.
system
.
reserve_resources_until
(
'1'
,
600
,
1
)
self
.
system
.
nodes
[
'1'
].
status
=
'busy'
self
.
system
.
find_queue_equivalence_classes
([],
[
'default'
],
[])
retval
=
self
.
system
.
find_job_location
(
jobs
,
[[[
'1'
],
600
]],
[])
assert_match
(
retval
,
{},
'bad location'
)
assert_match
(
self
.
system
.
pending_starts
,
{},
"bad pending start"
)
for
i
in
[
1
,
3
,
5
]:
self
.
assert_draining
(
i
,
600
,
2
)
for
i
in
[
2
,
4
]:
self
.
assert_not_draining
(
i
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment