Commit 57840dc9 authored by Swann Perarnau's avatar Swann Perarnau

Merge branch 'master' of xgitlab.cels.anl.gov:argo/nrm into big-controller-work

parents cbbf2354 8fa7a4e1
#!/usr/bin/env python2
from __future__ import print_function
import argparse
import logging
import zmq
import os
import tempfile
import subprocess
import uuid
logger = logging.getLogger('perf-wrapper')
class PerfWrapper(object):
"""Implements middleware between the Linux perf and
the NRM downstream interface."""
def __init__(self):
pass
def shutdown(self):
update = {'type': 'application',
'event': 'exit',
'uuid': self.app_uuid,
}
self.downstream_pub_socket.send_json(update)
def progress_report(self, progress):
update = {'type': 'application',
'event': 'progress',
'payload': progress,
'uuid': self.app_uuid,
}
self.downstream_pub_socket.send_json(update)
def setup(self):
context = zmq.Context()
self.downstream_pub_socket = context.socket(zmq.PUB)
downstream_pub_param = "ipc:///tmp/nrm-downstream-in"
self.downstream_pub_socket.connect(downstream_pub_param)
logger.info("downstream pub socket connected to: %s",
downstream_pub_param)
# retrieve our container uuid
self.container_uuid = os.environ.get('ARGO_CONTAINER_UUID')
if self.container_uuid is None:
logger.error("missing container uuid")
exit(1)
self.app_uuid = str(uuid.uuid4())
logger.info("client uuid: %r", self.app_uuid)
# send an hello to the demon
update = {'type': 'application',
'event': 'start',
'container': self.container_uuid,
'uuid': self.app_uuid,
'progress': True,
'threads': {'min': 1, 'cur': 1, 'max': 1},
}
self.downstream_pub_socket.send_json(update)
def main(self):
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("-f", "--frequency",
help="sampling frequency in ms",
type=int, default=1000)
parser.add_argument("cmd", help="command and arguments",
nargs=argparse.REMAINDER)
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
logger.info("cmd: %r", args.cmd)
self.setup()
# create a named pipe between us and the to-be-launched perf
# There is no mkstemp for FIFOs but we can securely create a temporary
# directory and then create a FIFO inside of it.
tmpdir = tempfile.mkdtemp()
fifoname = os.path.join(tmpdir, 'perf-fifo')
logger.info("fifoname: %r", fifoname)
os.mkfifo(fifoname, 0o600)
argv = ['perf', 'stat', '-e', 'instructions', '-x', ',',
'-I', str(args.frequency), '-o', fifoname, '--']
argv.extend(args.cmd)
logger.info("argv: %r", argv)
p = subprocess.Popen(argv, close_fds=True)
# This blocks until the other end opens as well so we need to invoke
# it after Popen.
# FIXME: will deadlock if Popen fails (say, no perf).
fifo = open(fifoname, 'r')
last_time = 0.0
# "for line in fifo" idiom didn't work for me here -- Python was
# buffering the output internally until perf was finished.
while True:
line = fifo.readline()
if not line:
break
line = line.strip()
if len(line) == 0 or line[0] == '#':
continue
tokens = line.split(',')
logger.info("tokens: %r", tokens)
time = float(tokens[0])
if tokens[1] == '<not counted>':
instructions = 0
else:
instructions = int(tokens[1])
ips = int(instructions / (time - last_time))
logger.info("instructions per second: %r", ips)
self.progress_report(ips)
last_time = time
# The child should be dead by now so this should terminate immediately.
p.wait()
self.shutdown()
fifo.close()
os.remove(fifoname)
os.rmdir(tmpdir)
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING)
wrapper = PerfWrapper()
wrapper.main()
......@@ -17,6 +17,12 @@
"cpus": "4",
"mems": "1"
}
},
{
"name": "argo/perfwrapper",
"value": {
"enabled": "1"
}
}
]
}
......
......@@ -125,13 +125,35 @@ class Container(SpecField):
"""Load container information."""
return super(Container, self).load(data)
class PerfWrapper(SpecField):
"""Information on whether to use perf for a container."""
fields = {"enabled": spec(unicode, False)
}
def __init__(self):
"""Create empty perf wrapper."""
pass
def load(self, data):
"""Load perf wrapper information."""
ret = super(PerfWrapper, self).load(data)
if not ret:
return ret
if self.enabled not in ["0", "False", "1", "True"]:
logger.error("Invalid value of perfwrapper enabled: %s",
self.enabled)
return False
return True
class IsolatorList(SpecField):
"""Represent the list of isolator in a Manifest."""
types = {"argo/scheduler": spec(Scheduler, False),
"argo/container": spec(Container, True)
"argo/container": spec(Container, True),
"argo/perfwrapper": spec(PerfWrapper, False)
}
def __init__(self):
......
......@@ -65,6 +65,15 @@ class ContainerManager(object):
else:
argv = []
# It would've been better if argo-perf-wrapper wrapped around
# argo-nodeos-config and not the final command -- that way it would
# be running outside of the container. However, because
# argo-nodeos-config is suid root, perf can't monitor it.
if hasattr(manifest.app.isolators, 'perfwrapper'):
if hasattr(manifest.app.isolators.perfwrapper, 'enabled'):
if manifest.app.isolators.perfwrapper.enabled in ["1", "True"]:
argv.append('argo-perf-wrapper')
argv.append(command)
argv.extend(args)
process = self.nodeos.execute(container_name, argv, environ)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment