Commit 86409f88 authored by Swann Perarnau's avatar Swann Perarnau

[refactor] Python rewrite of the software

We chose to rewrite the entire thing in python. The language should make
it easy to interact will all the moving parts of the Argo landscape, and
easy to prototype various control schemes.

The communication protocol is exactly the same, but implemented with
ZeroMQ + tornado.

Power readings are not integrated yet, we are targeting using the Coolr
project for that.

This is a rough draft, all the code is in binary scripts instead of
the package, and there are no unit tests. Nevertheless, it should be
a decent starting point for future development.
parent ccb1cd16
include Makefile
include tox.ini
PYTHON:= $(shell which python2)
# the compiler: gcc for C program, define as g++ for C++
CC = gcc
# compiler flags:
CFLAGS = -g -I/nfs/beacon_inst/include -I.
LDFLAGS = -lm -lbeacon -lpthread
# the build target executable:
TARGET = powPerfController
powMon = RaplPowerMon
beacon = beacon_nrm
RM = rm
all: $(powMon) $(TARGET)
$(powMon): $(powMon).c
$(CC) -o $(powMon) $(powMon).c $(LDFLAGS)
$(TARGET): $(TARGET).c $(beacon).c
$(CC) $(CFLAGS) $(TARGET).c $(beacon).c -o $(TARGET) $(LDFLAGS)
clean:
$(RM) -f $(TARGET) $(powMon)
source:
$(PYTHON) setup.py sdist
install:
$(PYTHON) setup.py install --force
check:
tox
How to run:
1) $ make clean; make
2) $ source /nfs/beacon_inst/env.sh; ./powPerfController 1234
3)in another shell:
$ /nfs/argobots-tascel/argobots-review/examples/dynamic-es/dyn_app 48 1000 localhost 1234
4)in another shell:
$ source /nfs/beacon_inst/env.sh
$ arbitrary_pub BEACON_BROADCAST "message type=2 ; node=frontend ; target watts=190”
sdasd
# Argo Node Resource Manager
Resource management daemon using communication with clients to control
power usage of application.
This is a python rewrite of the original code developed for the Argo project
two years ago.
## Additional Info
| **Systemwide Power Management with Argo**
| Dan Ellsworth, Tapasya Patki, Swann Perarnau, Pete Beckman *et al*
| In *High-Performance, Power-Aware Computing (HPPAC)*, 2016.
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <inttypes.h>
#include <unistd.h>
#include <math.h>
#include <time.h>
#include <string.h>
//#include <asm/msr.h>
#define MSR_RAPL_POWER_UNIT 0x606
/*
* Platform specific RAPL Domains.
* Note that PP1 RAPL Domain is supported on 062A only
* And DRAM RAPL Domain is supported on 062D only
*/
/* Package RAPL Domain */
#define MSR_PKG_RAPL_POWER_LIMIT 0x610
#define MSR_PKG_ENERGY_STATUS 0x611
#define MSR_PKG_PERF_STATUS 0x613
#define MSR_PKG_POWER_INFO 0x614
/* PP0 RAPL Domain */
#define MSR_PP0_POWER_LIMIT 0x638
#define MSR_PP0_ENERGY_STATUS 0x639
#define MSR_PP0_POLICY 0x63A
#define MSR_PP0_PERF_STATUS 0x63B
/* PP1 RAPL Domain, may reflect to uncore devices */
#define MSR_PP1_POWER_LIMIT 0x640
#define MSR_PP1_ENERGY_STATUS 0x641
#define MSR_PP1_POLICY 0x642
/* DRAM RAPL Domain */
#define MSR_DRAM_POWER_LIMIT 0x618
#define MSR_DRAM_ENERGY_STATUS 0x619
#define MSR_DRAM_PERF_STATUS 0x61B
#define MSR_DRAM_POWER_INFO 0x61C
/* RAPL UNIT BITMASK */
#define POWER_UNIT_OFFSET 0
#define POWER_UNIT_MASK 0x0F
#define ENERGY_UNIT_OFFSET 0x08
#define ENERGY_UNIT_MASK 0x1F00
#define TIME_UNIT_OFFSET 0x10
#define TIME_UNIT_MASK 0xF000
#define PKG_POWER_LIMIT_LOCK_OFFSET 0x3F
#define PKG_POWER_LIMIT_LOCK_MASK 0x1
#define ENABLE_LIMIT_2_OFFSET 0x2F
#define PKG_POWER_LIMIT_2_MASK 0x7FFF
#define ENABLE_LIMIT_1_OFFSET 0xF
#define ENABLE_LIMIT_1_MASK 0x1
#define PKG_CLAMPING_LIMIT_1_OFFSET 0x10
#define PKG_CLAMPING_LIMIT_1_MASK 0x1
#define PKG_POWER_LIMIT_1_OFFSET 0x0
#define PKG_POWER_LIMIT_1_MASK 0x7FFF
#define TIME_WINDOW_POWER_LIMIT_1_OFFSET 0x11
#define TIME_WINDOW_POWER_LIMIT_1_MASK 0x7F
#define TIME_WINDOW_POWER_LIMIT_2_OFFSET 0x31
#define TIME_WINDOW_POWER_LIMIT_2_MASK 0x7F
int open_msr(int core) {
char msr_filename[BUFSIZ];
int fd;
sprintf(msr_filename, "/dev/cpu/%d/msr", core);
fd = open(msr_filename, O_RDWR);
if ( fd < 0 ) {
if ( errno == ENXIO ) {
fprintf(stderr, "rdmsr: No CPU %d\n", core);
exit(2);
} else if ( errno == EIO ) {
fprintf(stderr, "rdmsr: CPU %d doesn't support MSRs\n", core);
exit(3);
} else {
perror("rdmsr:open");
fprintf(stderr,"Trying to open %s\n",msr_filename);
exit(127);
}
}
return fd;
}
long long read_msr(int fd, int which) {
uint64_t data;
if ( pread(fd, &data, sizeof data, which) != sizeof data ) {
perror("rdmsr:pread");
exit(127);
}
return (long long)data;
}
void write_msr(int fd, int which, uint64_t data) {
if ( pwrite(fd, &data, sizeof data, which) != sizeof data ) {
perror("wrmsr:pwrite");
exit(127);
}
}
int32_t wrmsr(int fd, uint64_t msr_number, uint64_t value)
{
return pwrite(fd, (const void *)&value, sizeof(uint64_t), msr_number);
}
int32_t rdmsr(int fd, uint64_t msr_number, uint64_t * value)
{
return pread(fd, (void *)value, sizeof(uint64_t), msr_number);
}
void set_power_limit(int fd, int watts, double pu)
{
/*
uint32_t setpoint = (uint32_t) ((1 << pu) * watts);
uint64_t reg = 0;
rdmsr(fd, MSR_PKG_RAPL_POWER_LIMIT, &reg);
reg = (reg & 0xFFFFFFFFFFFF0000) | setpoint | 0x8000;
reg = (reg & 0xFFFFFFFF0000FFFF) | 0xD0000;
wrmsr(fd, MSR_PKG_RAPL_POWER_LIMIT, reg);
*/
uint32_t setpoint = (uint32_t) (watts/pu);
uint64_t reg = 0;
rdmsr(fd, MSR_PKG_RAPL_POWER_LIMIT, &reg);
reg = (reg & 0xFFFFFFFFFFFF0000) | setpoint | 0x8000;
reg = (reg & 0xFFFFFFFF0000FFFF) | 0xD0000;
wrmsr(fd, MSR_PKG_RAPL_POWER_LIMIT, reg);
}
#define CPU_SANDYBRIDGE 42
#define CPU_SANDYBRIDGE_EP 45
#define CPU_IVYBRIDGE 58
#define CPU_IVYBRIDGE_EP 62
int main( int argc, char **argv ) {
int fd1,fd2;
long long result1, result2;
double power_units,energy_units,time_units;
double package1_before,package1_after,package2_before,package2_after;
double pp0_before,pp0_after;
double pp1_before=0.0,pp1_after;
double dram_before=0.0,dram_after;
double thermal_spec_power,minimum_power,maximum_power,time_window;
int cpu_model;
struct timeval currentime1,currentime2,beginningtime;
struct timespec interval_1s,interval_1ms,interval_10ms, interval_100ms, interval_500ms;
long double nowtime, power1, power2;
float total_power;
// char *filename;
//char *W = "W";
//FILE *file;
gettimeofday(&beginningtime,NULL);
interval_500ms.tv_sec = 0;
interval_500ms.tv_nsec = 500000000;
interval_100ms.tv_sec = 0;
interval_100ms.tv_nsec = 100000000;
interval_1s.tv_sec = 1;
interval_1s.tv_nsec = 0;
interval_1ms.tv_sec = 0;
interval_1ms.tv_nsec = 1000000;
interval_10ms.tv_sec = 0;
interval_10ms.tv_nsec = 10000000;
const char *powercap = argv[1];
//char filename[20] = "PowerResultsOn";
// strcat(filename, powercap);
// strcat(filename, "W");
// printf("filename = %s",filename);
// file = fopen(filename,"a");
fd1=open_msr(0);
fd2=open_msr(10);
/* Calculate the units used */
result1=read_msr(fd1,MSR_RAPL_POWER_UNIT);
power_units=pow(0.5,(double)(result1&0xf));
energy_units=pow(0.5,(double)((result1>>8)&0x1f));
time_units=pow(0.5,(double)((result1>>16)&0xf));
uint64_t currentval, newval, mask = 0, offset = 0;
currentval=read_msr(fd1,MSR_PKG_RAPL_POWER_LIMIT);
currentval=read_msr(fd2,MSR_PKG_RAPL_POWER_LIMIT);
int i;
for (i= 0; i<1; i++){
result1=read_msr(fd1,MSR_PKG_ENERGY_STATUS);
result2=read_msr(fd2,MSR_PKG_ENERGY_STATUS);
package1_before=(double)result1*energy_units;
package2_before=(double)result2*energy_units;
gettimeofday(&currentime1, NULL);
//printf("\nSleeping 1 second\n\n");
nanosleep(&interval_100ms, NULL);
result1=read_msr(fd1,MSR_PKG_ENERGY_STATUS);
result2=read_msr(fd2,MSR_PKG_ENERGY_STATUS);
gettimeofday(&currentime2, NULL);
package1_after=(double)result1*energy_units;
package2_after=(double)result2*energy_units;
nowtime =((long) ((currentime2.tv_usec - beginningtime.tv_usec) + (currentime2.tv_sec - beginningtime.tv_sec)* 1000000))/1000000.000000;
power1 =((package1_after - package1_before) /((currentime2.tv_usec - currentime1.tv_usec) + (currentime2.tv_sec - currentime1.tv_sec)*1000000))*1000000;
power2 =((package2_after - package2_before) /((currentime2.tv_usec - currentime1.tv_usec) + (currentime2.tv_sec - currentime1.tv_sec)*1000000))*1000000;
//fprintf(file,"%LF %LF %LF\n", nowtime, power1, power2);
total_power = (float)power1 +(float)power2;
}
printf("%f\n", total_power);
return 1;
}
#include <stdio.h>
#include <stdlib.h>
#include <beacon.h>
#include <string.h>
#include <powPerfController.h>
/* Baseline example to receive ERM power settings in the NRM */
// We need the hostname for the message filter in my_beacon_handler
// but would like to set it only once when the callback is set
// via set_nrm_power_target
static char hostname[100];
// We need a function pointer for the function to apply each time a
// new setting is received. set_nrm_power_target will set the value
// and my_beacon_handler() will use it
void (*target_handler)(double watts);
void set_nrm_power_target(void (*handler)(double watts));
/* BEACON boilerplate */
static int SET_NODE_E=2;
BEACON_beep_t binfo;
BEACON_beep_handle_t handle;
BEACON_subscribe_handle_t shandle1;
BEACON_topic_info_t *topic_info;
BEACON_topic_properties_t *eprop;
char data_buf[100];
char beep_name[100];
char filter_string[100];
char topic_string[32];
int BEACON_bcast_init() {
eprop = (BEACON_topic_properties_t *) malloc(sizeof(BEACON_topic_properties_t));
topic_info = (BEACON_topic_info_t *)malloc(sizeof(BEACON_topic_info_t));
if(topic_info == NULL) {
fprintf(stderr, "Malloc error!\n");
exit(0);
}
strcpy(topic_info[0].topic_name, "BEACON_BROADCAST");
sprintf(topic_info[0].severity, "INFO");
printf("The %d topic is %s\n", 0, topic_info[0].topic_name);
memset(&binfo, 0, sizeof(binfo));
strcpy(binfo.beep_version, "1.0");
strcpy(binfo.beep_name, "beacon_test");
int ret = BEACON_Connect(&binfo, &handle);
if (ret != BEACON_SUCCESS) {
printf("BEACON_Connect is not successful ret=%d\n", ret);
exit(-1);
}
strcpy(eprop->topic_scope, "global");
return 1;
}
int is_SET_NODE(char* message, char* node, double* watts) {
int mtype;
int rc = sscanf(message, "message type=%d;", &mtype);
if(rc!=1) {
return 0;
}
if(mtype!=SET_NODE_E) {
return 0;
}
rc = sscanf(message, "message type=%d ; node=%s ; target watts=%lf",&mtype, node, watts);
if(rc!=3) {
printf("wrong arg count %d\n",rc);
return 0;
}
return 1;
}
pthread_t poll_thread;
void* poll_logic(void* args) {
void (*handler)(BEACON_receive_topic_t* caught_topic) = (void (*)(BEACON_receive_topic_t*))args;
while(1) {
BEACON_receive_topic_t caught_topic;
int ret = BEACON_Wait_topic(shandle1, &caught_topic, 5);
if (ret != BEACON_SUCCESS) {
continue;
}
handler(&caught_topic);
}
}
int BEACON_bcast_subscribe(void (*handler)()) {
char* caddr = getenv("BEACON_TOPOLOGY_SERVER_ADDR");
sprintf(filter_string, "cluster_addr=%s,cluster_port=10809,topic_scope=global,topic_name=%s", caddr, topic_info[0].topic_name);
int ret = BEACON_Subscribe(&shandle1, handle, 0, filter_string, NULL);
pthread_create(&poll_thread, NULL, poll_logic, handler);
}
// The callback to send with the subscription
void my_beacon_handler(BEACON_receive_topic_t* topic) {
char node[100];
double watts;
// parse string store parts in enclave and delta
if(is_SET_NODE(topic->topic_payload, node, &watts)) {
if(strcmp(node,hostname)==0) {
target_handler(watts);
}
}
}
/* End boilerplate */
// Test handler to show that we can receive readings
//void test_handler(double watts) {
// printf("got %lf watts\n",watts);
//}
// The function that connects to BEACON and sets up the message handling
void set_nrm_power_target(void (*handler)(double watts)) {
gethostname(hostname,100);
BEACON_bcast_init();
target_handler=handler;
BEACON_bcast_subscribe(my_beacon_handler);
}
// A simple main function that runs the process for about a minute
//int main(int argc, char** argv) {
// set_nrm_power_target(test_handler);
// sleep(30);
//}
#!/usr/bin/env python2
from __future__ import print_function
import argparse
import logging
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
class Client(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.buf = ''
self.nt = 16
self.max = 32
self.server = None
def setup_shutdown(self):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
def get_server_message(self):
buf = self.buf
begin = 0
ret = ''
while begin < len(buf):
if buf[begin] in ['d', 'i', 'n', 'q']:
ret = buf[begin]
off = 1
else:
break
begin = begin + off
yield ret
self.buf = buf[begin:]
return
def do_receive(self, parts):
self.logger.info("receive stream: " + repr(parts))
if len(parts[1]) == 0:
if self.server:
# server disconnect, lets quit
self.setup_shutdown()
return
else:
self.server = parts[0]
self.buf = self.buf + parts[1]
for m in self.get_server_message():
self.logger.info(m)
if m == 'd':
if self.nt == 1:
ret = "min"
else:
self.nt -= 1
ret = "done (%d)" % self.nt
elif m == 'i':
if self.nt == self.max:
ret = "max"
else:
self.nt += 1
ret = "done (%d)" % self.nt
elif m == 'n':
ret = "%d" % self.nt
elif m == 'q':
ret = ''
self.setup_shutdown()
self.stream.send(self.server, zmq.SNDMORE)
self.stream.send(ret)
def do_signal(self, signum, frame):
self.logger.critical("received signal: " + repr(signum))
self.setup_shutdown()
def do_shutdown(self):
ioloop.IOLoop.current().stop()
def main(self):
# command line options
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose",
help="verbose logging information",
action='store_true')
parser.add_argument("threads", help="starting number of threads",
type=int, default=16)
parser.add_argument("maxthreads", help="max number of threads",
type=int, default=32)
args = parser.parse_args()
# deal with logging
if args.verbose:
self.logger.setLevel(logging.DEBUG)
self.nt = args.threads
self.max = args.maxthreads
# read env variables for connection
connect_addr = "localhost"
connect_port = 1234
connect_param = "tcp://%s:%d" % (connect_addr, connect_port)
# create connection
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.connect(connect_param)
self.logger.info("connected to: " + connect_param)
self.stream = zmqstream.ZMQStream(socket)
self.stream.on_recv(self.do_receive)
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
ioloop.IOLoop.current().start()
if __name__ == "__main__":
ioloop.install()
logging.basicConfig(level=logging.INFO)
client = Client()
client.main()
#!/usr/bin/env python2
from __future__ import print_function
import logging
import random
import re
import signal
import zmq
from zmq.eventloop import ioloop, zmqstream
client_fsm_table = {'stable': {'i': 's_ask_i', 'd': 's_ask_d'},
's_ask_i': {'done': 'stable', 'max': 'max'},
's_ask_d': {'done': 'stable', 'min': 'min'},
'max': {'d': 'max_ask_d'},
'min': {'i': 'min_ask_i'},
'max_ask_d': {'done': 'stable', 'min': 'nop'},
'min_ask_i': {'done': 'stable', 'max': 'nop'},
'nop': {}}
class Client(object):
def __init__(self, identity):
self.identity = identity
self.buf = ''
self.state = 'stable'
def append_buffer(self, msg):
self.buf = self.buf + msg
def do_transition(self, msg):
transitions = client_fsm_table[self.state]
if msg in transitions:
self.state = transitions[msg]
else:
pass
def get_allowed_requests(self):
return client_fsm_table[self.state].keys()
def get_messages(self):
buf = self.buf
begin = 0
off = 0
ret = ''
while begin < len(buf):
if buf.startswith('min', begin):
ret = 'min'
off = len(ret)
elif buf.startswith('max', begin):
ret = 'max'
off = len(ret)
elif buf.startswith('done (', begin):
n = re.split("done \((\d+)\)", buf[begin:])[1]
ret = 'done'
off = len('done ()') + len(n)
else:
m = re.match("\d+", buf[begin:])
if m:
ret = 'ok'
off = m.end()
else:
break
begin = begin + off
yield ret
self.buf = buf[begin:]
return
class Daemon(object):
def __init__(self):
self.clients = {}
self.buf = ''
self.logger = logging.getLogger(__name__)
self.current = 1
self.target = 1
def do_client_receive(self, parts):
self.logger.info("receiving client stream: " + repr(parts))
identity = parts[0]
if len(parts[1]) == 0:
# empty frame, indicate connect/disconnect
if identity in self.clients:
self.logger.info("known client disconnected")
del self.clients[identity]
else:
self.logger.info("new client: " + repr(identity))
self.clients[identity] = Client(identity)
else:
if identity in self.clients:
client = self.clients[identity]
# we need to unpack the stream into client messages
# messages can be: min, max, done (%d), %d
client.append_buffer(parts[1])
for m in client.get_messages():
client.do_transition(m)
self.logger.info("client now in state: " + client.state)
def do_sensor(self):
self.current = random.randrange(0, 34)
self.logger.info("current measure: " + str(self.current))
def do_control(self):
self.target = random.randrange(0, 34)
self.logger.info("target measure: " + str(self.target))
for identity, client in self.clients.iteritems():
if self.current < self.target:
if 'i' in client.get_allowed_requests():
self.stream.send_multipart([identity, 'i'])
client.do_transition('i')
elif self.current > self.target:
if 'd' in client.get_allowed_requests():
self.stream.send_multipart([identity, 'd'])
client.do_transition('d')
else:
pass
self.logger.info("client now in state: " + client.state)
def do_signal(self, signum, frame):
ioloop.IOLoop.current().add_callback_from_signal(self.do_shutdown)
def do_shutdown(self):
ioloop.IOLoop.current().stop()
def main(self):
# read config
bind_port = 1234
bind_address = '*'
# setup listening socket
context = zmq.Context()
socket = context.socket(zmq.STREAM)
bind_param = "tcp://%s:%d" % (bind_address, bind_port)
socket.bind(bind_param)
self.logger.info("socket bound to: " + bind_param)
self.stream = zmqstream.ZMQStream(socket)
self.stream.on_recv(self.do_client_receive)
self.sensor = ioloop.PeriodicCallback(self.do_sensor, 1000)
self.sensor.start()
self.control = ioloop.PeriodicCallback(self.do_control, 1000)
self.control.start()
# take care of signals
signal.signal(signal.SIGINT, self.do_signal)
ioloop.IOLoop.current().start()
if __name__ == "__main__":
ioloop.install()
logging.basicConfig(level=logging.DEBUG)
daemon = Daemon()