Commit 95d9a2a6 authored by Paul Rich's avatar Paul Rich
Browse files

Base component changes to allow for multiforker support to work.

parent cb0ce41e
......@@ -15,6 +15,7 @@ import logging
import time
import threading
import xmlrpclib
import socket
import Cobalt
import Cobalt.Proxy
......@@ -42,7 +43,7 @@ except Exception, e:
def state_file_location():
'''Grab the location of the Cobalt statefiles.
'''Grab the location of the Cobalt statefiles.
default: /var/spool/cobalt
......@@ -51,8 +52,9 @@ def state_file_location():
def run_component (component_cls, argv=None, register=True, state_name=False,
cls_kwargs={}, extra_getopt='', time_out=10,
single_threaded=False):
'''Run the Cobalt component.
single_threaded=False, seq_num=0, aug_comp_name=False,
state_name_match_component=False):
'''Run the Cobalt component.
arguments:
......@@ -64,6 +66,8 @@ def run_component (component_cls, argv=None, register=True, state_name=False,
extra_getopt
time_out
single_threaded
seq_num
aug_comp_name
This will run until a the component is terminated.
......@@ -91,6 +95,16 @@ def run_component (component_cls, argv=None, register=True, state_name=False,
elif item[0] == '-d':
level = logging.DEBUG
# form new component name for running on multiple hosts override in
# component_cls passed in
if aug_comp_name:
component_cls.name = '_'.join([component_cls.name, socket.gethostname(),
str(seq_num)])
if state_name_match_component:
# Request that the statefile name match the augmented component name
if not state_name:
state_name = component_cls.name
logging.getLogger().setLevel(level)
Cobalt.Logging.setup_logging(component_cls.implementation, console_timestamp=True)
......
......@@ -46,9 +46,9 @@ __all__ = [
class Service (Data):
fields = Data.fields + ["tag", "name", "location"]
def __init__ (self, spec):
Data.__init__(self, spec)
spec = spec.copy()
......@@ -63,50 +63,50 @@ class Service (Data):
class ServiceDict (DataDict):
item_cls = Service
key = "name"
class ServiceLocator (Component):
"""Generic implementation of the service-location component.
Methods:
register -- register a service (exposed)
unregister -- remove a service from the registry (exposed)
locate -- retrieve the location of a service (exposed)
get_services -- part of the query interface from DataSet (exposed)
"""
name = "service-location"
# A default logger for the class is placed here.
# Assigning an instance-level logger is supported,
# and expected in the case of multiple instances.
logger = logging.getLogger("Cobalt.Components.ServiceLocator")
def __init__ (self, *args, **kwargs):
"""Initialize a new ServiceLocator.
All arguments are passed to the component constructor.
"""
Component.__init__(self, *args, **kwargs)
self.services = ServiceDict()
def __getstate__(self):
state = {}
state.update(Component.__getstate__(self))
state.update({
'slp_version': 1})
return state
def __setstate__(self, state):
Component.__setstate__(self, state)
def register (self, service_name, location):
"""Register the availability of a service.
Arguments:
service_name -- name of the service to register
location -- location of the service
......@@ -121,10 +121,10 @@ class ServiceLocator (Component):
service.location = location
service.touch()
register = exposed(register)
def unregister (self, service_name):
"""Remove a service from the registry.
Arguments:
service_name -- name of the service to remove
"""
......@@ -135,10 +135,10 @@ class ServiceLocator (Component):
else:
self.logger.info("unregister(%r)" % (service_name))
unregister = exposed(unregister)
def locate (self, service_name):
"""Retrieve the location for a service.
Arguments:
service_name -- name of the service to look up
"""
......@@ -149,7 +149,7 @@ class ServiceLocator (Component):
return ""
return service.location
locate = exposed(locate)
def get_services (self, specs):
"""Query interface "Get" method."""
return self.services.q_get(specs)
......@@ -157,20 +157,20 @@ class ServiceLocator (Component):
class PollingServiceLocator (ServiceLocator):
"""ServiceLocator with active expiration.
Methods:
check_services -- ping services (automatic)
"""
implementation = "polling"
logger = logging.getLogger("Cobalt.Components.PollingServiceLocator")
def check_services (self):
"""Ping each service to check its availability.
Unregister unresponsive services.
"""
for service in self.services.values():
......@@ -186,34 +186,34 @@ class PollingServiceLocator (ServiceLocator):
class TimingServiceLocator (ServiceLocator):
"""ServiceLocator with passive expiration.
Attributes:
expire -- number of seconds to expire a service
Methods:
expire_services -- check service timestamps (automatic)
"""
implementation = "slp"
logger = logging.getLogger("Cobalt.Components.TimingServiceLocator")
def __init__ (self, expire=180, *args, **kwargs):
"""Initialize a TimingServiceLocator.
Keyword arguments:
expire -- Number of seconds when services expire.
Additional arguments are passed to ServiceLocator.
"""
ServiceLocator.__init__(self, *args, **kwargs)
self.expire = expire
def expire_services (self):
"""Check each service timestamp.
Unregister expired services.
"""
now = time.time()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment