AlpsBridge.py 9.65 KB
Newer Older
1
2
3
4
5
6
7
"""Bridge for communicating and fetching system state in ALPS."""

#this requires forking off a apbasil process, sending XML to stdin and
#parsing XML from stdout. Synchyronous, however, the system script
#forker isn't.  These will be able to block for now.

import logging
8
import xml.etree
9
import xmlrpclib
10
11
from cray_messaging import InvalidBasilMethodError, BasilRequest
from cray_messaging import parse_response, ALPSError
12
13
14
from Cobalt.Proxy import ComponentProxy
from Cobalt.Data import IncrID
from Cobalt.Util import sleep
15
from Cobalt.Util import init_cobalt_config, get_config_option
16
from Cobalt.Util import compact_num_list, expand_num_list
17
18

_logger = logging.getLogger()
19
init_cobalt_config()
20

21
FORKER = get_config_option('alps', 'forker', 'system_script_forker')
22
23
BASIL_PATH = get_config_option('alps', 'basil',
                               '/home/richp/alps-simulator/apbasil.sh')
24
25

_RUNID_GEN = IncrID()
26
27
CHILD_SLEEP_TIMEOUT = float(get_config_option('alps', 'child_sleep_timeout',
                                              1.0))
28
DEFAULT_DEPTH = int(get_config_option('alps', 'default_depth', 72))
29
30

class BridgeError(Exception):
31
    '''Exception class so that we may easily recognize bridge-specific errors.'''
32
33
    pass

34
35
36
37
38
39
def init_bridge():
    '''Initialize the bridge.  This includes purging all old bridge messages
    from the system_script_forker.  On restart or reinitialization, these old
    children should be considered invalid and purged.

    '''
40
    forker = ComponentProxy(FORKER, defer=True)
41
42
43
44
45
46
47
48
49
    try:
        stale_children = forker.get_children('apbridge', None)
        forker.cleanup_children([int(child['id']) for child in stale_children])
    except Exception:
        _logger.error('Unable to clear children from prior runs.  Init failed.',
                exc_info=True)
        raise BridgeError('Bridge initialization failed.')
    return

50
def reserve(user, jobid, nodecount, attributes=None, node_id_list=None):
51
52

    '''reserve a set of nodes in ALPS'''
53
54
    if attributes is None:
        attributes = {}
55
    params = {}
56
57
    param_attrs = {}

58
59
    params['user_name'] = user
    params['batch_id'] = jobid
60
61
62
    param_attrs['width'] = attributes.get('width', nodecount * DEFAULT_DEPTH)
    param_attrs['depth'] = attributes.get('depth', None)
    param_attrs['nppn'] = attributes.get('nppn', DEFAULT_DEPTH)
63
64
65
66
67
68
69
70
71
72
    param_attrs['npps'] = attributes.get('nnps', None)
    param_attrs['nspn'] = attributes.get('nspn', None)
    param_attrs['reservation_mode'] = attributes.get('reservation_mode',
                                                     'EXCLUSIVE')
    param_attrs['nppcu'] = attributes.get('nppcu', None)
    param_attrs['p-state'] = attributes.get('p-state', None)
    param_attrs['p-govenor'] = attributes.get('p-govenor', None)
    for key, val in param_attrs.items():
        if val is not None:
            params[key] = val
73
    if node_id_list is not None:
74
        params['node_list'] = [int(i) for i in node_id_list]
75
76
77
    retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RESERVE',
        params=params)))
    return retval
78

79
def release(alps_res_id):
80
81
82
    '''release a set of nodes in an ALPS reservation.  May be used on a
    reservation with running jobs.  If that occurs, the reservation will be
    released when the jobs exit/are terminated.
83

84
85
86
87
88
89
90
91
92
93
94
95
96
97
    Input:
        alps_res_id - id of the ALPS reservation to release.

    Returns:
        True if relese was successful

    Side Effects:
        ALPS reservation will be released.  New aprun attempts agianst
        reservation will fail.

    Exceptions:
        None Yet

    '''
98
    params = {'reservation_id': alps_res_id}
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
    retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('RELEASE',
            params=params)))
    return retval

def confirm(alps_res_id, pg_id):
    '''confirm an ALPS reservation.  Call this after we have the process group
    id of the user job that we are reserving nodes for.

    Input:
        alps_res_id - The id of the reservation that is being confirmed.
        pg_id - process group id to bind the reservation to.

    Return:
        True if the reservation is confirmed.  False otherwise.

    Side effects:
        None

    Exceptions:
        None Yet.
    '''
    params = {'pagg_id': pg_id,
Paul Rich's avatar
Paul Rich committed
121
              'reservation_id': alps_res_id}
122
123
124
    retval = _call_sys_forker(BASIL_PATH, str(BasilRequest('CONFIRM',
            params=params)))
    return retval
125

Paul Rich's avatar
Paul Rich committed
126
127
128
129
130
131
132
def system():
    '''fetch system information using the SYSTEM query.  Provides memory
    information'''
    params = {}
    req = BasilRequest('QUERY', 'SYSTEM', params)
    return _call_sys_forker(BASIL_PATH, str(req))

133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def fetch_inventory(changecount=None, resinfo=False):
    '''fetch the inventory for the machine

        changecount -  changecount to send if we only want deltas past a certain
        point
        resinfo - also fetch information on current reservations

        return:
        dictionary of machine information parsed from XML response
    '''
    params = {}
    if changecount is not None:
        params['changecount'] = changecount
    if resinfo:
        params['resinfo'] = True
148
    #TODO: add a flag for systems with version <=1.4 of ALPS
149
    req = BasilRequest('QUERY', 'INVENTORY', params)
150
    #print str(req)
151
152
    return _call_sys_forker(BASIL_PATH, str(req))

153
154
155
156
157
158
159
def fetch_reservations():
    '''fetch reservation data.  This includes reservation metadata but not the
    reserved nodes.

    '''
    params = {'resinfo': True, 'nonodes' : True}
    req = BasilRequest('QUERY', 'INVENTORY', params)
160
161
    return _call_sys_forker(BASIL_PATH, str(req))

162
def reserved_nodes():
163
164
165
166
    params = {}
    req = BasilRequest('QUERY', 'RESERVEDNODES', params)
    return _call_sys_forker(BASIL_PATH, str(req))

167
168
169
170
171
def fetch_aggretate_reservation_data():
    '''correlate node and reservation data to get which nodes are in which
    reservation.

    '''
Paul Rich's avatar
Paul Rich committed
172
    pass
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189

def extract_system_node_data(node_data):
    ret_nodeinfo = {}
    for node_info in node_data['nodes']:
        #extract nodeids, construct from bulk data block...
        for node_id in expand_num_list(node_info['node_ids']):
            node = {}
            node['node_id'] = node_id
            node['state'] = node_info['state']
            node['role'] = node_info['role']
            node['attrs'] = node_info
            ret_nodeinfo[str(node_id)] = node
        del node_info['state']
        del node_info['node_ids']
        del node_info['role']
    return ret_nodeinfo

190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def _log_xmlrpc_error(runid, fault):
    '''Log an xmlrpc error.

    Args:
        runid: integer id of the current run in system_script_forker
        fault: xmlrpclib.Fault object raised by a component call

    Returns:
        None

    '''
    _logger.error('XMLRPC Fault recieved while fetching child %s status:', runid)
    _logger.error('Child %s: Fault code: %s', runid, fault.faultCode)
    _logger.error('Child %s: Fault string: %s', runid,
            fault.faultString)
    _logger.debug('Traceback information: for runid %s', runid,
           exc_info=True)


209

210
def _call_sys_forker(basil_path, in_str):
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
    '''Make a  call through to BASIL wait until we get output and clean up
    child info.

    Args:
        basil_path: path to the BAISL executable. May be overriden for
                    test environments.
        in_str: A string of XML to send to 'apbasil'

    Returns:
        The XML response parsed into a Python dictionary.

    Exceptions:
        Will raise a xmlrpclib.Fault if communication with the bridge
        and/or system component fails completely at startup.

    Notes:
        This will block until 'apbasil' completion.  'apbasil' messages can
        be failrly large for things sent to stdout.

    '''
231
232
233
234

    runid = None #_RUNID_GEN.next()i
    resp = None
    try:
235
        child = ComponentProxy(FORKER).fork([basil_path], 'apbridge',
236
237
238
239
240
241
                'alps', None, None, runid, in_str, True)
        runid = child
    except Exception:
        _logger.critical("error communicating with bridge", exc_info=True)
        raise

242
    while True:
243
        #Is a timeout needed here?
244
245
246
        try:
            children = ComponentProxy(FORKER).get_children('apbridge', [runid])
        except xmlrpclib.Fault as fault:
247
            _log_xmlrpc_error(runid, fault)
248
249
250
        complete = False
        for child in children:
            if child['complete']:
251
252
253
254
                if child['lost_child'] and resp is None:
                    continue # Use the original response.  This child object is
                             # invalid.  If we never got one, then let the
                             # caller handle the error.
255
256
257
258
                if child['exit_status'] != 0:
                    _logger.error("BASIL returned a status of %s",
                            child['exit_status'])
                resp = child['stdout_string']
259
260
261
                try:
                    ComponentProxy(FORKER).cleanup_children([runid])
                except xmlrpclib.Fault as fault:
262
263
264
                    _log_xmlrpc_error(runid, fault)
                else:
                    complete = True
265
266
267
        if complete:
            break
        sleep(CHILD_SLEEP_TIMEOUT)
268

269
270
271
272
273
274
275
    parsed_resp = {}
    try:
        parsed_resp = parse_response(resp)
    except xml.etree.ElementTree.ParseError as exc:
        _logger.error('Error parsing response "%s"', resp)
        raise exc
    return parsed_resp
276
277

def print_node_names(spec):
278
    '''Debugging utility to print nodes returned by ALPS'''
279
280
281
282
283
284
    print spec['reservations']
    print spec['nodes'][0]
    for node in spec['nodes']:
        print node['name']

if __name__ == '__main__':
285
    #print_node_names(fetch_inventory(resinfo=True))
286

287
288
289
290
291
    # print fetch_inventory(changecount=0)
    # print extract_system_node_data(system())
    # print fetch_reserved_nodes()
    # print fetch_inventory(resinfo=True)
    print fetch_reservations()