Commit 5a41baba authored by Sridutt Bhalachandra's avatar Sridutt Bhalachandra

[Feature] Multi- node and process support

Added multi- node and process support that will allow launching of
multiple processes within a container. This is important for enabling
use of NRM with MPI applications with multiple processes in a container
and thus enabling multi-node executions

See Issue #17
parent aece6b21
......@@ -6,6 +6,7 @@ import logging
import uuid
import signal
import zmq
import os
logger = logging.getLogger('nrm-cmd')
......@@ -60,12 +61,14 @@ class CommandLineInterface(object):
# build the command as a JSON dict containing enough info. We add to
# the command a container uuid as a way to make sure that we can make
# the command idempotent.
containerid = str(uuid.uuid4())
command = {'command': 'run',
environ = os.environ
command = {'clientid': self.uuid,
'ucontainername': argv.ucontainername,
'command': 'run',
'manifest': argv.manifest,
'file': argv.command,
'args': argv.args,
'uuid': containerid,
'environ': dict(environ),
}
# command fsm
state = 'init'
......@@ -76,7 +79,7 @@ class CommandLineInterface(object):
while(True):
msg = self.upstream_sub_socket.recv_json()
if isinstance(msg, dict) and msg.get('type') == 'container':
if msg['uuid'] == containerid:
if msg['clientid'] == self.uuid:
if msg['event'] == 'start':
if state == 'init':
state = 'started'
......@@ -98,6 +101,17 @@ class CommandLineInterface(object):
exitmsg = msg
else:
logger.info("unexpected exit message: %r", msg)
elif msg['event'] == 'process_start':
if state == 'init':
state = 'started'
logger.info("process started in existing "
"container: %r""", msg)
else:
logger.info("unexpected start message: %r", state)
exit(1)
elif msg['event'] == 'process_exit':
logger.info("process ended: %r", msg)
break
if outeof and erreof and state == 'exiting':
state = 'exit'
logger.info("container ended: %r", exitmsg)
......@@ -180,6 +194,9 @@ class CommandLineInterface(object):
parser_run.add_argument("command", help="command to execute")
parser_run.add_argument("args", help="command arguments",
nargs=argparse.REMAINDER)
parser_run.add_argument("-u", "--ucontainername", help="""user-specified
name for container used to attach proceses""",
nargs='?', const=None, default=None)
parser_run.set_defaults(func=self.do_run)
# kill container
......
......@@ -3,12 +3,12 @@ from __future__ import print_function
from aci import ImageManifest
from collections import namedtuple
import logging
import os
from subprograms import ChrtClient, NodeOSClient, resources
import uuid
logger = logging.getLogger('nrm')
Container = namedtuple('Container', ['uuid', 'manifest', 'resources',
'power', 'process'])
'power', 'processes', 'clientids'])
class ContainerManager(object):
......@@ -27,88 +27,127 @@ class ContainerManager(object):
"""Create a container according to the request.
Returns the pid of the container or a negative number for errors."""
container = None
container_name = None
containerexistsflag = False
processes = None
clientids = None
manifestfile = request['manifest']
command = request['file']
args = request['args']
logger.info("run: manifest file: %s", manifestfile)
logger.info("run: command: %s", command)
logger.info("run: args: %r", args)
environ = request['environ']
ucontainername = request['ucontainername']
logger.info("run: manifest file: %s", manifestfile)
logger.info("run: command: %s", command)
logger.info("run: args: %r", args)
logger.info("run: ucontainername: %s", ucontainername)
# TODO: Application library to load must be set during configuration
apppreloadlibrary = ''
manifest = ImageManifest()
if not manifest.load(manifestfile):
logger.error("Manifest is invalid")
return None
# ask the resource manager for resources
req = resources(int(manifest.app.isolators.container.cpus.value),
int(manifest.app.isolators.container.mems.value))
allocation = self.resourcemanager.schedule(request['uuid'], req)
logger.info("run: allocation: %r", allocation)
if hasattr(manifest.app.isolators, 'scheduler'):
sched = manifest.app.isolators.scheduler
argv = self.chrt.getwrappedcmd(sched)
else:
argv = []
# Check if user-specified container exists else create it
if ucontainername in self.containers:
container_name = ucontainername
container = self.containers[ucontainername]
containerexistsflag = True
processes = container.processes
clientids = container.clientids
else:
processes = dict()
clientids = dict()
if ucontainername:
container_name = ucontainername
else:
# If no user-specified container name create one
container_name = str(uuid.uuid4())
# ask the resource manager for resources
req = resources(int(manifest.app.isolators.container.cpus.value),
int(manifest.app.isolators.container.mems.value))
alloc = self.resourcemanager.schedule(container_name, req)
logger.info("run: allocation: %r", alloc)
# create container
logger.info("creating container %s", container_name)
self.nodeos.create(container_name, alloc)
container_resources = dict()
container_resources['cpus'], container_resources['mems'] = alloc
# Container power settings
container_power = dict()
container_power['profile'] = None
container_power['policy'] = None
container_power['damper'] = None
container_power['slowdown'] = None
container_power['manager'] = None
# It would've been better if argo-perf-wrapper wrapped around
# argo-nodeos-config and not the final command -- that way it would
# be running outside of the container. However, because
# argo-nodeos-config is suid root, perf can't monitor it.
if hasattr(manifest.app.isolators, 'perfwrapper'):
manifest_perfwrapper = manifest.app.isolators.perfwrapper
if hasattr(manifest_perfwrapper, 'enabled'):
if manifest_perfwrapper.enabled in ["1", "True"]:
argv.append('argo-perf-wrapper')
if hasattr(manifest.app.isolators, 'power'):
if hasattr(manifest.app.isolators.power, 'enabled'):
pp = manifest.app.isolators.power
if pp.enabled in ["1", "True"]:
if pp.profile in ["1", "True"]:
container_power['profile'] = dict()
container_power['profile']['start'] = dict()
container_power['profile']['end'] = dict()
if pp.policy != "NONE":
container_power['policy'] = pp.policy
container_power['damper'] = pp.damper
container_power['slowdown'] = pp.slowdown
environ['LD_PRELOAD'] = apppreloadlibrary
# build context to execute
environ = os.environ
# environ['PATH'] = ("/usr/local/sbin:"
# "/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
environ['ARGO_CONTAINER_UUID'] = container_name
environ['AC_APP_NAME'] = manifest.name
environ['AC_METADATA_URL'] = "localhost"
logger.info("run: environ: %r", environ)
# create container
container_name = request['uuid']
environ['ARGO_CONTAINER_UUID'] = container_name
logger.info("creating container %s", container_name)
self.nodeos.create(container_name, allocation)
container_resources = dict()
container_resources['cpus'], container_resources['mems'] = allocation
# Container power settings
container_power = dict()
container_power['profile'] = None
container_power['policy'] = None
container_power['damper'] = None
container_power['slowdown'] = None
container_power['manager'] = None
# TODO: Application library to load must be set during configuration
applicationpreloadlibrary = ''
argv.append(command)
argv.extend(args)
# run my command
if hasattr(manifest.app.isolators, 'scheduler'):
sched = manifest.app.isolators.scheduler
argv = self.chrt.getwrappedcmd(sched)
process = self.nodeos.execute(container_name, argv, environ)
processes[process.pid] = process
clientids[process.pid] = request['clientid']
if containerexistsflag:
container.processes[process.pid] = process
self.pids[process.pid] = container
logger.info("Created process %s in container %s", process.pid,
container_name)
else:
argv = []
# It would've been better if argo-perf-wrapper wrapped around
# argo-nodeos-config and not the final command -- that way it would
# be running outside of the container. However, because
# argo-nodeos-config is suid root, perf can't monitor it.
if hasattr(manifest.app.isolators, 'perfwrapper'):
if hasattr(manifest.app.isolators.perfwrapper, 'enabled'):
if manifest.app.isolators.perfwrapper.enabled in ["1", "True"]:
argv.append('argo-perf-wrapper')
if hasattr(manifest.app.isolators, 'power'):
if hasattr(manifest.app.isolators.power, 'enabled'):
pp = manifest.app.isolators.power
if pp.enabled in ["1", "True"]:
if pp.profile in ["1", "True"]:
container_power['profile'] = dict()
container_power['profile']['start'] = dict()
container_power['profile']['end'] = dict()
if pp.policy != "NONE":
container_power['policy'] = pp.policy
container_power['damper'] = pp.damper
container_power['slowdown'] = pp.slowdown
environ['LD_PRELOAD'] = applicationpreloadlibrary
container = Container(container_name, manifest,
container_resources, container_power,
processes, clientids)
self.pids[process.pid] = container
self.containers[container_name] = container
logger.info("Container %s created and running : %r",
container_name, container)
argv.append(command)
argv.extend(args)
process = self.nodeos.execute(container_name, argv, environ)
c = Container(container_name, manifest, container_resources,
container_power, process)
self.pids[process.pid] = c
self.containers[container_name] = c
logger.info("Container %s created and running : %r", container_name, c)
return c
return process.pid, container
def delete(self, uuid):
"""Delete a container and kill all related processes."""
......@@ -116,7 +155,7 @@ class ContainerManager(object):
self.resourcemanager.update(uuid)
c = self.containers[uuid]
del self.containers[uuid]
del self.pids[c.process.pid]
map(lambda i: self.pids.pop(c.processes[i].pid, None), c.processes)
def kill(self, uuid):
"""Kill all the processes of a container."""
......
......@@ -35,8 +35,8 @@ class Daemon(object):
logger.error("wrong message format: %r", msg)
return
if event == 'start':
container_uuid = msg['container']
container = self.container_manager.containers[container_uuid]
cid = msg['container']
container = self.container_manager.containers[cid]
self.application_manager.register(msg, container)
elif event == 'threads':
uuid = msg['uuid']
......@@ -52,7 +52,7 @@ class Daemon(object):
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
c = self.container_manager.containers[app.container_uuid]
c = self.container_manager.containers[app.cid]
if c.power['policy']:
app.update_phase_context(msg)
elif event == 'exit':
......@@ -79,38 +79,44 @@ class Daemon(object):
self.target = float(msg['limit'])
logger.info("new target measure: %g", self.target)
elif command == 'run':
container_uuid = msg['uuid']
if container_uuid in self.container_manager.containers:
logger.info("container already created: %r",
container_uuid)
return
logger.info("new container required: %r", msg)
container = self.container_manager.create(msg)
if container.power['policy']:
container.power['manager'] = PowerPolicyManager(
container.resources['cpus'],
container.power['policy'],
float(container.power['damper']),
float(container.power['slowdown']))
if container.power['profile']:
p = container.power['profile']
p['start'] = self.machine_info['energy']['energy']
p['start']['time'] = self.machine_info['time']
logger.info("new container will be created if it doesn't "
"exist: %r", msg)
pid, container = self.container_manager.create(msg)
cid = container.uuid
clientid = container.clientids[pid]
# TODO: obviously we need to send more info than that
update = {'type': 'container',
'event': 'start',
'uuid': container_uuid,
'uuid': cid,
'clientid': clientid,
'errno': 0 if container else -1,
'pid': container.process.pid,
'power': container.power['policy']
'pid': pid,
}
self.upstream_pub.send_json(update)
if len(container.processes.keys()) == 1:
update['event'] = 'start'
if container.power['policy']:
container.power['manager'] = PowerPolicyManager(
container.resources['cpus'],
container.power['policy'],
float(container.power['damper']),
float(container.power['slowdown']))
if container.power['profile']:
p = container.power['profile']
p['start'] = self.machine_info['energy']['energy']
p['start']['time'] = self.machine_info['time']
update['power'] = container.power['policy']
else:
update['event'] = 'process_start'
# setup io callbacks
outcb = partial(self.do_children_io, container_uuid, 'stdout')
errcb = partial(self.do_children_io, container_uuid, 'stderr')
container.process.stdout.read_until_close(outcb, outcb)
container.process.stderr.read_until_close(errcb, errcb)
outcb = partial(self.do_children_io, clientid, cid, 'stdout')
errcb = partial(self.do_children_io, clientid, cid, 'stderr')
container.processes[pid].stdout.read_until_close(outcb, outcb)
container.processes[pid].stderr.read_until_close(errcb, errcb)
self.upstream_pub.send_json(update)
elif command == 'kill':
logger.info("asked to kill container: %r", msg)
response = self.container_manager.kill(msg['uuid'])
......@@ -126,7 +132,7 @@ class Daemon(object):
else:
logger.error("invalid command: %r", command)
def do_children_io(self, uuid, io, data):
def do_children_io(self, clientid, uuid, io, data):
"""Receive data from one of the children, and send it down the pipe.
Meant to be partially defined on a children basis."""
......@@ -134,6 +140,7 @@ class Daemon(object):
update = {'type': 'container',
'event': io,
'uuid': uuid,
'clientid': clientid,
'payload': data or 'eof',
}
self.upstream_pub.send_json(update)
......@@ -183,15 +190,21 @@ class Daemon(object):
# check if this is an exit
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
container = self.container_manager.pids[pid]
pp = container.power
if pp['policy']:
pp['manager'].reset_all()
clientid = container.clientids[pid]
remaining_pids = [p for p in container.processes.keys()
if p != pid]
msg = {'type': 'container',
'event': 'exit',
'status': status,
'uuid': container.uuid,
'clientid': clientid,
}
if pp['profile']:
if not remaining_pids:
msg['event'] = 'exit'
pp = container.power
if pp['policy']:
pp['manager'].reset_all()
if pp['profile']:
e = pp['profile']['end']
self.machine_info = self.sensor_manager.do_update()
e = self.machine_info['energy']['energy']
......@@ -205,7 +218,15 @@ class Daemon(object):
logger.info("Container %r profile data: %r",
container.uuid, diff)
msg['profile_data'] = diff
self.container_manager.delete(container.uuid)
self.container_manager.delete(container.uuid)
else:
msg['event'] = 'process_exit'
# Remove the pid of process that is finished
container.processes.pop(pid, None)
self.container_manager.pids.pop(pid, None)
logger.info("Process %s in Container %s has finised.",
pid, container.uuid)
self.upstream_pub.send_json(msg)
else:
logger.debug("child update ignored")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment