Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
sds
HEP
HEPnOS
Commits
7ab03bd9
Commit
7ab03bd9
authored
Jun 08, 2018
by
Matthieu Dorier
Browse files
enabled bake
parent
a41f978f
Changes
9
Hide whitespace changes
Inline
Side-by-side
src/CMakeLists.txt
View file @
7ab03bd9
...
...
@@ -7,7 +7,8 @@ set(hepnos-src DataStore.cpp
Event.cpp
)
set
(
hepnos-service-src service/HEPnOSService.cpp
service/ServiceConfig.cpp
)
service/ServiceConfig.cpp
service/ConnectionInfoGenerator.cpp
)
# load package helper for generating cmake CONFIG packages
include
(
CMakePackageConfigHelpers
)
...
...
src/DataStore.cpp
View file @
7ab03bd9
...
...
@@ -163,7 +163,7 @@ DataSet DataStore::createDataSet(const std::string& name) {
void
DataStore
::
shutdown
()
{
for
(
auto
addr
:
m_impl
->
m_addrs
)
{
margo_shutdown_remote_instance
(
m_impl
->
m_mid
,
addr
);
margo_shutdown_remote_instance
(
m_impl
->
m_mid
,
addr
.
second
);
}
}
...
...
src/private/DataStoreImpl.hpp
View file @
7ab03bd9
...
...
@@ -8,6 +8,7 @@
#include <vector>
#include <unordered_set>
#include <unordered_map>
#include <functional>
#include <iostream>
#include <yaml-cpp/yaml.h>
...
...
@@ -15,6 +16,7 @@
#include <bake-client.h>
#include <ch-placement.h>
#include "KeyTypes.hpp"
#include "ValueTypes.hpp"
#include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp"
...
...
@@ -28,16 +30,17 @@ namespace hepnos {
class
DataStore
::
Impl
{
public:
margo_instance_id
m_mid
;
// Margo instance
std
::
unordered_set
<
hg_addr_t
>
m_addrs
;
// Addresses used by the service
sdskv_client_t
m_sdskv_client
;
// SDSKV client
bake_client_t
m_bake_client
;
// BAKE client
std
::
vector
<
sdskv_provider_handle_t
>
m_sdskv_ph
;
// list of SDSKV provider handlers
std
::
vector
<
sdskv_database_id_t
>
m_sdskv_db
;
// list of SDSKV database ids
struct
ch_placement_instance
*
m_chi_sdskv
;
// ch-placement instance for SDSKV
std
::
vector
<
bake_provider_handle_t
>
m_bake_ph
;
// list of BAKE provider handlers
struct
ch_placement_instance
*
m_chi_bake
;
// ch-placement instance for BAKE
const
DataStore
::
iterator
m_end
;
// iterator for the end() of the DataStore
margo_instance_id
m_mid
;
// Margo instance
std
::
unordered_map
<
std
::
string
,
hg_addr_t
>
m_addrs
;
// Addresses used by the service
sdskv_client_t
m_sdskv_client
;
// SDSKV client
bake_client_t
m_bake_client
;
// BAKE client
std
::
vector
<
sdskv_provider_handle_t
>
m_sdskv_ph
;
// list of SDSKV provider handlers
std
::
vector
<
sdskv_database_id_t
>
m_sdskv_db
;
// list of SDSKV database ids
struct
ch_placement_instance
*
m_chi_sdskv
;
// ch-placement instance for SDSKV
std
::
vector
<
bake_provider_handle_t
>
m_bake_ph
;
// list of BAKE provider handlers
std
::
vector
<
bake_target_id_t
>
m_bake_targets
;
// list of BAKE target ids
struct
ch_placement_instance
*
m_chi_bake
;
// ch-placement instance for BAKE
const
DataStore
::
iterator
m_end
;
// iterator for the end() of the DataStore
Impl
(
DataStore
*
parent
)
:
m_mid
(
MARGO_INSTANCE_NULL
)
...
...
@@ -77,18 +80,22 @@ class DataStore::Impl {
for
(
YAML
::
const_iterator
it
=
sdskv
.
begin
();
it
!=
sdskv
.
end
();
it
++
)
{
std
::
string
str_addr
=
it
->
first
.
as
<
std
::
string
>
();
hg_addr_t
addr
;
hret
=
margo_addr_lookup
(
m_mid
,
str_addr
.
c_str
(),
&
addr
);
if
(
hret
!=
HG_SUCCESS
)
{
margo_addr_free
(
m_mid
,
addr
);
cleanup
();
throw
Exception
(
"margo_addr_lookup failed"
);
if
(
m_addrs
.
count
(
str_addr
)
!=
0
)
{
addr
=
m_addrs
[
str_addr
];
}
else
{
hret
=
margo_addr_lookup
(
m_mid
,
str_addr
.
c_str
(),
&
addr
);
if
(
hret
!=
HG_SUCCESS
)
{
margo_addr_free
(
m_mid
,
addr
);
cleanup
();
throw
Exception
(
"margo_addr_lookup failed"
);
}
m_addrs
[
str_addr
]
=
addr
;
}
m_addrs
.
insert
(
addr
);
// get the provider id(s)
if
(
it
->
second
.
IsScalar
())
{
uint16_t
provider_id
=
it
->
second
.
as
<
uint16_t
>
();
sdskv_provider_handle_t
ph
;
ret
=
sdskv_provider_handle_create
(
m_sdskv_client
,
addr
,
provider_id
,
&
ph
);
margo_addr_free
(
m_mid
,
addr
);
if
(
ret
!=
SDSKV_SUCCESS
)
{
cleanup
();
throw
Exception
(
"sdskv_provider_handle_create failed"
);
...
...
@@ -99,7 +106,6 @@ class DataStore::Impl {
uint16_t
provider_id
=
pid
->
second
.
as
<
uint16_t
>
();
sdskv_provider_handle_t
ph
;
ret
=
sdskv_provider_handle_create
(
m_sdskv_client
,
addr
,
provider_id
,
&
ph
);
margo_addr_free
(
m_mid
,
addr
);
if
(
ret
!=
SDSKV_SUCCESS
)
{
cleanup
();
throw
Exception
(
"sdskv_provider_handle_create failed"
);
...
...
@@ -120,40 +126,60 @@ class DataStore::Impl {
}
// initialize ch-placement for the SDSKV providers
m_chi_sdskv
=
ch_placement_initialize
(
"hash_lookup3"
,
m_sdskv_ph
.
size
(),
4
,
0
);
// get list of bake provider handles
YAML
::
Node
bake
=
config
[
"hepnos"
][
"providers"
][
"bake"
];
for
(
YAML
::
const_iterator
it
=
bake
.
begin
();
it
!=
bake
.
end
();
it
++
)
{
std
::
string
str_addr
=
it
->
first
.
as
<
std
::
string
>
();
hg_addr_t
addr
;
hret
=
margo_addr_lookup
(
m_mid
,
str_addr
.
c_str
(),
&
addr
);
if
(
hret
!=
HG_SUCCESS
)
{
margo_addr_free
(
m_mid
,
addr
);
cleanup
();
throw
Exception
(
"margo_addr_lookup failed"
);
}
m_addrs
.
insert
(
addr
);
if
(
it
->
second
.
IsScalar
())
{
uint16_t
provider_id
=
it
->
second
.
as
<
uint16_t
>
();
bake_provider_handle_t
ph
;
ret
=
bake_provider_handle_create
(
m_bake_client
,
addr
,
provider_id
,
&
ph
);
margo_addr_free
(
m_mid
,
addr
);
if
(
ret
!=
0
)
{
cleanup
();
throw
Exception
(
"bake_provider_handle_create failed"
);
if
(
bake
)
{
for
(
YAML
::
const_iterator
it
=
bake
.
begin
();
it
!=
bake
.
end
();
it
++
)
{
// get the address of a bake provider
std
::
string
str_addr
=
it
->
first
.
as
<
std
::
string
>
();
hg_addr_t
addr
;
if
(
m_addrs
.
count
(
str_addr
)
!=
0
)
{
addr
=
m_addrs
[
str_addr
];
}
else
{
// lookup the address
hret
=
margo_addr_lookup
(
m_mid
,
str_addr
.
c_str
(),
&
addr
);
if
(
hret
!=
HG_SUCCESS
)
{
margo_addr_free
(
m_mid
,
addr
);
cleanup
();
throw
Exception
(
"margo_addr_lookup failed"
);
}
m_addrs
[
str_addr
]
=
addr
;
}
m_bake_ph
.
push_back
(
ph
);
}
else
if
(
it
->
second
.
IsSequence
())
{
for
(
YAML
::
const_iterator
pid
=
it
->
second
.
begin
();
pid
!=
it
->
second
.
end
();
pid
++
)
{
uint16_t
provider_id
=
pid
->
second
.
as
<
uint16_t
>
();
if
(
it
->
second
.
IsScalar
())
{
uint16_t
provider_id
=
it
->
second
.
as
<
uint16_t
>
();
bake_provider_handle_t
ph
;
ret
=
bake_provider_handle_create
(
m_bake_client
,
addr
,
provider_id
,
&
ph
);
margo_addr_free
(
m_mid
,
addr
);
if
(
ret
!=
0
)
{
cleanup
();
throw
Exception
(
"bake_provider_handle_create failed"
);
}
m_bake_ph
.
push_back
(
ph
);
}
else
if
(
it
->
second
.
IsSequence
())
{
for
(
YAML
::
const_iterator
pid
=
it
->
second
.
begin
();
pid
!=
it
->
second
.
end
();
pid
++
)
{
uint16_t
provider_id
=
pid
->
second
.
as
<
uint16_t
>
();
bake_provider_handle_t
ph
;
ret
=
bake_provider_handle_create
(
m_bake_client
,
addr
,
provider_id
,
&
ph
);
if
(
ret
!=
0
)
{
cleanup
();
throw
Exception
(
"bake_provider_handle_create failed"
);
}
m_bake_ph
.
push_back
(
ph
);
}
}
// if(it->second.IsSequence())
}
// for loop
// find out the bake targets at each bake provider
for
(
auto
&
bake_ph
:
m_bake_ph
)
{
bake_target_id_t
bti
;
uint64_t
num_targets
=
0
;
ret
=
bake_probe
(
bake_ph
,
1
,
&
bti
,
&
num_targets
);
if
(
ret
!=
BAKE_SUCCESS
)
{
throw
Exception
(
"bake_probe failed to retrieve targets"
);
}
if
(
num_targets
!=
1
)
{
throw
Exception
(
"bake_prove returned no target"
);
}
m_bake_targets
.
push_back
(
bti
);
}
}
// initialize ch-placement for the bake providers
...
...
@@ -175,45 +201,63 @@ class DataStore::Impl {
ch_placement_finalize
(
m_chi_sdskv
);
if
(
m_chi_bake
)
ch_placement_finalize
(
m_chi_bake
);
for
(
auto
&
addr
:
m_addrs
)
{
margo_addr_free
(
m_mid
,
addr
.
second
);
}
if
(
m_mid
)
margo_finalize
(
m_mid
);
}
private:
static
void
checkConfig
(
YAML
::
Node
&
config
)
{
// config file starts with hepnos entry
auto
hepnosNode
=
config
[
"hepnos"
];
if
(
!
hepnosNode
)
{
throw
Exception
(
"
\"
hepnos
\"
entry not found in YAML file"
);
}
// hepnos entry has client entry
auto
clientNode
=
hepnosNode
[
"client"
];
if
(
!
clientNode
)
{
throw
Exception
(
"
\"
client
\"
entry not found in
\"
hepnos
\"
section"
);
}
// client entry has protocol entry
auto
protoNode
=
clientNode
[
"protocol"
];
if
(
!
protoNode
)
{
throw
Exception
(
"
\"
protocol
\"
entry not found in
\"
client
\"
section"
);
}
// hepnos entry has providers entry
auto
providersNode
=
hepnosNode
[
"providers"
];
if
(
!
providersNode
)
{
throw
Exception
(
"
\"
providers
\"
entry not found in
\"
hepnos
\"
section"
);
}
// provider entry has sdskv entry
auto
sdskvNode
=
providersNode
[
"sdskv"
];
if
(
!
sdskvNode
)
{
throw
Exception
(
"
\"
sdskv
\"
entry not found in
\"
providers
\"
section"
);
}
// sdskv entry is not empty
if
(
sdskvNode
.
size
()
==
0
)
{
throw
Exception
(
"No provider found in
\"
sdskv
\"
section"
);
}
// for each sdskv entry
for
(
auto
it
=
sdskvNode
.
begin
();
it
!=
sdskvNode
.
end
();
it
++
)
{
if
(
it
->
second
.
IsScalar
())
continue
;
// one provider id given
if
(
it
->
second
.
IsSequence
())
{
// array of provider ids given
// the sequence is not empty
if
(
it
->
second
.
size
()
==
0
)
{
throw
Exception
(
"Empty array of provider ids encountered in
\"
sdskv
\"
section"
);
}
// all objects in the sequence are scalar and appear only once
std
::
unordered_set
<
uint16_t
>
ids
;
for
(
auto
pid
=
it
->
second
.
begin
();
pid
!=
it
->
second
.
end
();
pid
++
)
{
if
(
!
pid
->
second
.
IsScalar
())
{
throw
Exception
(
"Non-scalar provider id encountered in
\"
sdskv
\"
section"
);
}
uint16_t
pid_int
=
pid
->
as
<
uint16_t
>
();
if
(
ids
.
count
(
pid_int
)
!=
0
)
{
throw
Exception
(
"Provider id encountered twice in
\"
sdskv
\"
section"
);
}
ids
.
insert
(
pid_int
);
}
}
else
{
throw
Exception
(
"Invalid value type for provider in
\"
sdskv
\"
section"
);
...
...
@@ -230,10 +274,16 @@ class DataStore::Impl {
if
(
it
->
second
.
size
()
==
0
)
{
throw
Exception
(
"No provider found in
\"
bake
\"
section"
);
}
std
::
unordered_set
<
uint16_t
>
ids
;
for
(
auto
pid
=
it
->
second
.
begin
();
pid
!=
it
->
second
.
end
();
pid
++
)
{
if
(
!
pid
->
second
.
IsScalar
())
{
throw
Exception
(
"Non-scalar provider id encountered in
\"
bake
\"
section"
);
}
uint16_t
pid_int
=
pid
->
as
<
uint16_t
>
();
if
(
ids
.
count
(
pid_int
)
!=
0
)
{
throw
Exception
(
"Provider id encountered twice in
\"
bake
\"
section"
);
}
ids
.
insert
(
pid_int
);
}
}
else
{
throw
Exception
(
"Invalid value type for provider in
\"
bake
\"
section"
);
...
...
@@ -252,33 +302,68 @@ class DataStore::Impl {
ss
<<
containerName
<<
"/"
;
ss
<<
objectName
;
// hash the name to get the provider id
long
unsigned
provider_idx
=
0
;
long
unsigned
sdskv_provider_idx
=
0
;
uint64_t
name_hash
;
if
(
level
!=
0
)
{
uint64_t
h
=
std
::
hash
<
std
::
string
>
()(
containerName
);
ch_placement_find_closest
(
m_chi_sdskv
,
h
,
1
,
&
provider_idx
);
name_hash
=
std
::
hash
<
std
::
string
>
()(
containerName
);
}
else
{
// use the complete name for final objects (level 0)
uint64_t
h
=
std
::
hash
<
std
::
string
>
()(
ss
.
str
());
ch_placement_find_closest
(
m_chi_sdskv
,
h
,
1
,
&
provider_idx
);
name_hash
=
std
::
hash
<
std
::
string
>
()(
ss
.
str
());
}
ch_placement_find_closest
(
m_chi_sdskv
,
name_hash
,
1
,
&
sdskv_provider_idx
);
// make corresponding datastore entry
DataStoreEntryPtr
entry
=
make_datastore_entry
(
level
,
ss
.
str
());
auto
ph
=
m_sdskv_ph
[
provider_idx
];
auto
db_id
=
m_sdskv_db
[
provider_idx
];
// find the size of the value, as a way to check if the key exists
hg_size_t
vsize
;
ret
=
sdskv_length
(
ph
,
db_id
,
entry
->
raw
(),
entry
->
length
(),
&
vsize
);
if
(
ret
==
SDSKV_ERR_UNKNOWN_KEY
)
{
return
false
;
}
if
(
ret
!=
SDSKV_SUCCESS
)
{
throw
Exception
(
"Error occured when calling sdskv_length"
);
}
auto
sdskv_ph
=
m_sdskv_ph
[
sdskv_provider_idx
];
auto
db_id
=
m_sdskv_db
[
sdskv_provider_idx
];
// read the value
data
.
resize
(
vsize
);
ret
=
sdskv_get
(
ph
,
db_id
,
entry
->
raw
(),
entry
->
length
(),
data
.
data
(),
&
vsize
);
if
(
ret
!=
SDSKV_SUCCESS
)
{
throw
Exception
(
"Error occured when calling sdskv_get"
);
if
(
level
!=
0
||
m_bake_ph
.
empty
())
{
// read directly from sdskv
// find the size of the value, as a way to check if the key exists
hg_size_t
vsize
;
ret
=
sdskv_length
(
sdskv_ph
,
db_id
,
entry
->
raw
(),
entry
->
length
(),
&
vsize
);
if
(
ret
==
SDSKV_ERR_UNKNOWN_KEY
)
{
return
false
;
}
if
(
ret
!=
SDSKV_SUCCESS
)
{
throw
Exception
(
"Error occured when calling sdskv_length"
);
}
data
.
resize
(
vsize
);
ret
=
sdskv_get
(
sdskv_ph
,
db_id
,
entry
->
raw
(),
entry
->
length
(),
data
.
data
(),
&
vsize
);
if
(
ret
!=
SDSKV_SUCCESS
)
{
throw
Exception
(
"Error occured when calling sdskv_get"
);
}
}
else
{
// read from BAKE
// first get the key/val from sdskv
DataStoreValue
rid_info
;
hg_size_t
vsize
=
sizeof
(
rid_info
);
ret
=
sdskv_get
(
sdskv_ph
,
db_id
,
entry
->
raw
(),
entry
->
length
(),
(
void
*
)(
&
rid_info
),
&
vsize
);
if
(
ret
==
SDSKV_ERR_UNKNOWN_KEY
)
{
return
false
;
}
if
(
ret
!=
SDSKV_SUCCESS
)
{
throw
Exception
(
"Error occured when calling sdskv_get"
);
}
if
(
vsize
!=
sizeof
(
rid_info
))
{
throw
Exception
(
"Call to sdskv_get returned a value of unexpected size"
);
}
// now read the data from bake
data
.
resize
(
rid_info
.
getDataSize
());
if
(
data
.
size
()
==
0
)
return
true
;
long
unsigned
bake_provider_idx
=
0
;
ch_placement_find_closest
(
m_chi_bake
,
name_hash
,
1
,
&
bake_provider_idx
);
auto
bake_ph
=
m_bake_ph
[
bake_provider_idx
];
auto
target
=
m_bake_targets
[
bake_provider_idx
];
uint64_t
bytes_read
=
0
;
ret
=
bake_read
(
bake_ph
,
rid_info
.
getBakeRegionID
(),
0
,
data
.
data
(),
data
.
size
(),
&
bytes_read
);
if
(
ret
!=
BAKE_SUCCESS
)
{
throw
Exception
(
"Couldn't read region from BAKE"
);
}
if
(
bytes_read
!=
rid_info
.
getDataSize
())
{
throw
Exception
(
"Bytes read from BAKE did not match expected object size"
);
}
}
return
true
;
}
...
...
@@ -291,29 +376,52 @@ class DataStore::Impl {
ss
<<
containerName
<<
"/"
;
ss
<<
objectName
;
// hash the name to get the provider id
long
unsigned
provider_idx
=
0
;
long
unsigned
sdskv_provider_idx
=
0
;
uint64_t
name_hash
;
if
(
level
!=
0
)
{
uint64_t
h
=
std
::
hash
<
std
::
string
>
()(
containerName
);
ch_placement_find_closest
(
m_chi_sdskv
,
h
,
1
,
&
provider_idx
);
name_hash
=
std
::
hash
<
std
::
string
>
()(
containerName
);
}
else
{
// use the complete name for final objects (level 0)
uint64_t
h
=
std
::
hash
<
std
::
string
>
()(
ss
.
str
());
ch_placement_find_closest
(
m_chi_sdskv
,
h
,
1
,
&
provider_idx
);
name_hash
=
std
::
hash
<
std
::
string
>
()(
ss
.
str
());
}
// make corresponding datastore entry
ch_placement_find_closest
(
m_chi_sdskv
,
name_hash
,
1
,
&
sdskv_provider_idx
);
// make corresponding datastore entry key
DataStoreEntryPtr
entry
=
make_datastore_entry
(
level
,
ss
.
str
());
auto
ph
=
m_sdskv_ph
[
provider_idx
];
auto
db_id
=
m_sdskv_db
[
provider_idx
];
auto
sdskv_
ph
=
m_sdskv_ph
[
sdskv_
provider_idx
];
auto
db_id
=
m_sdskv_db
[
sdskv_
provider_idx
];
// check if the key exists
hg_size_t
vsize
;
int
ret
=
sdskv_length
(
ph
,
db_id
,
entry
->
raw
(),
entry
->
length
(),
&
vsize
);
int
ret
=
sdskv_length
(
sdskv_
ph
,
db_id
,
entry
->
raw
(),
entry
->
length
(),
&
vsize
);
if
(
ret
==
HG_SUCCESS
)
return
false
;
// key already exists
if
(
ret
!=
SDSKV_ERR_UNKNOWN_KEY
)
{
// there was a problem with sdskv
throw
Exception
(
"Could not check if key exists in SDSKV (sdskv_length error)"
);
}
ret
=
sdskv_put
(
ph
,
db_id
,
entry
->
raw
(),
entry
->
length
(),
data
.
data
(),
data
.
size
());
if
(
ret
!=
SDSKV_SUCCESS
)
{
throw
Exception
(
"Could not put key/value pair in SDSKV (sdskv_put error)"
);
// if it's not a last-level data entry (data product), store in sdskeyval
if
(
level
!=
0
||
m_bake_ph
.
empty
())
{
ret
=
sdskv_put
(
sdskv_ph
,
db_id
,
entry
->
raw
(),
entry
->
length
(),
data
.
data
(),
data
.
size
());
if
(
ret
!=
SDSKV_SUCCESS
)
{
throw
Exception
(
"Could not put key/value pair in SDSKV (sdskv_put error)"
);
}
}
else
{
// store data in bake
long
unsigned
bake_provider_idx
=
0
;
ch_placement_find_closest
(
m_chi_bake
,
name_hash
,
1
,
&
bake_provider_idx
);
auto
bake_ph
=
m_bake_ph
[
bake_provider_idx
];
auto
target
=
m_bake_targets
[
bake_provider_idx
];
bake_region_id_t
rid
;
ret
=
bake_create_write_persist
(
bake_ph
,
target
,
data
.
data
(),
data
.
size
(),
&
rid
);
if
(
ret
!=
BAKE_SUCCESS
)
{
throw
Exception
(
"Could not create bake region (bake_create_write_persist error)"
);
}
// create Value to put in SDSKV
DataStoreValue
value
(
data
.
size
(),
bake_provider_idx
,
rid
);
ret
=
sdskv_put
(
sdskv_ph
,
db_id
,
entry
->
raw
(),
entry
->
length
(),
(
void
*
)(
&
value
),
sizeof
(
value
));
if
(
ret
!=
SDSKV_SUCCESS
)
{
ret
=
bake_remove
(
bake_ph
,
rid
);
if
(
ret
!=
BAKE_SUCCESS
)
{
throw
Exception
(
"Dude, not only did SDSKV fail to put the key, but I couldn't cleanup BAKE. Is it Friday 13?"
);
}
throw
Exception
(
"Could not put key/value pair in SDSKV (sdskv_put error)"
);
}
}
return
true
;
}
...
...
src/private/ValueTypes.hpp
0 → 100644
View file @
7ab03bd9
/*
* (C) 2018 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __PRIVATE_VALUE_TYPES_H
#define __PRIVATE_VALUE_TYPES_H
#include <cstring>
#include <cstdlib>
#include <cstdint>
#include <memory>
namespace
hepnos
{
class
DataStoreValue
{
size_t
m_object_size
;
uint64_t
m_server_id
;
bake_region_id_t
m_region_id
;
public:
DataStoreValue
()
:
m_object_size
(
0
),
m_server_id
(
0
)
{}
DataStoreValue
(
size_t
object_size
,
uint64_t
bake_server_id
,
const
bake_region_id_t
&
region_id
)
:
m_object_size
(
object_size
),
m_server_id
(
bake_server_id
),
m_region_id
(
region_id
)
{}
size_t
getDataSize
()
const
{
return
m_object_size
;
}
const
bake_region_id_t
&
getBakeRegionID
()
const
{
return
m_region_id
;
}
const
uint64_t
&
getBakeServerID
()
const
{
return
m_server_id
;
}
};
}
#endif
src/service/ConnectionInfoGenerator.cpp
0 → 100644
View file @
7ab03bd9
#include <fstream>
#include <yaml-cpp/yaml.h>
#include "ConnectionInfoGenerator.hpp"
namespace
hepnos
{
struct
ConnectionInfoGenerator
::
Impl
{
std
::
string
m_addr
;
// address of this process
uint16_t
m_bake_id
;
// provider ids for BAKE
uint16_t
m_sdskv_id
;
// provider ids for SDSKV
};
ConnectionInfoGenerator
::
ConnectionInfoGenerator
(
const
std
::
string
&
address
,
uint16_t
sdskv_provider_id
,
uint16_t
bake_provider_id
)
:
m_impl
(
std
::
make_unique
<
Impl
>
())
{
m_impl
->
m_addr
=
address
;
m_impl
->
m_bake_id
=
bake_provider_id
;
m_impl
->
m_sdskv_id
=
sdskv_provider_id
;
}
ConnectionInfoGenerator
::~
ConnectionInfoGenerator
()
{}
void
ConnectionInfoGenerator
::
generateFile
(
MPI_Comm
comm
,
const
std
::
string
&
filename
)
const
{
int
rank
,
size
;
const
char
*
addr
=
m_impl
->
m_addr
.
c_str
();
MPI_Comm_rank
(
comm
,
&
rank
);
MPI_Comm_size
(
comm
,
&
size
);
unsigned
j
=
0
;
while
(
addr
[
j
]
!=
'\0'
&&
addr
[
j
]
!=
':'
)
j
++
;
std
::
string
proto
(
addr
,
j
);
// Exchange addresses
std
::
vector
<
char
>
addresses_buf
(
128
*
size
);
MPI_Gather
(
addr
,
128
,
MPI_BYTE
,
addresses_buf
.
data
(),
128
,
MPI_BYTE
,
0
,
comm
);
// Exchange bake providers info
std
::
vector
<
uint16_t
>
bake_pr_ids_buf
(
size
);
MPI_Gather
(
&
(
m_impl
->
m_bake_id
),
1
,
MPI_UNSIGNED_SHORT
,
bake_pr_ids_buf
.
data
(),
1
,
MPI_UNSIGNED_SHORT
,
0
,
comm
);
// Exchange sdskv providers info
std
::
vector
<
uint16_t
>
sdskv_pr_ids_buf
(
size
);
MPI_Gather
(
&
(
m_impl
->
m_sdskv_id
),
1
,
MPI_UNSIGNED_SHORT
,
sdskv_pr_ids_buf
.
data
(),
1
,
MPI_UNSIGNED_SHORT
,
0
,
comm
);
// After this line, the rest is executed only by rank 0
if
(
rank
!=
0
)
return
;
std
::
vector
<
std
::
string
>
addresses
;
for
(
unsigned
i
=
0
;
i
<
size
;
i
++
)
{
addresses
.
emplace_back
(
&
addresses_buf
[
128
*
i
]);
}
YAML
::
Node
config
;
config
[
"hepnos"
][
"client"
][
"protocol"
]
=
proto
;
YAML
::
Node
providers
=
config
[
"hepnos"
][
"providers"
];
for
(
unsigned
int
i
=
0
;
i
<
size
;
i
++
)
{
const
auto
&
provider_addr
=
addresses
[
i
];
if
(
sdskv_pr_ids_buf
[
i
])
{
providers
[
"sdskv"
][
provider_addr
]
=
sdskv_pr_ids_buf
[
i
];
}
if
(
bake_pr_ids_buf
[
i
])
{
providers
[
"bake"
][
provider_addr
]
=
bake_pr_ids_buf
[
i
];
}
}
std
::
ofstream
fout
(
filename
);
fout
<<
config
;
}
}
src/service/ConnectionInfoGenerator.hpp
0 → 100644
View file @
7ab03bd9
#ifndef __HEPNOS_CONNECTION_INFO_GENERATOR_H
#define __HEPNOS_CONNECTION_INFO_GENERATOR_H
#include <string>
#include <memory>
#include <mpi.h>
namespace
hepnos
{
class
ConnectionInfoGenerator
{
private:
class
Impl
;
std
::
unique_ptr
<
Impl
>
m_impl
;
public:
ConnectionInfoGenerator
(
const
std
::
string
&
address
,
uint16_t
sdskv_provider_id
,
uint16_t
bake_provider_id
);
ConnectionInfoGenerator
(
const
ConnectionInfoGenerator
&
)
=
delete
;
ConnectionInfoGenerator
(
ConnectionInfoGenerator
&&
)
=
delete
;
ConnectionInfoGenerator
&
operator
=
(
const
ConnectionInfoGenerator
&
)
=
delete
;
ConnectionInfoGenerator
&
operator
=
(
ConnectionInfoGenerator
&&
)
=
delete
;
~
ConnectionInfoGenerator
();
void
generateFile
(
MPI_Comm
comm
,
const
std
::
string
&
filename
)
const
;
};
}
#endif
src/service/HEPnOSService.cpp
View file @
7ab03bd9
...
...
@@ -12,13 +12,13 @@
#include <margo.h>
#include <bake-server.h>
#include <sdskv-server.h>
#include <yaml-cpp/yaml.h>
#include "ServiceConfig.hpp"
#include "ConnectionInfoGenerator.hpp"
#include "hepnos-service.h"
#define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } }
static
void
generate_connection_file
(
MPI_Comm
comm
,
const
char
*
addr
,
const
char
*
filename
);
//
static void generate_connection_file(MPI_Comm comm, const char* addr, const char* filename);
void
hepnos_run_service
(
MPI_Comm
comm
,
const
char
*
config_file
,
const
char
*
connection_file
)
{
...
...
@@ -58,9 +58,10 @@ void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* conn
hg_size_t
self_addr_str_size
=
128
;
margo_addr_to_string
(
mid
,
self_addr_str
,
&
self_addr_str_size
,
self_addr
);