Commit b66c88ec authored by Swann Perarnau's avatar Swann Perarnau

[refactor] Use globally configured logger

The logging module allow us to configure logging facilities once per
process using basicConfig, and then to use globally defined, named,
logger objects. This simplifies access to logger objects, their
configuration and remove pointers from all objects.

This patch refactor all the logging calls to use a single 'nrm' logger
object, using those facilities.
parent d5f88a14
......@@ -7,16 +7,18 @@ import uuid
import signal
import zmq
logger = logging.getLogger('nrm-cmd')
class CommandLineInterface(object):
"""Implements a command line interface to the NRM."""
def __init__(self):
self.logger = logging.getLogger(__name__)
pass
def do_signal(self, signum, stackframe):
self.logger.info("received signal %d, exiting", signum)
logger.info("received signal %d, exiting", signum)
exit(1)
def setup(self):
......@@ -38,17 +40,15 @@ class CommandLineInterface(object):
upstream_sub_filter = ""
self.upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)
self.logger.info("upstream pub socket bound to: %s",
upstream_pub_param)
self.logger.info("upstream sub socket connected to: %s",
upstream_sub_param)
logger.info("upstream pub socket bound to: %s", upstream_pub_param)
logger.info("upstream sub socket connected to: %s", upstream_sub_param)
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
# create a uuid for this client instance
self.uuid = str(uuid.uuid4())
self.logger.info("client uuid: %r", self.uuid)
logger.info("client uuid: %r", self.uuid)
def do_run(self, argv):
""" Connect to the NRM and ask to spawn a container and run a command
......@@ -71,11 +71,11 @@ class CommandLineInterface(object):
while(True):
self.upstream_pub_socket.send_json(command)
msg = self.upstream_sub_socket.recv_json()
self.logger.info("new message: %r", msg)
logger.info("new message: %r", msg)
# ignore other messages
if isinstance(msg, dict) and msg.get('type') == 'container':
if msg['uuid'] == containerid:
self.logger.info("container response: %r", msg)
logger.info("container response: %r", msg)
break
def do_setpower(self, argv):
......@@ -95,11 +95,11 @@ class CommandLineInterface(object):
while(True):
self.upstream_pub_socket.send_json(command)
msg = self.upstream_sub_socket.recv_json()
self.logger.info("new message: %r", msg)
logger.info("new message: %r", msg)
# ignore other messages
if isinstance(msg, dict) and msg.get('type') == 'power':
if msg['limit'] == argv.limit:
self.logger.info("command received by the daemon")
logger.info("command received by the daemon")
break
def main(self):
......@@ -128,7 +128,7 @@ class CommandLineInterface(object):
parser_setpower.set_defaults(func=self.do_setpower)
args = parser.parse_args()
if args.verbose:
self.logger.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
self.setup()
args.func(args)
......
......@@ -3,7 +3,7 @@ import collections
import logging
import json
logger = logging.getLogger('argus')
logger = logging.getLogger('nrm')
spec = collections.namedtuple('Field', ['cls', 'required'])
......
......@@ -6,10 +6,11 @@ import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
logger = logging.getLogger('nrm-client')
class Client(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.buf = ''
self.nt = 16
self.max = 32
......@@ -34,7 +35,7 @@ class Client(object):
return
def do_receive(self, parts):
self.logger.info("receive stream: " + repr(parts))
logger.info("receive stream: " + repr(parts))
if len(parts[1]) == 0:
if self.server:
......@@ -46,7 +47,7 @@ class Client(object):
self.buf = self.buf + parts[1]
for m in self.get_server_message():
self.logger.info(m)
logger.info(m)
if m == 'd':
if self.nt == 1:
ret = "min"
......@@ -68,7 +69,7 @@ class Client(object):
self.stream.send(ret)
def do_signal(self, signum, frame):
self.logger.critical("received signal: " + repr(signum))
logger.critical("received signal: " + repr(signum))
self.setup_shutdown()
def do_shutdown(self):
......@@ -88,7 +89,7 @@ class Client(object):
# deal with logging
if args.verbose:
self.logger.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
self.nt = args.threads
self.max = args.maxthreads
......@@ -101,7 +102,7 @@ class Client(object):
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.connect(connect_param)
self.logger.info("connected to: " + connect_param)
logger.info("connected to: " + connect_param)
self.stream = zmqstream.ZMQStream(socket)
self.stream.on_recv(self.do_receive)
......
......@@ -7,6 +7,7 @@ import os
from subprograms import ChrtClient, NodeOSClient, resources
import sys
logger = logging.getLogger('nrm')
Container = namedtuple('Container', ['uuid', 'manifest', 'pid'])
......@@ -18,7 +19,6 @@ class ContainerManager(object):
def __init__(self, rm):
self.containers = dict()
self.pids = dict()
self.logger = logging.getLogger(__name__)
self.resourcemanager = rm
self.nodeos = NodeOSClient()
self.chrt = ChrtClient()
......@@ -30,19 +30,19 @@ class ContainerManager(object):
manifestfile = request['manifest']
command = request['file']
args = request['args']
self.logger.info("run: manifest file: %s", manifestfile)
self.logger.info("run: command: %s", command)
self.logger.info("run: args: %r", args)
logger.info("run: manifest file: %s", manifestfile)
logger.info("run: command: %s", command)
logger.info("run: args: %r", args)
manifest = ImageManifest()
if not manifest.load(manifestfile):
self.logger.error("Manifest is invalid")
logger.error("Manifest is invalid")
return -1
# ask the resource manager for resources
req = resources(int(manifest.app.isolators.container.cpus.value),
int(manifest.app.isolators.container.mems.value))
allocation = self.resourcemanager.schedule(request['uuid'], req)
self.logger.info("run: allocation: %r", allocation)
logger.info("run: allocation: %r", allocation)
# build context to execute
environ = os.environ
......@@ -51,21 +51,21 @@ class ContainerManager(object):
environ['AC_APP_NAME'] = manifest.name
environ['AC_METADATA_URL'] = "localhost"
environ['container'] = 'argo'
self.logger.info("run: environ: %r", environ)
logger.info("run: environ: %r", environ)
# create container
container_name = request['uuid']
self.logger.info("creating container %s", container_name)
logger.info("creating container %s", container_name)
self.nodeos.create(container_name, allocation)
self.logger.info("created container %s", container_name)
logger.info("created container %s", container_name)
newpid = os.fork()
self.logger.info("forked: new pid: %s", newpid)
logger.info("forked: new pid: %s", newpid)
if newpid == 0:
# move myself to that container
mypid = os.getpid()
self.nodeos.attach(container_name, mypid)
self.logger.info("child: attached to container %s", container_name)
logger.info("child: attached to container %s", container_name)
# run my command
if hasattr(manifest.app.isolators, 'scheduler'):
......@@ -76,7 +76,7 @@ class ContainerManager(object):
argv.append(command)
argv.extend(args)
self.logger.debug("execvpe %r", argv)
logger.debug("execvpe %r", argv)
os.execvpe(argv[0], argv, environ)
# should never happen
sys.exit(1)
......
......@@ -21,6 +21,8 @@ application_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
'min_ask_i': {'done': 'stable', 'max': 'nop'},
'nop': {}}
logger = logging.getLogger('nrm')
class Application(object):
def __init__(self, identity):
......@@ -75,20 +77,19 @@ class Daemon(object):
self.applications = {}
self.containerpids = {}
self.buf = ''
self.logger = logging.getLogger(__name__)
self.target = 1.0
def do_application_receive(self, parts):
self.logger.info("receiving application stream: %r", parts)
logger.info("receiving application stream: %r", parts)
identity = parts[0]
if len(parts[1]) == 0:
# empty frame, indicate connect/disconnect
if identity in self.applications:
self.logger.info("known client disconnected")
logger.info("known client disconnected")
del self.applications[identity]
else:
self.logger.info("new client: " + repr(identity))
logger.info("new client: " + repr(identity))
self.applications[identity] = Application(identity)
else:
if identity in self.applications:
......@@ -98,26 +99,26 @@ class Daemon(object):
application.append_buffer(parts[1])
for m in application.get_messages():
application.do_transition(m)
self.logger.info("application now in state: %s",
application.state)
logger.info("application now in state: %s",
application.state)
def do_upstream_receive(self, parts):
self.logger.info("receiving upstream message: %r", parts)
logger.info("receiving upstream message: %r", parts)
if len(parts) != 1:
self.logger.error("unexpected msg length, dropping it: %r", parts)
logger.error("unexpected msg length, dropping it: %r", parts)
return
msg = json.loads(parts[0])
if isinstance(msg, dict):
command = msg.get('command')
# TODO: switch to a dispatch dictionary
if command is None:
self.logger.error("missing command in message: %r", msg)
logger.error("missing command in message: %r", msg)
return
if command == 'setpower':
self.target = float(msg['limit'])
self.logger.info("new target measure: %g", self.target)
logger.info("new target measure: %g", self.target)
elif command == 'run':
self.logger.info("new container required: %r", msg)
logger.info("new container required: %r", msg)
pid = self.container_manager.create(msg)
if pid > 0:
self.containerpids[pid] = msg['uuid']
......@@ -135,18 +136,18 @@ class Daemon(object):
}
self.upstream_pub.send_json(update)
else:
self.logger.error("invalid command: %r", command)
logger.error("invalid command: %r", command)
def do_sensor(self):
self.machine_info = self.sensor.do_update()
self.logger.info("current state: %r", self.machine_info)
logger.info("current state: %r", self.machine_info)
total_power = self.machine_info['energy']['power']['total']
msg = {'type': 'power',
'total': total_power,
'limit': self.target
}
self.upstream_pub.send_json(msg)
self.logger.info("sending sensor message: %r", msg)
logger.info("sending sensor message: %r", msg)
def do_control(self):
total_power = self.machine_info['energy']['power']['total']
......@@ -162,7 +163,7 @@ class Daemon(object):
application.do_transition('d')
else:
pass
self.logger.info("application now in state: %s", application.state)
logger.info("application now in state: %s", application.state)
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
......@@ -170,7 +171,7 @@ class Daemon(object):
elif signum == signal.SIGCHLD:
ioloop.IOLoop.current().add_callback_from_signal(self.do_children)
else:
self.logger.error("wrong signal: %d", signum)
logger.error("wrong signal: %d", signum)
def do_children(self):
# find out if children have terminated
......@@ -182,7 +183,7 @@ class Daemon(object):
except OSError:
break
self.logger.info("child update %d: %r", pid, status)
logger.info("child update %d: %r", pid, status)
# check if its a pid we care about
if pid in self.containerpids:
# check if this is an exit
......@@ -196,7 +197,7 @@ class Daemon(object):
}
self.upstream_pub.send_json(msg)
else:
self.logger.debug("child update ignored")
logger.debug("child update ignored")
pass
def do_shutdown(self):
......@@ -229,12 +230,9 @@ class Daemon(object):
upstream_sub_filter = ""
upstream_sub_socket.setsockopt(zmq.SUBSCRIBE, upstream_sub_filter)
self.logger.info("downstream socket bound to: %s",
downstream_bind_param)
self.logger.info("upstream pub socket bound to: %s",
upstream_pub_param)
self.logger.info("upstream sub socket connected to: %s",
upstream_sub_param)
logger.info("downstream socket bound to: %s", downstream_bind_param)
logger.info("upstream pub socket bound to: %s", upstream_pub_param)
logger.info("upstream sub socket connected to: %s", upstream_sub_param)
# register socket triggers
self.downstream = zmqstream.ZMQStream(downstream_socket)
......
......@@ -3,6 +3,8 @@ from __future__ import print_function
import logging
from subprograms import HwlocClient, resources
logger = logging.getLogger('nrm')
class ResourceManager(object):
......@@ -10,12 +12,11 @@ class ResourceManager(object):
the scheduling of new containers according to partitioning rules."""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.hwloc = HwlocClient()
# query the node topo, keep track of the critical resources
self.allresources = self.hwloc.info()
self.logger.debug("resource info: %r", self.allresources)
logger.debug("resource info: %r", self.allresources)
self.available = self.allresources
self.allocations = {}
......@@ -52,13 +53,13 @@ class ResourceManager(object):
freed[attr] = set(val) - set(getattr(allocation, attr))
if allocation != resources([], []):
self.allocations[uuid] = allocation
self.logger.info("updated allocation for %r: %r", uuid,
self.available)
logger.info("updated allocation for %r: %r", uuid,
self.available)
else:
del self.allocations[uuid]
self.logger.info("deleted allocation for %r", uuid)
logger.info("deleted allocation for %r", uuid)
new = {}
for attr, val in self.available._asdict().items():
new[attr] = list(set(val) - set(added[attr]) | set(freed[attr]))
self.available = resources(**new)
self.logger.info("updated available resources: %r", self.available)
logger.info("updated available resources: %r", self.available)
......@@ -4,15 +4,16 @@ import collections
import logging
import xml.etree.ElementTree
logger = logging.getLogger('nrm')
resources = collections.namedtuple("Resources", ["cpus", "mems"])
def logpopen(p, args, stdout, stderr):
"""log popen cmd."""
logging.debug("popen cmd: %r", args)
logging.debug("popen return code: %s", p.returncode)
logging.debug("popen stdout: %r", stdout)
logging.debug("popen, stderr: %r", stderr)
logger.debug("popen cmd: %r", args)
logger.debug("popen return code: %s", p.returncode)
logger.debug("popen stdout: %r", stdout)
logger.debug("popen, stderr: %r", stderr)
def bitmask2list(mask):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment