Commit cdfe3c19 authored by Shane Snyder's avatar Shane Snyder

Minor enhancement to the darshan i/o workload API

Improve the performance of the darshan i/o workload by converting the
linked list of per-rank contexts to a hash table.
parent a1e0df0a
......@@ -12,19 +12,26 @@
#include <assert.h>
#include <sys/stat.h>
#include "ross.h"
#include "codes/codes-workload.h"
#include "codes-workload-method.h"
#include "darshan-io-events.h"
#include "codes/quickhash.h"
#define DARSHAN_MAX_EVENT_READ_COUNT 100
#define DARSHAN_NEGLIGIBLE_DELAY .001
#define RANK_HASH_TABLE_SIZE 397
/* structure for storing all context needed to retrieve events for this rank */
/* TODO: if many ross LPs are mapped to this PE, then memory usage is going to increase
* linearly, obviously due to the static events arrays used here to store darshan events.
* This really needs to be fixed by coming up with a more intelligent way of representing
* this workload (i.e. no intermediary format). Until then, you can lower the
* DARSHAN_MAX_EVENT_READ_COUNT to as low as 1 to save memory but restrict performance.
*/
struct rank_events_context
{
int rank;
struct rank_events_context *next;
int ind_file_fd;
int64_t ind_event_cnt;
int coll_file_fd;
......@@ -36,6 +43,7 @@ struct rank_events_context
int coll_list_ndx;
int coll_list_max;
double last_event_time;
struct qhash_head hash_link;
};
/* CODES workload API functions for workloads generated from darshan logs*/
......@@ -46,6 +54,7 @@ static void *darshan_io_workload_get_info(int rank);
/* helper functions for darshan workload CODES API */
static void darshan_read_events(struct rank_events_context *rank_context, int ind_flag);
static struct codes_workload_op darshan_event_to_codes_workload_op(struct darshan_event event);
static int hash_rank_compare(void *key, struct qhash_head *link);
/* workload method name and function pointers for the CODES workload API */
struct codes_workload_method darshan_io_workload_method =
......@@ -56,16 +65,18 @@ struct codes_workload_method darshan_io_workload_method =
.codes_workload_get_info= darshan_io_workload_get_info,
};
/* linked list of context structs for each rank participating in the workload */
static struct rank_events_context *rank_events = NULL;
static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;
/* info about this darshan workload group needed by bgp model */
/* TODO: is this needed for darshan workloads? */
/* TODO: does this need to be stored in the rank context to support multiple workloads? */
static struct codes_workload_info darshan_workload_info = {-1, -1, -1, -1, -1};
/* load the workload generator for this rank, given input params */
static int darshan_io_workload_load(const char *params, int rank)
{
const char *event_file = params; /* for now, params is just the input event file name */
const char *event_file = params; /* for now, params is just the input trace file name */
int event_file_fd;
int64_t nprocs;
ssize_t bytes_read;
......@@ -73,8 +84,8 @@ static int darshan_io_workload_load(const char *params, int rank)
int64_t off_beg = 0;
int64_t off_end = 0;
int64_t i;
struct stat stat_buf;
int ret;
struct stat stat_buf;
struct rank_events_context *new = NULL;
if (!event_file)
......@@ -91,21 +102,37 @@ static int darshan_io_workload_load(const char *params, int rank)
/* open the input events file */
event_file_fd = open(event_file, O_RDONLY);
if (event_file_fd < 0)
{
free(new);
return -1;
}
/* read the event file header to determine the number of original ranks */
bytes_read = read(event_file_fd, &nprocs, sizeof(int64_t));
if (bytes_read != sizeof(int64_t))
{
free(new);
close(event_file_fd);
return -1;
}
rank_offsets = malloc((nprocs + 1) * sizeof(int64_t));
if (!rank_offsets)
{
free(new);
close(event_file_fd);
return -1;
}
/* read the event list offset for all ranks from the event file header */
bytes_read = read(event_file_fd, rank_offsets, (nprocs + 1) * sizeof(int64_t));
if (bytes_read != ((nprocs + 1) * sizeof(int64_t)))
{
free(new);
free(rank_offsets);
close(event_file_fd);
return -1;
}
/* determine the beginning and ending offsets for this rank's independent events */
off_beg = rank_offsets[rank];
......@@ -128,7 +155,12 @@ static int darshan_io_workload_load(const char *params, int rank)
if (!ret)
off_end = stat_buf.st_size;
else
{
free(new);
free(rank_offsets);
close(event_file_fd);
return -1;
}
}
/* set the independent event count */
......@@ -139,7 +171,12 @@ static int darshan_io_workload_load(const char *params, int rank)
new->ind_file_fd = event_file_fd;
ret = lseek(new->ind_file_fd, (off_t)off_beg, SEEK_SET);
if (ret < 0)
{
free(new);
free(rank_offsets);
close(event_file_fd);
return -1;
}
}
else
{
......@@ -154,7 +191,12 @@ static int darshan_io_workload_load(const char *params, int rank)
/* use fstat to find the ending offset of the file */
ret = fstat(event_file_fd, &stat_buf);
if (ret < 0)
{
free(new);
free(rank_offsets);
close(event_file_fd);
return -1;
}
else
off_end = (int64_t)stat_buf.st_size;
......@@ -164,11 +206,14 @@ static int darshan_io_workload_load(const char *params, int rank)
/* open a new file descriptor for collective events, and seek to the beginning offset */
new->coll_file_fd = open(event_file, O_RDONLY);
if (new->coll_file_fd < 0)
return -1;
ret = lseek(new->coll_file_fd, (off_t)off_beg, SEEK_SET);
if (ret < 0)
{
free(new);
free(rank_offsets);
close(event_file_fd);
return -1;
}
}
else
{
......@@ -182,9 +227,23 @@ static int darshan_io_workload_load(const char *params, int rank)
new->coll_list_ndx = 0;
new->coll_list_max = 0;
/* add this stream of events to the head of our list of streams per rank */
new->next = rank_events;
rank_events = new;
/* initialize the hash table of rank contexts, if it has not been initialized */
if (!rank_tbl)
{
rank_tbl = qhash_init(hash_rank_compare, quickhash_32bit_hash, RANK_HASH_TABLE_SIZE);
if (!rank_tbl)
{
free(new);
free(rank_offsets);
close(event_file_fd);
close(new->coll_file_fd);
return -1;
}
}
/* add this rank context to the hash table */
qhash_add(rank_tbl, &(new->rank), &(new->hash_link));
rank_tbl_pop++;
/* fill out the info required for this workload group */
if (darshan_workload_info.group_id == -1)
......@@ -196,7 +255,6 @@ static int darshan_io_workload_load(const char *params, int rank)
}
free(rank_offsets);
return 0;
}
......@@ -205,23 +263,20 @@ static void darshan_io_workload_get_next(int rank, struct codes_workload_op *op)
{
struct darshan_event next_event;
int ind_event_flag;
struct rank_events_context *tmp = rank_events;
struct qhash_head *hash_link = NULL;
struct rank_events_context *tmp = NULL;
/* find event context for this rank */
while (tmp)
{
if (tmp->rank == rank)
break;
tmp = tmp->next;
}
/* find event context for this rank in the rank hash table */
hash_link = qhash_search(rank_tbl, &rank);
/* terminate the workload if there are no events for this rank */
if (!tmp)
/* terminate the workload if there is no valid rank context */
if (!hash_link)
{
op->op_type = CODES_WK_END;
return;
}
tmp = qhash_entry(hash_link, struct rank_events_context, hash_link);
assert(tmp->rank == rank);
/* read in more independent events if necessary */
......@@ -261,6 +316,11 @@ static void darshan_io_workload_get_next(int rank, struct codes_workload_op *op)
{
/* no more events -- just end the workload */
op->op_type = CODES_WK_END;
qhash_del(hash_link);
free(tmp);
rank_tbl_pop--;
if (!rank_tbl_pop)
qhash_finalize(rank_tbl);
}
else
{
......@@ -313,7 +373,8 @@ static void darshan_io_workload_get_next(int rank, struct codes_workload_op *op)
return;
}
/* for now, no info needed by the simulator regarding darshan workloads */
/* return the workload info needed by the bgp model */
/* TODO: do we really need this? */
static void *darshan_io_workload_get_info(int rank)
{
return &(darshan_workload_info);
......@@ -414,6 +475,18 @@ static struct codes_workload_op darshan_event_to_codes_workload_op(struct darsha
return codes_op;
}
static int hash_rank_compare(void *key, struct qhash_head *link)
{
int *in_rank = (int *)key;
struct rank_events_context *tmp;
tmp = qhash_entry(link, struct rank_events_context, hash_link);
if (tmp->rank == *in_rank)
return 1;
return 0;
}
/*
* Local variables:
* c-indent-level: 4
......
......@@ -22,7 +22,7 @@
#include "codes/codes-workload.h"
#include "codes/quickhash.h"
struct file_info_list
struct file_info
{
struct qlist_head hash_link;
uint64_t file_hash;
......@@ -177,7 +177,7 @@ int main(int argc, char *argv[])
}
/* destroy and finalize the file descriptor hash table */
qhash_destroy_and_finalize(fd_table, struct file_info_list, hash_link, free);
qhash_destroy_and_finalize(fd_table, struct file_info, hash_link, free);
error_exit:
......@@ -192,7 +192,7 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i
int open_flags = O_RDWR;
char file_name[50];
int fildes;
struct file_info_list *tmp_list = NULL;
struct file_info *tmp_list = NULL;
struct qlist_head *hash_link = NULL;
char *buf = NULL;
int ret;
......@@ -270,7 +270,7 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i
}
/* save the file descriptor for this file in a hash table to be retrieved later */
tmp_list = malloc(sizeof(struct file_info_list));
tmp_list = malloc(sizeof(struct file_info));
if (!tmp_list)
{
fprintf(stderr, "No memory available for file hash entry\n");
......@@ -292,7 +292,7 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i
/* search for the corresponding file descriptor in the hash table */
hash_link = qhash_search_and_remove(fd_table, &(replay_op.u.close.file_id));
assert(hash_link);
tmp_list = qhash_entry(hash_link, struct file_info_list, hash_link);
tmp_list = qhash_entry(hash_link, struct file_info, hash_link);
fildes = tmp_list->file_descriptor;
free(tmp_list);
......@@ -318,7 +318,7 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i
/* search for the corresponding file descriptor in the hash table */
hash_link = qhash_search(fd_table, &(replay_op.u.write.file_id));
assert(hash_link);
tmp_list = qhash_entry(hash_link, struct file_info_list, hash_link);
tmp_list = qhash_entry(hash_link, struct file_info, hash_link);
fildes = tmp_list->file_descriptor;
/* perform the write operation */
......@@ -349,7 +349,7 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i
/* search for the corresponding file descriptor in the hash table */
hash_link = qhash_search(fd_table, &(replay_op.u.read.file_id));
assert(hash_link);
tmp_list = qhash_entry(hash_link, struct file_info_list, hash_link);
tmp_list = qhash_entry(hash_link, struct file_info, hash_link);
fildes = tmp_list->file_descriptor;
/* perform the write operation */
......@@ -379,9 +379,9 @@ int replay_workload_op(struct codes_workload_op replay_op, int rank, long long i
int hash_file_compare(void *key, struct qlist_head *link)
{
uint64_t *in_file_hash = (uint64_t *)key;
struct file_info_list *tmp_file;
struct file_info *tmp_file;
tmp_file = qlist_entry(link, struct file_info_list, hash_link);
tmp_file = qlist_entry(link, struct file_info, hash_link);
if (tmp_file->file_hash == *in_file_hash)
return 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment