Commit a2274c7f authored by Swann Perarnau's avatar Swann Perarnau

Merge branch 'c_api_for_merge' into 'master'

Allow applications to transmit context information to NRM (daemon)

See merge request !17
parents c61e07ff 1c4e2816
Pipeline #3650 canceled with stages
......@@ -23,6 +23,13 @@
"value": {
"enabled": "1"
}
},
{
"name": "argo/powerpolicy",
"value": {
"enabled": "0",
"policy": "NONE"
}
}
]
}
......
/* Filename: downstream.c
*
* Description: This file contains the implementation of downstream API to
* transmit application context information to NRM.
*
* The application context information transmitted is used to monitor
* application progress and/or invoke power policies to improve energy
* efficiency at the node level.
*/
#include "downstream_api.h"
#include<zmq.h>
#include<stdio.h>
#include<unistd.h>
#include<string.h>
#include<stdlib.h>
#include<assert.h>
int nrm_init(struct nrm_context *ctxt, const char *uuid)
{
assert(ctxt != NULL);
assert(uuid != NULL);
const char *uri = getenv(NRM_ENV_URI);
if(uri == NULL)
uri = NRM_DEFAULT_URI;
ctxt->container_uuid = getenv("ARGO_CONTAINER_UUID");
assert(ctxt->container_uuid != NULL);
ctxt->app_uuid = (char *)uuid;
ctxt->context = zmq_ctx_new();
ctxt->socket = zmq_socket(ctxt->context, ZMQ_PUB);
int err = zmq_connect(ctxt->socket, uri);
assert(err == 0);
char buf[512];
snprintf(buf, 512, NRM_START_FORMAT, ctxt->container_uuid, ctxt->app_uuid);
sleep(1);
err = zmq_send(ctxt->socket, buf, strnlen(buf, 512), 0);
assert(err > 0);
assert(!clock_gettime(CLOCK_REALTIME, &ctxt->time));
ctxt->acc = 0;
return 0;
}
int nrm_fini(struct nrm_context *ctxt)
{
assert(ctxt != NULL);
char buf[512];
snprintf(buf, 512, NRM_EXIT_FORMAT, ctxt->app_uuid);
int err = zmq_send(ctxt->socket, buf, strnlen(buf, 512), 0);
assert(err > 0);
zmq_close(ctxt->socket);
zmq_ctx_destroy(ctxt->context);
return 0;
}
int nrm_send_progress(struct nrm_context *ctxt, unsigned long progress)
{
char buf[512];
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
long long int timediff = (now.tv_nsec - ctxt->time.tv_nsec) +
1e9* (now.tv_sec - ctxt->time.tv_sec);
ctxt->acc += progress;
if(timediff > NRM_RATELIMIT_THRESHOLD)
{
snprintf(buf, 512, NRM_PROGRESS_FORMAT, ctxt->acc, ctxt->app_uuid);
int err = zmq_send(ctxt->socket, buf, strnlen(buf, 512), 0);
assert(err > 0);
ctxt->acc = 0;
}
ctxt->time = now;
return 0;
}
int nrm_invoke_power_policy(struct nrm_context *ctxt, int cpu, double
startCompute, double endCompute, double startBarrier, double
endBarrier)
{
char buf[512];
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
long long int timediff = (now.tv_nsec - ctxt->time.tv_nsec) +
1e9* (now.tv_sec - ctxt->time.tv_sec);
if(timediff > NRM_RATELIMIT_THRESHOLD)
{
snprintf(buf, 512, NRM_POWER_POLICY_FORMAT, cpu, startCompute,
endCompute, startBarrier, endBarrier, ctxt->app_uuid);
int err = zmq_send(ctxt->socket, buf, strnlen(buf, 512), 0);
assert(err > 0);
}
ctxt->time = now;
return 0;
}
/* Filename: downstream_api.h
*
* Includes required headers, functions and parameters used by NRM downstream
* interface
*
*/
#ifndef NRM_H
#define NRM_H 1
#include<time.h>
/* min time in nsec between messages: necessary for rate-limiting progress
* report. For now, 10ms is the threashold. */
#define NRM_RATELIMIT_THRESHOLD (10000000LL)
struct nrm_context {
void *context;
void *socket;
char *container_uuid;
char *app_uuid;
struct timespec time;
unsigned long acc;
};
#define NRM_DEFAULT_URI "ipc:///tmp/nrm-downstream-in"
#define NRM_ENV_URI "ARGO_NRM_DOWNSTREAM_IN_URI"
#define NRM_START_FORMAT "{\"type\":\"application\", \"event\":\"start\", \"container\": \"%s\", \"uuid\": \"%s\", \"progress\": true, \"threads\": null}"
#define NRM_PROGRESS_FORMAT "{\"type\":\"application\", \"event\":\"progress\", \"payload\": \"%lu\", \"uuid\": \"%s\"}"
#define NRM_POWER_POLICY_FORMAT "{\"type\":\"application\", \"event\":\"power_policy\", \"cpu\": \"%d\", \"startcompute\": \"%lf\", \"endcompute\": \"%lf\", \"startbarrier\": \"%lf\", \"endbarrier\": \"%lf\", \"uuid\": \"%s\"}"
#define NRM_EXIT_FORMAT "{\"type\":\"application\", \"event\":\"exit\", \"uuid\": \"%s\"}"
int nrm_init(struct nrm_context *, const char *);
int nrm_fini(struct nrm_context *);
int nrm_send_progress(struct nrm_context *, unsigned long);
int nrm_invoke_power_policy(struct nrm_context *ctxt, int cpu, double
startCompute, double endCompute, double startBarrier, double
endBarrier);
#endif
......@@ -149,13 +149,44 @@ class PerfWrapper(SpecField):
return True
class PowerPolicy(SpecField):
"""Information on whether to use power policy for a container."""
policies = ['NONE', 'DDCM', 'DVFS', 'COMBINED']
fields = {"enabled": spec(unicode, False),
"policy": spec(unicode, False)
}
def __init__(self):
"""Create empty perf wrapper."""
pass
def load(self, data):
"""Load perf wrapper information."""
ret = super(PowerPolicy, self).load(data)
if not ret:
return ret
if self.enabled not in ["0", "False", "1", "True"]:
logger.error("Invalid value of powerpolicy enabled: %s",
self.enabled)
return False
if self.policy not in self.policies:
logger.error("Invalid value of powerpolicy policy: %s",
self.policy)
return False
return True
class IsolatorList(SpecField):
"""Represent the list of isolator in a Manifest."""
types = {"argo/scheduler": spec(Scheduler, False),
"argo/container": spec(Container, True),
"argo/perfwrapper": spec(PerfWrapper, False)
"argo/perfwrapper": spec(PerfWrapper, False),
"argo/powerpolicy": spec(PowerPolicy, False)
}
def __init__(self):
......
......@@ -45,6 +45,11 @@ class Daemon(object):
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
elif event == 'power_policy':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
# TODO: Invoke appropriate power policy
elif event == 'exit':
self.application_manager.delete(msg['uuid'])
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment