Commit d373a502 authored by Swann Perarnau's avatar Swann Perarnau

Merge branch 'code-cleanup-improvements' into 'master'

Code cleanup improvements

See merge request !5
parents 346eed43 95b0b149
......@@ -7,16 +7,18 @@ import uuid
import signal
import zmq
logger = logging.getLogger('nrm-cmd')
class CommandLineInterface(object):
"""Implements a command line interface to the NRM."""
def __init__(self):
self.logger = logging.getLogger(__name__)
pass
def do_signal(self, signum, stackframe):
self.logger.info("received signal %d, exiting", signum)
logger.info("received signal %d, exiting", signum)
exit(1)
def setup(self):
......@@ -38,17 +40,15 @@ class CommandLineInterface(object):
upstream_sub_filter = ""
self.upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)
self.logger.info("upstream pub socket bound to: %s",
upstream_pub_param)
self.logger.info("upstream sub socket connected to: %s",
upstream_sub_param)
logger.info("upstream pub socket bound to: %s", upstream_pub_param)
logger.info("upstream sub socket connected to: %s", upstream_sub_param)
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
# create a uuid for this client instance
self.uuid = str(uuid.uuid4())
self.logger.info("client uuid: %r", self.uuid)
logger.info("client uuid: %r", self.uuid)
def do_run(self, argv):
""" Connect to the NRM and ask to spawn a container and run a command
......@@ -71,11 +71,11 @@ class CommandLineInterface(object):
while(True):
self.upstream_pub_socket.send_json(command)
msg = self.upstream_sub_socket.recv_json()
self.logger.info("new message: %r", msg)
logger.info("new message: %r", msg)
# ignore other messages
if isinstance(msg, dict) and msg.get('type') == 'container':
if msg['uuid'] == containerid:
self.logger.info("container response: %r", msg)
logger.info("container response: %r", msg)
break
def do_list(self, argv):
......@@ -135,11 +135,11 @@ class CommandLineInterface(object):
while(True):
self.upstream_pub_socket.send_json(command)
msg = self.upstream_sub_socket.recv_json()
self.logger.info("new message: %r", msg)
logger.info("new message: %r", msg)
# ignore other messages
if isinstance(msg, dict) and msg.get('type') == 'power':
if msg['limit'] == argv.limit:
self.logger.info("command received by the daemon")
logger.info("command received by the daemon")
break
def main(self):
......@@ -178,7 +178,7 @@ class CommandLineInterface(object):
args = parser.parse_args()
if args.verbose:
self.logger.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
self.setup()
args.func(args)
......
#!/usr/bin/env python2
from __future__ import print_function
import nrm
import nrm.daemon
......
......@@ -3,7 +3,7 @@ import collections
import logging
import json
logger = logging.getLogger('argus')
logger = logging.getLogger('nrm')
spec = collections.namedtuple('Field', ['cls', 'required'])
......@@ -57,7 +57,7 @@ class Scheduler(SpecField):
classes = ['SCHED_FIFO', 'SCHED_HPC', 'SCHED_OTHER']
fields = {"policy": spec(unicode, True),
"priority": spec(unicode, False)
"priority": spec(unicode, False),
}
def __init__(self):
......@@ -180,7 +180,7 @@ class ImageManifest(SpecField):
fields = {"acKind": spec(unicode, True),
"acVersion": spec(unicode, True),
"name": spec(unicode, True),
"app": spec(App, True)
"app": spec(App, True),
}
def __init__(self):
......
......@@ -6,10 +6,11 @@ import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
logger = logging.getLogger('nrm-client')
class Client(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.buf = ''
self.nt = 16
self.max = 32
......@@ -34,7 +35,7 @@ class Client(object):
return
def do_receive(self, parts):
self.logger.info("receive stream: " + repr(parts))
logger.info("receive stream: " + repr(parts))
if len(parts[1]) == 0:
if self.server:
......@@ -46,7 +47,7 @@ class Client(object):
self.buf = self.buf + parts[1]
for m in self.get_server_message():
self.logger.info(m)
logger.info(m)
if m == 'd':
if self.nt == 1:
ret = "min"
......@@ -68,7 +69,7 @@ class Client(object):
self.stream.send(ret)
def do_signal(self, signum, frame):
self.logger.critical("received signal: " + repr(signum))
logger.critical("received signal: " + repr(signum))
self.setup_shutdown()
def do_shutdown(self):
......@@ -88,7 +89,7 @@ class Client(object):
# deal with logging
if args.verbose:
self.logger.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
self.nt = args.threads
self.max = args.maxthreads
......@@ -101,7 +102,7 @@ class Client(object):
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.connect(connect_param)
self.logger.info("connected to: " + connect_param)
logger.info("connected to: " + connect_param)
self.stream = zmqstream.ZMQStream(socket)
self.stream.on_recv(self.do_receive)
......
......@@ -8,7 +8,7 @@ import signal
from subprograms import ChrtClient, NodeOSClient, resources
import sys
logger = logging.getLogger('nrm')
Container = namedtuple('Container', ['uuid', 'manifest', 'pid'])
......@@ -20,7 +20,6 @@ class ContainerManager(object):
def __init__(self, rm):
self.containers = dict()
self.pids = dict()
self.logger = logging.getLogger(__name__)
self.resourcemanager = rm
self.nodeos = NodeOSClient()
self.chrt = ChrtClient()
......@@ -32,19 +31,19 @@ class ContainerManager(object):
manifestfile = request['manifest']
command = request['file']
args = request['args']
self.logger.info("run: manifest file: %s", manifestfile)
self.logger.info("run: command: %s", command)
self.logger.info("run: args: %r", args)
logger.info("run: manifest file: %s", manifestfile)
logger.info("run: command: %s", command)
logger.info("run: args: %r", args)
manifest = ImageManifest()
if not manifest.load(manifestfile):
self.logger.error("Manifest is invalid")
logger.error("Manifest is invalid")
return -1
# ask the resource manager for resources
req = resources(int(manifest.app.isolators.container.cpus.value),
int(manifest.app.isolators.container.mems.value))
allocation = self.resourcemanager.schedule(request['uuid'], req)
self.logger.info("run: allocation: %r", allocation)
logger.info("run: allocation: %r", allocation)
# build context to execute
environ = os.environ
......@@ -53,21 +52,21 @@ class ContainerManager(object):
environ['AC_APP_NAME'] = manifest.name
environ['AC_METADATA_URL'] = "localhost"
environ['container'] = 'argo'
self.logger.info("run: environ: %r", environ)
logger.info("run: environ: %r", environ)
# create container
container_name = request['uuid']
self.logger.info("creating container %s", container_name)
logger.info("creating container %s", container_name)
self.nodeos.create(container_name, allocation)
self.logger.info("created container %s", container_name)
logger.info("created container %s", container_name)
newpid = os.fork()
self.logger.info("forked: new pid: %s", newpid)
logger.info("forked: new pid: %s", newpid)
if newpid == 0:
# move myself to that container
mypid = os.getpid()
self.nodeos.attach(container_name, mypid)
self.logger.info("child: attached to container %s", container_name)
logger.info("child: attached to container %s", container_name)
# run my command
if hasattr(manifest.app.isolators, 'scheduler'):
......@@ -78,7 +77,7 @@ class ContainerManager(object):
argv.append(command)
argv.extend(args)
self.logger.debug("execvpe %r", argv)
logger.debug("execvpe %r", argv)
os.execvpe(argv[0], argv, environ)
# should never happen
sys.exit(1)
......@@ -91,6 +90,7 @@ class ContainerManager(object):
def delete(self, uuid):
"""Delete a container and kill all related processes."""
self.nodeos.delete(uuid, kill=True)
self.resourcemanager.update(uuid)
c = self.containers[uuid]
del self.containers[uuid]
del self.pids[c.pid]
......
......@@ -21,6 +21,8 @@ application_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
'min_ask_i': {'done': 'stable', 'max': 'nop'},
'nop': {}}
logger = logging.getLogger('nrm')
class Application(object):
def __init__(self, identity):
......@@ -75,20 +77,19 @@ class Daemon(object):
self.applications = {}
self.containerpids = {}
self.buf = ''
self.logger = logging.getLogger(__name__)
self.target = 1.0
def do_application_receive(self, parts):
self.logger.info("receiving application stream: %r", parts)
logger.info("receiving application stream: %r", parts)
identity = parts[0]
if len(parts[1]) == 0:
# empty frame, indicate connect/disconnect
if identity in self.applications:
self.logger.info("known client disconnected")
logger.info("known client disconnected")
del self.applications[identity]
else:
self.logger.info("new client: " + repr(identity))
logger.info("new client: " + repr(identity))
self.applications[identity] = Application(identity)
else:
if identity in self.applications:
......@@ -98,26 +99,26 @@ class Daemon(object):
application.append_buffer(parts[1])
for m in application.get_messages():
application.do_transition(m)
self.logger.info("application now in state: %s",
application.state)
logger.info("application now in state: %s",
application.state)
def do_upstream_receive(self, parts):
self.logger.info("receiving upstream message: %r", parts)
logger.info("receiving upstream message: %r", parts)
if len(parts) != 1:
self.logger.error("unexpected msg length, dropping it: %r", parts)
logger.error("unexpected msg length, dropping it: %r", parts)
return
msg = json.loads(parts[0])
if isinstance(msg, dict):
command = msg.get('command')
# TODO: switch to a dispatch dictionary
if command is None:
self.logger.error("missing command in message: %r", msg)
logger.error("missing command in message: %r", msg)
return
if command == 'setpower':
self.target = float(msg['limit'])
self.logger.info("new target measure: %g", self.target)
logger.info("new target measure: %g", self.target)
elif command == 'run':
self.logger.info("new container required: %r", msg)
logger.info("new container required: %r", msg)
pid = self.container_manager.create(msg)
if pid > 0:
self.containerpids[pid] = msg['uuid']
......@@ -149,18 +150,18 @@ class Daemon(object):
}
self.upstream_pub.send_json(update)
else:
self.logger.error("invalid command: %r", command)
logger.error("invalid command: %r", command)
def do_sensor(self):
self.machine_info = self.sensor.do_update()
self.logger.info("current state: %r", self.machine_info)
logger.info("current state: %r", self.machine_info)
total_power = self.machine_info['energy']['power']['total']
msg = {'type': 'power',
'total': total_power,
'limit': self.target
}
self.upstream_pub.send_json(msg)
self.logger.info("sending sensor message: %r", msg)
logger.info("sending sensor message: %r", msg)
def do_control(self):
total_power = self.machine_info['energy']['power']['total']
......@@ -176,7 +177,7 @@ class Daemon(object):
application.do_transition('d')
else:
pass
self.logger.info("application now in state: %s", application.state)
logger.info("application now in state: %s", application.state)
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
......@@ -184,7 +185,7 @@ class Daemon(object):
elif signum == signal.SIGCHLD:
ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
else:
self.logger.error("wrong signal: %d", signum)
logger.error("wrong signal: %d", signum)
def do_children(self):
# find out if children have terminated
......@@ -196,7 +197,7 @@ class Daemon(object):
except OSError:
break
self.logger.info("child update %d: %r", pid, status)
logger.info("child update %d: %r", pid, status)
# check if its a pid we care about
if pid in self.containerpids:
# check if this is an exit
......@@ -210,7 +211,7 @@ class Daemon(object):
}
self.upstream_pub.send_json(msg)
else:
self.logger.debug("child update ignored")
logger.debug("child update ignored")
pass
def do_shutdown(self):
......@@ -243,12 +244,9 @@ class Daemon(object):
upstream_sub_filter = ""
upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)
self.logger.info("downstream socket bound to: %s",
downstream_bind_param)
self.logger.info("upstream pub socket bound to: %s",
upstream_pub_param)
self.logger.info("upstream sub socket connected to: %s",
upstream_sub_param)
logger.info("downstream socket bound to: %s", downstream_bind_param)
logger.info("upstream pub socket bound to: %s", upstream_pub_param)
logger.info("upstream sub socket connected to: %s", upstream_sub_param)
# register socket triggers
self.downstream = zmqstream.ZMQStream(downstream_socket)
......
......@@ -3,6 +3,8 @@ from __future__ import print_function
import logging
from subprograms import HwlocClient, resources
logger = logging.getLogger('nrm')
class ResourceManager(object):
......@@ -10,13 +12,13 @@ class ResourceManager(object):
the scheduling of new containers according to partitioning rules."""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.hwloc = HwlocClient()
# query the node topo, keep track of the critical resources
self.allresources = self.hwloc.info()
self.logger.debug("resource info: %r", self.allresources)
logger.debug("resource info: %r", self.allresources)
self.available = self.allresources
self.allocations = {}
def schedule(self, uuid, request):
"""Schedule a resource request on the available resources.
......@@ -27,43 +29,37 @@ class ResourceManager(object):
# - memories exclusive if more than one left
if len(self.available.cpus) >= request.cpus:
retcpus = self.available.cpus[:request.cpus]
availcpus = self.available.cpus[request.cpus:]
else:
retcpus = []
availcpus = self.available.cpus
if len(self.available.mems) > 1:
retmems = self.available.mems[:request.mems]
availmems = self.available.mems[request.mems:]
else:
retmems = self.available.mems
availmems = self.available.mems
self.available = resources(availcpus, availmems)
return resources(retcpus, retmems)
ret = resources(retcpus, retmems)
# make sure we don't remember an error
if ret.cpus:
self.update(uuid, ret)
return ret
def remove(self, uuid):
"""Free the resources associated with request uuid."""
pass
def update(self, uuid, allocation=resources([], [])):
"""Update resource tracking according to new allocation.
# def oldcode(self):
# numcpus = int(manifest.app.isolators.container.cpus.value)
#
# allresources = hwloc.info()
# self.logger.debug("resource info: %r", allresources)
# ncontainers = len(allresources.cpus) // numcpus
# self.logger.debug("will support %s containers", ncontainers)
# cur = nodeos.getavailable()
# self.logger.debug("%r are available", cur)
# sets = hwloc.distrib(ncontainers, restrict=cur, fake=allresources)
# self.logger.info("asking for %s cores", numcpus)
# self.logger.debug("will search in one of these: %r", sets)
# # find a free set
# avail = set(cur.cpus)
# for s in sets:
# cpuset = set(s.cpus)
# if cpuset.issubset(avail):
# alloc = s
# break
# else:
# self.logger.error("no exclusive cpuset found among %r", avail)
# return -2
#
The new allocation is saved, and available resources updated."""
added = {}
freed = {}
prev = self.allocations.get(uuid, resources([], []))
for attr, val in prev._asdict().items():
added[attr] = set(getattr(allocation, attr)) - set(val)
freed[attr] = set(val) - set(getattr(allocation, attr))
if allocation != resources([], []):
self.allocations[uuid] = allocation
logger.info("updated allocation for %r: %r", uuid,
self.available)
else:
del self.allocations[uuid]
logger.info("deleted allocation for %r", uuid)
new = {}
for attr, val in self.available._asdict().items():
new[attr] = list(set(val) - set(added[attr]) | set(freed[attr]))
self.available = resources(**new)
logger.info("updated available resources: %r", self.available)
......@@ -4,15 +4,16 @@ import collections
import logging
import xml.etree.ElementTree
logger = logging.getLogger('nrm')
resources = collections.namedtuple("Resources", ["cpus", "mems"])
def logpopen(p, args, stdout, stderr):
"""log popen cmd."""
logging.debug("popen cmd: %r", args)
logging.debug("popen return code: %s", p.returncode)
logging.debug("popen stdout: %r", stdout)
logging.debug("popen, stderr: %r", stderr)
logger.debug("popen cmd: %r", args)
logger.debug("popen return code: %s", p.returncode)
logger.debug("popen stdout: %r", stdout)
logger.debug("popen, stderr: %r", stderr)
def bitmask2list(mask):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment