Commits (15)
......@@ -7,9 +7,18 @@ py.test:
- pipenv install --dev
- pipenv run py.test
- /^wip\/.*/
- /^WIP\/.*/
- rapl
stage: style
- pipenv install --dev
- pipenv run flake8
- /^wip\/.*/
- /^WIP\/.*/
......@@ -25,10 +25,13 @@
"name": "argo/powerpolicy",
"name": "argo/power",
"value": {
"enabled": "0",
"policy": "NONE"
"enabled": "1",
"profile": "1",
"policy": "NONE",
"damper": "0.1",
"slowdown": "1.1"
......@@ -3,7 +3,7 @@
* Description: This file contains the implementation of downstream API to
* transmit application context information to NRM.
* The application context information transmitted is used to monitor
* The application context information transmitted can be used to monitor
* application progress and/or invoke power policies to improve energy
* efficiency at the node level.
......@@ -71,9 +71,9 @@ int nrm_send_progress(struct nrm_context *ctxt, unsigned long progress)
return 0;
int nrm_invoke_power_policy(struct nrm_context *ctxt, int cpu, double
startCompute, double endCompute, double startBarrier, double
int nrm_send_phase_context(struct nrm_context *ctxt, int cpu, unsigned long
long int startCompute, unsigned long long int endCompute, unsigned
long long int startBarrier, unsigned long long int endBarrier)
char buf[512];
struct timespec now;
......@@ -83,7 +83,7 @@ int nrm_invoke_power_policy(struct nrm_context *ctxt, int cpu, double
snprintf(buf, 512, NRM_POWER_POLICY_FORMAT, cpu, startCompute,
snprintf(buf, 512, NRM_PHASE_CONTEXT_FORMAT, cpu, startCompute,
endCompute, startBarrier, endBarrier, ctxt->app_uuid);
int err = zmq_send(ctxt->socket, buf, strnlen(buf, 512), 0);
assert(err > 0);
......@@ -28,15 +28,15 @@ struct nrm_context {
#define NRM_START_FORMAT "{\"type\":\"application\", \"event\":\"start\", \"container\": \"%s\", \"uuid\": \"%s\", \"progress\": true, \"threads\": null}"
#define NRM_PROGRESS_FORMAT "{\"type\":\"application\", \"event\":\"progress\", \"payload\": \"%lu\", \"uuid\": \"%s\"}"
#define NRM_POWER_POLICY_FORMAT "{\"type\":\"application\", \"event\":\"power_policy\", \"cpu\": \"%d\", \"startcompute\": \"%lf\", \"endcompute\": \"%lf\", \"startbarrier\": \"%lf\", \"endbarrier\": \"%lf\", \"uuid\": \"%s\"}"
#define NRM_PHASE_CONTEXT_FORMAT "{\"type\":\"application\", \"event\":\"phase_context\", \"cpu\": \"%d\", \"startcompute\": \"%llu\", \"endcompute\": \"%llu\", \"startbarrier\": \"%llu\", \"endbarrier\": \"%llu\", \"uuid\": \"%s\"}"
#define NRM_EXIT_FORMAT "{\"type\":\"application\", \"event\":\"exit\", \"uuid\": \"%s\"}"
int nrm_init(struct nrm_context *, const char *);
int nrm_fini(struct nrm_context *);
int nrm_send_progress(struct nrm_context *, unsigned long);
int nrm_invoke_power_policy(struct nrm_context *ctxt, int cpu, double
startCompute, double endCompute, double startBarrier, double
int nrm_send_phase_context(struct nrm_context *ctxt, int cpu, unsigned long
long int startCompute, unsigned long long int endCompute, unsigned
long long int startBarrier, unsigned long long int endBarrier);
......@@ -143,37 +143,52 @@ class PerfWrapper(SpecField):
if not ret:
return ret
if self.enabled not in ["0", "False", "1", "True"]:
logger.error("Invalid value of perfwrapper enabled: %s",
logger.error("Invalid value for perfwrapper enabled: %s",
return False
return True
class PowerPolicy(SpecField):
class Power(SpecField):
"""Information on whether to use power policy for a container."""
"""Power settings for a container."""
policies = ['NONE', 'DDCM', 'DVFS', 'COMBINED']
fields = {"enabled": spec(unicode, False),
"policy": spec(unicode, False)
"profile": spec(unicode, False),
"policy": spec(unicode, False),
"damper": spec(unicode, False),
"slowdown": spec(unicode, False)
def __init__(self):
"""Create empty perf wrapper."""
"""Create empty power settings object."""
def load(self, data):
"""Load perf wrapper information."""
ret = super(PowerPolicy, self).load(data)
"""Load power settings."""
ret = super(Power, self).load(data)
if not ret:
return ret
if self.enabled not in ["0", "False", "1", "True"]:
logger.error("Invalid value of powerpolicy enabled: %s",
logger.error("Invalid value for power enabled: %s",
return False
if self.profile not in ["0", "False", "1", "True"]:
logger.error("Invalid value for power profile: %s",
return False
if self.policy not in self.policies:
logger.error("Invalid value of powerpolicy policy: %s",
logger.error("Invalid value for power policy: %s",
return False
if self.damper < 0.0:
logger.error("Invalid value for power policy damper: %s",
return False
if self.slowdown < 1.0:
logger.error("Invalid value for power policy slowdown: %s",
return False
return True
......@@ -186,7 +201,7 @@ class IsolatorList(SpecField):
types = {"argo/scheduler": spec(Scheduler, False),
"argo/container": spec(Container, True),
"argo/perfwrapper": spec(PerfWrapper, False),
"argo/powerpolicy": spec(PowerPolicy, False)
"argo/power": spec(Power, False),
def __init__(self):
......@@ -18,12 +18,13 @@ class Application(object):
'min_ask_i': {'done': 'stable', 'noop': 'noop'},
'noop': {}}
def __init__(self, uuid, container, progress, threads):
def __init__(self, uuid, container, progress, threads, phase_contexts):
self.uuid = uuid
self.container_uuid = container
self.progress = progress
self.threads = threads
self.thread_state = 'stable'
self.phase_contexts = phase_contexts
def do_thread_transition(self, event):
"""Update the thread fsm state."""
......@@ -58,6 +59,13 @@ class Application(object):
"""Update the progress tracking."""
assert self.progress
def update_phase_context(self, msg):
"""Update the phase contextual information."""
id = int(msg['cpu'])
self.phase_contexts[id] = {k: int(msg[k]) for k in ('aggregation',
'computetime', 'totaltime')}
self.phase_contexts[id]['set'] = True
class ApplicationManager(object):
......@@ -66,15 +74,24 @@ class ApplicationManager(object):
def __init__(self):
self.applications = dict()
def register(self, msg):
def register(self, msg, container):
"""Register a new downstream application."""
uuid = msg['uuid']
container = msg['container']
container_uuid = msg['container']
progress = msg['progress']
threads = msg['threads']
self.applications[uuid] = Application(uuid, container, progress,
phase_contexts = dict()
phase_context_keys = ['set', 'aggregation', 'computetime', 'totaltime']
if container.power['policy']:
ids = container.resources['cpus']
for id in ids:
phase_contexts[id] = dict.fromkeys(phase_context_keys)
phase_contexts[id]['set'] = False
phase_contexts = None
self.applications[uuid] = Application(uuid, container_uuid, progress,
threads, phase_contexts)
def delete(self, uuid):
"""Delete an application from the register."""
......@@ -7,7 +7,8 @@ import os
from subprograms import ChrtClient, NodeOSClient, resources
logger = logging.getLogger('nrm')
Container = namedtuple('Container', ['uuid', 'manifest', 'process'])
Container = namedtuple('Container', ['uuid', 'manifest', 'resources',
'power', 'process'])
class ContainerManager(object):
......@@ -50,15 +51,24 @@ class ContainerManager(object):
environ['AC_APP_NAME'] = manifest.name
environ['AC_METADATA_URL'] = "localhost"
logger.info("run: environ: %r", environ)
# TODO: Application library to load must be set during configuration
applicationpreloadlibrary = '.so'
# create container
container_name = request['uuid']
environ['ARGO_CONTAINER_UUID'] = container_name
logger.info("creating container %s", container_name)
self.nodeos.create(container_name, allocation)
logger.info("created container %s", container_name)
container_resources = dict()
container_resources['cpus'], container_resources['mems'] = allocation
# Container power settings
container_power = dict()
container_power['profile'] = None
container_power['policy'] = None
container_power['damper'] = None
container_power['slowdown'] = None
container_power['manager'] = None
# TODO: Application library to load must be set during configuration
applicationpreloadlibrary = '/home/sriduttb/shared/argo/libnrm/libmpi_nrm.so'
# run my command
if hasattr(manifest.app.isolators, 'scheduler'):
......@@ -76,18 +86,30 @@ class ContainerManager(object):
if manifest.app.isolators.perfwrapper.enabled in ["1", "True"]:
if hasattr(manifest.app.isolators, 'powerpolicy'):
if hasattr(manifest.app.isolators.powerpolicy, 'enabled'):
if manifest.app.isolators.powerpolicy.enabled in ["1", "True"]:
if manifest.app.isolators.powerpolicy.policy != "NONE":
environ['LD_PRELOAD'] = applicationpreloadlibrary
if hasattr(manifest.app.isolators, 'power'):
if hasattr(manifest.app.isolators.power, 'enabled'):
pp = manifest.app.isolators.power
if pp.enabled in ["1", "True"]:
if pp.profile in ["1", "True"]:
container_power['profile'] = dict()
container_power['profile']['start'] = dict()
container_power['profile']['end'] = dict()
if pp.policy != "NONE":
container_power['policy'] = pp.policy
container_power['damper'] = pp.damper
container_power['slowdown'] = pp.slowdown
environ['LD_PRELOAD'] = applicationpreloadlibrary
environ['NRM_TRANSMIT'] = "1"
environ['NRM_DAMPER'] = pp.damper
process = self.nodeos.execute(container_name, argv, environ)
c = Container(container_name, manifest, process)
c = Container(container_name, manifest, container_resources,
container_power, process)
self.pids[process.pid] = c
self.containers[container_name] = c
logger.info("Container %s created and running : %r", container_name, c)
return c
def delete(self, uuid):
......@@ -95,6 +117,12 @@ class ContainerManager(object):
self.nodeos.delete(uuid, kill=True)
c = self.containers[uuid]
# TODO: Need to check if this is the correct approach even with
# multiple containers
if c.power['policy']:
del os.environ['LD_PRELOAD']
del os.environ['NRM_TRANSMIT']
del os.environ['NRM_DAMPER']
del self.containers[uuid]
del self.pids[c.process.pid]
......@@ -117,3 +117,32 @@ class Controller(object):
def update(self, action, actuator):
"""Update tracking across the board to reflect the last action."""
def run_policy_container(self, container, application):
"""Run policies on a container."""
ids = container.resources['cpus']
pcs = application.phase_contexts
# Run policy only if all phase contexts have been received
if not filter(lambda i: not pcs[i]['set'], ids):
# Only run policy if all phase contexts are an
# aggregation of same number of phases
aggs = [pcs[i]['aggregation'] for i in ids]
if aggs.count(aggs[0]) == len(aggs):
if filter(lambda i: pcs[i]['set'], ids):
logger.debug("Phase context not reset %r", application)
for i in ids:
pcs[i]['set'] = False
def run_policy(self, containers):
"""Run policies on containers with policies set."""
for container in containers:
p = containers[container].power
if p['policy']:
apps = self.actuators[0].application_manager.applications
if apps:
app = next(apps[a] for a in apps if apps[a].container_uuid
== container)
self.run_policy_container(containers[container], app)
......@@ -3,6 +3,7 @@ from __future__ import print_function
from applications import ApplicationManager
from containers import ContainerManager
from controller import Controller, ApplicationActuator, PowerActuator
from powerpolicy import PowerPolicyManager
from functools import partial
import json
import logging
......@@ -12,14 +13,15 @@ from sensor import SensorManager
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
import sys
logger = logging.getLogger('nrm')
class Daemon(object):
def __init__(self):
def __init__(self, statsFlag=False):
self.target = 100.0
self.statsFlag = statsFlag
def do_downstream_receive(self, parts):
logger.info("receiving downstream message: %r", parts)
......@@ -34,7 +36,9 @@ class Daemon(object):
logger.error("wrong message format: %r", msg)
if event == 'start':
container_uuid = msg['container']
container = self.container_manager.containers[container_uuid]
self.application_manager.register(msg, container)
elif event == 'threads':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
......@@ -45,13 +49,19 @@ class Daemon(object):
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
elif event == 'power_policy':
elif event == 'phase_context':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
app = self.application_manager.applications[uuid]
# TODO: Invoke appropriate power policy
c = self.container_manager.containers[app.container_uuid]
if c.power['policy']:
# Run container policy
self.controller.run_policy_container(c, app)
elif event == 'exit':
uuid = msg['uuid']
if uuid in self.application_manager.applications:
logger.error("unknown event: %r", event)
......@@ -80,12 +90,23 @@ class Daemon(object):
logger.info("new container required: %r", msg)
container = self.container_manager.create(msg)
if container.power['policy']:
container.power['manager'] = PowerPolicyManager(
if container.power['profile']:
p = container.power['profile']
p['start'] = self.machine_info['energy']['energy']
p['start']['time'] = self.machine_info['time']
# TODO: obviously we need to send more info than that
update = {'type': 'container',
'event': 'start',
'uuid': container_uuid,
'errno': 0 if container else -1,
'pid': container.process.pid,
'power': container.power['policy']
# setup io callbacks
......@@ -132,11 +153,15 @@ class Daemon(object):
logger.info("sending sensor message: %r", msg)
def do_control(self):
plan = self.controller.planify(self.target, self.machine_info)
action, actuator = plan
if action:
self.controller.execute(action, actuator)
self.controller.update(action, actuator)
# plan = self.controller.planify(self.target, self.machine_info)
# action, actuator = plan
# if action:
# self.controller.execute(action, actuator)
# self.controller.update(action, actuator)
# Call policy only if there are containers
# if self.container_manager.containers:
# self.controller.run_policy(self.container_manager.containers)
def do_signal(self, signum, frame):
if signum == signal.SIGINT:
......@@ -162,12 +187,36 @@ class Daemon(object):
# check if this is an exit
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
container = self.container_manager.pids[pid]
p = container.power
if p['policy']:
msg = {'type': 'container',
'event': 'exit',
'status': status,
'uuid': container.uuid,
if p['profile']:
e = p['profile']['end']
self.machine_info = self.sensor_manager.do_update()
e = self.machine_info['energy']['energy']
e['time'] = self.machine_info['time']
s = p['profile']['start']
# Calculate difference between the values
diff = self.sensor_manager.calc_difference(s, e)
# Get final package temperature
temp = self.machine_info['temperature']
diff['temp'] = map(lambda k: temp[k]['pkg'], temp)
diff['policy'] = p['policy']
if p['policy']:
diff['damper'] = float(p['damper'])/1000000000
diff['slowdown'] = p['slowdown']
if self.statsFlag:
diff['policy_statistics'] = (
logger.info("Container %r profile data: %r",
container.uuid, diff)
msg['profile_data'] = diff
logger.debug("child update ignored")
......@@ -250,5 +299,8 @@ class Daemon(object):
def runner():
daemon = Daemon()
if len(sys.argv) > 1:
daemon = Daemon(True if sys.argv[1] == 'True' else False)
daemon = Daemon()
""" DDCMPolicy Module:
This module contains the Dynamic Duty Cycle Modulation (DDCM) based policy
aimed at mitigating workload imbalance in parallel applications that use
barrier synchronizations. It reduces duty cycle of cpus not on the critical
path of execution thereby reducing energy with little or no adverse impact
on performance.
This implementation specifically targets Intel architecture.
Please check your architecture specification for supported power control
mechanisms and other information.
Additional information:
1. Bhalachandra, Sridutt, Allan Porterfield, and Jan F. Prins. "Using
dynamic duty cycle modulation to improve energy efficiency in high
performance computing." In Parallel and Distributed Processing Symposium
Workshop (IPDPSW), 2015 IEEE International, pp. 911-918. IEEE, 2015.
2. Porterfield, Allan, Rob Fowler, Sridutt Bhalachandra, Barry Rountree,
Diptorup Deb, and Rob Lewis. "Application runtime variability and power
optimization for exascale computers." In Proceedings of the 5th
International Workshop on Runtime and Operating Systems for Supercomputers,
p. 3. ACM, 2015.
from __future__ import division
import math
import coolr
import coolr.dutycycle
class DDCMPolicy:
""" Contains cpu-specific DDCM based power policy """
def __init__(self, maxlevel=16, minlevel=1):
self.maxdclevel = maxlevel
self.mindclevel = minlevel
# Relaxation factor
self.relaxation = 1
self.ddcmpolicyset = 0
self.ddcmpolicyreset = 0
self.dc = coolr.dutycycle.DutyCycle()
def print_stats(self, resetflag=False):
ddcmstats = dict()
ddcmstats['DDCMPolicySets'] = self.ddcmpolicyset
ddcmstats['DDCMPolicyResets'] = self.ddcmpolicyreset
if resetflag:
self.ddcmpolicyset = 0
self.ddcmpolicyreset = 0
return ddcmstats
def execute(self, cpu, currentdclevel, computetime, totalphasetime):
# Compute work done by cpu during current phase
work = computetime / totalphasetime
# Compute effective work based on current duty cycle(dc) level
effectivework = work * self.maxdclevel / currentdclevel
# Compute effective slow down in current phase
effectiveslowdown = work * self.mindclevel / currentdclevel
# Decrease or keep constant dc level in the next phase if the effective
# work done is equal or less than 1.0
if effectivework <= 1.0:
self.ddcmpolicyset += 1
# Compute by how many levels dc needs to decrease
dcreduction = math.floor(effectivework / 0.0625) - 15
# Compute new dc level for next phase
if -14 < dcreduction < 0:
# Note that dcreduction is a negative value
newdclevel = currentdclevel + dcreduction + self.relaxation
elif dcreduction < -13:
# Empirical observation shows reducing dc below 18.75% leads to
# excessive slowdown
newdclevel = currentdclevel - 13
# If reduction required is 0
newdclevel = currentdclevel
# Check if new dc level computed is not less than whats permissible
if newdclevel < self.mindclevel:
newdclevel = self.maxdclevel
# If there was a slowdown in the last phase, then increase the duty
# cycle level corresponding to the slowdown
self.ddcmpolicyreset += 1
# Compute by how many levels dc needs to increase
dcincrease = math.floor(effectiveslowdown / 0.0625)
newdclevel = currentdclevel + dcincrease
# Check if new dc level computed is not greater than whats
# permissible
if newdclevel > self.maxdclevel:
newdclevel = self.maxdclevel
# Set the duty cycle of cpu to the new value computed
self.dc.set(cpu, newdclevel)
return newdclevel
""" Power Policy Module:
This module provides the interfaces that enable use of policies to control
processor power using controls available in the processor.
E.g. Dynamic Duty Cycle Modulation (DDCM), Dynamic Voltage
and Frequency Scaling (DVFS) and Power capping
The policies target problems like workload imbalance, memory saturation
seen very often in parallel applications.
To mitigate workload imbalance the policies adapt core frequencies to
workload characteristics through use of core-specific power controls.
The user can choose from three policies - DDCM, DVFS and a combination of
DVFS and DDCM to mitiage workload imbalance in parallel applications that
use barrier synchronizations.
The effective frequency of cpus not on the critical path of execution is
reduced thereby lowering energy with little or no adverse impact on
Additional information:
Bhalachandra, Sridutt, Allan Porterfield, Stephen L. Olivier, and Jan F.
Prins. "An adaptive core-specific runtime for energy efficiency." In 2017
IEEE International Parallel and Distributed Processing Symposium (IPDPS),
pp. 947-956. 2017.
Note: Power controls (DVFS, DDCM and power capping) needs to be enabled
before using these interfaces. Please check your architecture specification
for supported power contols and related information.
import ddcmpolicy
import logging
logger = logging.getLogger('nrm')
class PowerPolicyManager:
""" Used for power policy application """
def __init__(self, cpus=None, policy=None, damper=1000000000,
self.cpus = cpus
self.policy = policy
self.damper = damper
self.slowdown = slowdown
# Intiliaze all power interfaces
self.ddcmpolicy = ddcmpolicy.DDCMPolicy()
# Power levels
self.maxdclevel = self.ddcmpolicy.maxdclevel
# TODO: Need to set this value when DVFS policies are added
self.maxfreqlevel = -1
self.dclevel = dict.fromkeys(self.cpus, self.maxdclevel)
self.freqlevel = dict.fromkeys(self.cpus, self.maxfreqlevel)
# Book-keeping
self.damperexits = 0
self.slowdownexits = 0
self.prevtolalphasetime = 1000000000000000 # Any large value
def run_policy(self, phase_contexts):
# Run only if policy is specified
if self.policy:
for id in phase_contexts:
if id not in self.cpus:
logger.info("""Attempt to change power of cpu not in container
: %r""", id)
# Select and invoke appropriate power policy
# TODO: Need to add a better policy selection logic in addition
# to user specified using manifest file
ret, value = self.execute(id, **phase_contexts[id])
if self.policy == 'DDCM' and ret in ['DDCM', 'SLOWDOWN']:
self.dclevel[id] = value
phase_contexts[id]['set'] = False
def execute(self, cpu, **kwargs):
computetime = kwargs['computetime']
totalphasetime = kwargs['totaltime']
# If the current phase length is less than the damper value, then do
# not use policy. This avoids use of policy during startup operation
# insignificant phases
if totalphasetime < self.damper:
self.damperexits += 1
return 'DAMPER', -1
# Reset value for next phase
self.prevtolalphasetime = totalphasetime
# If the current phase has slowed down beyond the threshold set, then
# reset power. This helps correct error in policy application or acts
# as a rudimentary way to detect phase change
if(self.dclevel[cpu] < self.ddcmpolicy.maxdclevel and totalphasetime >
self.slowdown * self.prevtolalphasetime):
newdclevel = self.ddcmpolicy.maxdclevel
return 'SLOWDOWN', newdclevel
# Invoke the correct policy based on operation module
if self.policy == "DDCM":
newdclevel = self.ddcmpolicy.execute(cpu, self.dclevel[cpu],
computetime, totalphasetime)
# TODO: Add DVFS and Combined policies
return 'DDCM', newdclevel
def print_policy_stats(self, resetflag=False):
# Get statistics for policy run
ppstats = dict()
ppstats['PowerPolicyDamperExits'] = self.damperexits
ppstats['PowerPolicySlowdownExits'] = self.slowdownexits
if resetflag:
self.damperexits = 0
self.slowdownexits = 0
return ppstats
def power_reset(self, cpu):
# Reset all power controls
self.dclevel[cpu] = self.maxdclevel
def power_check(self, cpu):
# Check status of all power controls
return self.ddcmpolicy.dc.check(cpu)
def reset_all(self):
# Reset all cpus
for cpu in self.cpus:
......@@ -6,6 +6,7 @@
This module should be the only one interfacing with coolr.
from __future__ import print_function
import time
import coolr
import coolr.clr_rapl
import coolr.clr_hwmon
......@@ -33,6 +34,7 @@ class SensorManager:
machine_info = dict()
machine_info['energy'] = self.rapl.sample(accflag=True)
machine_info['temperature'] = self.coretemp.sample()
machine_info['time'] = time.time()
return machine_info
def get_powerlimits(self):
......@@ -42,3 +44,30 @@ class SensorManager:
def set_powerlimit(self, domain, value):
self.rapl.set_powerlimit(value, domain)
def calc_difference(self, start, end):
diff = dict()
# lengthen the shortened input dictionary keys
for k in start.keys():
if k not in ['time']:
start[k.replace('p', 'package-')] = start[k]
end[k.replace('p', 'package-')] = end[k]
# Calculate energy difference
diff['energy'] = self.rapl.diffenergy(start, end)
# Update time elapsed
diff['time'] = diff['energy']['time']
# Remove 'time' field returned by function
# Convert uJ to J
diff['energy'] = {k: diff['energy'][k]/(1000000.0) for k in
# Calculate power difference
diff['power'] = self.rapl.calcpower(start, end)
# Remove 'delta' field returned by function
return diff
......@@ -114,11 +114,13 @@ class NodeOSClient(object):
# to escape spaces from arguments before.
argv = [s.replace(' ', r'\ ') for s in argv]
cmd += " argv:'"+" ".join(argv)+"'"
env = ['{0}={1}'.format(envname, envval.replace(' ', r'\ '))
for envname, envval in environ.items()]
cmd += " env:'"+" ".join(env)+"'"
return process.Subprocess(args, stdin=process.Subprocess.STREAM,
class ChrtClient(object):