Commit 8145492d authored by Matthieu Dorier's avatar Matthieu Dorier

added xfer ults in bake_write

parent 0f9d4a1f
......@@ -571,6 +571,8 @@ static void bake_write_ult(hg_handle_t handle)
char* memory;
char* buffer = NULL;
size_t xfer_buf_size = 0;
size_t xfer_buf_count = 0;
uint32_t max_num_threads = 0;
hg_bulk_t bulk_handle = HG_BULK_NULL;
const struct hg_info *hgi;
margo_instance_id mid;
......@@ -581,6 +583,7 @@ static void bake_write_ult(hg_handle_t handle)
mid = margo_hg_handle_get_instance(handle);
assert(mid);
ABT_pool handler_pool = margo_hg_handle_get_handler_pool(handle);
hgi = margo_get_info(handle);
bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
if(!svr_ctx) {
......@@ -627,7 +630,9 @@ static void bake_write_ult(hg_handle_t handle)
out.ret = BAKE_ERR_UNKNOWN_TARGET;
goto finish;
}
xfer_buf_size = entry->xfer_buffer_size;
xfer_buf_size = entry->xfer_buffer_size;
xfer_buf_count = entry->xfer_buffer_count;
max_num_threads = entry->xfer_concurrency;
memory = region->data + in.region_offset;
......@@ -671,7 +676,7 @@ static void bake_write_ult(hg_handle_t handle)
}
} else { // multiple transfers using intermediate buffer
#if 0
buffer = calloc(xfer_buf_size,1);
/* create bulk handle for local side of transfer */
......@@ -706,6 +711,67 @@ static void bake_write_ult(hg_handle_t handle)
remaining_size -= current_size;
current_offset += current_size;
}
#endif
/* Definition of xfer_args:
typedef struct xfer_args {
margo_instance_id mid;
size_t size;
char* target;
size_t buf_size;
margo_bulk_pool_t buf_pool;
hg_addr_t remote_addr;
hg_bulk_t remote_bulk;
hg_bulk_t remote_offset;
int32_t op_type;
int32_t ret;
} xfer_args;
*/
// (1) compute the maximum number of ULTs that can handle this transfer
// as well as the number of individual transfers needed given the buffer sizes
// number of xfers of up to xfer_buf_size needed in total
size_t num_xfers_needed = in.bulk_size / xfer_buf_size;
if(num_xfers_needed * xfer_buf_size < in.bulk_size) num_xfers_needed += 1;
// number of threads that will be spawned
uint32_t num_threads = num_xfers_needed;
num_threads = num_threads < max_num_threads ? num_threads : max_num_threads;
// maximum number of xfers per thread
size_t xfer_per_thread = num_xfers_needed / num_threads;
if(xfer_per_thread * num_threads < num_xfers_needed) xfer_per_thread += 1;
// (2) create the array of arguments and ULTs
xfer_args* args = alloca(sizeof(*args)*num_threads);
ABT_thread* ults = alloca(sizeof(*ults)*num_threads);
unsigned int i;
size_t current_offset = 0;
size_t remaining_size = in.bulk_size;
size_t current_size = xfer_per_thread * xfer_buf_size;
for(i=0; i < num_threads; i++) {
current_size = current_size > remaining_size ? remaining_size : current_size;
args[i].mid = mid;
args[i].size = current_size;
args[i].target = memory + current_offset;
args[i].buf_size = xfer_buf_size;
args[i].buf_pool = entry->xfer_bulk_pool;
args[i].remote_addr = src_addr;
args[i].remote_bulk = in.bulk_handle;
args[i].remote_offset = current_offset;
args[i].op_type = HG_BULK_PULL;
args[i].ret = 0;
ABT_thread_create(handler_pool, (void (*)(void*))xfer_ult, args+i, ABT_THREAD_ATTR_NULL, ults+i);
current_offset += current_size;
remaining_size -= current_size;
}
// (3) join and free the ULTs
ABT_thread_join_many(num_threads, ults);
ABT_thread_free_many(num_threads, ults);
}
......@@ -1918,9 +1984,6 @@ static void xfer_ult(xfer_args* args)
*/
hg_bulk_t local_bulk = HG_BULK_NULL;
fprintf(stderr,"xfer_ult: size=%ld, remote_offset=%ld, buf_size=%ld\n", args->size, args->remote_offset, args->buf_size);
ABT_thread_yield();
// (1) compute how many iterations must be done
size_t num_xfers = args->size / args->buf_size;
if(num_xfers * args->buf_size < args->size)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment