base.py 15.6 KB
Newer Older
1
2
"""Cobalt component base."""

3
__revision__ = '$Revision: 2130 $'
4
5
6
7
8

__all__ = ["Component", "exposed", "automatic", "run_component"]

import inspect
import os
9
import os.path
10
11
12
13
14
import cPickle
import pydoc
import sys
import getopt
import logging
15
import time
16
import threading
17
import xmlrpclib
18
import socket
19

20
import Cobalt
21
22
import Cobalt.Proxy
import Cobalt.Logging
23
from Cobalt.Server import BaseXMLRPCServer, XMLRPCServer, find_intended_location
24
from Cobalt.Data import get_spec_fields
Daniel Buettner's avatar
Daniel Buettner committed
25
from Cobalt.Exceptions import NoExposedMethod
Narayan Desai's avatar
Narayan Desai committed
26
from Cobalt.Statistics import Statistics
27
28
29
30
31
32
33
34
35
36
37
38
import Cobalt.Util
init_cobalt_config = Cobalt.Util.init_cobalt_config
get_config_option = Cobalt.Util.get_config_option
ParsingError = Cobalt.Util.ParsingError


try:
    print >>sys.stderr, "INFO: initializing configuration system"
    _config_files_read = init_cobalt_config()
    _missing_config_files = list(set(Cobalt.CONFIG_FILES).difference(set(_config_files_read)))
    if _missing_config_files:
        print >>sys.stderr, "Warning: one or more config files were not found: %s" % (str(_missing_config_files)[1:-1],)
39
except Exception, e:
40
41
    print >>sys.stderr, "ERROR: unable to parse config file:\n\t%s" % (e.message,)
    sys.exit(1)
42

43
44

def state_file_location():
45

46
    '''Grab the location of the Cobalt statefiles.
47
48
49
50

    default: /var/spool/cobalt

    '''
51
    return os.path.expandvars(get_config_option('statefiles', "location", "/var/spool/cobalt"))
52

53
def run_component (component_cls, argv=None, register=True, state_name=False,
54
                   cls_kwargs={}, extra_getopt='', time_out=10,
55
56
                   single_threaded=False, seq_num=0, aug_comp_name=False,
                   state_name_match_component=False):
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
    '''Run the specified Cobalt component until recieving signal to terminate.

    Args::

        component_cls: underlying component library class to use for execution
        argv: component-specific arguments [Default: None]
        register: If true, register the component with the service-locator [Default: True]
        state_name: Name of statefile to use for a component.  If false, do not
                    generate a statefile [Default: False]
        cls_kwargs: keyword arguments to pass to the underlying component class
                    [Default: {}]
        extra_getopt: extra options to getopt [Default: '']
        time_out: timeout for default task iteration in seconds. [Default: 10]
        single_threaded: If true, run as a single-threaded componenent.
                         [Default: False]
        seq_num: Sequence number of the componenent.  This should be incremented
                 for multiple components on one host. [Default: 0]
        aug_comp_name: Add the hostname and a sequence number to the component
                       name registered with the service-loactor if True.
                       [Default: False]
        state_name_match_component: Make the statefile name match the component
                                    name in the service-locator if using the
                                    augmented name. [Default: False]

    Returns:
        Exit status of the component

    Notes:
        Components that make use of the fork system call must use
        single-threaded mode.
87
88
89

    '''

90
91
92
    if argv is None:
        argv = sys.argv
    try:
93
        (opts, arg) = getopt.getopt(argv[1:], 'D:d' + extra_getopt)
94
95
96
    except getopt.GetoptError, e:
        print >> sys.stderr, e
        print >> sys.stderr, "Usage:"
97
        print >> sys.stderr, "%s [-d] [-D pidfile] [--config-files file1:file2]" % (os.path.basename(argv[0]))
98
        sys.exit(1)
99

100
101
102
    # default settings
    daemon = False
    pidfile = ""
103
    level = logging.INFO
104
105
    # get user input
    for item in opts:
106
        if item[0] == '-D':
107
            daemon = True
108
109
110
            pidfile_name = item[1]
        elif item[0] == '-d':
            level = logging.DEBUG
111

112
113
114
115
    # form new component name for running on multiple hosts.
    # This includes a hostname and the sequence number to allow for multiple
    # copies of the same component to be run.  This is primarily used with
    # forkers at this time.
116
117
118
119
120
121
122
123
    if aug_comp_name:
        component_cls.name = '_'.join([component_cls.name, socket.gethostname(),
            str(seq_num)])
    if state_name_match_component:
        # Request that the statefile name match the augmented component name
        if not state_name:
            state_name = component_cls.name

124
    logging.getLogger().setLevel(level)
125
    Cobalt.Logging.setup_logging(component_cls.implementation, console_timestamp=True)
126
127
128
129
130

    if daemon:
        child_pid = os.fork()
        if child_pid != 0:
            return
131

132
        os.setsid()
133

134
135
136
        child_pid = os.fork()
        if child_pid != 0:
            os._exit(0)
137

138
139
140
141
        redirect_file = open("/dev/null", "w+")
        os.dup2(redirect_file.fileno(), sys.__stdin__.fileno())
        os.dup2(redirect_file.fileno(), sys.__stdout__.fileno())
        os.dup2(redirect_file.fileno(), sys.__stderr__.fileno())
142

143
144
        os.chdir(os.sep)
        os.umask(0)
145

146
147
148
        pidfile = open(pidfile_name or "/dev/null", "w")
        print >> pidfile, os.getpid()
        pidfile.close()
149

150
151
152
153
154
155
    if state_name:
        state_file_name = "%s/%s" % (state_file_location(), state_name)
        try:
            component = cPickle.load(open(state_file_name))
        except:
            component = component_cls(**cls_kwargs)
156
            component.logger.error("UNABLE TO LOAD STATE FROM %s.  STARTING WITH A BLANK SLATE.", state_file_name, exc_info=True)
157
158
159
        component.statefile = state_file_name
    else:
        component = component_cls(**cls_kwargs)
160

161
    location = find_intended_location(component)
162
    try:
163
164
165
        keypath = os.path.expandvars(get_config_option('communication', 'key'))
        certpath = os.path.expandvars(get_config_option('communication', 'cert'))
        capath = os.path.expandvars(get_config_option('communication', 'ca'))
166
167
    except:
        keypath = '/etc/cobalt.key'
168
169
        certpath = None
        capath = None
170

171
    if single_threaded:
172
173
        server = BaseXMLRPCServer(location, keyfile=keypath, certfile=certpath, 
                          cafile=capath, register=register, timeout=time_out)
174
    else:
175
176
        server = XMLRPCServer(location, keyfile=keypath, certfile=certpath,
                          cafile=capath, register=register, timeout=time_out)
177
178
179
180
181

    #Two components of the same type cannot be allowed to run at the same time.
    if component.name != 'service-location':
        address = Cobalt.Proxy.ComponentProxy('service-location').locate(component.name)
        if address:
182
            component.logger.critical("CRITICAL: Instance of component %s already registered.  Startup of this instance aborted.", component.name)
183
            sys.exit(1)
184
    server.register_instance(component)
185

186
187
188
189
    try:
        server.serve_forever()
    finally:
        server.server_close()
190
191
192

def exposed (func):
    """Mark a method to be exposed publically.
193

194
195
196
197
198
    Examples:
    class MyComponent (Component):
        @expose
        def my_method (self, param1, param2):
            do_stuff()
199

200
201
202
203
204
205
206
207
    class MyComponent (Component):
        def my_method (self, param1, param2):
            do_stuff()
        my_method = expose(my_method)
    """
    func.exposed = True
    return func

208
209
def automatic (func, period=10):
    """Mark a method to be run periodically."""
210
    func.automatic = True
211
212
    func.automatic_period = period
    func.automatic_ts = -1
213
214
    return func

215
def locking (func):
216
    """Mark a function as being internally thread safe"""
217
    func.locking = True
218
219
220
221
222
    return func

def readonly (func):
    """Mark a function as read-only -- no data effects in component inst"""
    func.readonly = True
223
224
    return func

225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def query (func=None, **kwargs):
    """Mark a method to be marshalled as a query."""
    def _query (func):
        if kwargs.get("all_fields", True):
            func.query_all_fields = True
        func.query = True
        return func
    if func is not None:
        return _query(func)
    return _query

def marshal_query_result (items, specs=None):
    if specs is not None:
        fields = get_spec_fields(specs)
    else:
        fields = None
    return [item.to_rx(fields) for item in items]

class Component (object):
244

245
    """Base component.
246

247
248
249
250
    Intended to be served as an instance by Cobalt.Component.XMLRPCServer
    >>> server = Cobalt.Component.XMLRPCServer(location, keyfile)
    >>> component = Cobalt.Component.Component()
    >>> server.serve_instance(component)
251

252
253
254
    Class attributes:
    name -- logical component name (e.g., "queue-manager", "process-manager")
    implementation -- implementation identifier (e.g., "BlueGene/L", "BlueGene/P")
255

256
257
258
    Methods:
    save -- pickle the component to a file
    do_tasks -- perform automatic tasks for the component
259

260
    """
261

262
263
    name = "component"
    implementation = "generic"
264

265
266
    def __init__ (self, **kwargs):
        """Initialize a new component.
267

268
269
270
271
272
        Keyword arguments:
        statefile -- file in which to save state automatically
        """
        self.statefile = kwargs.get("statefile", None)
        if kwargs.get("register", True):
273
            self._registered_component=True
274
            Cobalt.Proxy.register_component(self)
275
276
        else:
            self._registered_component=False
277
        self.logger = logging.getLogger("%s %s" % (self.implementation, self.name))
278
279
        self._component_lock = threading.Lock()
        self._component_lock_acquired_time = None
Narayan Desai's avatar
Narayan Desai committed
280
        self.statistics = Statistics()
281
282
        self.logger.info("%s:%s component executing on %s", self.name,
                self.implementation, socket.gethostname())
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300

    def __getstate__(self):
        state = {}
        return {
            'base_component_version':1,
            'register_component':self._registered_component}

    def __setstate__(self, state):
        Cobalt.Util.fix_set(state)
        if hasattr(state, 'register_component') and state['register_component']:
            self._registered_component=True
            Cobalt.Proxy.register_component(self)
        else:
            self._registered_component=False
        self.logger = logging.getLogger("%s %s" % (self.implementation, self.name))
        self._component_lock = threading.Lock()
        self._component_lock_acquired_time = None
        self.statistics = Statistics()
301
302
        self.logger.info("%s:%s component executing on %s", self.name,
                self.implementation, socket.gethostname())
303
304
305
306
307
308

    def component_lock_acquire(self):
        entry_time = time.time()
        self._component_lock.acquire()
        self._component_lock_acquired_time = time.time()
        self.statistics.add_value('component_lock_wait', self._component_lock_acquired_time - entry_time)
309

310
311
312
313
    def component_lock_release(self):
        self.statistics.add_value('component_lock_held', time.time() - self._component_lock_acquired_time)
        self._component_lock_acquired_time = None
        self._component_lock.release()
314

315
316
    def save (self, statefile=None):
        """Pickle the component.
317

318
319
320
321
322
        Arguments:
        statefile -- use this file, rather than component.statefile
        """
        statefile = statefile or self.statefile
        if statefile:
323
            temp_statefile = statefile + ".temp"
324
            data = cPickle.dumps(self)
325
            try:
326
327
328
329
330
                fd = file(temp_statefile, "wb")
                fd.write(data)
                fd.close()
            except IOError, e:
                self.logger.error("statefile failure : %s" % e)
331
                return str(e)
332
333
            else:
                os.rename(temp_statefile, statefile)
334
335
                return "state saved to file: %s" % statefile
    save = exposed(save)
336

337
338
    def do_tasks (self):
        """Perform automatic tasks for the component.
339

340
341
342
343
344
        Automatic tasks are member callables with an attribute
        automatic == True.
        """
        for name, func in inspect.getmembers(self, callable):
            if getattr(func, "automatic", False):
345
                need_to_lock = not getattr(func, 'locking', False)
346
                if (time.time() - func.automatic_ts) > func.automatic_period:
347
                    if need_to_lock:
348
                        self.component_lock_acquire()
349
                    try:
350
                        mt1 = time.time()
351
352
                        func()
                    except:
353
354
355
356
                        self.logger.error("Automatic method %s failed" \
                                          % (name), exc_info=1)
                    finally:
                        mt2 = time.time()
357
358
                        if not need_to_lock:
                            self.component_lock_acquire()
359
                        self.statistics.add_value(name, mt2-mt1)
360
                        self.component_lock_release()
361
                        func.__dict__['automatic_ts'] = time.time()
362

363
364
    def _resolve_exposed_method (self, method_name):
        """Resolve an exposed method.
365

366
367
368
369
370
371
372
373
374
375
        Arguments:
        method_name -- name of the method to resolve
        """
        try:
            func = getattr(self, method_name)
        except AttributeError:
            raise NoExposedMethod(method_name)
        if not getattr(func, "exposed", False):
            raise NoExposedMethod(method_name)
        return func
376

Narayan Desai's avatar
Narayan Desai committed
377
    def _dispatch (self, method, args, dispatch_dict):
378
        """Custom XML-RPC dispatcher for components.
379

380
381
382
        method -- XML-RPC method name
        args -- tuple of paramaters to method
        """
Narayan Desai's avatar
Narayan Desai committed
383
384
385
386
387
388
389
390
391
        if method in dispatch_dict:
            method_func = dispatch_dict[method]
        else:
            try:
                method_func = self._resolve_exposed_method(method)
            except Exception, e:
                if getattr(e, "log", True):
                    self.logger.error(e, exc_info=True)
                raise xmlrpclib.Fault(getattr(e, "fault_code", 1), str(e))
392

393
        need_to_lock = not getattr(method_func, 'locking', False)
Narayan Desai's avatar
Narayan Desai committed
394
        if need_to_lock:
395
            self.component_lock_acquire()
396
        try:
Narayan Desai's avatar
Narayan Desai committed
397
398
            method_start = time.time()
            result = method_func(*args)
399
        except Exception, e:
400
401
            if getattr(e, "log", True):
                self.logger.error(e, exc_info=True)
402
            raise xmlrpclib.Fault(getattr(e, "fault_code", 1), str(e))
Narayan Desai's avatar
Narayan Desai committed
403
        finally:
404
            method_done = time.time()
405
406
            if not need_to_lock:
                self.component_lock_acquire()
407
            self.statistics.add_value(method, method_done - method_start)
408
            self.component_lock_release()
Narayan Desai's avatar
Narayan Desai committed
409
410
411
412
413
414
415
        if getattr(method_func, "query", False):
            if not getattr(method_func, "query_all_methods", False):
                margs = args[:1]
            else:
                margs = []
            result = marshal_query_result(result, *margs)
        return result
Narayan Desai's avatar
Narayan Desai committed
416
417
418

    @exposed
    def listMethods (self):
419
420
421
422
423
        """Custom XML-RPC introspective method list."""
        return [
            name for name, func in inspect.getmembers(self, callable)
            if getattr(func, "exposed", False)
        ]
Narayan Desai's avatar
Narayan Desai committed
424
425
426

    @exposed
    def methodHelp (self, method_name):
427
        """Custom XML-RPC introspective method help.
428

429
430
431
432
433
434
435
436
        Arguments:
        method_name -- name of method to get help on
        """
        try:
            func = self._resolve_exposed_method(method_name)
        except NoExposedMethod:
            return ""
        return pydoc.getdoc(func)
437

438
439
440
441
    def get_name (self):
        """The name of the component."""
        return self.name
    get_name = exposed(get_name)
442

443
444
445
446
    def get_implementation (self):
        """The implementation of the component."""
        return self.implementation
    get_implementation = exposed(get_implementation)
447
448
449
450
451

    def get_statistics (self):
        """Get current statistics about component execution"""
        return self.statistics.display()
    get_statistics = exposed(get_statistics)
452

453