Commit 579b717d authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch 'cray-merge-slp-base' into 'develop'

Base component changes to allow for multiforker support to work.

Changes to allow multiple alps_script_forkers to run at once.

See merge request !14
parents 9dc323cc 8a7842da
......@@ -15,6 +15,7 @@ import logging
import time
import threading
import xmlrpclib
import socket
import Cobalt
import Cobalt.Proxy
......@@ -42,7 +43,7 @@ except Exception, e:
def state_file_location():
'''Grab the location of the Cobalt statefiles.
'''Grab the location of the Cobalt statefiles.
default: /var/spool/cobalt
......@@ -51,21 +52,38 @@ def state_file_location():
def run_component (component_cls, argv=None, register=True, state_name=False,
cls_kwargs={}, extra_getopt='', time_out=10,
single_threaded=False):
'''Run the Cobalt component.
arguments:
component_cls
argv
register
state_name
cls_kwargs
extra_getopt
time_out
single_threaded
This will run until a the component is terminated.
single_threaded=False, seq_num=0, aug_comp_name=False,
state_name_match_component=False):
'''Run the specified Cobalt component until recieving signal to terminate.
Args::
component_cls: underlying component library class to use for execution
argv: component-specific arguments [Default: None]
register: If true, register the component with the service-locator [Default: True]
state_name: Name of statefile to use for a component. If false, do not
generate a statefile [Default: False]
cls_kwargs: keyword arguments to pass to the underlying component class
[Default: {}]
extra_getopt: extra options to getopt [Default: '']
time_out: timeout for default task iteration in seconds. [Default: 10]
single_threaded: If true, run as a single-threaded componenent.
[Default: False]
seq_num: Sequence number of the componenent. This should be incremented
for multiple components on one host. [Default: 0]
aug_comp_name: Add the hostname and a sequence number to the component
name registered with the service-loactor if True.
[Default: False]
state_name_match_component: Make the statefile name match the component
name in the service-locator if using the
augmented name. [Default: False]
Returns:
Exit status of the component
Notes:
Components that make use of the fork system call must use
single-threaded mode.
'''
......@@ -91,6 +109,18 @@ def run_component (component_cls, argv=None, register=True, state_name=False,
elif item[0] == '-d':
level = logging.DEBUG
# form new component name for running on multiple hosts.
# This includes a hostname and the sequence number to allow for multiple
# copies of the same component to be run. This is primarily used with
# forkers at this time.
if aug_comp_name:
component_cls.name = '_'.join([component_cls.name, socket.gethostname(),
str(seq_num)])
if state_name_match_component:
# Request that the statefile name match the augmented component name
if not state_name:
state_name = component_cls.name
logging.getLogger().setLevel(level)
Cobalt.Logging.setup_logging(component_cls.implementation, console_timestamp=True)
......
......@@ -46,9 +46,9 @@ __all__ = [
class Service (Data):
fields = Data.fields + ["tag", "name", "location"]
def __init__ (self, spec):
Data.__init__(self, spec)
spec = spec.copy()
......@@ -63,50 +63,50 @@ class Service (Data):
class ServiceDict (DataDict):
item_cls = Service
key = "name"
class ServiceLocator (Component):
"""Generic implementation of the service-location component.
Methods:
register -- register a service (exposed)
unregister -- remove a service from the registry (exposed)
locate -- retrieve the location of a service (exposed)
get_services -- part of the query interface from DataSet (exposed)
"""
name = "service-location"
# A default logger for the class is placed here.
# Assigning an instance-level logger is supported,
# and expected in the case of multiple instances.
logger = logging.getLogger("Cobalt.Components.ServiceLocator")
def __init__ (self, *args, **kwargs):
"""Initialize a new ServiceLocator.
All arguments are passed to the component constructor.
"""
Component.__init__(self, *args, **kwargs)
self.services = ServiceDict()
def __getstate__(self):
state = {}
state.update(Component.__getstate__(self))
state.update({
'slp_version': 1})
return state
def __setstate__(self, state):
Component.__setstate__(self, state)
def register (self, service_name, location):
"""Register the availability of a service.
Arguments:
service_name -- name of the service to register
location -- location of the service
......@@ -121,10 +121,10 @@ class ServiceLocator (Component):
service.location = location
service.touch()
register = exposed(register)
def unregister (self, service_name):
"""Remove a service from the registry.
Arguments:
service_name -- name of the service to remove
"""
......@@ -135,10 +135,10 @@ class ServiceLocator (Component):
else:
self.logger.info("unregister(%r)" % (service_name))
unregister = exposed(unregister)
def locate (self, service_name):
"""Retrieve the location for a service.
Arguments:
service_name -- name of the service to look up
"""
......@@ -149,7 +149,7 @@ class ServiceLocator (Component):
return ""
return service.location
locate = exposed(locate)
def get_services (self, specs):
"""Query interface "Get" method."""
return self.services.q_get(specs)
......@@ -157,20 +157,20 @@ class ServiceLocator (Component):
class PollingServiceLocator (ServiceLocator):
"""ServiceLocator with active expiration.
Methods:
check_services -- ping services (automatic)
"""
implementation = "polling"
logger = logging.getLogger("Cobalt.Components.PollingServiceLocator")
def check_services (self):
"""Ping each service to check its availability.
Unregister unresponsive services.
"""
for service in self.services.values():
......@@ -186,34 +186,34 @@ class PollingServiceLocator (ServiceLocator):
class TimingServiceLocator (ServiceLocator):
"""ServiceLocator with passive expiration.
Attributes:
expire -- number of seconds to expire a service
Methods:
expire_services -- check service timestamps (automatic)
"""
implementation = "slp"
logger = logging.getLogger("Cobalt.Components.TimingServiceLocator")
def __init__ (self, expire=180, *args, **kwargs):
"""Initialize a TimingServiceLocator.
Keyword arguments:
expire -- Number of seconds when services expire.
Additional arguments are passed to ServiceLocator.
"""
ServiceLocator.__init__(self, *args, **kwargs)
self.expire = expire
def expire_services (self):
"""Check each service timestamp.
Unregister expired services.
"""
now = time.time()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment