Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
B
bake
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
7
Issues
7
List
Boards
Labels
Milestones
Merge Requests
2
Merge Requests
2
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sds
bake
Compare Revisions
master...carns/dev-thread-pipeline
Source
carns/dev-thread-pipeline
Select Git revision
...
Target
master
Select Git revision
Compare
Commits (4)
simple experimental code path for pipelining
· 4f5f516f
Philip Carns
authored
Feb 06, 2019
4f5f516f
experimenting with bulk pool per ES
· ea51f251
Philip Carns
authored
Feb 06, 2019
ea51f251
remember to free thread
· b625c58b
Philip Carns
authored
Feb 13, 2019
b625c58b
bug fix
· fe7045b5
Philip Carns
authored
Feb 13, 2019
fe7045b5
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
176 additions
and
4 deletions
+176
-4
src/bake-server.c
src/bake-server.c
+176
-4
No files found.
src/bake-server.c
View file @
fe7045b5
...
...
@@ -82,6 +82,22 @@ typedef struct bake_server_context_t
remi_provider_t
remi_provider
;
}
bake_server_context_t
;
struct
xfer_slot
;
struct
pipeline_ult_arg
{
void
*
local_buf_ptr
;
unsigned
long
local_buf_size
;
hg_addr_t
remote_addr
;
// remote address
hg_bulk_t
remote_bulk
;
// remote bulk handle for transfers
size_t
remote_offset
;
// remote offset at which to take the data
int
ret
;
// return value of the xfer_ult function
struct
xfer_slot
*
xfer_slot
;
int
last
;
region_content_t
*
region
;
size_t
content_size
;
PMEMobjpool
*
pmem_pool
;
};
typedef
struct
xfer_args
{
margo_instance_id
mid
;
// margo instance
size_t
size
;
// size of data to transfer
...
...
@@ -99,6 +115,7 @@ static void bake_server_finalize_cb(void *data);
static
int
bake_target_post_migration_callback
(
remi_fileset_t
fileset
,
void
*
provider
);
static
void
pipeline_ult
(
void
*
_args
);
static
void
xfer_ult
(
xfer_args
*
args
);
int
bake_makepool
(
...
...
@@ -417,6 +434,7 @@ int bake_provider_list_storage_targets(
return
BAKE_SUCCESS
;
}
#if 0
int bake_provider_set_target_xfer_buffer(
bake_provider_t provider,
bake_target_id_t target_id,
...
...
@@ -451,6 +469,69 @@ finish:
ABT_rwlock_unlock(provider->lock);
return ret;
}
#else
struct
xfer_slot
{
margo_instance_id
mid
;
ABT_pool
pool
;
ABT_sched
sched
;
ABT_xstream
xstream
;
margo_bulk_pool_t
bulk_pool
;
int
bulk_count
;
unsigned
long
bulk_size
;
};
#define XFER_SLOTS 4
struct
xfer_slot
xfer_slot_array
[
XFER_SLOTS
];
int
xfer_slot_array_idx
=
-
1
;
static
void
bulk_pool_maker_ult
(
void
*
_arg
)
{
struct
xfer_slot
*
slot
=
_arg
;
int
ret
;
ret
=
margo_bulk_pool_create
(
slot
->
mid
,
slot
->
bulk_count
,
slot
->
bulk_size
,
HG_BULK_READWRITE
,
&
slot
->
bulk_pool
);
assert
(
ret
==
0
);
return
;
}
int
bake_provider_set_target_xfer_buffer
(
bake_provider_t
provider
,
bake_target_id_t
target_id
,
size_t
count
,
size_t
size
)
{
int
i
;
int
ret
;
ABT_thread
tid
;
for
(
i
=
0
;
i
<
XFER_SLOTS
;
i
++
)
{
ret
=
ABT_pool_create_basic
(
ABT_POOL_FIFO_WAIT
,
ABT_POOL_ACCESS_MPMC
,
ABT_TRUE
,
&
xfer_slot_array
[
i
].
pool
);
assert
(
ret
==
0
);
ret
=
ABT_sched_create_basic
(
ABT_SCHED_BASIC_WAIT
,
1
,
&
xfer_slot_array
[
i
].
pool
,
ABT_SCHED_CONFIG_NULL
,
&
xfer_slot_array
[
i
].
sched
);
assert
(
ret
==
0
);
ret
=
ABT_xstream_create
(
xfer_slot_array
[
i
].
sched
,
&
xfer_slot_array
[
i
].
xstream
);
assert
(
ret
==
0
);
xfer_slot_array
[
i
].
bulk_size
=
size
;
xfer_slot_array
[
i
].
bulk_count
=
count
;
xfer_slot_array
[
i
].
mid
=
provider
->
mid
;
/* run one thread on the new pool and sequentially wait for it to
* complete; the only thing this thread will do is allocate a margo
* bulk pool local to that particular ES.
*/
ABT_thread_create
(
xfer_slot_array
[
i
].
pool
,
bulk_pool_maker_ult
,
&
xfer_slot_array
[
i
],
ABT_THREAD_ATTR_NULL
,
&
tid
);
ABT_thread_join
(
tid
);
}
xfer_slot_array_idx
=
0
;
return
(
0
);
}
#endif
int
bake_provider_set_target_xfer_concurrency
(
bake_provider_t
provider
,
...
...
@@ -990,10 +1071,13 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
src_addr
=
hgi
->
addr
;
}
#if 0
if(xfer_buf_size == 0
|| xfer_buf_count == 0
|| xfer_buf_size > in.bulk_size) { // don't use an intermediate buffer
#else
if
(
xfer_slot_array_idx
<
0
)
{
/* don't use intermediate buffer */
#endif
/* create bulk handle for local side of transfer */
hret
=
margo_bulk_create
(
mid
,
1
,
(
void
**
)(
&
memory
),
&
in
.
bulk_size
,
HG_BULK_WRITE_ONLY
,
&
bulk_handle
);
...
...
@@ -1015,6 +1099,7 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
}
else
{
#if 0
// (1) compute the maximum number of ULTs that can handle this transfer
// as well as the number of individual transfers needed given the buffer sizes
...
...
@@ -1060,13 +1145,58 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
// (3) join and free the ULTs
ABT_thread_join_many(num_threads, ults);
ABT_thread_free_many(num_threads, ults);
#else
/* experimental pipelining implementation */
int
i
=
0
;
int
j
=
0
;
ABT_thread
tid_array
[
16
];
/* TODO: dynamic, or reuse as completed */
struct
pipeline_ult_arg
arg_array
[
16
];
unsigned
long
issued
=
0
;
while
(
issued
<
in
.
bulk_size
)
{
assert
(
i
<
16
);
/* TODO: dynamic, or reuse as completed */
arg_array
[
i
].
local_buf_ptr
=
memory
+
issued
;
arg_array
[
i
].
local_buf_size
=
xfer_slot_array
[
0
].
bulk_size
;
if
(
arg_array
[
i
].
local_buf_size
>
(
in
.
bulk_size
-
issued
))
arg_array
[
i
].
local_buf_size
=
in
.
bulk_size
-
issued
;
arg_array
[
i
].
remote_addr
=
src_addr
;
arg_array
[
i
].
remote_bulk
=
in
.
bulk_handle
;
arg_array
[
i
].
remote_offset
=
issued
;
//fprintf(stderr, "FOO: using xfer slot %d\n", xfer_slot_array_idx);
arg_array
[
i
].
xfer_slot
=
&
xfer_slot_array
[
xfer_slot_array_idx
];
xfer_slot_array_idx
++
;
/* TODO: protect with an abt mutex */
if
(
xfer_slot_array_idx
>=
XFER_SLOTS
)
xfer_slot_array_idx
=
0
;
arg_array
[
i
].
ret
=
0
;
/* TODO: use handler pool or a dedicated pool elsewhere? */
ABT_thread_create
(
arg_array
[
i
].
xfer_slot
->
pool
,
pipeline_ult
,
&
arg_array
[
i
],
ABT_THREAD_ATTR_NULL
,
&
tid_array
[
i
]);
issued
+=
arg_array
[
i
].
local_buf_size
;
i
++
;
if
(
issued
<
in
.
bulk_size
)
arg_array
[
i
].
last
=
0
;
else
{
arg_array
[
i
].
last
=
1
;
arg_array
[
i
].
pmem_pool
=
entry
->
pmem_pool
;
arg_array
[
i
].
region
=
region
;
arg_array
[
i
].
content_size
=
content_size
;
}
}
while
(
j
<
i
)
{
ABT_thread_join
(
tid_array
[
j
]);
ABT_thread_free
(
&
tid_array
[
j
]);
j
++
;
}
TIMERS_END_STEP
(
3
);
#endif
}
/* TODO: should this have an abt shim in case it blocks? */
pmemobj_persist
(
entry
->
pmem_pool
,
region
,
content_size
);
TIMERS_END_STEP
(
3
);
out
.
ret
=
BAKE_SUCCESS
;
...
...
@@ -1901,6 +2031,48 @@ static int bake_target_post_migration_callback(remi_fileset_t fileset, void* uar
return
0
;
}
static
void
pipeline_ult
(
void
*
_arg
)
{
struct
pipeline_ult_arg
*
arg
=
_arg
;
int
ret
;
void
*
local_buf_ptr
;
size_t
tmp_buf_size
;
int
tmp_count
;
hg_bulk_t
local_bulk
=
HG_BULK_NULL
;
ret
=
margo_bulk_pool_get
(
arg
->
xfer_slot
->
bulk_pool
,
&
local_bulk
);
assert
(
ret
==
0
);
ret
=
margo_bulk_access
(
local_bulk
,
0
,
arg
->
local_buf_size
,
HG_BULK_READWRITE
,
1
,
&
local_buf_ptr
,
&
tmp_buf_size
,
&
tmp_count
);
assert
(
ret
==
0
);
ret
=
margo_bulk_transfer
(
arg
->
xfer_slot
->
mid
,
HG_BULK_PULL
,
arg
->
remote_addr
,
arg
->
remote_bulk
,
arg
->
remote_offset
,
local_bulk
,
0
,
arg
->
local_buf_size
);
assert
(
ret
==
0
);
/* NOTE: this is the one line of code with more performance impact than
* any other under concurrent load. Need to refactor in a way that
* allocates intermediate buffer on same ES where this memcpy is
* executed to get better locality? The destination is typically going
* to a kernel space so we can't control that one easily, but we should
* be able to get the intermediate buffer right here.
*/
memcpy
(
arg
->
local_buf_ptr
,
local_buf_ptr
,
arg
->
local_buf_size
);
ret
=
margo_bulk_pool_release
(
arg
->
xfer_slot
->
bulk_pool
,
local_bulk
);
if
(
arg
->
last
)
pmemobj_persist
(
arg
->
pmem_pool
,
arg
->
region
,
arg
->
content_size
);
assert
(
ret
==
0
);
return
;
}
static
void
xfer_ult
(
xfer_args
*
args
)
{
/*
...
...