Commit 23510e58 authored by Matthieu Dorier's avatar Matthieu Dorier Committed by Philip Carns

Added margo_bulk_parallel_transfer

parent 490c0c15
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MARGO_BULK_UTIL
#define __MARGO_BULK_UTIL
#ifdef __cplusplus
extern "C" {
#endif
#include <margo.h>
/**
* Perform a bulk transfer by submitting multiple margo_bulk_transfer
* in parallel.
*
* @param [in] mid Margo instance
* @param [in] op type of operation to perform
* @param [in] origin_addr remote Mercury address
* @param [in] origin_handle remote Mercury bulk memory handle
* @param [in] origin_offset offset into remote bulk memory to access
* @param [in] local_handle local bulk memory handle
* @param [in] local_offset offset into local bulk memory to access
* @param [in] size size (in bytes) of transfer
* @param [in] chunk_size size to by transferred by each operation
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_bulk_parallel_transfer(
margo_instance_id mid,
hg_bulk_op_t op,
hg_addr_t origin_addr,
hg_bulk_t origin_handle,
size_t origin_offset,
hg_bulk_t local_handle,
size_t local_offset,
size_t size,
size_t chunk_size);
#ifdef __cplusplus
}
#endif
#endif /* __MARGO */
...@@ -17,7 +17,7 @@ extern "C" { ...@@ -17,7 +17,7 @@ extern "C" {
#include <mercury_macros.h> #include <mercury_macros.h>
#include <abt.h> #include <abt.h>
#include "margo-diag.h" #include <margo-diag.h>
/* determine how much of the Mercury ID space to use for Margo provider IDs */ /* determine how much of the Mercury ID space to use for Margo provider IDs */
#define __MARGO_PROVIDER_ID_SIZE (sizeof(hg_id_t)/4) #define __MARGO_PROVIDER_ID_SIZE (sizeof(hg_id_t)/4)
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <math.h> #include <math.h>
#include "margo.h" #include "margo.h"
#include "margo-bulk-util.h"
#include "margo-timer.h" #include "margo-timer.h"
#include "utlist.h" #include "utlist.h"
#include "uthash.h" #include "uthash.h"
...@@ -1466,6 +1467,56 @@ hg_return_t margo_bulk_transfer( ...@@ -1466,6 +1467,56 @@ hg_return_t margo_bulk_transfer(
return margo_wait_internal(&reqs); return margo_wait_internal(&reqs);
} }
hg_return_t margo_bulk_parallel_transfer(
margo_instance_id mid,
hg_bulk_op_t op,
hg_addr_t origin_addr,
hg_bulk_t origin_handle,
size_t origin_offset,
hg_bulk_t local_handle,
size_t local_offset,
size_t size,
size_t chunk_size)
{
unsigned i, j;
hg_return_t hret = HG_SUCCESS;
hg_return_t hret_wait = HG_SUCCESS;
hg_return_t hret_xfer = HG_SUCCESS;
size_t remaining_size = size;
if(chunk_size == 0)
return HG_INVALID_PARAM;
size_t count = size/chunk_size;
if(count*chunk_size < size) count += 1;
struct margo_request_struct* reqs = calloc(count, sizeof(*reqs));
for(i = 0; i < count; i++) {
if(remaining_size < chunk_size) chunk_size = remaining_size;
hret = margo_bulk_itransfer_internal(mid, op, origin_addr,
origin_handle, origin_offset, local_handle,
local_offset, chunk_size, reqs+i);
if(hret_xfer != HG_SUCCESS) {
hret = hret_xfer;
goto wait;
}
origin_offset += chunk_size;
local_offset += chunk_size;
}
wait:
for(j = 0; j < i; j++) {
hret_wait = margo_wait_internal(reqs + j);
if(hret == HG_SUCCESS && hret_wait != HG_SUCCESS) {
hret = hret_wait;
goto finish;
}
}
finish:
free(reqs);
return hret;
}
hg_return_t margo_bulk_itransfer( hg_return_t margo_bulk_itransfer(
margo_instance_id mid, margo_instance_id mid,
hg_bulk_op_t op, hg_bulk_op_t op,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment