Commit c2f6be3c authored by Jason Cope's avatar Jason Cope
Browse files

updated a years worth of improvements / bug fixes to the BMI portals layer

- runs stable on XT4 and XT5
- tested up to 200,000 cores with IOFSL
parent b7a189c3
...@@ -13,7 +13,7 @@ ifneq (,$(BUILD_PORTALS)) ...@@ -13,7 +13,7 @@ ifneq (,$(BUILD_PORTALS))
# Local definitions. # Local definitions.
# #
DIR := src/io/bmi/bmi_portals DIR := src/io/bmi/bmi_portals
cfiles := portals.c dlmalloc.c portals_conn.c portals_helpers.c portals_wrappers.c cfiles := portals.c dlmalloc.c portals_comm.c portals_conn.c portals_helpers.c portals_wrappers.c portals_trace.c
# #
# Export these to the top Makefile to tell it what to build. # Export these to the top Makefile to tell it what to build.
......
...@@ -4,13 +4,16 @@ ...@@ -4,13 +4,16 @@
#include <signal.h> #include <signal.h>
#include "portals_conn.h" #include "portals_conn.h"
#include "portals_comm.h"
#include "portals_helpers.h" #include "portals_helpers.h"
#include "portals_trace.h"
#include "src/common/quicklist/quicklist.h" #include "src/common/quicklist/quicklist.h"
#include "src/common/gen-locks/gen-locks.h" #include "src/common/gen-locks/gen-locks.h"
#include "src/common/id-generator/id-generator.h" #include "src/common/id-generator/id-generator.h"
#include "src/io/bmi/bmi.h" #include "src/io/bmi/bmi.h"
#include "src/io/bmi/bmi-method-support.h" #include "src/io/bmi/bmi-method-support.h"
#include "src/io/bmi/bmi-method-callback.h"
/* bmi mode */ /* bmi mode */
#define CLIENT 0 #define CLIENT 0
...@@ -88,6 +91,30 @@ static int bmip_is_clone = 0; ...@@ -88,6 +91,30 @@ static int bmip_is_clone = 0;
pthread_barrier_t bmip_comm_bar; pthread_barrier_t bmip_comm_bar;
pthread_barrierattr_t bmi_comm_bar_attr; pthread_barrierattr_t bmi_comm_bar_attr;
static void * bmiptl_safe_malloc(size_t size, char * fl, char * fn, int l)
{
void * m = NULL;
m = malloc(size);
if(!m)
{
fprintf(stderr, "%s:%i malloc failed. size = %lu\n", __func__, __LINE__, size);
assert(m != NULL);
}
return m;
}
static void bmiptl_safe_free(void * m, char * fl, char * fn, int l)
{
if(!m)
{
fprintf(stderr, "%s:%i free failed. NULL buffer detected\n", __func__, __LINE__);
assert(m != NULL);
}
free(m);
}
int bmip_comm_barrier_init(void) int bmip_comm_barrier_init(void)
{ {
int ret = 0; int ret = 0;
...@@ -196,6 +223,16 @@ BMI_portals_initialize(bmi_method_addr_p listen_addr, int method_id, ...@@ -196,6 +223,16 @@ BMI_portals_initialize(bmi_method_addr_p listen_addr, int method_id,
portals_node_type = (init_flags & BMI_INIT_SERVER) ? SERVER : CLIENT; portals_node_type = (init_flags & BMI_INIT_SERVER) ? SERVER : CLIENT;
portals_method_id = method_id; portals_method_id = method_id;
/* dynamically allocate the correct mem */
if(portals_node_type == SERVER)
{
bmip_allocate_server_mem();
}
else if(portals_node_type == CLIENT)
{
bmip_allocate_client_mem();
}
if(portals_node_type == SERVER || (!client_clone_mode && portals_node_type == CLIENT) ) if(portals_node_type == SERVER || (!client_clone_mode && portals_node_type == CLIENT) )
{ {
/* if we have the addr */ /* if we have the addr */
...@@ -227,17 +264,11 @@ BMI_portals_initialize(bmi_method_addr_p listen_addr, int method_id, ...@@ -227,17 +264,11 @@ BMI_portals_initialize(bmi_method_addr_p listen_addr, int method_id,
{ {
bmip_comm_barrier_init(); bmip_comm_barrier_init();
#ifdef BMIP_CLONE #ifdef BMIP_CLONE
//client_clone_pid = clone(bmip_client_clone_init, clone_stack_top, CLONE_THREAD|CLONE_FILES|CLONE_SIGHAND|CLONE_VM, listen_addr);
client_clone_pid = clone(bmip_client_clone_init, clone_stack_top, CLONE_THREAD|CLONE_SIGHAND|CLONE_VM, listen_addr); client_clone_pid = clone(bmip_client_clone_init, clone_stack_top, CLONE_THREAD|CLONE_SIGHAND|CLONE_VM, listen_addr);
bmip_comm_barrier(); bmip_comm_barrier();
#endif #endif
} }
if(portals_node_type == SERVER || (!client_clone_mode && portals_node_type == CLIENT))
{
fprintf(stderr, "%s:%i nid = %i pid = %i\n", __func__, __LINE__, bmip_get_ptl_nid(), bmip_get_ptl_pid());
}
return 0; return 0;
} }
...@@ -245,6 +276,7 @@ BMI_portals_initialize(bmi_method_addr_p listen_addr, int method_id, ...@@ -245,6 +276,7 @@ BMI_portals_initialize(bmi_method_addr_p listen_addr, int method_id,
static int static int
BMI_portals_finalize(void) BMI_portals_finalize(void)
{ {
bmip_trace_dump_list();
if(portals_node_type == SERVER) if(portals_node_type == SERVER)
{ {
bmip_dest_eqs(); bmip_dest_eqs();
...@@ -287,16 +319,10 @@ BMI_portals_set_info(int option, void* inout_parameter) ...@@ -287,16 +319,10 @@ BMI_portals_set_info(int option, void* inout_parameter)
bmi_method_addr_p addr = (bmi_method_addr_p)inout_parameter; bmi_method_addr_p addr = (bmi_method_addr_p)inout_parameter;
portals_addr_t * a = (portals_addr_t *)addr->method_data; portals_addr_t * a = (portals_addr_t *)addr->method_data;
fprintf(stderr, "%s:%i drop portals addr = %s\n", __func__, __LINE__, a->hostname);
/* remove from the list */ /* remove from the list */
qlist_del(&a->list); qlist_del(&a->list);
gen_mutex_unlock(&addr_lock); gen_mutex_unlock(&addr_lock);
/* cleanup */
//free(a->hostname);
//free(a);
break; break;
} }
case BMI_OPTIMISTIC_BUFFER_REG: case BMI_OPTIMISTIC_BUFFER_REG:
...@@ -344,7 +370,7 @@ BMI_portals_memalloc(bmi_size_t size, enum bmi_op_type send_recv) ...@@ -344,7 +370,7 @@ BMI_portals_memalloc(bmi_size_t size, enum bmi_op_type send_recv)
} }
else else
{ {
return malloc(size); return bmiptl_safe_malloc(size, __FILE__, __func__, __LINE__);
} }
return NULL; return NULL;
} }
...@@ -360,7 +386,7 @@ BMI_portals_memfree(void* buffer, bmi_size_t size, enum bmi_op_type send_recv) ...@@ -360,7 +386,7 @@ BMI_portals_memfree(void* buffer, bmi_size_t size, enum bmi_op_type send_recv)
} }
else else
{ {
free(buffer); bmiptl_safe_free(buffer, __FILE__, __func__, __LINE__);
} }
return ret; return ret;
} }
...@@ -369,7 +395,7 @@ BMI_portals_memfree(void* buffer, bmi_size_t size, enum bmi_op_type send_recv) ...@@ -369,7 +395,7 @@ BMI_portals_memfree(void* buffer, bmi_size_t size, enum bmi_op_type send_recv)
static int static int
BMI_portals_unexpected_free(void* buffer) BMI_portals_unexpected_free(void* buffer)
{ {
free(buffer); bmiptl_safe_free(buffer, __FILE__, __func__, __LINE__);
return 0; return 0;
} }
...@@ -390,7 +416,7 @@ BMI_portals_post_send(bmi_op_id_t* id, bmi_method_addr_p dest, ...@@ -390,7 +416,7 @@ BMI_portals_post_send(bmi_op_id_t* id, bmi_method_addr_p dest,
mop->context_id = context_id; mop->context_id = context_id;
*id = mop->op_id; *id = mop->op_id;
bmip_server_post_send(((portals_addr_t *)dest->method_data)->pid, (int64_t)tag, 1, &buffer, (size_t *)&size, BMIP_USE_CVTEST, user_ptr, *id); bmip_server_post_send(((portals_addr_t *)dest->method_data)->pid, (int64_t)tag, 1, (void **)&buffer, (size_t *)&size, BMIP_USE_CVTEST, user_ptr, *id);
return 0; return 0;
} }
else else
...@@ -441,6 +467,18 @@ BMI_portals_post_sendunexpected(bmi_op_id_t* id, bmi_method_addr_p dest, ...@@ -441,6 +467,18 @@ BMI_portals_post_sendunexpected(bmi_op_id_t* id, bmi_method_addr_p dest,
return bmip_delegate_ret; return bmip_delegate_ret;
} }
else
{
method_op_p mop = bmi_alloc_method_op(0);
mop->addr = dest;
mop->method_data = NULL;
mop->user_ptr = user_ptr;
mop->context_id = context_id;
*id = mop->op_id;
return bmip_server_post_unex_send(((portals_addr_t *)dest->method_data)->pid,
1, buffer, size, tag, user_ptr, *id);
}
return -1; return -1;
} }
...@@ -459,6 +497,7 @@ BMI_portals_post_recv(bmi_op_id_t* id, bmi_method_addr_p src, void* buffer, ...@@ -459,6 +497,7 @@ BMI_portals_post_recv(bmi_op_id_t* id, bmi_method_addr_p src, void* buffer,
void* user_ptr, bmi_context_id context_id, PVFS_hint hints) void* user_ptr, bmi_context_id context_id, PVFS_hint hints)
{ {
int ret = 0; int ret = 0;
if(portals_node_type == SERVER) if(portals_node_type == SERVER)
{ {
method_op_p mop = bmi_alloc_method_op(0); method_op_p mop = bmi_alloc_method_op(0);
...@@ -508,7 +547,7 @@ BMI_portals_test(bmi_op_id_t id, int* outcount, bmi_error_code_t* error_code, ...@@ -508,7 +547,7 @@ BMI_portals_test(bmi_op_id_t id, int* outcount, bmi_error_code_t* error_code,
if(portals_node_type == SERVER) if(portals_node_type == SERVER)
{ {
method_op_p op = (method_op_p)id_gen_fast_lookup(id); method_op_p op = (method_op_p)id_gen_fast_lookup(id);
*outcount = bmip_server_test_event_id(max_idle_time_ms, 1, user_ptr, actual_size, id); *outcount = bmip_server_test_event_id(max_idle_time_ms, 1, user_ptr, (size_t *)actual_size, id);
*error_code = 0; *error_code = 0;
if(*outcount > 0) if(*outcount > 0)
...@@ -549,8 +588,8 @@ BMI_portals_testcontext(int incount, bmi_op_id_t* out_id_array, int* outcount, ...@@ -549,8 +588,8 @@ BMI_portals_testcontext(int incount, bmi_op_id_t* out_id_array, int* outcount,
{ {
if(portals_node_type == SERVER) if(portals_node_type == SERVER)
{ {
int64_t * opids = (int64_t *)malloc(sizeof(int64_t) * incount); int64_t * opids = (int64_t *)bmiptl_safe_malloc(sizeof(int64_t) * incount, __FILE__, __func__, __LINE__);
size_t * sizes = (size_t *)malloc(sizeof(size_t) * incount); size_t * sizes = (size_t *)bmiptl_safe_malloc(sizeof(size_t) * incount, __FILE__, __func__, __LINE__);
*outcount = bmip_server_test_events(max_idle_time_ms, incount, user_ptr_array, sizes, opids); *outcount = bmip_server_test_events(max_idle_time_ms, incount, user_ptr_array, sizes, opids);
...@@ -571,8 +610,8 @@ BMI_portals_testcontext(int incount, bmi_op_id_t* out_id_array, int* outcount, ...@@ -571,8 +610,8 @@ BMI_portals_testcontext(int incount, bmi_op_id_t* out_id_array, int* outcount,
} }
} }
free(opids); bmiptl_safe_free(opids, __FILE__, __func__, __LINE__);
free(sizes); bmiptl_safe_free(sizes, __FILE__, __func__, __LINE__);
return 0; return 0;
} }
...@@ -652,10 +691,10 @@ BMI_portals_testunexpected(int incount, int* outcount, ...@@ -652,10 +691,10 @@ BMI_portals_testunexpected(int incount, int* outcount,
{ {
int i = 0; int i = 0;
/* TODO make these global per server... prevent alloc / dealloc */ /* TODO make these global per server... prevent alloc / dealloc */
void ** buffers = (void **)malloc(sizeof(void *) * incount); void ** buffers = (void **)bmiptl_safe_malloc(sizeof(void *) * incount, __FILE__, __func__, __LINE__);
size_t * sizes = (size_t *)malloc(sizeof(size_t) * incount); size_t * sizes = (size_t *)bmiptl_safe_malloc(sizeof(size_t) * incount, __FILE__, __func__, __LINE__);
int64_t * tags = (int64_t *)malloc(sizeof(int64_t) * incount); int64_t * tags = (int64_t *)bmiptl_safe_malloc(sizeof(int64_t) * incount, __FILE__, __func__, __LINE__);
ptl_process_id_t * addrs = (ptl_process_id_t *)malloc(sizeof(ptl_process_id_t) * incount); ptl_process_id_t * addrs = (ptl_process_id_t *)bmiptl_safe_malloc(sizeof(ptl_process_id_t) * incount, __FILE__, __func__, __LINE__);
*outcount = bmip_server_test_unex_events(max_idle_time_ms, incount, buffers, sizes, tags, addrs); *outcount = bmip_server_test_unex_events(max_idle_time_ms, incount, buffers, sizes, tags, addrs);
...@@ -671,10 +710,10 @@ BMI_portals_testunexpected(int incount, int* outcount, ...@@ -671,10 +710,10 @@ BMI_portals_testunexpected(int incount, int* outcount,
/* TODO make these global per server... prevent alloc / dealloc */ /* TODO make these global per server... prevent alloc / dealloc */
/* cleanup */ /* cleanup */
free(buffers); bmiptl_safe_free(buffers, __FILE__, __func__, __LINE__);
free(sizes); bmiptl_safe_free(sizes, __FILE__, __func__, __LINE__);
free(tags); bmiptl_safe_free(tags, __FILE__, __func__, __LINE__);
free(addrs); bmiptl_safe_free(addrs, __FILE__, __func__, __LINE__);
return 0; return 0;
} }
...@@ -710,6 +749,7 @@ BMI_portals_method_addr_lookup(const char * id) ...@@ -710,6 +749,7 @@ BMI_portals_method_addr_lookup(const char * id)
if(strcmp(id, a->hostname) == 0) if(strcmp(id, a->hostname) == 0)
{ {
found = 1; found = 1;
addr = a->p_addr; /* store the address */
break; break;
} }
} }
...@@ -746,10 +786,6 @@ BMI_portals_method_addr_lookup(const char * id) ...@@ -746,10 +786,6 @@ BMI_portals_method_addr_lookup(const char * id)
/* add it to the list */ /* add it to the list */
qlist_add_tail(&((portals_addr_t *)addr->method_data)->list, &bmip_addr_list); qlist_add_tail(&((portals_addr_t *)addr->method_data)->list, &bmip_addr_list);
} }
else
{
fprintf(stderr, "%s:%i not found\n", __func__, __LINE__);
}
gen_mutex_unlock(&addr_lock); gen_mutex_unlock(&addr_lock);
return addr; return addr;
...@@ -769,7 +805,7 @@ BMI_portals_post_send_list(bmi_op_id_t* id, bmi_method_addr_p dest, ...@@ -769,7 +805,7 @@ BMI_portals_post_send_list(bmi_op_id_t* id, bmi_method_addr_p dest,
{ {
method_op_p mop = bmi_alloc_method_op(0); method_op_p mop = bmi_alloc_method_op(0);
*id = mop->op_id; *id = mop->op_id;
bmip_server_post_send(((portals_addr_t *)dest->method_data)->pid, (int64_t)tag, list_count, (const void **)buffer_list, (size_t *)size_list, BMIP_USE_CVTEST, user_ptr, *id); bmip_server_post_send(((portals_addr_t *)dest->method_data)->pid, (int64_t)tag, list_count, (void **)buffer_list, (size_t *)size_list, BMIP_USE_CVTEST, user_ptr, *id);
return 0; return 0;
} }
else else
......
#include "portals_comm.h"
#include "portals_wrappers.h"
#include <stdio.h>
const char * bmip_ptl_ev_type(ptl_event_t * ev)
{
switch(ev->type)
{
case PTL_EVENT_SEND_START:
return "PTL_EVENT_SEND_START";
case PTL_EVENT_SEND_END:
return "PTL_EVENT_SEND_END";
case PTL_EVENT_PUT_START:
return "PTL_EVENT_PUT_START";
case PTL_EVENT_PUT_END:
return "PTL_EVENT_PUT_END";
case PTL_EVENT_ACK:
return "PTL_EVENT_ACK";
case PTL_EVENT_GET_START:
return "PTL_EVENT_GET_START";
case PTL_EVENT_GET_END:
return "PTL_EVENT_GET_END";
case PTL_EVENT_REPLY_START:
return "PTL_EVENT_REPLY_START";
case PTL_EVENT_REPLY_END:
return "PTL_EVENT_REPLY_END";
case PTL_EVENT_UNLINK:
return "PTL_EVENT_UNLINK";
default:
return "UNKNOWN";
};
out:
return NULL;
}
int bmip_unex_handler(ptl_event_t * ev)
{
int ret = ev->type;
switch(ev->type)
{
case PTL_EVENT_SEND_START:
break;
case PTL_EVENT_SEND_END:
break;
case PTL_EVENT_PUT_START:
break;
case PTL_EVENT_PUT_END:
break;
case PTL_EVENT_ACK:
break;
case PTL_EVENT_GET_START:
break;
case PTL_EVENT_GET_END:
break;
case PTL_EVENT_REPLY_START:
break;
case PTL_EVENT_REPLY_END:
break;
case PTL_EVENT_UNLINK:
break;
default:
ret = -1;
break;
};
out:
return ret;
}
int bmip_wait_event(int timeout, ptl_handle_eq_t * eq, ptl_event_t * ev)
{
int ret = -1;
int i = 0;
const int numhandles = 1;
ptl_event_t sev;
ptl_event_t * lev;
/* detect if we want a copy of the event data or not */
if(ev == NULL)
{
lev = &sev;
}
else
{
lev = ev;
}
/* wait for an unexpected message */
#ifndef BMIP_USE_TIMEOUT
ret = bmip_ptl_eq_wait(*eq, lev);
#else
ret = bmip_ptl_eq_poll(eq, numhandles, timeout, lev, &i);
#endif
if(ret != PTL_EQ_EMPTY)
{
if(ret != PTL_OK)
{
fprintf(stderr, "eq wait failure\n");
ret = -1;
goto out;
}
else
{
ret = bmip_unex_handler(lev);
if(ret == -1)
{
ret = -1;
fprintf(stderr, "ev handler failure\n");
goto out;
}
}
}
else
{
ret = -2;
}
out:
return ret;
}
#ifndef PORTALS_COMM_H
#define PORTALS_COMM_H
#include <portals/portals3.h>
#include <sys/utsname.h>
int bmip_wait_event(int timeout, ptl_handle_eq_t * eq, ptl_event_t * ev);
#endif
This diff is collapsed.
...@@ -13,6 +13,14 @@ ...@@ -13,6 +13,14 @@
#define BMIP_USE_CVTEST 0 #define BMIP_USE_CVTEST 0
#define BMIP_USE_BARRIER 1 #define BMIP_USE_BARRIER 1
#define BMIP_EX_SEND 1
#define BMIP_UNEX_SEND 2
#define BMIP_EX_RECV 3
#define BMIP_UNEX_RECV 4
#define BMIP_UNEX_SS_LOCK_FREE 10
#define BMIP_UNEX_SS_LOCK_HELD 11
#define BMIP_MAX_LISTIO 1025 #define BMIP_MAX_LISTIO 1025
#define BMIP_EV_LIMIT 128 #define BMIP_EV_LIMIT 128
...@@ -22,6 +30,7 @@ typedef struct bmip_seq ...@@ -22,6 +30,7 @@ typedef struct bmip_seq
struct qlist_head list; struct qlist_head list;
ptl_process_id_t target; ptl_process_id_t target;
unsigned int counter; unsigned int counter;
char server;
} bmip_seq_t; } bmip_seq_t;
typedef struct bmip_pending_event typedef struct bmip_pending_event
...@@ -46,10 +55,12 @@ typedef struct bmip_portals_conn_op ...@@ -46,10 +55,12 @@ typedef struct bmip_portals_conn_op
/* tree data */ /* tree data */
int32_t key; int32_t key;
size_t ev_alength;
/* op data */ /* op data */
int8_t op_type; int8_t op_type;
const void ** buffers; void ** buffers;
const void ** user_buffers; void ** user_buffers;
size_t * lengths; size_t * lengths;
size_t * alengths; size_t * alengths;
size_t * offsets; size_t * offsets;
...@@ -81,9 +92,23 @@ typedef struct bmip_portals_conn_op ...@@ -81,9 +92,23 @@ typedef struct bmip_portals_conn_op
int get_remote_get_wait_counter; int get_remote_get_wait_counter;
int unex_wait_counter; int unex_wait_counter;
} bmip_portals_conn_op_t; int cur_ss_counter;
int cur_ss_counter_limit;
size_t cur_ss_offset;
char ss_mode;
void * cur_ss_buffer;
size_t cur_ss_buffer_inc;
int put_remote_get_wait_counter;
char * unex_buffer;
size_t unex_buffer_length;
ptl_match_bits_t unexmb;
int bmip_wait_event(int timeout, ptl_handle_eq_t * eq, ptl_event_t * ev); /* trace events */
uint64_t csid;
uint64_t ptlid;
} bmip_portals_conn_op_t;
/* connection setup and shutdown */ /* connection setup and shutdown */
int bmip_init(int pid); int bmip_init(int pid);
...@@ -120,11 +145,17 @@ int bmip_server_get_cleanup(void * op_, int etype); ...@@ -120,11 +145,17 @@ int bmip_server_get_cleanup(void * op_, int etype);
int bmip_server_get_pending(void * op_, int etype); int bmip_server_get_pending(void * op_, int etype);
int bmip_server_get_init(void * op_, int etype, ptl_process_id_t op_pid); int bmip_server_get_init(void * op_, int etype, ptl_process_id_t op_pid);
int bmip_server_post_unex_send(ptl_process_id_t target, int num, void * buffer, size_t length, int tag, void * user_ptr, int64_t comm_id);
int bmip_server_unex_send_put_local_put_wait(void * op_, int etype);
int bmip_server_unex_send_get_remote_get_wait(void * op_, int etype);
int bmip_server_unex_send_put_remote_put(bmip_portals_conn_op_t * cur_op, int etype);
int bmip_server_unex_send_cleanup(void * op_, int etype);
void * bmip_server_monitor(void * args); void * bmip_server_monitor(void * args);
bmip_context_t * bmip_server_post_recv(ptl_process_id_t target, int64_t match_bits, int num, void ** buffers, size_t * lengths, int use_barrier, void * user_ptr, int64_t comm_id); int bmip_server_post_recv(ptl_process_id_t target, int64_t match_bits, int num, void ** buffers, size_t * lengths, int use_barrier, void * user_ptr, int64_t comm_id);
void bmip_server_wait_recv(bmip_context_t * context); void bmip_server_wait_recv(bmip_context_t * context);
bmip_context_t * bmip_server_post_send(ptl_process_id_t target, int64_t match_bits, int num, const void ** buffers, size_t * lengths, int use_barrier, void * user_ptr, int64_t comm_id); int bmip_server_post_send(ptl_process_id_t target, int64_t match_bits, int num, void ** buffers, size_t * lengths, int use_barrier, void * user_ptr, int64_t comm_id);
void bmip_server_wait_send(bmip_context_t * context); void bmip_server_wait_send(bmip_context_t * context);
void * bmip_new_malloc(size_t len); void * bmip_new_malloc(size_t len);
...@@ -149,4 +180,20 @@ int bmip_server_test_event_id(int ms_timeout, int nums, void ** user_ptrs, size_ ...@@ -149,4 +180,20 @@ int bmip_server_test_event_id(int ms_timeout, int nums, void ** user_ptrs, size_