Commit d7f57ef8 authored by Swann Perarnau's avatar Swann Perarnau

Merge branch 'process-management' into 'master'

Improve process management

See merge request !30
parents fd393d12 5b550e0b
Pipeline #4602 passed with stages
in 55 seconds
......@@ -6,9 +6,13 @@ import logging
import signal
import os
import nrm.messaging
import uuid
import sys
import collections
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm')
KillArgs = collections.namedtuple("Kill", ["uuid"])
class CommandLineInterface(object):
......@@ -18,7 +22,13 @@ class CommandLineInterface(object):
def __init__(self):
pass
def do_signal(self, signum, stackframe):
def do_signal(self, uuid, signum, stackframe):
if uuid:
logger.info("received signal %d, killing the application..",
signum)
self.do_kill(KillArgs(uuid))
logger.info("killed the application.", signum)
else:
logger.info("received signal %d, exiting", signum)
exit(1)
......@@ -29,10 +39,28 @@ class CommandLineInterface(object):
self.client = nrm.messaging.UpstreamRPCClient(upstream_client_param)
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
def handler(signum, frame):
self.do_signal(None, signum, frame)
signal.signal(signal.SIGINT, handler)
self.client.wait_connected()
def do_listen(self, argv):
""" Connect to the NRM and listen for pub/sub messages."""
upstream_pub_port = 2345
upstream_pub_param = "tcp://localhost:%d" % (upstream_pub_port)
self.pub_client = nrm.messaging.UpstreamPubClient(upstream_pub_param)
self.pub_client.wait_connected()
while(True):
msg = self.pub_client.recvmsg()
if argv.uuid:
uuid = getattr(msg, 'container_uuid', None)
if argv.container == uuid:
logger.info("pub message", msg)
else:
logger.info("pub message", msg)
def do_run(self, argv):
""" Connect to the NRM and ask to spawn a container and run a command
in it.
......@@ -43,13 +71,14 @@ class CommandLineInterface(object):
# the command a container uuid as a way to make sure that we can make
# the command idempotent.
environ = os.environ
container_uuid = argv.ucontainername or str(uuid.uuid4())
command = {'api': 'up_rpc_req',
'type': 'run',
'manifest': argv.manifest,
'path': argv.command,
'args': argv.args,
'environ': dict(environ),
'container_uuid': str(argv.ucontainername),
'container_uuid': container_uuid,
}
msg = RPC_MSG['run'](**command)
# command fsm
......@@ -58,28 +87,23 @@ class CommandLineInterface(object):
erreof = False
exitmsg = None
self.client.sendmsg(msg)
while(True):
# the first message tells us if we started a container or not
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type in ['start', 'stdout', 'stderr', 'exit',
'process_start', 'process_exit']
assert msg.type == 'process_start'
def handler(signum, frame):
self.do_signal(msg.container_uuid, signum, frame)
signal.signal(signal.SIGINT, handler)
if msg.type == 'start':
if state == 'init':
state = 'started'
logger.info("container started: %r", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg.type == 'process_start':
if state == 'init':
state = 'started'
logger.info("process started in existing "
"container: %r""", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg.type == 'stdout':
while(True):
msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep'
assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
if msg.type == 'stdout':
logger.info("container msg: %r", msg)
if msg.payload == 'eof':
outeof = True
......@@ -89,16 +113,14 @@ class CommandLineInterface(object):
erreof = True
elif msg.type == 'process_exit':
logger.info("process ended: %r", msg)
break
elif msg.type == 'exit':
if state == 'started':
state = 'exiting'
exitmsg = msg
else:
logger.info("unexpected exit message: %r", msg)
logger.error("unexpected message: %r", msg)
if outeof and erreof and state == 'exiting':
state = 'exit'
logger.info("container ended: %r", exitmsg)
logger.info("command ended: %r", exitmsg)
sys.exit(int(exitmsg.status))
break
def do_list(self, argv):
......@@ -182,6 +204,13 @@ class CommandLineInterface(object):
parser_list = subparsers.add_parser("list")
parser_list.set_defaults(func=self.do_list)
# listen
parser_listen = subparsers.add_parser("listen")
parser_listen.add_argument("-u", "--uuid",
help="container uuid to listen for",
default=None)
parser_listen.set_defaults(func=self.do_listen)
# setpowerlimit
parser_setpower = subparsers.add_parser("setpower")
parser_setpower.add_argument("-f", "--follow",
......
......@@ -162,12 +162,13 @@ class ContainerManager(object):
if uuid in self.containers:
c = self.containers[uuid]
logger.debug("killing %r:", c)
for p in c.processes.values():
try:
c.process.proc.terminate()
p.terminate()
except OSError:
pass
logging.error("OS error: could not terminate process.")
def list(self):
"""List the containers in the system."""
return [{'uuid': c.uuid, 'pid': c.process.pid} for c in
self.containers.values()]
return [{'uuid': c.uuid, 'pid': c.processes.keys()}
for c in self.containers.values()]
......@@ -91,7 +91,13 @@ class Controller(object):
def planify(self, target, machineinfo):
"""Plan the next action for the control loop."""
try:
total_power = machineinfo['energy']['power']['total']
except TypeError:
logging.error("\"machineinfo\" malformed. Can not run "
"control loop.")
return (None, None)
direction = None
if total_power < target:
direction = 'i'
......
......@@ -88,7 +88,7 @@ class Daemon(object):
}
pid, container = self.container_manager.create(params)
container_uuid = container.uuid
if len(container.processes.keys()) == 1:
if len(container.processes) == 1:
if container.power['policy']:
container.power['manager'] = PowerPolicyManager(
container.resources['cpus'],
......@@ -99,37 +99,30 @@ class Daemon(object):
p = container.power['profile']
p['start'] = self.machine_info['energy']['energy']
p['start']['time'] = self.machine_info['time']
update = {'api': 'up_rpc_rep',
'type': 'start',
update = {'api': 'up_pub',
'type': 'container_start',
'container_uuid': container_uuid,
'errno': 0 if container else -1,
'pid': pid,
'power': container.power['policy'] or dict()
}
self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update),
client)
# setup io callbacks
outcb = partial(self.do_children_io, client,
container_uuid, 'stdout')
errcb = partial(self.do_children_io, client,
container_uuid, 'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
else:
self.upstream_pub_server.sendmsg(
PUB_MSG['container_start'](**update))
# now deal with the process itself
update = {'api': 'up_rpc_rep',
'type': 'process_start',
'container_uuid': container_uuid,
'pid': pid,
}
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_start'](**update), client)
# setup io callbacks
outcb = partial(self.do_children_io, client,
container_uuid, 'stdout')
errcb = partial(self.do_children_io, client,
container_uuid, 'stderr')
outcb = partial(self.do_children_io, client, container_uuid,
'stdout')
errcb = partial(self.do_children_io, client, container_uuid,
'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
elif msg.type == 'kill':
logger.info("asked to kill container: %r", msg)
response = self.container_manager.kill(msg.container_uuid)
......@@ -161,7 +154,12 @@ class Daemon(object):
def do_sensor(self):
self.machine_info = self.sensor_manager.do_update()
logger.info("current state: %r", self.machine_info)
try:
total_power = self.machine_info['energy']['power']['total']
except TypeError:
logger.error("power sensor format malformed, "
"can not report power upstream.")
else:
msg = {'api': 'up_pub',
'type': 'power',
'total': total_power,
......@@ -205,16 +203,30 @@ class Daemon(object):
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
container = self.container_manager.pids[pid]
clientid = container.clientids[pid]
remaining_pids = [p for p in container.processes.keys()
if p != pid]
# first, send a process_exit
msg = {'api': 'up_rpc_rep',
'type': 'process_exit',
'status': str(status),
'container_uuid': container.uuid,
}
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_exit'](**msg), clientid)
# Remove the pid of process that is finished
container.processes.pop(pid, None)
self.container_manager.pids.pop(pid, None)
logger.info("Process %s in Container %s has finised.",
pid, container.uuid)
if not remaining_pids:
msg['type'] = 'exit'
msg['profile_data'] = dict()
# if this is the last process in the container,
# kill everything
if len(container.processes) == 0:
# deal with container exit
msg = {'api': 'up_pub',
'type': 'container_exit',
'container_uuid': container.uuid,
'profile_data': dict(),
}
pp = container.power
if pp['policy']:
pp['manager'].reset_all()
......@@ -233,18 +245,8 @@ class Daemon(object):
container.uuid, diff)
msg['profile_data'] = diff
self.container_manager.delete(container.uuid)
self.upstream_rpc_server.sendmsg(
RPC_MSG['exit'](**msg), clientid)
else:
msg['type'] = 'process_exit'
# Remove the pid of process that is finished
container.processes.pop(pid, None)
self.container_manager.pids.pop(pid, None)
logger.info("Process %s in Container %s has finised.",
pid, container.uuid)
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_exit'](**msg), clientid)
self.upstream_pub_server.sendmsg(
PUB_MSG['container_exit'](**msg))
else:
logger.debug("child update ignored")
pass
......
......@@ -29,24 +29,24 @@ MSGFORMATS['up_rpc_req'] = {'list': {},
'kill': {'container_uuid': basestring},
'setpower': {'limit': basestring},
}
MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring,
'errno': int,
'pid': int,
'power': dict},
'list': {'payload': list},
MSGFORMATS['up_rpc_rep'] = {'list': {'payload': list},
'stdout': {'container_uuid': basestring,
'payload': basestring},
'stderr': {'container_uuid': basestring,
'payload': basestring},
'exit': {'container_uuid': basestring,
'status': basestring,
'profile_data': dict},
'process_start': {'container_uuid': basestring},
'process_start': {'container_uuid': basestring,
'pid': int},
'process_exit': {'container_uuid': basestring,
'status': basestring},
'getpower': {'limit': basestring},
}
MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float}}
MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float},
'container_start': {'container_uuid': basestring,
'errno': int,
'power': dict},
'container_exit': {'container_uuid': basestring,
'profile_data': dict},
}
# Mirror of the message formats, using namedtuples as the actual transport
# for users of this messaging layer.
......@@ -171,9 +171,55 @@ class UpstreamPubServer(object):
self.address = address
self.zmq_context = zmq.Context()
self.socket = self.zmq_context.socket(zmq.PUB)
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.bind(address)
def sendmsg(self, msg):
"""Sends a message."""
logger.debug("sending message: %r", msg)
self.socket.send(msg2wire(msg))
class UpstreamPubClient(object):
"""Implements the message layer client to the upstream Pub API."""
def __init__(self, address):
self.address = address
self.zmq_context = zmq.Context()
self.socket = self.zmq_context.socket(zmq.SUB)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.connect(address)
def wait_connected(self):
"""Creates a monitor socket and wait for the connect event."""
monitor = self.socket.get_monitor_socket()
while True:
msg = zmq.utils.monitor.recv_monitor_message(monitor)
logger.debug("monitor message: %r", msg)
if int(msg['event']) == zmq.EVENT_CONNECTED:
logger.debug("socket connected")
break
self.socket.disable_monitor()
def recvmsg(self):
"""Receives a message and returns it."""
frames = self.socket.recv_multipart()
logger.debug("received message: %r", frames)
assert len(frames) == 1
return wire2msg(frames[0])
def do_recv_callback(self, frames):
"""Receives a message from zmqstream.on_recv, passing it to a user
callback."""
logger.info("receiving message: %r", frames)
assert len(frames) == 1
msg = wire2msg(frames[0])
assert self.callback
self.callback(msg)
def setup_recv_callback(self, callback):
"""Setup a ioloop-backed callback for receiving messages."""
self.stream = zmqstream.ZMQStream(self.socket)
self.callback = callback
self.stream.on_recv(self.do_recv_callback)
......@@ -22,6 +22,12 @@ def upstream_pub_server():
return nrm.messaging.UpstreamPubServer("ipc:///tmp/nrm-pytest-pub")
@pytest.fixture
def upstream_pub_client():
"""Fixture for a server handle on the upstream PUB API"""
return nrm.messaging.UpstreamPubClient("ipc:///tmp/nrm-pytest-pub")
@pytest.fixture
def dummy_msg():
"""Fixture for a dummy valid message."""
......@@ -73,3 +79,13 @@ def test_rpc_server_callback(upstream_rpc_client, upstream_rpc_server,
def test_pub_server_send(upstream_pub_server, dummy_msg):
upstream_pub_server.sendmsg(dummy_msg)
def test_pub_connection(upstream_pub_client, upstream_pub_server):
upstream_pub_client.wait_connected()
def test_pub_client_recv(upstream_pub_server, upstream_pub_client, dummy_msg):
upstream_pub_server.sendmsg(dummy_msg)
msg = upstream_pub_client.recvmsg()
assert msg == dummy_msg
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment