Commit f3c53106 authored by Sridutt Bhalachandra's avatar Sridutt Bhalachandra

[fix] Aggregative downstream & new msg layer

Made necesseary fixes required to make the aggregative downstream api
integration to work with the new downstream messaging layer.

Also,fixed the case where daemon crashed when an application message
(from libnrm using pmpi) was received after container was killed

run_policy on all containers removed as the controller no longer has
application manager info

Any other refactoring and fixes required (check merge request
discussion)

See Issues #13, #20 and Merge !41
parent 402a2524
Pipeline #4961 passed with stages
in 3 minutes and 6 seconds
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
{ {
"name": "argo/perfwrapper", "name": "argo/perfwrapper",
"value": { "value": {
"enabled": "0" "enabled": "1"
} }
}, },
{ {
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
"enabled": "1", "enabled": "1",
"profile": "1", "profile": "1",
"policy": "NONE", "policy": "NONE",
"damper": "1000000000", "damper": "1e9",
"slowdown": "1.1" "slowdown": "1.1"
} }
}, },
......
...@@ -64,9 +64,10 @@ class Application(object): ...@@ -64,9 +64,10 @@ class Application(object):
def update_phase_context(self, msg): def update_phase_context(self, msg):
"""Update the phase contextual information.""" """Update the phase contextual information."""
id = int(msg['cpu']) id = int(msg.cpu)
self.phase_contexts[id] = {k: int(msg[k]) for k in ('aggregation', self.phase_contexts[id] = {k: getattr(msg, k) for k in
'computetime', 'totaltime')} ('aggregation', 'computetime',
'totaltime')}
self.phase_contexts[id]['set'] = True self.phase_contexts[id]['set'] = True
......
...@@ -29,7 +29,6 @@ class ContainerManager(object): ...@@ -29,7 +29,6 @@ class ContainerManager(object):
self.pids = dict() self.pids = dict()
self.resourcemanager = rm self.resourcemanager = rm
self.hwloc = rm.hwloc self.hwloc = rm.hwloc
self.nodeos = NodeOSClient()
self.chrt = ChrtClient() self.chrt = ChrtClient()
self.pmpi_lib = pmpi_lib self.pmpi_lib = pmpi_lib
......
...@@ -60,9 +60,13 @@ class Daemon(object): ...@@ -60,9 +60,13 @@ class Daemon(object):
uuid = msg.application_uuid uuid = msg.application_uuid
if uuid in self.application_manager.applications: if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid] app = self.application_manager.applications[uuid]
c = self.container_manager.containers[app.cid] if bool(self.container_manager.containers):
if c.power['policy']: cid = app.container_uuid
app.update_phase_context(msg) c = self.container_manager.containers[cid]
if c.power['policy']:
app.update_phase_context(msg)
# Run container policy
self.controller.run_policy_container(c, app)
elif msg.type == 'application_exit': elif msg.type == 'application_exit':
uuid = msg.application_uuid uuid = msg.application_uuid
if uuid in self.application_manager.applications: if uuid in self.application_manager.applications:
...@@ -107,7 +111,7 @@ class Daemon(object): ...@@ -107,7 +111,7 @@ class Daemon(object):
'type': 'container_start', 'type': 'container_start',
'container_uuid': container_uuid, 'container_uuid': container_uuid,
'errno': 0 if container else -1, 'errno': 0 if container else -1,
'power': container.power['policy'] or dict() 'power': container.power['policy'] or str(None)
} }
self.upstream_pub_server.sendmsg( self.upstream_pub_server.sendmsg(
PUB_MSG['container_start'](**update)) PUB_MSG['container_start'](**update))
...@@ -179,8 +183,8 @@ class Daemon(object): ...@@ -179,8 +183,8 @@ class Daemon(object):
self.controller.execute(action, actuator) self.controller.execute(action, actuator)
self.controller.update(action, actuator) self.controller.update(action, actuator)
# Call policy only if there are containers # Call policy only if there are containers
if self.container_manager.containers: # if self.container_manager.containers:
self.controller.run_policy(self.container_manager.containers) # self.controller.run_policy(self.container_manager.containers)
def do_signal(self, signum, frame): def do_signal(self, signum, frame):
if signum == signal.SIGINT: if signum == signal.SIGINT:
...@@ -249,7 +253,7 @@ class Daemon(object): ...@@ -249,7 +253,7 @@ class Daemon(object):
if p['policy']: if p['policy']:
diff['damper'] = float(p['damper'])/1000000000 diff['damper'] = float(p['damper'])/1000000000
diff['slowdown'] = p['slowdown'] diff['slowdown'] = p['slowdown']
diff['nodename'] = socket.gethostname() diff['nodename'] = self.sensor_manager.nodename
logger.info("Container %r profile data: %r", logger.info("Container %r profile data: %r",
container.uuid, diff) container.uuid, diff)
msg['profile_data'] = diff msg['profile_data'] = diff
......
...@@ -43,7 +43,7 @@ MSGFORMATS['up_rpc_rep'] = {'list': {'payload': list}, ...@@ -43,7 +43,7 @@ MSGFORMATS['up_rpc_rep'] = {'list': {'payload': list},
MSGFORMATS['up_pub'] = {'power': {'total': float, 'limit': float}, MSGFORMATS['up_pub'] = {'power': {'total': float, 'limit': float},
'container_start': {'container_uuid': basestring, 'container_start': {'container_uuid': basestring,
'errno': int, 'errno': int,
'power': dict}, 'power': basestring},
'container_exit': {'container_uuid': basestring, 'container_exit': {'container_uuid': basestring,
'profile_data': dict}, 'profile_data': dict},
'performance': {'container_uuid': basestring, 'performance': {'container_uuid': basestring,
...@@ -63,10 +63,10 @@ MSGFORMATS['down_event'] = {'application_start': ...@@ -63,10 +63,10 @@ MSGFORMATS['down_event'] = {'application_start':
'application_uuid': basestring, 'application_uuid': basestring,
'container_uuid': basestring}, 'container_uuid': basestring},
'phase_context': {'cpu': int, 'phase_context': {'cpu': int,
'startcompute': int, 'aggregation': int,
'endcompute': int, 'computetime': int,
'startbarrier': int, 'totaltime': int,
'endbarrier': int}, 'application_uuid': basestring},
} }
# Mirror of the message formats, using namedtuples as the actual transport # Mirror of the message formats, using namedtuples as the actual transport
......
...@@ -20,6 +20,7 @@ class SensorManager: ...@@ -20,6 +20,7 @@ class SensorManager:
def __init__(self): def __init__(self):
self.nodeconfig = coolr.clr_nodeinfo.nodeconfig() self.nodeconfig = coolr.clr_nodeinfo.nodeconfig()
self.nodename = self.nodeconfig.nodename
self.cputopology = coolr.clr_nodeinfo.cputopology() self.cputopology = coolr.clr_nodeinfo.cputopology()
self.coretemp = coolr.clr_hwmon.coretemp_reader() self.coretemp = coolr.clr_hwmon.coretemp_reader()
self.rapl = coolr.clr_rapl.rapl_reader() self.rapl = coolr.clr_rapl.rapl_reader()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment