base.py 15.3 KB
Newer Older
1
2
"""Cobalt component base."""

3
__revision__ = '$Revision: 2130 $'
4
5
6
7
8

__all__ = ["Component", "exposed", "automatic", "run_component"]

import inspect
import os
9
import os.path
10
11
12
13
14
import cPickle
import pydoc
import sys
import getopt
import logging
15
import time
16
import threading
17
import xmlrpclib
18
import socket
19

20
import Cobalt
21
22
import Cobalt.Proxy
import Cobalt.Logging
23
from Cobalt.Server import BaseXMLRPCServer, XMLRPCServer, find_intended_location
24
from Cobalt.Data import get_spec_fields
Daniel Buettner's avatar
Daniel Buettner committed
25
from Cobalt.Exceptions import NoExposedMethod
Narayan Desai's avatar
Narayan Desai committed
26
from Cobalt.Statistics import Statistics
27
28
29
30
31
32
33
34
35
36
37
38
import Cobalt.Util
init_cobalt_config = Cobalt.Util.init_cobalt_config
get_config_option = Cobalt.Util.get_config_option
ParsingError = Cobalt.Util.ParsingError


try:
    print >>sys.stderr, "INFO: initializing configuration system"
    _config_files_read = init_cobalt_config()
    _missing_config_files = list(set(Cobalt.CONFIG_FILES).difference(set(_config_files_read)))
    if _missing_config_files:
        print >>sys.stderr, "Warning: one or more config files were not found: %s" % (str(_missing_config_files)[1:-1],)
39
except Exception, e:
40
41
    print >>sys.stderr, "ERROR: unable to parse config file:\n\t%s" % (e.message,)
    sys.exit(1)
42

43
44

def state_file_location():
45

46
    '''Grab the location of the Cobalt statefiles.
47
48
49
50

    default: /var/spool/cobalt

    '''
51
    return os.path.expandvars(get_config_option('statefiles', "location", "/var/spool/cobalt"))
52

53
def run_component (component_cls, argv=None, register=True, state_name=False,
54
                   cls_kwargs={}, extra_getopt='', time_out=10,
55
56
                   single_threaded=False, seq_num=0, aug_comp_name=False,
                   state_name_match_component=False):
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
    '''Run the specified Cobalt component until recieving signal to terminate.

    Args::

        component_cls: underlying component library class to use for execution
        argv: component-specific arguments [Default: None]
        register: If true, register the component with the service-locator [Default: True]
        state_name: Name of statefile to use for a component.  If false, do not
                    generate a statefile [Default: False]
        cls_kwargs: keyword arguments to pass to the underlying component class
                    [Default: {}]
        extra_getopt: extra options to getopt [Default: '']
        time_out: timeout for default task iteration in seconds. [Default: 10]
        single_threaded: If true, run as a single-threaded componenent.
                         [Default: False]
        seq_num: Sequence number of the componenent.  This should be incremented
                 for multiple components on one host. [Default: 0]
        aug_comp_name: Add the hostname and a sequence number to the component
                       name registered with the service-loactor if True.
                       [Default: False]
        state_name_match_component: Make the statefile name match the component
                                    name in the service-locator if using the
                                    augmented name. [Default: False]

    Returns:
        Exit status of the component

    Notes:
        Components that make use of the fork system call must use
        single-threaded mode.
87
88
89

    '''

90
91
92
    if argv is None:
        argv = sys.argv
    try:
93
        (opts, arg) = getopt.getopt(argv[1:], 'D:d' + extra_getopt)
94
95
96
    except getopt.GetoptError, e:
        print >> sys.stderr, e
        print >> sys.stderr, "Usage:"
97
        print >> sys.stderr, "%s [-d] [-D pidfile] [--config-files file1:file2]" % (os.path.basename(argv[0]))
98
        sys.exit(1)
99

100
101
102
    # default settings
    daemon = False
    pidfile = ""
103
    level = logging.INFO
104
105
    # get user input
    for item in opts:
106
        if item[0] == '-D':
107
            daemon = True
108
109
110
            pidfile_name = item[1]
        elif item[0] == '-d':
            level = logging.DEBUG
111

112
113
114
115
    # form new component name for running on multiple hosts.
    # This includes a hostname and the sequence number to allow for multiple
    # copies of the same component to be run.  This is primarily used with
    # forkers at this time.
116
117
118
119
120
121
122
123
    if aug_comp_name:
        component_cls.name = '_'.join([component_cls.name, socket.gethostname(),
            str(seq_num)])
    if state_name_match_component:
        # Request that the statefile name match the augmented component name
        if not state_name:
            state_name = component_cls.name

124
    logging.getLogger().setLevel(level)
125
    Cobalt.Logging.setup_logging(component_cls.implementation, console_timestamp=True)
126
127
128
129
130

    if daemon:
        child_pid = os.fork()
        if child_pid != 0:
            return
131

132
        os.setsid()
133

134
135
136
        child_pid = os.fork()
        if child_pid != 0:
            os._exit(0)
137

138
139
140
141
        redirect_file = open("/dev/null", "w+")
        os.dup2(redirect_file.fileno(), sys.__stdin__.fileno())
        os.dup2(redirect_file.fileno(), sys.__stdout__.fileno())
        os.dup2(redirect_file.fileno(), sys.__stderr__.fileno())
142

143
144
        os.chdir(os.sep)
        os.umask(0)
145

146
147
148
        pidfile = open(pidfile_name or "/dev/null", "w")
        print >> pidfile, os.getpid()
        pidfile.close()
149

150
151
152
153
154
155
    if state_name:
        state_file_name = "%s/%s" % (state_file_location(), state_name)
        try:
            component = cPickle.load(open(state_file_name))
        except:
            component = component_cls(**cls_kwargs)
156
            component.logger.error("UNABLE TO LOAD STATE FROM %s.  STARTING WITH A BLANK SLATE.", state_file_name, exc_info=True)
157
158
159
        component.statefile = state_file_name
    else:
        component = component_cls(**cls_kwargs)
160

161
    location = find_intended_location(component)
162
    try:
163
164
165
        keypath = os.path.expandvars(get_config_option('communication', 'key'))
        certpath = os.path.expandvars(get_config_option('communication', 'cert'))
        capath = os.path.expandvars(get_config_option('communication', 'ca'))
166
167
    except:
        keypath = '/etc/cobalt.key'
168
169
        certpath = None
        capath = None
170

171
    if single_threaded:
172
173
        server = BaseXMLRPCServer(location, keyfile=keypath, certfile=certpath, 
                          cafile=capath, register=register, timeout=time_out)
174
    else:
175
176
        server = XMLRPCServer(location, keyfile=keypath, certfile=certpath,
                          cafile=capath, register=register, timeout=time_out)
177
178
179
180
181

    #Two components of the same type cannot be allowed to run at the same time.
    if component.name != 'service-location':
        address = Cobalt.Proxy.ComponentProxy('service-location').locate(component.name)
        if address:
182
            component.logger.critical("CRITICAL: Instance of component %s already registered.  Startup of this instance aborted.", component.name)
183
            sys.exit(1)
184
    server.register_instance(component)
185

186
187
188
189
    try:
        server.serve_forever()
    finally:
        server.server_close()
190
191
192

def exposed (func):
    """Mark a method to be exposed publically.
193

194
195
196
197
198
    Examples:
    class MyComponent (Component):
        @expose
        def my_method (self, param1, param2):
            do_stuff()
199

200
201
202
203
204
205
206
207
    class MyComponent (Component):
        def my_method (self, param1, param2):
            do_stuff()
        my_method = expose(my_method)
    """
    func.exposed = True
    return func

208
209
def automatic (func, period=10):
    """Mark a method to be run periodically."""
210
    func.automatic = True
211
212
    func.automatic_period = period
    func.automatic_ts = -1
213
214
    return func

215
def locking (func):
216
    """Mark a function as being internally thread safe"""
217
    func.locking = True
218
219
220
221
222
    return func

def readonly (func):
    """Mark a function as read-only -- no data effects in component inst"""
    func.readonly = True
223
224
    return func

225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def query (func=None, **kwargs):
    """Mark a method to be marshalled as a query."""
    def _query (func):
        if kwargs.get("all_fields", True):
            func.query_all_fields = True
        func.query = True
        return func
    if func is not None:
        return _query(func)
    return _query

def marshal_query_result (items, specs=None):
    if specs is not None:
        fields = get_spec_fields(specs)
    else:
        fields = None
    return [item.to_rx(fields) for item in items]

class Component (object):
244

245
    """Base component.
246

247
248
249
250
    Intended to be served as an instance by Cobalt.Component.XMLRPCServer
    >>> server = Cobalt.Component.XMLRPCServer(location, keyfile)
    >>> component = Cobalt.Component.Component()
    >>> server.serve_instance(component)
251

252
253
254
    Class attributes:
    name -- logical component name (e.g., "queue-manager", "process-manager")
    implementation -- implementation identifier (e.g., "BlueGene/L", "BlueGene/P")
255

256
257
258
    Methods:
    save -- pickle the component to a file
    do_tasks -- perform automatic tasks for the component
259

260
    """
261

262
263
    name = "component"
    implementation = "generic"
264

265
266
    def __init__ (self, **kwargs):
        """Initialize a new component.
267

268
269
270
271
272
        Keyword arguments:
        statefile -- file in which to save state automatically
        """
        self.statefile = kwargs.get("statefile", None)
        if kwargs.get("register", True):
273
            self._registered_component=True
274
            Cobalt.Proxy.register_component(self)
275
276
        else:
            self._registered_component=False
277
        self.logger = logging.getLogger("%s %s" % (self.implementation, self.name))
278
279
        self._component_lock = threading.Lock()
        self._component_lock_acquired_time = None
Narayan Desai's avatar
Narayan Desai committed
280
        self.statistics = Statistics()
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304

    def __getstate__(self):
        state = {}
        return {
            'base_component_version':1,
            'register_component':self._registered_component}

    def __setstate__(self, state):
        Cobalt.Util.fix_set(state)
        if hasattr(state, 'register_component') and state['register_component']:
            self._registered_component=True
            Cobalt.Proxy.register_component(self)
        else:
            self._registered_component=False
        self.logger = logging.getLogger("%s %s" % (self.implementation, self.name))
        self._component_lock = threading.Lock()
        self._component_lock_acquired_time = None
        self.statistics = Statistics()

    def component_lock_acquire(self):
        entry_time = time.time()
        self._component_lock.acquire()
        self._component_lock_acquired_time = time.time()
        self.statistics.add_value('component_lock_wait', self._component_lock_acquired_time - entry_time)
305

306
307
308
309
    def component_lock_release(self):
        self.statistics.add_value('component_lock_held', time.time() - self._component_lock_acquired_time)
        self._component_lock_acquired_time = None
        self._component_lock.release()
310

311
312
    def save (self, statefile=None):
        """Pickle the component.
313

314
315
316
317
318
        Arguments:
        statefile -- use this file, rather than component.statefile
        """
        statefile = statefile or self.statefile
        if statefile:
319
            temp_statefile = statefile + ".temp"
320
            data = cPickle.dumps(self)
321
            try:
322
323
324
325
326
                fd = file(temp_statefile, "wb")
                fd.write(data)
                fd.close()
            except IOError, e:
                self.logger.error("statefile failure : %s" % e)
327
                return str(e)
328
329
            else:
                os.rename(temp_statefile, statefile)
330
331
                return "state saved to file: %s" % statefile
    save = exposed(save)
332

333
334
    def do_tasks (self):
        """Perform automatic tasks for the component.
335

336
337
338
339
340
        Automatic tasks are member callables with an attribute
        automatic == True.
        """
        for name, func in inspect.getmembers(self, callable):
            if getattr(func, "automatic", False):
341
                need_to_lock = not getattr(func, 'locking', False)
342
                if (time.time() - func.automatic_ts) > func.automatic_period:
343
                    if need_to_lock:
344
                        self.component_lock_acquire()
345
                    try:
346
                        mt1 = time.time()
347
348
                        func()
                    except:
349
350
351
352
                        self.logger.error("Automatic method %s failed" \
                                          % (name), exc_info=1)
                    finally:
                        mt2 = time.time()
353
354
                        if not need_to_lock:
                            self.component_lock_acquire()
355
                        self.statistics.add_value(name, mt2-mt1)
356
                        self.component_lock_release()
357
                        func.__dict__['automatic_ts'] = time.time()
358

359
360
    def _resolve_exposed_method (self, method_name):
        """Resolve an exposed method.
361

362
363
364
365
366
367
368
369
370
371
        Arguments:
        method_name -- name of the method to resolve
        """
        try:
            func = getattr(self, method_name)
        except AttributeError:
            raise NoExposedMethod(method_name)
        if not getattr(func, "exposed", False):
            raise NoExposedMethod(method_name)
        return func
372

Narayan Desai's avatar
Narayan Desai committed
373
    def _dispatch (self, method, args, dispatch_dict):
374
        """Custom XML-RPC dispatcher for components.
375

376
377
378
        method -- XML-RPC method name
        args -- tuple of paramaters to method
        """
Narayan Desai's avatar
Narayan Desai committed
379
380
381
382
383
384
385
386
387
        if method in dispatch_dict:
            method_func = dispatch_dict[method]
        else:
            try:
                method_func = self._resolve_exposed_method(method)
            except Exception, e:
                if getattr(e, "log", True):
                    self.logger.error(e, exc_info=True)
                raise xmlrpclib.Fault(getattr(e, "fault_code", 1), str(e))
388

389
        need_to_lock = not getattr(method_func, 'locking', False)
Narayan Desai's avatar
Narayan Desai committed
390
        if need_to_lock:
391
            self.component_lock_acquire()
392
        try:
Narayan Desai's avatar
Narayan Desai committed
393
394
            method_start = time.time()
            result = method_func(*args)
395
        except Exception, e:
396
397
            if getattr(e, "log", True):
                self.logger.error(e, exc_info=True)
398
            raise xmlrpclib.Fault(getattr(e, "fault_code", 1), str(e))
Narayan Desai's avatar
Narayan Desai committed
399
        finally:
400
            method_done = time.time()
401
402
            if not need_to_lock:
                self.component_lock_acquire()
403
            self.statistics.add_value(method, method_done - method_start)
404
            self.component_lock_release()
Narayan Desai's avatar
Narayan Desai committed
405
406
407
408
409
410
411
        if getattr(method_func, "query", False):
            if not getattr(method_func, "query_all_methods", False):
                margs = args[:1]
            else:
                margs = []
            result = marshal_query_result(result, *margs)
        return result
Narayan Desai's avatar
Narayan Desai committed
412
413
414

    @exposed
    def listMethods (self):
415
416
417
418
419
        """Custom XML-RPC introspective method list."""
        return [
            name for name, func in inspect.getmembers(self, callable)
            if getattr(func, "exposed", False)
        ]
Narayan Desai's avatar
Narayan Desai committed
420
421
422

    @exposed
    def methodHelp (self, method_name):
423
        """Custom XML-RPC introspective method help.
424

425
426
427
428
429
430
431
432
        Arguments:
        method_name -- name of method to get help on
        """
        try:
            func = self._resolve_exposed_method(method_name)
        except NoExposedMethod:
            return ""
        return pydoc.getdoc(func)
433

434
435
436
437
    def get_name (self):
        """The name of the component."""
        return self.name
    get_name = exposed(get_name)
438

439
440
441
442
    def get_implementation (self):
        """The implementation of the component."""
        return self.implementation
    get_implementation = exposed(get_implementation)
443
444
445
446
447

    def get_statistics (self):
        """Get current statistics about component execution"""
        return self.statistics.display()
    get_statistics = exposed(get_statistics)
448

449