Commit 062b1995 authored by Valentin Reis's avatar Valentin Reis
Browse files

WIP: progress reporting to cmd.

parent b6297037
Pipeline #4661 passed with stages
in 2 minutes and 10 seconds
......@@ -5,6 +5,8 @@ from __future__ import print_function
import argparse
import logging
import zmq
import zmq.utils
import zmq.utils.monitor
import os
import tempfile
import subprocess
......@@ -30,9 +32,10 @@ class PerfWrapper(object):
def progress_report(self, progress):
update = {'type': 'application',
'event': 'progress',
'event': 'hardwareprogress',
'payload': progress,
'uuid': self.app_uuid,
'app_uuid': self.app_uuid,
'container_uuid': self.container_uuid,
}
self.downstream_pub_socket.send_json(update)
......@@ -44,6 +47,15 @@ class PerfWrapper(object):
self.downstream_pub_socket.connect(downstream_pub_param)
monitor = self.downstream_pub_socket.get_monitor_socket()
while True:
msg = zmq.utils.monitor.recv_monitor_message(monitor)
logger.debug("monitor message: %r", msg)
if int(msg['event']) == zmq.EVENT_CONNECTED:
logger.debug("downstream pub socket connected")
break
self.downstream_pub_socket.disable_monitor()
logger.info("downstream pub socket connected to: %s",
downstream_pub_param)
......@@ -67,19 +79,40 @@ class PerfWrapper(object):
def main(self):
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("-f", "--frequency",
help="sampling frequency in ms",
type=int, default=1000)
parser.add_argument("cmd", help="command and arguments",
nargs=argparse.REMAINDER)
parser.add_argument(
'--perf',
help="Path to the linux perf tool to use. This path can be"
"relative and makes uses of the $PATH if necessary."
"Override default with the PERF environment"
"variable.",
default=os.environ.get('PERF',
'perf'))
parser.add_argument(
"--log",
help="Main log file. Override default with the NRM_LOG."
"environment variable",
default=os.environ.get('PERFWRAPPER_LOG',
'/tmp/nrm_perfwrapper.log'))
parser.add_argument(
"-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument(
"-f", "--frequency",
help="sampling frequency in ms",
type=int, default=1000)
parser.add_argument(
"cmd",
help="command and arguments",
nargs=argparse.REMAINDER)
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
print("Logging to %s" % args.log)
logger.addHandler(logging.FileHandler(args.log))
logger.info("cmd: %r", args.cmd)
self.setup()
......@@ -92,7 +125,7 @@ class PerfWrapper(object):
logger.info("fifoname: %r", fifoname)
os.mkfifo(fifoname, 0o600)
perf_tool_path = os.environ.get('PERF', 'perf')
perf_tool_path = args.perf
argv = [perf_tool_path, 'stat', '-e', 'instructions', '-x', ',',
'-I', str(args.frequency), '-o', fifoname, '--']
argv.extend(args.cmd)
......
......@@ -55,8 +55,12 @@ class Application(object):
self.do_thread_transition('done')
self.threads['cur'] = newth
def update_progress(self, msg):
"""Update the progress tracking."""
def update_progress(self, msg, time):
"""Update the instrumented progress tracking."""
assert self.progress
def update_hardwareprogress(self, msg, time):
"""Update the hardware progress tracking."""
assert self.progress
def update_phase_context(self, msg):
......
......@@ -107,7 +107,10 @@ class ContainerManager(object):
manifest_perfwrapper = manifest.app.isolators.perfwrapper
if hasattr(manifest_perfwrapper, 'enabled'):
if manifest_perfwrapper.enabled in ["1", "True"]:
argv.append(self.perfwrapper)
argv.extend([self.perfwrapper,
"--perf",
self.linuxperf,
"-v"])
if hasattr(manifest.app.isolators, 'power'):
if hasattr(manifest.app.isolators.power, 'enabled'):
......@@ -127,7 +130,6 @@ class ContainerManager(object):
# environ['PATH'] = ("/usr/local/sbin:"
# "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
environ['ARGO_CONTAINER_UUID'] = container_name
environ['PERF'] = self.linuxperf
environ['AC_APP_NAME'] = manifest.name
environ['AC_METADATA_URL'] = "localhost"
......
......@@ -7,6 +7,7 @@ from powerpolicy import PowerPolicyManager
from functools import partial
import json
import logging
import time
import os
from resources import ResourceManager
from sensor import SensorManager
......@@ -50,10 +51,31 @@ class Daemon(object):
app = self.application_manager.applications[uuid]
app.update_threads(msg)
elif event == 'progress':
t = time.time()
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
app.update_progress(msg)
app = self.application_manager.applications[uuid]
upstream_msg = {'api': 'up_pub',
'type': 'progress',
'time': t,
'application_uuid': uuid,
'payload': msg['payload']}
self.upstream_pub_server.sendmsg(
PUB_MSG['progress'](**upstream_msg))
app.update_progress(msg, t)
elif event == 'hardwareprogress':
t = time.time()
app = self.application_manager.applications[msg['app_uuid']]
if app.container_uuid == msg['container_uuid']:
app.update_hardwareprogress(msg, t)
upstream_msg = {'api': 'up_pub',
'type': 'hardwareprogress',
'time': t,
'container_uuid': msg['container_uuid'],
'application_uuid': msg['container_uuid'],
'payload': msg['payload']}
self.upstream_pub_server.sendmsg(
PUB_MSG['hardwareprogress'](**upstream_msg))
app.update_hardwareprogress(msg, t)
elif event == 'phase_context':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
......
......@@ -41,6 +41,15 @@ MSGFORMATS['up_rpc_rep'] = {'list': {'payload': list},
'getpower': {'limit': basestring},
}
MSGFORMATS['up_pub'] = {'power': {'total': int, 'limit': float},
'progress': {'application_uuid': basestring,
'time': float,
'payload': float},
'hardwareprogress': {'container_uuid': basestring,
'application_uuid': basestring,
'time': float,
'payload': int},
'control': {'container_uuid': basestring,
'action': int},
'container_start': {'container_uuid': basestring,
'errno': int,
'power': dict},
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment