diff --git a/nrm/messaging.py b/nrm/messaging.py index 8a266bfc6ca4f8d4e912411d0b3f81a145fb1bf9..6ff8e0724932327ccab0e3e766e8661f91e04ef4 100644 --- a/nrm/messaging.py +++ b/nrm/messaging.py @@ -18,7 +18,7 @@ logger = logging.getLogger('nrm') # list of APIs supported by this messaging layer. Each message is # indexed by its intended api user and the type of the message, along with # basic field type information. -APIS = ['up_rpc_req', 'up_rpc_rep', 'up_pub'] +APIS = ['up_rpc_req', 'up_rpc_rep', 'up_pub', 'down_event'] MSGFORMATS = {k: {} for k in APIS} MSGFORMATS['up_rpc_req'] = {'list': {}, 'run': {'manifest': basestring, @@ -47,6 +47,13 @@ MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float}, 'container_exit': {'container_uuid': basestring, 'profile_data': dict}, } +MSGFORMATS['down_event'] = {'application_start': + {'container_uuid': basestring, + 'application_uuid': basestring}, + 'application_exit': + {'application_uuid': basestring}, + 'progress': {'payload': int}, + } # Mirror of the message formats, using namedtuples as the actual transport # for users of this messaging layer. @@ -223,3 +230,20 @@ class UpstreamPubClient(object): self.stream = zmqstream.ZMQStream(self.socket) self.callback = callback self.stream.on_recv(self.do_recv_callback) + + +class DownstreamEventServer(UpstreamRPCServer): + + """Implements the message layer server for the downstream event API.""" + + def sendmsg(self, msg, client_uuid): + assert False, "Cannot send message from this side of the event stream." + + +class DownstreamEventClient(UpstreamRPCClient): + + """Implements the message layer client for the downstream event API.""" + + def recvmsg(self): + assert False, \ + "Cannot receive messages from this side of the event stream." diff --git a/test/test_messaging.py b/test/test_messaging.py index 7df4479ef1aa0ae0c0ab391447cc19d5754e83bd..8f3384707ad78e21427f402212844924ede354cd 100644 --- a/test/test_messaging.py +++ b/test/test_messaging.py @@ -28,6 +28,18 @@ def upstream_pub_client(): return nrm.messaging.UpstreamPubClient("ipc:///tmp/nrm-pytest-pub") +@pytest.fixture +def downstream_event_server(): + """Fixture for a server handle on the downstream event API""" + return nrm.messaging.DownstreamEventServer("ipc:///tmp/nrm-pytest-down") + + +@pytest.fixture +def downstream_event_client(): + """Fixture for a client handle on the downstream event API""" + return nrm.messaging.DownstreamEventClient("ipc:///tmp/nrm-pytest-down") + + @pytest.fixture def dummy_msg(): """Fixture for a dummy valid message.""" @@ -89,3 +101,26 @@ def test_pub_client_recv(upstream_pub_server, upstream_pub_client, dummy_msg): upstream_pub_server.sendmsg(dummy_msg) msg = upstream_pub_client.recvmsg() assert msg == dummy_msg + + +def test_down_connection(downstream_event_client, downstream_event_server): + downstream_event_client.wait_connected() + + +def test_down_event_send_recv(downstream_event_client, downstream_event_server, + dummy_msg): + downstream_event_client.sendmsg(dummy_msg) + msg, client = downstream_event_server.recvmsg() + assert msg == dummy_msg + assert client == downstream_event_client.uuid + + +def test_down_event_server_callback(downstream_event_client, + downstream_event_server, + dummy_msg, dummy_daemon): + downstream_event_server.setup_recv_callback(dummy_daemon.callback) + frames = [downstream_event_client.uuid, nrm.messaging.msg2wire(dummy_msg)] + downstream_event_server.do_recv_callback(frames) + assert dummy_daemon.called + assert dummy_daemon.msg == dummy_msg + assert dummy_daemon.client == downstream_event_client.uuid