Commit b83309be authored by Valentin Reis's avatar Valentin Reis

Merge branch 'process-management' into powerlevel

parents f2638563 5ced1945
Pipeline #4375 failed with stages
in 2 minutes and 22 seconds
...@@ -6,6 +6,7 @@ import logging ...@@ -6,6 +6,7 @@ import logging
import signal import signal
import os import os
import nrm.messaging import nrm.messaging
import uuid
RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req'] RPC_MSG = nrm.messaging.MSGTYPES['up_rpc_req']
logger = logging.getLogger('nrm') logger = logging.getLogger('nrm')
...@@ -49,7 +50,7 @@ class CommandLineInterface(object): ...@@ -49,7 +50,7 @@ class CommandLineInterface(object):
'path': argv.command, 'path': argv.command,
'args': argv.args, 'args': argv.args,
'environ': dict(environ), 'environ': dict(environ),
'container_uuid': str(argv.ucontainername), 'container_uuid': argv.ucontainername or str(uuid.uuid4()),
} }
msg = RPC_MSG['run'](**command) msg = RPC_MSG['run'](**command)
# command fsm # command fsm
...@@ -58,28 +59,23 @@ class CommandLineInterface(object): ...@@ -58,28 +59,23 @@ class CommandLineInterface(object):
erreof = False erreof = False
exitmsg = None exitmsg = None
self.client.sendmsg(msg) self.client.sendmsg(msg)
while(True):
# the first message tells us if we started a container or not
msg = self.client.recvmsg() msg = self.client.recvmsg()
assert msg.api == 'up_rpc_rep' assert msg.api == 'up_rpc_rep'
assert msg.type in ['start', 'stdout', 'stderr', 'exit', assert msg.type in ['start', 'process_start']
'process_start', 'process_exit'] new_container = False
if msg.type == 'start': if msg.type == 'start':
if state == 'init': new_container = True
state = 'started' msg = self.client.recvmsg()
logger.info("container started: %r", msg) assert msg.type == 'process_start'
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg.type == 'process_start':
if state == 'init':
state = 'started' state = 'started'
logger.info("process started in existing " while(True):
"container: %r""", msg) msg = self.client.recvmsg()
else: assert msg.api == 'up_rpc_rep'
logger.info("unexpected start message: %r", state) assert msg.type in ['stdout', 'stderr', 'exit', 'process_exit']
exit(1)
elif msg.type == 'stdout': if msg.type == 'stdout':
logger.info("container msg: %r", msg) logger.info("container msg: %r", msg)
if msg.payload == 'eof': if msg.payload == 'eof':
outeof = True outeof = True
...@@ -89,7 +85,8 @@ class CommandLineInterface(object): ...@@ -89,7 +85,8 @@ class CommandLineInterface(object):
erreof = True erreof = True
elif msg.type == 'process_exit': elif msg.type == 'process_exit':
logger.info("process ended: %r", msg) logger.info("process ended: %r", msg)
break if not new_container:
state = 'exiting'
elif msg.type == 'exit': elif msg.type == 'exit':
if state == 'started': if state == 'started':
state = 'exiting' state = 'exiting'
......
/* Filename: downstream.c
*
* Description: This file contains the implementation of downstream API to
* transmit application context information to NRM.
*
* The application context information transmitted can be used to monitor
* application progress and/or invoke power policies to improve energy
* efficiency at the node level.
*/
#include "downstream_api.h"
#include<zmq.h>
#include<stdio.h>
#include<unistd.h>
#include<string.h>
#include<stdlib.h>
#include<assert.h>
int nrm_init(struct nrm_context *ctxt, const char *uuid)
{
assert(ctxt != NULL);
assert(uuid != NULL);
const char *uri = getenv(NRM_ENV_URI);
if(uri == NULL)
uri = NRM_DEFAULT_URI;
ctxt->container_uuid = getenv("ARGO_CONTAINER_UUID");
assert(ctxt->container_uuid != NULL);
ctxt->app_uuid = (char *)uuid;
ctxt->context = zmq_ctx_new();
ctxt->socket = zmq_socket(ctxt->context, ZMQ_PUB);
int err = zmq_connect(ctxt->socket, uri);
assert(err == 0);
char buf[512];
snprintf(buf, 512, NRM_START_FORMAT, ctxt->container_uuid, ctxt->app_uuid);
sleep(1);
err = zmq_send(ctxt->socket, buf, strnlen(buf, 512), 0);
assert(err > 0);
assert(!clock_gettime(CLOCK_REALTIME, &ctxt->time));
ctxt->acc = 0;
return 0;
}
int nrm_fini(struct nrm_context *ctxt)
{
assert(ctxt != NULL);
char buf[512];
snprintf(buf, 512, NRM_EXIT_FORMAT, ctxt->app_uuid);
int err = zmq_send(ctxt->socket, buf, strnlen(buf, 512), 0);
assert(err > 0);
zmq_close(ctxt->socket);
zmq_ctx_destroy(ctxt->context);
return 0;
}
int nrm_send_progress(struct nrm_context *ctxt, unsigned long progress)
{
char buf[512];
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
long long int timediff = (now.tv_nsec - ctxt->time.tv_nsec) +
1e9* (now.tv_sec - ctxt->time.tv_sec);
ctxt->acc += progress;
if(timediff > NRM_RATELIMIT_THRESHOLD)
{
snprintf(buf, 512, NRM_PROGRESS_FORMAT, ctxt->acc, ctxt->app_uuid);
int err = zmq_send(ctxt->socket, buf, strnlen(buf, 512), 0);
assert(err > 0);
ctxt->acc = 0;
}
ctxt->time = now;
return 0;
}
int nrm_send_phase_context(struct nrm_context *ctxt, int cpu, unsigned long
long int startCompute, unsigned long long int endCompute, unsigned
long long int startBarrier, unsigned long long int endBarrier)
{
char buf[512];
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
long long int timediff = (now.tv_nsec - ctxt->time.tv_nsec) +
1e9* (now.tv_sec - ctxt->time.tv_sec);
if(timediff > NRM_RATELIMIT_THRESHOLD)
{
snprintf(buf, 512, NRM_PHASE_CONTEXT_FORMAT, cpu, startCompute,
endCompute, startBarrier, endBarrier, ctxt->app_uuid);
int err = zmq_send(ctxt->socket, buf, strnlen(buf, 512), 0);
assert(err > 0);
}
ctxt->time = now;
return 0;
}
/* Filename: downstream_api.h
*
* Includes required headers, functions and parameters used by NRM downstream
* interface
*
*/
#ifndef NRM_H
#define NRM_H 1
#include<time.h>
/* min time in nsec between messages: necessary for rate-limiting progress
* report. For now, 10ms is the threashold. */
#define NRM_RATELIMIT_THRESHOLD (10000000LL)
struct nrm_context {
void *context;
void *socket;
char *container_uuid;
char *app_uuid;
struct timespec time;
unsigned long acc;
};
#define NRM_DEFAULT_URI "ipc:///tmp/nrm-downstream-in"
#define NRM_ENV_URI "ARGO_NRM_DOWNSTREAM_IN_URI"
#define NRM_START_FORMAT "{\"type\":\"application\", \"event\":\"start\", \"container\": \"%s\", \"uuid\": \"%s\", \"progress\": true, \"threads\": null}"
#define NRM_PROGRESS_FORMAT "{\"type\":\"application\", \"event\":\"progress\", \"payload\": \"%lu\", \"uuid\": \"%s\"}"
#define NRM_PHASE_CONTEXT_FORMAT "{\"type\":\"application\", \"event\":\"phase_context\", \"cpu\": \"%d\", \"startcompute\": \"%llu\", \"endcompute\": \"%llu\", \"startbarrier\": \"%llu\", \"endbarrier\": \"%llu\", \"uuid\": \"%s\"}"
#define NRM_EXIT_FORMAT "{\"type\":\"application\", \"event\":\"exit\", \"uuid\": \"%s\"}"
int nrm_init(struct nrm_context *, const char *);
int nrm_fini(struct nrm_context *);
int nrm_send_progress(struct nrm_context *, unsigned long);
int nrm_send_phase_context(struct nrm_context *ctxt, int cpu, unsigned long
long int startCompute, unsigned long long int endCompute, unsigned
long long int startBarrier, unsigned long long int endBarrier);
#endif
...@@ -27,6 +27,7 @@ class Daemon(object): ...@@ -27,6 +27,7 @@ class Daemon(object):
def __init__(self, config): def __init__(self, config):
self.target = 100.0 self.target = 100.0
self.config = config self.config = config
self.container_owner = dict()
def do_downstream_receive(self, parts): def do_downstream_receive(self, parts):
logger.info("receiving downstream message: %r", parts) logger.info("receiving downstream message: %r", parts)
...@@ -105,33 +106,27 @@ class Daemon(object): ...@@ -105,33 +106,27 @@ class Daemon(object):
'type': 'start', 'type': 'start',
'container_uuid': container_uuid, 'container_uuid': container_uuid,
'errno': 0 if container else -1, 'errno': 0 if container else -1,
'pid': pid,
'power': container.power['policy'] or dict() 'power': container.power['policy'] or dict()
} }
self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update), self.upstream_rpc_server.sendmsg(RPC_MSG['start'](**update),
client) client)
# setup io callbacks self.container_owner[container.uuid] = client
outcb = partial(self.do_children_io, client,
container_uuid, 'stdout') # now deal with the process itself
errcb = partial(self.do_children_io, client,
container_uuid, 'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
else:
update = {'api': 'up_rpc_rep', update = {'api': 'up_rpc_rep',
'type': 'process_start', 'type': 'process_start',
'container_uuid': container_uuid, 'container_uuid': container_uuid,
'pid': pid,
} }
self.upstream_rpc_server.sendmsg( self.upstream_rpc_server.sendmsg(
RPC_MSG['process_start'](**update), client) RPC_MSG['process_start'](**update), client)
# setup io callbacks # setup io callbacks
outcb = partial(self.do_children_io, client, outcb = partial(self.do_children_io, client, container_uuid,
container_uuid, 'stdout') 'stdout')
errcb = partial(self.do_children_io, client, errcb = partial(self.do_children_io, client, container_uuid,
container_uuid, 'stderr') 'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb) container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb) container.processes[pid].stderr.read_until_close(errcb, errcb)
elif msg.type == 'kill': elif msg.type == 'kill':
logger.info("asked to kill container: %r", msg) logger.info("asked to kill container: %r", msg)
response = self.container_manager.kill(msg.container_uuid) response = self.container_manager.kill(msg.container_uuid)
...@@ -207,16 +202,30 @@ class Daemon(object): ...@@ -207,16 +202,30 @@ class Daemon(object):
if os.WIFEXITED(status) or os.WIFSIGNALED(status): if os.WIFEXITED(status) or os.WIFSIGNALED(status):
container = self.container_manager.pids[pid] container = self.container_manager.pids[pid]
clientid = container.clientids[pid] clientid = container.clientids[pid]
remaining_pids = [p for p in container.processes.keys()
if p != pid] # first, send a process_exit
msg = {'api': 'up_rpc_rep', msg = {'api': 'up_rpc_rep',
'type': 'process_exit',
'status': str(status), 'status': str(status),
'container_uuid': container.uuid, 'container_uuid': container.uuid,
} }
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_exit'](**msg), clientid)
# Remove the pid of process that is finished
container.processes.pop(pid, None)
self.container_manager.pids.pop(pid, None)
logger.info("Process %s in Container %s has finised.",
pid, container.uuid)
if not remaining_pids: # if this process was owner of the container,
msg['type'] = 'exit' # kill everything
msg['profile_data'] = dict() if self.container_owner[container.uuid] == clientid:
# deal with container exit
msg = {'api': 'up_rpc_rep',
'type': 'exit',
'container_uuid': container.uuid,
'profile_data': dict(),
}
pp = container.power pp = container.power
if pp['policy']: if pp['policy']:
pp['manager'].reset_all() pp['manager'].reset_all()
...@@ -237,16 +246,7 @@ class Daemon(object): ...@@ -237,16 +246,7 @@ class Daemon(object):
self.container_manager.delete(container.uuid) self.container_manager.delete(container.uuid)
self.upstream_rpc_server.sendmsg( self.upstream_rpc_server.sendmsg(
RPC_MSG['exit'](**msg), clientid) RPC_MSG['exit'](**msg), clientid)
else: del self.container_owner[container.uuid]
msg['type'] = 'process_exit'
# Remove the pid of process that is finished
container.processes.pop(pid, None)
self.container_manager.pids.pop(pid, None)
logger.info("Process %s in Container %s has finised.",
pid, container.uuid)
self.upstream_rpc_server.sendmsg(
RPC_MSG['process_exit'](**msg), clientid)
else: else:
logger.debug("child update ignored") logger.debug("child update ignored")
pass pass
......
...@@ -31,7 +31,6 @@ MSGFORMATS['up_rpc_req'] = {'list': {}, ...@@ -31,7 +31,6 @@ MSGFORMATS['up_rpc_req'] = {'list': {},
} }
MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring, MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring,
'errno': int, 'errno': int,
'pid': int,
'power': dict}, 'power': dict},
'list': {'payload': list}, 'list': {'payload': list},
'stdout': {'container_uuid': basestring, 'stdout': {'container_uuid': basestring,
...@@ -39,9 +38,9 @@ MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring, ...@@ -39,9 +38,9 @@ MSGFORMATS['up_rpc_rep'] = {'start': {'container_uuid': basestring,
'stderr': {'container_uuid': basestring, 'stderr': {'container_uuid': basestring,
'payload': basestring}, 'payload': basestring},
'exit': {'container_uuid': basestring, 'exit': {'container_uuid': basestring,
'status': basestring,
'profile_data': dict}, 'profile_data': dict},
'process_start': {'container_uuid': basestring}, 'process_start': {'container_uuid': basestring,
'pid': int},
'process_exit': {'container_uuid': basestring, 'process_exit': {'container_uuid': basestring,
'status': basestring}, 'status': basestring},
'getpower': {'limit': basestring}, 'getpower': {'limit': basestring},
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment