Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
AIG-public
Cobalt
Commits
475ba747
Commit
475ba747
authored
Aug 23, 2017
by
Paul Rich
Browse files
Revert "Merge branch 'revert-
cd19412c
' into 'develop'"
This reverts merge request !63
parent
ff6627a9
Changes
4
Hide whitespace changes
Inline
Side-by-side
man/cobalt.conf.5
View file @
475ba747
...
...
@@ -554,7 +554,11 @@ seconds. The default is 300 seconds.
.B update_thread_timeout
The polling interval for state updates from ALPS in seconds. The default is
10 seconds.
.SS [capmc]
.TP
.B path
Path to CAPMC command front-end. If unset, the default is /opt/cray/capmc/default/bin/capmc
.TP
.SS [system]
.TP
.B backfill_epsillon
...
...
src/lib/Components/system/AlpsBridge.py
View file @
475ba747
...
...
@@ -7,6 +7,7 @@
import
logging
import
xml.etree
import
xmlrpclib
import
json
from
cray_messaging
import
InvalidBasilMethodError
,
BasilRequest
from
cray_messaging
import
parse_response
,
ALPSError
from
Cobalt.Proxy
import
ComponentProxy
...
...
@@ -21,7 +22,8 @@ init_cobalt_config()
FORKER
=
get_config_option
(
'alps'
,
'forker'
,
'system_script_forker'
)
BASIL_PATH
=
get_config_option
(
'alps'
,
'basil'
,
'/home/richp/alps-simulator/apbasil.sh'
)
# Make sure that you have key and cert set for CAPMC operaition and paths are established in the exec environment
CAPMC_PATH
=
get_config_option
(
'capmc'
,
'path'
,
'/opt/cray/capmc/default/bin/capmc'
)
_RUNID_GEN
=
IncrID
()
CHILD_SLEEP_TIMEOUT
=
float
(
get_config_option
(
'alps'
,
'child_sleep_timeout'
,
1.0
))
...
...
@@ -72,7 +74,7 @@ def reserve(user, jobid, nodecount, attributes=None, node_id_list=None):
params
[
key
]
=
val
if
node_id_list
is
not
None
:
params
[
'node_list'
]
=
[
int
(
i
)
for
i
in
node_id_list
]
retval
=
_call_sys_forker
(
BASIL_PATH
,
str
(
BasilRequest
(
'RESERVE'
,
retval
=
_call_sys_forker
_basil
(
BASIL_PATH
,
str
(
BasilRequest
(
'RESERVE'
,
params
=
params
)))
return
retval
...
...
@@ -96,7 +98,7 @@ def release(alps_res_id):
'''
params
=
{
'reservation_id'
:
alps_res_id
}
retval
=
_call_sys_forker
(
BASIL_PATH
,
str
(
BasilRequest
(
'RELEASE'
,
retval
=
_call_sys_forker
_basil
(
BASIL_PATH
,
str
(
BasilRequest
(
'RELEASE'
,
params
=
params
)))
return
retval
...
...
@@ -119,7 +121,7 @@ def confirm(alps_res_id, pg_id):
'''
params
=
{
'pagg_id'
:
pg_id
,
'reservation_id'
:
alps_res_id
}
retval
=
_call_sys_forker
(
BASIL_PATH
,
str
(
BasilRequest
(
'CONFIRM'
,
retval
=
_call_sys_forker
_basil
(
BASIL_PATH
,
str
(
BasilRequest
(
'CONFIRM'
,
params
=
params
)))
return
retval
...
...
@@ -128,7 +130,7 @@ def system():
information'''
params
=
{}
req
=
BasilRequest
(
'QUERY'
,
'SYSTEM'
,
params
)
return
_call_sys_forker
(
BASIL_PATH
,
str
(
req
))
return
_call_sys_forker
_basil
(
BASIL_PATH
,
str
(
req
))
def
fetch_inventory
(
changecount
=
None
,
resinfo
=
False
):
'''fetch the inventory for the machine
...
...
@@ -148,7 +150,7 @@ def fetch_inventory(changecount=None, resinfo=False):
#TODO: add a flag for systems with version <=1.4 of ALPS
req
=
BasilRequest
(
'QUERY'
,
'INVENTORY'
,
params
)
#print str(req)
return
_call_sys_forker
(
BASIL_PATH
,
str
(
req
))
return
_call_sys_forker
_basil
(
BASIL_PATH
,
str
(
req
))
def
fetch_reservations
():
'''fetch reservation data. This includes reservation metadata but not the
...
...
@@ -157,12 +159,12 @@ def fetch_reservations():
'''
params
=
{
'resinfo'
:
True
,
'nonodes'
:
True
}
req
=
BasilRequest
(
'QUERY'
,
'INVENTORY'
,
params
)
return
_call_sys_forker
(
BASIL_PATH
,
str
(
req
))
return
_call_sys_forker
_basil
(
BASIL_PATH
,
str
(
req
))
def
reserved_nodes
():
params
=
{}
req
=
BasilRequest
(
'QUERY'
,
'RESERVEDNODES'
,
params
)
return
_call_sys_forker
(
BASIL_PATH
,
str
(
req
))
return
_call_sys_forker
_basil
(
BASIL_PATH
,
str
(
req
))
def
fetch_aggretate_reservation_data
():
'''correlate node and reservation data to get which nodes are in which
...
...
@@ -187,6 +189,99 @@ def extract_system_node_data(node_data):
del
node_info
[
'role'
]
return
ret_nodeinfo
def
fetch_ssd_static_data
(
nid_list
=
None
,
by_cname
=
False
):
'''Get static SSD information from CAPMC.
Args:
nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
by_cname - if True, returns the nodes keyed by Cray cname (default False
Returns:
A dictionary with call status, Consult CAPMC documentation for details
Notes:
Consult CAPMC v1.2 or ls' call for more information.
'''
args
=
[
'get_ssds'
]
if
nid_list
is
not
None
:
args
.
extend
([
'-n'
,
nid_list
])
ret_info
=
_call_sys_forker_capmc
(
CAPMC_PATH
,
args
)
if
not
by_cname
:
# Because everything else in this system works on nids.
fixed_ret_info
=
{}
fixed_ret_info
[
'nids'
]
=
[]
for
key
,
val
in
ret_info
.
items
():
if
key
not
in
[
'e'
,
'err_msg'
]:
fixed_val
=
val
val
[
'cname'
]
=
key
fixed_ret_info
[
'nids'
].
append
(
fixed_val
)
else
:
fixed_ret_info
[
key
]
=
val
ret_info
=
fixed_ret_info
return
ret_info
def
fetch_ssd_enable
(
nid_list
=
None
):
'''Get SSD enable flags from CAPMC.
Args:
nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
Returns:
A dictionary with call status, and list of nid dicts of the form {"ssd_enable": val, "nid": id}
Notes:
Consult CAPMC v1.2 or later documentation for details on 'get_ssd_enable' call for more information.
'''
args
=
[
'get_ssd_enable'
]
if
nid_list
is
not
None
:
args
.
extend
([
'-n'
,
nid_list
])
return
_call_sys_forker_capmc
(
CAPMC_PATH
,
args
)
def
fetch_ssd_diags
(
nid_list
=
None
,
raw
=
False
):
'''Get static SSD information from CAPMC.
Args:
nid_list - optional list of nodes as a comma-delimited, hyphenated string (default None).
raw - If true, do not make records consistient with other CAPMC calls output. (default False).
Returns:
A dictionary with call status, Consult CAPMC documentation for details
Notes:
Consult CAPMC v1.2 or ls' call for more information.
This call to CAPMC, unlike others, returns 'ssd_diags' as a list of dictionaries as a top-level
object, not 'nids'. Size is in GB (10^3 not 2^10) instead of bytes. 'serial_num' is equivalent
to 'serial_number' in CAPMC's get_ssds call. Both keys are converted to match 'get_ssds' output.
'''
args
=
[
'get_ssd_diags'
]
if
nid_list
is
not
None
:
args
.
extend
([
'-n'
,
nid_list
])
ret_info
=
_call_sys_forker_capmc
(
CAPMC_PATH
,
args
)
if
not
raw
:
# Not all consistency is foolish.
fixed_ret_info
=
{}
fixed_ret_info
[
'e'
]
=
ret_info
[
'e'
]
fixed_ret_info
[
'err_msg'
]
=
ret_info
[
'err_msg'
]
fixed_ret_info
[
'ssd_diags'
]
=
[]
diag_info
=
ret_info
[
'ssd_diags'
]
for
info
in
diag_info
:
fixed_diag_info
=
{}
for
diag_key
,
diag_val
in
info
.
items
():
if
diag_key
not
in
[
'serial_num'
,
'size'
]:
fixed_diag_info
[
diag_key
]
=
diag_val
elif
diag_key
==
'serial_num'
:
fixed_diag_info
[
'serial_number'
]
=
diag_val
elif
diag_key
==
'size'
:
# It's storage so apparently we're using 10^3 instead of 2^10
# Going from GB back to bytes
fixed_diag_info
[
diag_key
]
=
int
(
1000000000
*
int
(
diag_val
))
fixed_ret_info
[
'ssd_diags'
].
append
(
fixed_diag_info
)
ret_info
=
fixed_ret_info
return
ret_info
def
_log_xmlrpc_error
(
runid
,
fault
):
'''Log an xmlrpc error.
...
...
@@ -205,34 +300,35 @@ def _log_xmlrpc_error(runid, fault):
_logger
.
debug
(
'Traceback information: for runid %s'
,
runid
,
exc_info
=
True
)
def
_call_sys_forker
(
basil_path
,
in_str
):
'''Make a call through to BASIL wait until we get output and clean up
child info.
def
_call_sys_forker
(
path
,
tag
,
label
,
args
=
None
,
in_str
=
None
):
'''Make a call through the system_script_forker to get output from a cray command.
Args:
basil_path: path to the BAISL executable. May be overriden for
test environments.
in_str: A string of XML to send to 'apbasil'
path - path to the command
tag - string tag for call
label - label for logging on call
args - arguments to command (default None)
in_str - string to send to stdin of command (default None)
Returns:
The XML response parsed into a Python dictionary.
stdout as a string
Exceptions:
Will raise a xmlrpclib.Fault if communication with the bridge
and/or system component fails completely at startup.
Notes:
This will block until 'apbasil' completion. 'apbasil' messages can
be failrly large for things sent to stdout.
Notes:
This is currently a blocking call until command completion.
'''
runid
=
None
#_RUNID_GEN.next()
i
runid
=
None
#_RUNID_GEN.next()
resp
=
None
cmd
=
[
path
]
if
args
is
not
None
:
cmd
.
extend
(
args
)
try
:
child
=
ComponentProxy
(
FORKER
).
fork
(
[
basil_path
]
,
'apbridge'
,
child
=
ComponentProxy
(
FORKER
).
fork
(
cmd
,
'apbridge'
,
'alps'
,
None
,
None
,
runid
,
in_str
,
True
)
runid
=
child
except
Exception
:
...
...
@@ -253,8 +349,8 @@ def _call_sys_forker(basil_path, in_str):
# invalid. If we never got one, then let the
# caller handle the error.
if
child
[
'exit_status'
]
!=
0
:
_logger
.
error
(
"
BASIL
returned a status of %s"
,
child
[
'exit_status'
])
_logger
.
error
(
"
%s
returned a status of
%s, stderr:
%s"
,
cmd
,
child
[
'exit_status'
]
,
"
\n
"
.
join
(
child
[
'stderr'
])
)
resp
=
child
[
'stdout_string'
]
try
:
ComponentProxy
(
FORKER
).
cleanup_children
([
runid
])
...
...
@@ -265,7 +361,31 @@ def _call_sys_forker(basil_path, in_str):
if
complete
:
break
sleep
(
CHILD_SLEEP_TIMEOUT
)
return
resp
def
_call_sys_forker_basil
(
basil_path
,
in_str
):
'''Make a call through to BASIL wait until we get output and clean up
child info.
Args:
basil_path: path to the BAISL executable. May be overriden for
test environments.
in_str: A string of XML to send to 'apbasil'
Returns:
The XML response parsed into a Python dictionary.
Exceptions:
Will raise a xmlrpclib.Fault if communication with the bridge
and/or system component fails completely at startup.
Notes:
This will block until 'apbasil' completion. 'apbasil' messages can
be failrly large for things sent to stdout.
'''
resp
=
_call_sys_forker
(
basil_path
,
'apbridge'
,
'alps'
,
in_str
=
in_str
)
parsed_resp
=
{}
try
:
parsed_resp
=
parse_response
(
resp
)
...
...
@@ -274,6 +394,27 @@ def _call_sys_forker(basil_path, in_str):
raise
exc
return
parsed_resp
def
_call_sys_forker_capmc
(
capmc_path
,
args
):
'''Call a CAPMC command and recieve response'''
resp
=
_call_sys_forker
(
capmc_path
,
'apbridge'
,
'capmc_ssd'
,
args
=
args
)
parsed_response
=
{}
try
:
parsed_response
=
json
.
loads
(
resp
)
except
TypeError
:
_logger
.
error
(
"Bad type recieved for CAPMC response, expected %s got %s."
,
type
(
""
),
type
(
resp
))
raise
except
ValueError
:
_logger
.
error
(
"Invalid JSON string returned: %s"
,
resp
)
else
:
err_code
=
parsed_response
.
get
(
'e'
,
None
)
err_msg
=
parsed_response
.
get
(
'err_msg'
,
None
)
if
err_code
is
None
:
raise
ValueError
(
'Error code in CAPMC response not provided. Invalid response recieved. %s'
,
resp
)
if
int
(
err_code
)
!=
0
:
raise
ValueError
(
'Error Code %s recieved. Message: %s'
,
err_code
,
err_msg
)
return
parsed_response
def
print_node_names
(
spec
):
'''Debugging utility to print nodes returned by ALPS'''
print
spec
[
'reservations'
]
...
...
src/lib/Components/system/CraySystem.py
View file @
475ba747
...
...
@@ -201,6 +201,12 @@ class CraySystem(BaseSystem):
reservations
=
ALPSBridge
.
fetch_reservations
()
_logger
.
info
(
'ALPS RESERVATION DATA FETCHED'
)
# reserved_nodes = ALPSBridge.reserved_nodes()
ssd_enabled
=
ALPSBridge
.
fetch_ssd_enable
()
_logger
.
info
(
'CAPMC SSD ENABLED DATA FETCHED'
)
ssd_info
=
ALPSBridge
.
fetch_ssd_static_data
()
_logger
.
info
(
'CAPMC SSD DETAIL DATA FETCHED'
)
ssd_diags
=
ALPSBridge
.
fetch_ssd_diags
()
_logger
.
info
(
'CAPMC SSD DIAG DATA FETCHED'
)
except
Exception
:
#don't crash out here. That may trash a statefile.
_logger
.
error
(
'Possible transient encountered during initialization. Retrying.'
,
...
...
@@ -209,7 +215,7 @@ class CraySystem(BaseSystem):
else
:
pending
=
False
self
.
_assemble_nodes
(
inventory
,
system
)
self
.
_assemble_nodes
(
inventory
,
system
,
ssd_enabled
,
ssd_info
,
ssd_diags
)
#Reversing the node name to id lookup is going to save a lot of cycles.
for
node
in
self
.
nodes
.
values
():
self
.
node_name_to_id
[
node
.
name
]
=
node
.
node_id
...
...
@@ -218,10 +224,23 @@ class CraySystem(BaseSystem):
# self._assemble_reservations(reservations, reserved_nodes)
return
def
_assemble_nodes
(
self
,
inventory
,
system
):
def
_assemble_nodes
(
self
,
inventory
,
system
,
ssd_enabled
,
ssd_info
,
ssd_diags
):
'''merge together the INVENTORY and SYSTEM query data to form as
complete a picture of a node as we can.
Args:
inventory - ALPS QUERY(INVENTORY) data
system - ALPS QUERY(SYSTEM) data
ssd_enable - CAPMC get_ssd_enable data
ssd_info - CAPMC get_ssds data
ssd_diags - CAPMC get_ssd_diags data
Returns:
None
Side Effects:
Populates the node dictionary
'''
nodes
=
{}
for
nodespec
in
inventory
[
'nodes'
]:
...
...
@@ -235,8 +254,34 @@ class CraySystem(BaseSystem):
if
nodes
[
node_id
].
role
.
upper
()
not
in
[
'BATCH'
]:
nodes
[
node_id
].
status
=
'down'
nodes
[
node_id
].
status
=
nodespec
[
'state'
]
self
.
_update_ssd_data
(
nodes
,
ssd_enabled
,
ssd_info
,
ssd_diags
)
self
.
nodes
=
nodes
def
_update_ssd_data
(
self
,
nodes
,
ssd_enabled
=
None
,
ssd_info
=
None
,
ssd_diags
=
None
):
'''Update/add ssd data from CAPMC'''
if
ssd_enabled
is
not
None
:
for
ssd_data
in
ssd_enabled
[
'nids'
]:
try
:
nodes
[
str
(
ssd_data
[
'nid'
])].
attributes
[
'ssd_enabled'
]
=
int
(
ssd_data
[
'ssd_enable'
])
except
KeyError
:
_logger
.
warning
(
'ssd info present for nid %s, but not reported in ALPS.'
,
ssd_data
[
'nid'
])
if
ssd_info
is
not
None
:
for
ssd_data
in
ssd_info
[
'nids'
]:
try
:
nodes
[
str
(
ssd_data
[
'nid'
])].
attributes
[
'ssd_info'
]
=
ssd_data
except
KeyError
:
_logger
.
warning
(
'ssd info present for nid %s, but not reported in ALPS.'
,
ssd_data
[
'nid'
])
if
ssd_diags
is
not
None
:
for
diag_info
in
ssd_diags
[
'ssd_diags'
]:
try
:
node
=
nodes
[
str
(
diag_info
[
'nid'
])]
except
KeyError
:
_logger
.
warning
(
'ssd diag data present for nid %s, but not reported in ALPS.'
,
ssd_data
[
'nid'
])
else
:
for
field
in
[
'life_remaining'
,
'ts'
,
'firmware'
,
'percent_used'
]:
node
.
attributes
[
'ssd_info'
][
field
]
=
diag_info
[
field
]
def
_assemble_reservations
(
self
,
reservations
,
reserved_nodes
):
# FIXME: we can recover reservations now. Implement this.
pass
...
...
@@ -340,6 +385,7 @@ class CraySystem(BaseSystem):
self
.
nodes
[
str
(
nid
)]
=
new_node
self
.
logger
.
warning
(
'Node %s added to tracking.'
,
nid
)
@
exposed
def
update_node_state
(
self
):
'''update the state of cray nodes. Check reservation status and system
...
...
@@ -370,6 +416,9 @@ class CraySystem(BaseSystem):
inven_nodes
=
ALPSBridge
.
extract_system_node_data
(
ALPSBridge
.
system
())
reservations
=
ALPSBridge
.
fetch_reservations
()
#reserved_nodes = ALPSBridge.reserved_nodes()
# Fetch SSD diagnostic data and enabled flags. I would hope these change in event of dead ssd
ssd_enabled
=
ALPSBridge
.
fetch_ssd_enable
()
ssd_diags
=
ALPSBridge
.
fetch_ssd_diags
()
except
(
ALPSBridge
.
ALPSError
,
ComponentLookupError
):
_logger
.
warning
(
'Error contacting ALPS for state update. Aborting this update'
,
exc_info
=
True
)
...
...
@@ -484,6 +533,8 @@ class CraySystem(BaseSystem):
self
.
_reconstruct_node
(
inven_node
,
recon_inventory
)
# _logger.error('UNS: ALPS reports node %s but not in our node list.',
# inven_node['node_id'])
# Update SSD data:
self
.
_update_ssd_data
(
self
.
nodes
,
ssd_enabled
=
ssd_enabled
,
ssd_diags
=
ssd_diags
)
#should down win over running in terms of display?
#keep node that are marked for cleanup still in cleanup
for
node
in
cleanup_nodes
:
...
...
src/lib/client_utils.py
View file @
475ba747
...
...
@@ -1376,8 +1376,8 @@ def print_node_details(args):
retval
=
str
(
value
)
return
retval
nodes
=
component_call
(
SYSMGR
,
False
,
'get_nodes'
,
(
True
,
expand_node_args
(
args
)))
nodes
=
json
.
loads
(
component_call
(
SYSMGR
,
False
,
'get_nodes'
,
(
True
,
expand_node_args
(
args
)
,
None
,
True
)
))
res_queues
=
_setup_res_info
()
for
node
in
nodes
.
values
():
header_list
=
[]
...
...
@@ -1393,6 +1393,10 @@ def print_node_details(args):
if
res_queues
.
get
(
str
(
node
[
'node_id'
]),
False
):
queues
.
extend
(
res_queues
[
str
(
node
[
'node_id'
])])
value_list
.
append
(
':'
.
join
(
queues
))
elif
key
==
'attributes'
:
for
attr_key
,
attr_val
in
value
.
items
():
header_list
.
append
(
key
+
'.'
+
attr_key
)
value_list
.
append
(
gen_printable_value
(
attr_val
))
else
:
header_list
.
append
(
key
)
value_list
.
append
(
gen_printable_value
(
value
))
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment