slp.py 6.79 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
"""Implementations of the service-location component.

Classes:
ServiceLocator -- generic implementation
PollingServiceLocator -- network-aware
TimingServiceLocator -- timeout-based

The service-location component provides registration and lookup functions
to store the locations of dynamically addressed xmlrpc servers. It is used
directly by ComponentProxy to connect to components whose service location
is not configured statically locally.

This module currently provides three implementations of service-location:

The generic implementation, ServiceLocator, is completely passive, and
has no logic regarding the expiration or validity of the services and
locations registered. It is little more than a hash, storing service data
verbatim as it is registered and unregistered.

PollingServiceLocator extends the generic implementation by automatically
polling all registered services by calling their "ping" methods. This
verifies that (a) the component is serving at the specified location, and
(b) the component is responding.

TimingServiceLocator extends the generic implementation by aotomatically
expiring any service that has not registered in a given timeframe (specified
at initialization with the "expire" keyword argument). Services are expected
to register themselves regularly at an interval <= that timeframe.
"""

import logging
import sys
import socket
import time
from xmlrpclib import ServerProxy

import Cobalt.Logging
from Cobalt.Data import Data, DataDict
from Cobalt.Components.base import Component, exposed, automatic, query
from Cobalt.Server import XMLRPCServer


__all__ = [
    "ServiceLocator", "PollingServiceLocator", "TimingServiceLocator",
]


class Service (Data):
49

50
    fields = Data.fields + ["tag", "name", "location"]
51

52
    def __init__ (self, spec):
53
        Data.__init__(self, spec)
54
        spec = spec.copy()
55
56
        self.name = spec.pop("name")
        self.location = spec.pop("location")
57
        self.tag = spec.get("tag", "service")
58

59
60
61
62
        self.stamp = time.time()

    def touch(self):
        self.stamp = time.time()
63
64
65


class ServiceDict (DataDict):
66

67
68
69
70
71
    item_cls = Service
    key = "name"


class ServiceLocator (Component):
72

73
    """Generic implementation of the service-location component.
74

75
76
77
78
79
80
    Methods:
    register -- register a service (exposed)
    unregister -- remove a service from the registry (exposed)
    locate -- retrieve the location of a service (exposed)
    get_services -- part of the query interface from DataSet (exposed)
    """
81

82
    name = "service-location"
83

84
85
86
87
    # A default logger for the class is placed here.
    # Assigning an instance-level logger is supported,
    # and expected in the case of multiple instances.
    logger = logging.getLogger("Cobalt.Components.ServiceLocator")
88

89
90
    def __init__ (self, *args, **kwargs):
        """Initialize a new ServiceLocator.
91

92
93
94
95
        All arguments are passed to the component constructor.
        """
        Component.__init__(self, *args, **kwargs)
        self.services = ServiceDict()
96

97
98
99
100
101
102
    def __getstate__(self):
        state = {}
        state.update(Component.__getstate__(self))
        state.update({
                'slp_version': 1})
        return state
103

104
105
106
    def __setstate__(self, state):
        Component.__setstate__(self, state)

107
108
    def register (self, service_name, location):
        """Register the availability of a service.
109

110
111
112
113
114
115
116
117
118
119
120
        Arguments:
        service_name -- name of the service to register
        location -- location of the service
        """
        try:
            service = self.services[service_name]
        except KeyError:
            service = Service(dict(name=service_name, location=location))
            self.services[service_name] = service
            self.logger.info("register(%r, %r)" % (service_name, location))
        else:
121
            service.location = location
122
123
            service.touch()
    register = exposed(register)
124

125
126
    def unregister (self, service_name):
        """Remove a service from the registry.
127

128
129
130
131
132
133
134
135
136
137
        Arguments:
        service_name -- name of the service to remove
        """
        try:
            del self.services[service_name]
        except KeyError:
            self.logger.info("unregister(%r) [not registered]" % (service_name))
        else:
            self.logger.info("unregister(%r)" % (service_name))
    unregister = exposed(unregister)
138

139
140
    def locate (self, service_name):
        """Retrieve the location for a service.
141

142
143
144
145
146
147
        Arguments:
        service_name -- name of the service to look up
        """
        try:
            service = self.services[service_name]
        except KeyError:
148
            self.logger.debug("locate(%r) [not registered]" % (service_name))
149
150
151
            return ""
        return service.location
    locate = exposed(locate)
152

153
154
155
156
157
158
159
    def get_services (self, specs):
        """Query interface "Get" method."""
        return self.services.q_get(specs)
    get_services = exposed(query(get_services))


class PollingServiceLocator (ServiceLocator):
160

161
    """ServiceLocator with active expiration.
162

163
164
165
    Methods:
    check_services -- ping services (automatic)
    """
166

167
    implementation = "polling"
168

169
    logger = logging.getLogger("Cobalt.Components.PollingServiceLocator")
170

171
172
    def check_services (self):
        """Ping each service to check its availability.
173

174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
        Unregister unresponsive services.
        """
        for service in self.services.values():
            try:
                ServerProxy(service.location).ping()
            except socket.error, e:
                self.logger.warn("unable to contact %s [%s]" % (service.name, e))
                self.unregister(service.name)
            except Exception, e:
                self.logger.error("error in %s (%s)" % (service.name, e))
                self.unregister(service.name)
    check_services = automatic(check_services)


class TimingServiceLocator (ServiceLocator):
189

190
    """ServiceLocator with passive expiration.
191

192
193
    Attributes:
    expire -- number of seconds to expire a service
194

195
196
197
    Methods:
    expire_services -- check service timestamps (automatic)
    """
198

199
    implementation = "slp"
200

201
    logger = logging.getLogger("Cobalt.Components.TimingServiceLocator")
202

203
204
    def __init__ (self, expire=180, *args, **kwargs):
        """Initialize a TimingServiceLocator.
205

206
207
        Keyword arguments:
        expire -- Number of seconds when services expire.
208

209
210
211
212
        Additional arguments are passed to ServiceLocator.
        """
        ServiceLocator.__init__(self, *args, **kwargs)
        self.expire = expire
213

214
215
    def expire_services (self):
        """Check each service timestamp.
216

217
218
219
220
221
222
223
224
        Unregister expired services.
        """
        now = time.time()
        for service in self.services.values():
            if now - service.stamp > self.expire:
                self.logger.info("%s expired" % (service.name))
                self.unregister(service.name)
    expire_services = automatic(expire_services)