Commit 27d0aaf2 authored by Philip Carns's avatar Philip Carns
Browse files

Merge branch 'master' into 'master'

Add breadcrumb profiling logic

See merge request !18
parents ea6ff6b0 7fb90671
......@@ -3,6 +3,9 @@ ACLOCAL_AMFLAGS = -I m4
bin_PROGRAMS =
bin_SCRIPTS =
dist_bin_SCRIPTS = scripts/margo-gen-profile
dist_noinst_SCRIPTS = scripts/margo-gen-profile
noinst_PROGRAMS =
noinst_HEADERS =
TESTS =
......@@ -15,7 +18,8 @@ EXTRA_DIST =
BUILT_SOURCES =
include_HEADERS = \
include/margo.h \
include/margo-bulk-pool.h
include/margo-bulk-pool.h \
include/margo-diag.h
TESTS_ENVIRONMENT =
......
# Margo instrumentation
This file documents instrumentation capabilities that are built into the
This file documents instrumentation and profiling capabilities that are built into the
margo library. See the [top level README.md](../README.md) for general
information about margo.
Margo includes two forms of instrumentation. The first measures time spent
Margo includes two levels of instrumentation and profiling. The first (diagnostics) measures time spent
executing key Mercury functions within the communication progress
loop. The second measures time spent invoking remote procedure calls.
loop. The second (breadcrumb profiling) measures time spent invoking remote procedure calls.
## Usage
Both can be enabled at run time by calling the `margo_diag_start()` any
Diagnostics can be enabled in two ways:
* At program startup, using the env variable `MARGO_ENABLE_DIAGNOSTICS`.
* At run time by calling the `margo_diag_start()` any
time after `margo_init()` on the process that you wish to instrument.
Statistics from both can then be emitted at any time prior to
`margo_finalize()` by calling the `margo_diag_dump()` function.
The arguments to `margo_diag_dump()` are as follows:
Statistics from mercury diagnostics can then be emitted at any time prior to
`margo_finalize()` by calling the `margo_diag_dump()` function. Diagnostics
can be stopped by calling the `margo_diag_stop` on the process.
Similarly, profining can by enabled/disabled either by use of the environment
variable `MARGO_ENABLE_PROFILING` or by using the `margo_profile_start`/`margo_profile_stop`
functions. Statistics from profiling can be output by invoking the `margo_profile_dump`
on the process.
The arguments to `margo_diag_dump()` and `margo_profile_dump` are as follows:
* `mid`: the margo instance to retrieve instrumentation from
* `file`: name of the file to write the (text) data to. If the "-" string
is used, then data will be written to `STDOUT`.
* `uniquify`: flag indicating that the file name should be suffixed with
additional characters to make it unique from other diagnostic files emited
additional characters to make it unique from other files emited
on the same node.
## Output format
* Diagnostic files have the *.diag suffix for the file name, and
profile files have the *.csv suffix.
* If the environment variable is used to control diagnostics/profiling,
all the corresponding files have the default "profile" prefix to the name.
Additionally, the `uniqufy` flag is set, causing the generation of one profile
file for every margo process instance.
## Diagnostics Output Format
Example output from `margo_diag_dump()` will look like this for a given
processes:
```
# Margo diagnostics
# Wed Jul 31 11:15:13 2019
# Addr Hash and Address Name: 18446744035473536664, ofi+sockets://10.3.1.23:46282
# Tue Oct 8 20:20:08 2019
# RPC breadcrumbs for RPCs that were registered on this process:
# 0x5f22 data_xfer_read
# 0xa1ef delegator_read
# 0x5f22 data_xfer_read
# 0x9245 my_shutdown_rpc
# <stat> <avg> <min> <max> <count>
# Time consumed by HG_Trigger()
trigger_elapsed 0.000000036 0.000000238 0.000114679 3911094
# Time consumed by HG_Progress() when called with timeout==0
progress_elapsed_zero_timeout 0.000004716 0.000000238 0.016073227 3909480
# Time consumed by HG_Progress() when called with timeout!=0
progress_elapsed_nonzero_timeout 0.051754011 0.000023842 0.100308180 411
# Timeout values passed to HG_Progress()
progress_timeout_value 0.010511802 0.000000000 100.000000000 3909891
# RPC statistics
0x5f22 0xa1ef 0x0000 0x0000 0.001448274 0.001207113 0.007883787 100
# Function Name, Average Time Per Call, Cumulative Time, Highwatermark, Lowwatermark, Call Count
trigger_elapsed,0.000000047,2.650168180,0.000000238,0.010999918,56241640
progress_elapsed_zero_timeout,0.000000755,42.434520245,0.000000477,0.129006147,56173943
progress_elapsed_nonzero_timeout,0.000000000,0.000000000,0.000000000,0.000000000,0
```
Key components of the output are:
* A table of RPC names registered on that processes. Each has a 16 bit
hexadecimal identifier and a string name. There may be duplicates in the
table if the same RPC is registered more than once on the process.
* A set of statistics for Mercury functions used to drive communication and
* The assigned unique network address of the margo instance and its 64-bit hash.
* A set of basic statistics for Mercury functions used to drive communication and
completion project. There are counters and elapsed time measurements for
the `HG_Trigger()` function and the `HG_Progress()` function (when called with
or without a timeout value, as Margo varies its pollin strategy). There
is also a category that records statistics about the actual timeout values
used.
* A set of statistics for each RPC that was _issued_ by the process (in the
"RPC statistics" category at the end. Each RPC will be identified by a
set of up to 4 hexidecmial identifiers. The set of identifiers represents a
stack that shows the heritage of up to 4 chained RPCS that lead to this
* This file is intended to be read by the end-user directly.
## Profiling Output Format
```
90
18446744035473536664,ofi+sockets://10.3.1.23:46282
0xdea7,mobject_server_stat
0x9166,mobject_server_clean
0x8bbe,mobject_read_op
0x45b1,mobject_write_op
0x03cb,sdskv_migrate_database_rpc
0xf8d6,sdskv_migrate_all_keys_rpc
0x70d7,sdskv_migrate_keys_prefixed_rpc
0x49e1,sdskv_migrate_key_range_rpc
0x9ce7,sdskv_migrate_keys_rpc
0x2cb7,sdskv_list_keyvals_rpc
0x3598,sdskv_list_keys_rpc
0xf4dc,sdskv_bulk_get_rpc
0x0afa,sdskv_length_multi_rpc
0x5518,sdskv_length_rpc
0x1e99,sdskv_exists_rpc
0xc2bd,sdskv_erase_multi_rpc
0xcaf8,sdskv_erase_rpc
0x98d0,sdskv_get_multi_rpc
0x6488,sdskv_get_rpc
0x8cc0,sdskv_bulk_put_rpc
0xc083,sdskv_put_multi_rpc
0x9695,sdskv_put_rpc
0x4482,sdskv_list_databases_rpc
0x2154,sdskv_count_databases_rpc
0x89b3,sdskv_open_rpc
0xec2c,remi_migrate_end
0x3be8,bake_probe_rpc
0x098f,bake_persist_rpc
...
...
...
0x3be8 ,0.000013113,15336,18446744035473536664,1,0.000013113,0.000013113,0.000013113,1,18446744073709551615,0,0,18446744073709551615,0,0
0x3be8 ,1;0.000013113,1.000000000, 0;0.000000000,0.000000000, 1;0.000013113,1.000000000, 2;0.000000000,0.000000000, 3;0.000013113,1.000000000, 4
0x098f 0x45b1 ,0.011572483,1169230223,18446744035473536664,0,4.397543430,0.008075237,0.020334244,380,18446744073709551615,286331153,0,18446744073709551615,286331153,0
0x098f 0x45b1 ,0;0.000000000,0.000000000, 0;0.000000000,0.000000000, 1;0.000000000,0.000000000, 2;0.000000000,0.000000000, 3;0.000000000,0.000000000, 4
...
...
```
Key components of the output are:
* First line always contains the number of RPC names registered on that process.
* Second line is the assigned unique network address of the margo instance and its 64-bit hash.
* Next, a table of RPC names registered on that process. Each has a 16 bit
hexadecimal identifier and a string name. There may be duplicates in the
table if the same RPC is registered more than once on the process.
* A set of statistics for each RPC that was issued or received by that margo instance.
Each RPC will be identified by a set of up to 4 hexidecmial identifiers.
The set of identifiers represents a stack that shows the heritage of up to 4 chained RPCS that lead to this
measurement. Each identifier will match a name in the table at the top.
In the above example, only one RPC was issued by this
process: a "data_xfer_read" RPC that was issed as a side effect of a
"delegator_read" RPC.
In the above example, statistics are shown for two RPCs (among others in the profile): the "bake_probe_rpc"
was executed by this server margo process. Additionally, the "bake_persist_rpc" was issued by this server margo process
that was the side of receiving and executing a "mobject_write_op" request from a client.
* RPC calls made from the margo instance in question to different margo server instances, or issued by different margo client instances
to the margo instance in question are distinguished in the profile.
Additionally, the RPC statistic also indicates if it represents a client making a call or a server executing a call.
* Each RPC statistic is associated with 2 consecutive lines in the profile: One containing profiling statistics such as timing information,
and the other representing information to be used in the generation of a sparkline.
* The user must keep in mind that the profiles only represent the RPC statistics. This is not the same as request tracing.
## Implementation
## Generating a Profile and Topology Graph
While the diagnostic files are simple enough to be read and understood by the user,
the profile files, although readable, represent too much information to be processed
manually. Thus, we have developed a "profile generator" that reads the *.csv files
in the current working directory and generates a PDF file summarizing important
performance statistics that can help in detecting load imbalance among margo instances,
relative call-counts and calltimes for various RPC breadcrumbs, and so on.
## Future directions and use cases
In order to the generate the PDF summarizing performance, follow these steps:
* Enable profiling in your margo instances (easiest way is to use the `MARGO_ENABLE_PROFILING`
environment variable).
* Add the $MARGO_INSTALL/bin to your path, and run the MOCHI program.
* After the program executes, verify that the current directory contains the *.csv files.
* Invoke the ```margo-gen-profile``` program in the directory containing the *.csv.
This will generate a ```profile.pdf``` and a ```graph.gv``` file.
* The ```profile.pdf``` contains a list of graphs summarizing various performance statistics.
For example, the following is a graph representing the top-5 breadcrumbs
sorted by cumulative call times on the origin (client) and the target (server): ![](fig/profile.png)
* The ```graph.gv``` file is a ```graphViz``` file that represents a topology graph
of the MOCHI service setup. This is a visual representation of the RPC calls
and location of various margo processes mapped onto the physical system.
If the user has ```graphViz``` installed, a PNG file can be generated using
the command ```dot -Tpng graph.gv -o gg.png```, and the resulting image would
look like: ![](fig/gg.png)
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h> /* defines printf for tests */
#include <time.h> /* defines time_t for timings in the test */
#include <stdint.h> /* defines uint32_t etc */
#include <sys/param.h> /* attempt to define endianness */
#ifdef linux
# include <endian.h> /* attempt to define endianness */
#endif
#ifndef __MARGO_DIAG
#define __MARGO_DIAG
#ifdef __cplusplus
extern "C" {
#endif
#define GET_SELF_ADDR_STR(__mid, __addr_str) do { \
hg_addr_t __self_addr; \
hg_size_t __size; \
__addr_str = NULL; \
if (margo_addr_self(__mid, &__self_addr) != HG_SUCCESS) break; \
if (margo_addr_to_string(__mid, NULL, &__size, __self_addr) != HG_SUCCESS) { \
margo_addr_free(__mid, __self_addr); \
break; \
} \
if ((__addr_str = malloc(__size)) == NULL) { \
margo_addr_free(__mid, __self_addr); \
break; \
} \
if (margo_addr_to_string(__mid, __addr_str, &__size, __self_addr) != HG_SUCCESS) { \
free(__addr_str); \
__addr_str = NULL; \
margo_addr_free(__mid, __self_addr); \
break; \
} \
margo_addr_free(__mid, __self_addr); \
} while(0)
/******************************************************************************************************/
/* used to identify a globally unique breadcrumb */
struct global_breadcrumb_key
{
uint64_t rpc_breadcrumb; /* a.k.a RPC callpath */
uint64_t addr_hash; /* hash of server addr */
uint16_t provider_id; /* provider_id within a server. NOT a globally unique identifier */
};
enum breadcrumb_type
{
origin, target
};
typedef enum breadcrumb_type breadcrumb_type;
struct breadcrumb_stats
{
/* stats for breadcrumb call times */
double min;
double max;
double cumulative;
/* stats for RPC handler pool sizes */
/* Total pool size = Total number of runnable items + items waiting on a lock */
unsigned long abt_pool_total_size_lwm; /* low watermark */
unsigned long abt_pool_total_size_hwm; /* high watermark */
unsigned long abt_pool_total_size_cumulative;
unsigned long abt_pool_size_lwm; /* low watermark */
unsigned long abt_pool_size_hwm; /* high watermark */
unsigned long abt_pool_size_cumulative;
/* count of occurrences of breadcrumb */
unsigned long count;
};
typedef struct breadcrumb_stats breadcrumb_stats;
/* structure to store breadcrumb snapshot, for consumption outside of margo.
reflects the margo-internal structure used to hold diagnostic data */
struct margo_breadcrumb
{
breadcrumb_stats stats;
/* 0 is this is a origin-side breadcrumb, 1 if this is a target-side breadcrumb */
breadcrumb_type type;
struct global_breadcrumb_key key;
struct margo_breadcrumb* next;
};
/* snapshot contains linked list of breadcrumb data */
struct margo_breadcrumb_snapshot
{
struct margo_breadcrumb* ptr;
};
#ifdef __cplusplus
}
#endif
#endif /* __MARGO_DIAG */
......@@ -17,6 +17,8 @@ extern "C" {
#include <mercury_macros.h>
#include <abt.h>
#include "margo-diag.h"
/* determine how much of the Mercury ID space to use for Margo provider IDs */
#define __MARGO_PROVIDER_ID_SIZE (sizeof(hg_id_t)/4)
#define __MARGO_RPC_HASH_SIZE (__MARGO_PROVIDER_ID_SIZE * 3)
......@@ -41,6 +43,8 @@ typedef struct margo_request_struct* margo_request;
#define MARGO_MAX_PROVIDER_ID ((1 << (8*__MARGO_PROVIDER_ID_SIZE))-1)
#define MARGO_PARAM_PROGRESS_TIMEOUT_UB 1
#define MARGO_PARAM_ENABLE_PROFILING 2
#define MARGO_PARAM_ENABLE_DIAGNOSTICS 3
/**
* Initializes margo library.
......@@ -900,6 +904,30 @@ margo_instance_id margo_hg_info_get_instance(const struct hg_info *info);
*/
void margo_diag_start(margo_instance_id mid);
/**
* Enables profile data collection on specified Margo instance
*
* @param [in] mid Margo instance
* @returns void
*/
void margo_profile_start(margo_instance_id mid);
/**
* Disables diagnostic collection on specified Margo instance
*
* @param [in] mid Margo instance
* @returns void
*/
void margo_diag_stop(margo_instance_id mid);
/**
* Disables profile data collection on specified Margo instance
*
* @param [in] mid Margo instance
* @returns void
*/
void margo_profile_stop(margo_instance_id mid);
/**
* Appends diagnostic statistics (enabled via margo_diag_start()) to specified
* output file.
......@@ -912,6 +940,27 @@ void margo_diag_start(margo_instance_id mid);
*/
void margo_diag_dump(margo_instance_id mid, const char* file, int uniquify);
/**
* Appends profile statistics (enabled via margo_profile_start()) to specified
* output file.
*
* @param [in] mid Margo instance
* @param [in] file output file ("-" for stdout)
* @param [in] uniquify flag indicating if file name should have additional
* information added to it to make output from different processes unique
* @returns void
*/
void margo_profile_dump(margo_instance_id mid, const char* file, int uniquify);
/**
* Grabs a snapshot of the current state of diagnostics within the margo instance
*
* @param [in] mid Margo instance
* @param [out] snap Margo diagnostics snapshot
* @returns void
*/
void margo_breadcrumb_snapshot(margo_instance_id mid, struct margo_breadcrumb_snapshot* snap);
/**
* Sets configurable parameters/hints
*
......
#!/usr/bin/env python
# (C) 2015 The University of Chicago
#
# See COPYRIGHT in top-level directory.
#
import matplotlib
matplotlib.use("Agg")
import base64
import matplotlib.pyplot as plt; plt.rcdefaults()
import numpy as np
from matplotlib.backends.backend_pdf import PdfPages
import operator
import sys
import os
import glob, re
from collections import defaultdict, OrderedDict
class ProfileGenerator:
def __init__(self):
self.name = "MargoProfileGenerator"
self.cumulative = [dict(), dict()] #Separate dictionaries for origin and target in the form <KEY = RPC breadcrumb name, VALUE = cumulative time>
self.count = dict() #Dictionary in the form <KEY = RPC breadcrumb name, VALUE = cumulative count>
self.cumulativestat = [defaultdict(list), defaultdict(list)] #Separate dictionaries for origin and target in the form <KEY = RPC breadcrumb name, VALUE = [list of individual times] >
self.countstat = [defaultdict(list), defaultdict(list)] #Separate dictionaries for origin and target in the form <KEY = RPC breadcrumb name, VALUE = [list of individual counts] >
self.poolsizehwm = defaultdict(list) #RPC handler pool size low water mark on the target, in the form <KEY = RPC breadcrumb name, VALUE = [list of individual low watermarks]>
self.poolsizelwm = defaultdict(list) #RPC handler pool size high water mark on the target, in the form <KEY = RPC breadcrumb name, VALUE = [list of individual low watermarks]>
self.pooltotalsizehwm = defaultdict(list) #RPC handler pool total size low water mark on the target, in the form <KEY = RPC breadcrumb name, VALUE = [list of individual low watermarks]>
self.pooltotalsizelwm = defaultdict(list) #RPC handler pool total size high water mark on the target, in the form <KEY = RPC breadcrumb name, VALUE = [list of individual low watermarks]>
self.countsparklines = defaultdict(list) #Sparkline data for RPC breadcrumb counts, in the form <KEY = <RPC breadcrumb name, sparkline_index>, VALUE = [list of individual counts for target instances]>
self.timesparklines = [defaultdict(list), defaultdict(list)] #Sparkline data for RPC cumulative time, in the form <KEY = <RPC breadcrumb name, sparkline_index>, VALUE = [list of individual counts for origin or target instaces]>
self.nodetomid = defaultdict(list) #List of margo instances on a given node
self.midtoprovider = defaultdict(list) #List of registered provider types on a given margo instance
self.provider_id = dict() #Used to generate a unique global ID for every provider. This is necessary in order to properly generate the graphViz file
self.edgelistcount = dict() #Count information in the form <KEY = (source instance, source call, dest instance, destination call), VALYE = cumulative count >
self.edgelisttime = dict() #Time information in the form <KEY = (source instance, source call, dest instance, destination call), VALYE = cumulative time >
self.hash64tomid = dict() #Map a 64-bit hash to a margo instance address string
self.rpc2name = dict() #Map the RPC ID to a proper registered name
self.pp = PdfPages('profile.pdf')
# Takes in a list of hexadecimal RPC breadcrumbs and returns a list
# of breadcrumb names
def __getrpc2name(self, o):
output = []
for i in o:
l = list(i.split(' '))
tmp = ""
for j in (l[::-1])[1:]:
if tmp != '':
tmp = tmp+"->"+self.rpc2name.get(j, "UNKNOWN_RPC_ID")
else:
tmp = self.rpc2name.get(j, "UNKNOWN_RPC_ID")
output.append((re.sub("(->)", "\\1 ", tmp, 0, re.DOTALL)).strip('\n'))
return output
# Boilerplate for graph generation
# x_pos = positions on the x-axis
# perf_arr = performance numbers to plot
# is_stat_graph = whether this graph is a statistics graph or not
# objects = list of stuff to plot on the x-axis
def __gengraph(self, x_pos, perf_arr, xlabel, ylabel, title, txt=None, is_stat_graph=False, use_x_ticks=False, objects=None, labels=[None, None]):
fig = plt.figure(num=None, figsize=(15, 10), dpi=80, facecolor='w', edgecolor='k')
ax = fig.add_subplot(111)
width=0.35
if(is_stat_graph):
ax.boxplot(perf_arr)
ax.set_ylim(ymin=0)
else:
for i in range(0, len(perf_arr)):
ax.bar(x_pos+i*width, perf_arr[i], width=width, align='center', alpha=0.5, label=labels[i])
ax.legend(loc='best')
if(use_x_ticks):
ax.set_xticklabels(objects)
if(not is_stat_graph):
ax.set_xticks(x_pos+((width/2)*(len(perf_arr)-1)))
else:
plt.xticks([])
ax.set_xlabel(xlabel, fontsize=16)
ax.set_ylabel(ylabel, fontsize=16)
ax.set_title(txt)
fig.suptitle(title, fontsize=20, fontweight='bold')
fig.savefig(self.pp, format='pdf')
plt.close()
# Boilerplate for sparkline graph generation
# objects = list of stuff to plot on the x-axis
# num_subplots = number of sparklines to draw
# perf_arr = sparkline data to use for plotting
def __gensparklinegraph(self, objects, title, num_subplots, perf_arr, txt=None):
fig, ax = plt.subplots(num_subplots+1, figsize=(15,10))
plt.subplots_adjust(hspace=0.3)
fig.suptitle(title, fontsize=20, fontweight='bold')
ax[0].text(0, 0.5, txt, style='italic',
bbox={'facecolor': 'red', 'alpha': 0.5, 'pad': 10}, horizontalalignment='left')
ax[0].set_xticks([])
ax[0].set_yticks([])
for k1, v1 in ax[0].spines.items():
v1.set_visible(False)
for i in range(0, num_subplots):
v = perf_arr[i]
ax[i+1].plot(v)
for k1, v1 in ax[i+1].spines.items():
v1.set_visible(False)
ax[i+1].set_xticks([])
ax[i+1].set_yticks([])
ax[i+1].plot(len(v) - 1, v[len(v) - 1], 'r.')
ax[i+1].fill_between(range(len(v)), v, len(v)*[min(v)], alpha=0.1)
ax[i+1].set_title((objects[i]).strip('\n'), fontsize=8)
plt.savefig(self.pp, format='pdf')
plt.close()
#Add to the edgelist that collects count information in the form <KEY = (source instance, source call, dest instance, destination call), VALYE = cumulative count >
#Add to the edgelist that collects time information in the form <KEY = (source instance, source call, dest instance, destination call), VALUE = cumulative time >
def __addtoedgelist(self, name, mid, addr_hash, cumulative, count):
raw_rpc_calls = list(name.split(" "))
del raw_rpc_calls[len(raw_rpc_calls) - 1]
if(len(raw_rpc_calls) > 1):
self.edgelistcount[(mid, raw_rpc_calls[1], addr_hash, raw_rpc_calls[0])] = self.edgelistcount.get((mid, raw_rpc_calls[1], addr_hash, raw_rpc_calls[0]), 0) + int(count)
self.edgelisttime[(mid, raw_rpc_calls[1], addr_hash, raw_rpc_calls[0])] = self.edgelisttime.get((mid, raw_rpc_calls[1], addr_hash, raw_rpc_calls[0]), 0) + float(cumulative)
else:
self.edgelistcount[(mid, raw_rpc_calls[0], addr_hash, raw_rpc_calls[0])] = self.edgelistcount.get((mid, raw_rpc_calls[0], addr_hash, raw_rpc_calls[0]), 0) + int(count)
self.edgelisttime[(mid, raw_rpc_calls[0], addr_hash, raw_rpc_calls[0])] = self.edgelisttime.get((mid, raw_rpc_calls[0], addr_hash, raw_rpc_calls[0]), 0) + float(cumulative)
#Generate the nodes of the graph
#Grey: Nodes
#Blue: Margo instances
#White: Providers
def __gengraphnodes(self):
node_string = "/* First describe and generate the nodes and entities in the graph */\n"
for (node, midlist) in self.nodetomid.items():
node_string += ("subgraph cluster" + node + " {\n")
node_string += ("node [style=filled, color=white];\n style=filled;\n color=lightgrey;\n")
for mid in midlist:
node_string += ("subgraph cluster" + mid.replace("+", "_").replace(":", "_").replace(".","_").replace("//", "_").replace("/", "_") + " {\n")
node_string += ("node [style=filled];\n color=lightblue;\n")
for (provider_type, provider_id) in self.midtoprovider[mid]:
node_string += (str(provider_id) + ";\n")
node_string += ("label = \"" + mid + "\";\n")
node_string += ("}\n")
node_string += ("label = \"" + node + "\";\n")
node_string += ("}\n")
return node_string
#Generate weighted edges of the form (cumulative time, cumulative count)
#(source_provider_id (unique), source_call, destination_provider_id (globally unique), destination_call)
#IMPT: We derive the "provider type" from the RPC call by assuming that every RPC call has the form <provider_type>_<RPC_call>
#For instance, the call sdskv_get_keys has the provider type as "sdskv" and RPC call as "get_keys"
def __gengraphedges(self):
edge_string = "/* Generate a list of weighted edges for the graph in the form (count, time) */\n"
for edge_info, count in self.edgelistcount.items():
cumulative_time = self.edgelisttime[edge_info]
source_provider_id = ""
dest_provider_id = ""
#IMPT: We only have source and destination margo instances. We need to derive the id's for the providers inside the margo instance that made
#the call.
#Assumption: We make an assumption that there is only one provider of a given type inside a margo instance. If there are multiple, then
#the edge represents the cumulative counts or time values for all providers of a given type.
(source, source_call, destination, dest_call) = edge_info
destination = self.hash64tomid[destination]
source_provider = ((self.rpc2name.get(str(source_call), "UNKNOWN")).split("_"))[0]
dest_provider = ((self.rpc2name.get(str(dest_call), "UNKNOWN")).split("_"))[0]
if source_provider != "UNKNOWN" and dest_provider != "UNKNOWN":
for (provider_type, provider_id) in self.midtoprovider[source]:
if source_provider == provider_type:
source_provider_id = provider_id
for (provider_type, provider_id) in self.midtoprovider[destination]:
if dest_provider == provider_type:
dest_provider_id = provider_id
dest_call = self.rpc2name[str(dest_call)]
edge_string += (str(source_provider_id) + " -> " + str(dest_provider_id) + "[label=\""+dest_call+","+str(count)+","+str(cumulative_time)+"\", weight=\""+str(dest_call)+","+str(count)+","+str(cumulative_time)+"\"];\n")
return edge_string
# Read the current working directory for profile*.csv files and populate the relevant data-structures
# Profile files are expected to be in the following format:
# N = num RPC's registered on this instance
# Followed by N lines of <RPC ID>,<RPC NAME>
# 3 lines for Margo internal routines: trigger elapsed, progress_elapsed_zero_timeout, progress_elapsed_nonzero_timeout
# Followed by actual breadcrumb data in the form <name, avg, rpc_breadcrumb, addr_hash, origin_or_target, cumulative, _min, _max, count, handler_max, handler_min, handler_cumulative>
def readfiles(self):
files = glob.glob(str(os.getcwd())+"/*.csv") #Read all *.csv files in CURRENT_WORKING_DIRECTORY
for f in files:
f1 = open(f, "r")
contents = f1.readlines()
num_registered_rpcs = int(contents[0]) #First line is always number of RPC's registered with the margo instance generating this particular profile file
self_hash, mid = ((str(contents[1])).strip("\n")).split(',') #Second line is always the margo instance network name, succeeding the hash of this name
self.hash64tomid[long (self_hash)] = mid
node = (((os.path.basename(f1.name)).split('-')[1]).replace(".", ""))
self.nodetomid[node].append(mid) #Collect list of margo instances on this node
if num_registered_rpcs > 0: