Commit e857ab5e authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '29-fix-nodelist-slow' into 'master'

Resolve "Nodeadm -l/nodelist fetch is slow"

Closes #29 

Major speedup achieved doing two things:
1) bypass the large amount of recursion in the XMLRPC marshaler by converting the data to send as a json string (this is a flag as to whether or not you want dictionary data in this form)

2) Added a parameter restriction so you can request only specific fields.  Used in nodeadm -l and nodelist to reduce data being sent.

See merge request !14
parents 717ec9fd 7c962617
......@@ -28,8 +28,34 @@ class CrayNode(ClusterNode):
self.ALPS_status = 'UNKNOWN' #Assume unknown state.
CrayNode.RESOURCE_STATUSES.append('alps-interactive')
def to_dict(self):
return self.__dict__
def to_dict(self, cooked=False, params=None):
'''return a dictionary representation of a node. Used to send data to
clients/other components.
Input:
cooked - (default: False) If true, strip leading '_' characters from
variables. Useful for display applications (e.g. nodelist)
Returns:
Dictionary representation of CrayNode fields.
Notes:
The output can be sent via XMLRPC without modificaiton
'''
ret_node = self.__dict__
if cooked:
cooked_node = {}
for key, val in self.__dict__.items():
if key.startswith('_'):
cooked_node[key[1:]] = val
else:
cooked_node[key] = val
ret_node = cooked_node
if params is not None and cooked:
params = [p.lower() for p in params]
ret_node = {k:v for k, v in ret_node.items() if k.lower() in params}
return ret_node
def __str__(self):
return str(self.to_dict())
......
......@@ -5,6 +5,7 @@ import threading
import thread
import time
import xmlrpclib
import json
import Cobalt.Util
import Cobalt.Components.system.AlpsBridge as ALPSBridge
......@@ -226,37 +227,36 @@ class CraySystem(BaseSystem):
@exposed
def get_nodes(self, as_dict=False, node_ids=None):
def get_nodes(self, as_dict=False, node_ids=None, params=None, as_json=False):
'''fetch the node dictionary.
node_ids - a list of node names to return, if None, return all nodes
(default None)
as_dict - Return node information as a dictionary keyed to string
node_id value.
node_ids - A list of node names to return, if None, return all nodes
(default None).
params - If requesting a dict, only request this list of
parameters of the node.
json - Encode to json before sending. Useful on large systems.
returns the node dictionary. Can reutrn underlying node data as
dictionary for XMLRPC purposes
'''
def cook_node_dict(node):
'''strip leading '_' for display purposes'''
raw_node = node.to_dict()
cooked_node = {}
for key, val in raw_node.items():
if key.startswith('_'):
cooked_node[key[1:]] = val
else:
cooked_node[key] = val
return cooked_node
if node_ids is None:
if as_dict:
return {k:cook_node_dict(v) for k, v in self.nodes.items()}
else:
return self.nodes
def node_filter(node):
if node_ids is not None:
return (str(node[0]) in [str(nid) for nid in node_ids])
return True
node_info = None
if as_dict:
retdict = {k:v.to_dict(True, params) for k, v in self.nodes.items()}
node_info = dict(filter(node_filter, retdict.items()))
else:
if as_dict:
return {k:cook_node_dict(v) for k, v in self.nodes.items() if int(k) in node_ids}
else:
return {k:v for k,v in self.nodes.items() if int(k) in node_ids}
node_info = dict(filter(node_filter, self.nodes.items()))
if as_json:
return json.dumps(node_info)
return node_info
def _run_update_state(self):
'''automated node update functions on the update timer go here.'''
......
......@@ -16,6 +16,7 @@ import ConfigParser
import re
import logging
import time
import json
import Cobalt.Util
from Cobalt.Proxy import ComponentProxy
......@@ -1278,8 +1279,10 @@ def cluster_display_node_info():
def print_node_list():
'''fetch and print a list of node information with default headers'''
nodes = component_call(SYSMGR, False, 'get_nodes',
(True,))
header = ['Node_id', 'Name', 'Queues', 'Status']
nodes = json.loads(component_call(SYSMGR, False, 'get_nodes',
(True, None, header, True)))
reservations = component_call(SCHMGR, False, 'get_reservations', ([{'queue':'*', 'partitions':'*', 'active':True}],))
res_queues = {}
for res in reservations:
......@@ -1291,7 +1294,6 @@ def print_node_list():
res_queues[node].append(res['queue'])
if len(nodes) > 0:
header = ['Node_id', 'Name', 'Queues', 'Status']
print_nodes = []
for node in nodes.values():
entry = []
......
......@@ -81,10 +81,12 @@ class TestCraySystem(object):
node_dict=dict(self.base_spec)
self.system.nodes[str(i)] = CrayNode(node_dict)
self.system.node_name_to_id[node_dict['name']] = node_dict['node_id']
for node in self.system.nodes.values():
node.managed = True
self.system._gen_node_to_queue()
self.base_job = {'jobid':1, 'user':'crusher', 'attrs':{},
'queue':'default',
'queue':'default', 'nodes': 1,
}
def teardown(self):
......@@ -278,3 +280,21 @@ class TestCraySystem(object):
self.system._idle_nodes_by_queue())
assert nodecount == 0, 'Wrong nodecount'
assert nodelist == [], 'Wrong node in list %s' % nodelist
def fake_reserve(self,job, new_time, node_id_list):
if job['nodes'] < len(node_id_list):
return node_id_list[:int(job['nodes'])]
else:
return []
@patch.object(CraySystem, '_ALPS_reserve_resources', fake_reserve)
@patch.object(time, 'time', return_value=500.000)
def test_find_job_location_basic(self, *args, **kwargs):
'''CraySystem.find_job_locaton: Assign basic job to nodes'''
retval = self.system.find_job_location([self.base_job], [], [])
assert retval == {1: ['1']}, 'bad loc: expected %s, got %s' % ({1: ['1']}, retval)
assert self.system.pending_starts[1] == 1700.0, 'bad pending start: expected %s, got %s' % (1700.0, self.system.pending_starts[1])
assert self.system.nodes['1'].reserved_jobid == 1, 'Node not reserved'
assert self.system.nodes['1'].reserved_until == 800.0, 'reserved until expected 800.0, got %s' % self.system_nodes['1'].reserved_until
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment