Commit 7786fb66 authored by Swann Perarnau's avatar Swann Perarnau

Merge branch 'downstream-api' into 'master'

Improve the downstream API

See merge request !9
parents be059812 66e4c85d
#!/usr/bin/env python2
from __future__ import print_function
import argparse
import json
import logging
import os
import signal
import time
import uuid
import zmq
from zmq.eventloop import ioloop, zmqstream
logger = logging.getLogger('nrm-dummy-application')
class DownstreamApplication(object):
"""Implements a downstream client."""
def __init__(self):
pass
def do_signal(self, signum, frame):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
def do_shutdown(self):
update = {'type': 'application',
'event': 'exit',
'uuid': self.app_uuid,
}
self.downstream_pub.send_json(update)
ioloop.IOLoop.current().stop()
def do_downstream_receive(self, parts):
logger.info("receiving message from downstream: %r", parts)
if len(parts) != 1:
logger.error("unexpected msg length, dropping it")
return
msg = json.loads(parts[0])
if isinstance(msg, dict):
uuid = msg['uuid']
if uuid != self.app_uuid:
return
command = msg.get('command')
if command is None:
logger.error("missing command in message")
return
elif command == 'threads':
newth = msg['payload']
if newth >= 1 and newth <= self.max:
self.nt = newth
update = {'type': 'application',
'event': 'threads',
'payload': self.nt,
'uuid': self.app_uuid,
}
self.downstream_pub.send_json(update)
elif command == 'exit':
self.do_shutdown()
else:
logger.error("bad command")
return
def do_progress_report(self):
now = time.time()
seconds = now - self.last_update
ratio = float(self.nt)/float(self.max)
progress = seconds*ratio*42
update = {'type': 'application',
'event': 'progress',
'payload': progress,
'uuid': self.app_uuid,
}
self.downstream_pub.send_json(update)
self.last_update = now
def setup(self):
context = zmq.Context()
downstream_pub_socket = context.socket(zmq.PUB)
downstream_sub_socket = context.socket(zmq.SUB)
downstream_pub_param = "ipc:///tmp/nrm-downstream-in"
downstream_sub_param = "ipc:///tmp/nrm-downstream-out"
downstream_pub_socket.connect(downstream_pub_param)
downstream_sub_socket.connect(downstream_sub_param)
# we want to receive everything for now
downstream_sub_filter = ""
downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
logger.info("downstream pub socket connected to: %s",
downstream_pub_param)
logger.info("downstream sub socket connected to: %s",
downstream_sub_param)
# link sockets to events
self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
self.downstream_sub.on_recv(self.do_downstream_receive)
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
# periodic update on progress
self.progress = ioloop.PeriodicCallback(self.do_progress_report, 1000)
self.progress.start()
# retrieve our container uuid
self.container_uuid = os.environ.get('container')
if self.container_uuid is None:
logger.error("missing container uuid")
exit(1)
self.app_uuid = str(uuid.uuid4())
logger.info("client uuid: %r", self.app_uuid)
# send an hello to the demon
update = {'type': 'application',
'event': 'start',
'container': self.container_uuid,
'uuid': self.app_uuid,
'progress': True,
'threads': {'min': 1, 'cur': self.nt, 'max': self.max},
}
self.downstream_pub.send_json(update)
def main(self):
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("threads", help="starting number of threads",
type=int, default=16)
parser.add_argument("maxthreads", help="max number of threads",
type=int, default=32)
args = parser.parse_args()
# deal with logging
if args.verbose:
logger.setLevel(logging.DEBUG)
self.nt = args.threads
self.max = args.maxthreads
self.setup()
self.last_update = time.time()
ioloop.IOLoop.current().start()
if __name__ == "__main__":
ioloop.install()
logging.basicConfig(level=logging.INFO)
app = DownstreamApplication()
app.main()
#!/usr/bin/env python2
from __future__ import print_function
import nrm
import nrm.client
if __name__ == "__main__":
nrm.client.runner()
from __future__ import print_function
import logging
logger = logging.getLogger('nrm')
class Application(object):
"""Information about a downstream API user."""
thread_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'noop': 'max'},
's_ask_d': {'done': 'stable', 'noop': 'min'},
'max': {'d': 'max_ask_d'},
'min': {'i': 'min_ask_i'},
'max_ask_d': {'done': 'stable', 'noop': 'noop'},
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
def __init__(self, uuid, container, progress, threads):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
def do_thread_transition(self, event):
"""Update the thread fsm state."""
transitions = self.thread_fsm_table[self.thread_state]
if event in transitions:
self.thread_state = transitions[event]
def get_allowed_thread_requests(self):
return self.thread_fsm_table[self.thread_state].keys()
def update_threads(self, msg):
"""Update the thread tracking."""
newth = msg['payload']
curth = self.threads['cur']
if newth == curth:
self.do_thread_transition('noop')
else:
self.do_thread_transition('done')
self.threads['cur'] = newth
def update_progress(self, msg):
"""Update the progress tracking."""
assert self.progress
class ApplicationManager(object):
"""Manages the tracking of applications: users of the downstream API."""
def __init__(self):
self.applications = dict()
def register(self, msg):
"""Register a new downstream application."""
uuid = msg['uuid']
container = msg['container']
progress = msg['progress']
threads = msg['threads']
self.applications[uuid] = Application(uuid, container, progress,
threads)
def delete(self, uuid):
"""Delete an application from the register."""
del self.applications[uuid]
from __future__ import print_function
import argparse
import logging
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
logger = logging.getLogger('nrm-client')
class Client(object):
def __init__(self):
self.buf = ''
self.nt = 16
self.max = 32
self.server = None
def setup_shutdown(self):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
def get_server_message(self):
buf = self.buf
begin = 0
ret = ''
while begin < len(buf):
if buf[begin] in ['d', 'i', 'n', 'q']:
ret = buf[begin]
off = 1
else:
break
begin = begin + off
yield ret
self.buf = buf[begin:]
return
def do_receive(self, parts):
logger.info("receive stream: " + repr(parts))
if len(parts[1]) == 0:
if self.server:
# server disconnect, lets quit
self.setup_shutdown()
return
else:
self.server = parts[0]
self.buf = self.buf + parts[1]
for m in self.get_server_message():
logger.info(m)
if m == 'd':
if self.nt == 1:
ret = "min"
else:
self.nt -= 1
ret = "done (%d)" % self.nt
elif m == 'i':
if self.nt == self.max:
ret = "max"
else:
self.nt += 1
ret = "done (%d)" % self.nt
elif m == 'n':
ret = "%d" % self.nt
elif m == 'q':
ret = ''
self.setup_shutdown()
self.stream.send(self.server, zmq.SNDMORE)
self.stream.send(ret)
def do_signal(self, signum, frame):
logger.critical("received signal: " + repr(signum))
self.setup_shutdown()
def do_shutdown(self):
ioloop.IOLoop.current().stop()
def main(self):
# command line options
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("threads", help="starting number of threads",
type=int, default=16)
parser.add_argument("maxthreads", help="max number of threads",
type=int, default=32)
args = parser.parse_args()
# deal with logging
if args.verbose:
logger.setLevel(logging.DEBUG)
self.nt = args.threads
self.max = args.maxthreads
# read env variables for connection
connect_addr = "localhost"
connect_port = 1234
connect_param = "tcp://%s:%d" % (connect_addr, connect_port)
# create connection
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.connect(connect_param)
logger.info("connected to: " + connect_param)
self.stream = zmqstream.ZMQStream(socket)
self.stream.on_recv(self.do_receive)
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
ioloop.IOLoop.current().start()
def runner():
ioloop.install()
logging.basicConfig(level=logging.INFO)
client = Client()
client.main()
from __future__ import print_function
from applications import ApplicationManager
from containers import ContainerManager
from resources import ResourceManager
from functools import partial
import json
import logging
import os
import re
from resources import ResourceManager
import sensor
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
application_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'max': 'max'},
's_ask_d': {'done': 'stable', 'min': 'min'},
'max': {'d': 'max_ask_d'},
'min': {'i': 'min_ask_i'},
'max_ask_d': {'done': 'stable', 'min': 'nop'},
'min_ask_i': {'done': 'stable', 'max': 'nop'},
'nop': {}}
logger = logging.getLogger('nrm')
class Application(object):
def __init__(self, identity):
self.identity = identity
self.buf = ''
self.state = 'stable'
def append_buffer(self, msg):
self.buf = self.buf + msg
def do_transition(self, msg):
transitions = application_fsm_table[self.state]
if msg in transitions:
self.state = transitions[msg]
else:
pass
def get_allowed_requests(self):
return application_fsm_table[self.state].keys()
def get_messages(self):
buf = self.buf
begin = 0
off = 0
ret = ''
while begin < len(buf):
if buf.startswith('min', begin):
ret = 'min'
off = len(ret)
elif buf.startswith('max', begin):
ret = 'max'
off = len(ret)
elif buf.startswith('done (', begin):
n = re.split("done \((\d+)\)", buf[begin:])[1]
ret = 'done'
off = len('done ()') + len(n)
else:
m = re.match("\d+", buf[begin:])
if m:
ret = 'ok'
off = m.end()
else:
break
begin = begin + off
yield ret
self.buf = buf[begin:]
return
class Daemon(object):
def __init__(self):
self.applications = {}
self.buf = ''
self.target = 1.0
def do_application_receive(self, parts):
logger.info("receiving application stream: %r", parts)
identity = parts[0]
if len(parts[1]) == 0:
# empty frame, indicate connect/disconnect
if identity in self.applications:
logger.info("known client disconnected")
del self.applications[identity]
def do_downstream_receive(self, parts):
logger.info("receiving downstream message: %r", parts)
if len(parts) != 1:
logger.error("unexpected msg length, dropping it: %r", parts)
return
msg = json.loads(parts[0])
if isinstance(msg, dict):
msgtype = msg.get('type')
event = msg.get('event')
if msgtype is None or msgtype != 'application' or event is None:
logger.error("wrong message format: %r", msg)
return
if event == 'start':
self.application_manager.register(msg)
elif event == 'threads':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_threads(msg)
elif event == 'progress':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
elif event == 'exit':
self.application_manager.delete(msg['uuid'])
else:
logger.info("new client: " + repr(identity))
self.applications[identity] = Application(identity)
else:
if identity in self.applications:
application = self.applications[identity]
# we need to unpack the stream into application messages
# messages can be: min, max, done (%d), %d
application.append_buffer(parts[1])
for m in application.get_messages():
application.do_transition(m)
logger.info("application now in state: %s",
application.state)
logger.error("unknown event: %r", event)
return
def do_upstream_receive(self, parts):
logger.info("receiving upstream message: %r", parts)
......@@ -180,18 +128,27 @@ class Daemon(object):
def do_control(self):
total_power = self.machine_info['energy']['power']['total']
for identity, application in self.applications.iteritems():
for identity, application in \
self.application_manager.applications.iteritems():
update = {'type': 'application',
'command': 'threads',
'uuid': identity,
'event': 'threads',
}
if total_power < self.target:
if 'i' in application.get_allowed_requests():
self.downstream.send_multipart([identity, 'i'])
application.do_transition('i')
if 'i' in application.get_allowed_thread_requests():
update['payload'] = application.threads['cur'] + 1
self.downstream_pub.send_json(update)
application.do_thread_transition('i')
elif total_power > self.target:
if 'd' in application.get_allowed_requests():
self.downstream.send_multipart([identity, 'd'])
application.do_transition('d')
if 'd' in application.get_allowed_thread_requests():
update['payload'] = application.threads['cur'] - 1
self.downstream_pub.send_json(update)
application.do_thread_transition('d')
else:
pass
logger.info("application now in state: %s", application.state)
continue
logger.info("application now in state: %s",
application.thread_state)
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
......@@ -233,10 +190,9 @@ class Daemon(object):
ioloop.IOLoop.current().stop()
def main(self):
# Bind port for downstream clients
bind_port = 1234
# Bind address for downstream clients
bind_address = '*'
# PUB port for upstream clients
upstream_pub_port = 2345
# SUB port for upstream clients
......@@ -244,35 +200,43 @@ class Daemon(object):
# setup application listening socket
context = zmq.Context()
downstream_socket = context.socket(zmq.STREAM)
downstream_pub_socket = context.socket(zmq.PUB)
downstream_sub_socket = context.socket(zmq.SUB)
upstream_pub_socket = context.socket(zmq.PUB)
upstream_sub_socket = context.socket(zmq.SUB)
downstream_bind_param = "tcp://%s:%d" % (bind_address, bind_port)
downstream_pub_param = "ipc:///tmp/nrm-downstream-out"
downstream_sub_param = "ipc:///tmp/nrm-downstream-in"
upstream_pub_param = "tcp://%s:%d" % (bind_address, upstream_pub_port)
upstream_sub_param = "tcp://%s:%d" % (bind_address, upstream_sub_port)
downstream_socket.bind(downstream_bind_param)
downstream_pub_socket.bind(downstream_pub_param)
downstream_sub_socket.bind(downstream_sub_param)
downstream_sub_filter = ""
downstream_sub_socket.setsockopt(zmq.SUBSCRIBE, downstream_sub_filter)
upstream_pub_socket.bind(upstream_pub_param)
upstream_sub_socket.bind(upstream_sub_param)
upstream_sub_filter = ""
upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)
logger.info("downstream socket bound to: %s", downstream_bind_param)
logger.info("downstream pub socket bound to: %s", downstream_pub_param)
logger.info("downstream sub socket bound to: %s", downstream_sub_param)
logger.info("upstream pub socket bound to: %s", upstream_pub_param)
logger.info("upstream sub socket connected to: %s", upstream_sub_param)
# register socket triggers
self.downstream = zmqstream.ZMQStream(downstream_socket)
self.downstream.on_recv(self.do_application_receive)
self.downstream_sub = zmqstream.ZMQStream(downstream_sub_socket)
self.downstream_sub.on_recv(self.do_downstream_receive)
self.upstream_sub = zmqstream.ZMQStream(upstream_sub_socket)
self.upstream_sub.on_recv(self.do_upstream_receive)
# create a stream to let ioloop deal with blocking calls on HWM
self.upstream_pub = zmqstream.ZMQStream(upstream_pub_socket)
self.downstream_pub = zmqstream.ZMQStream(downstream_pub_socket)
# create resource and container manager
# create managers
self.resource_manager = ResourceManager()
self.container_manager = ContainerManager(self.resource_manager)
self.application_manager = ApplicationManager()
# create sensor manager and make first measurement
self.sensor = sensor.SensorManager()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment