test-mpi.cc 4.58 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/*
 * Copyright (c) 2017, Los Alamos National Security, LLC.
 * All rights reserved.
 *
 */

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <mpi.h>

#include <margo.h>
#include <mercury.h>
#include <abt.h>

#include "sds-keyval.h"

#include <vector>

#define DIE_IF(cond_expr, err_fmt, ...) \
    do { \
        if (cond_expr) { \
            fprintf(stderr, "ERROR at %s:%d (" #cond_expr "): " \
                    err_fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \
            exit(1); \
        } \
    } while(0)

static void usage()
{
  fprintf(stderr, "Usage: test-mpi <addr>\n");
}

int main(int argc, char *argv[])
{
    int rank;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

42
43
    MPI_Comm clientComm;

44
    if (rank == 0) {
45
46
47
48
49
      char server_addr_str[128];
      hg_size_t server_addr_str_sz = 128;
      hg_addr_t server_addr;
      hg_return_t hret;
      
50
      MPI_Comm_split(MPI_COMM_WORLD, MPI_UNDEFINED, rank, &clientComm);
51
      
52
53
      // kv-server
      kv_context *context = kv_server_register(argv[1]);
54
55
      hret = margo_addr_self(context->mid, &server_addr);
      DIE_IF(hret != HG_SUCCESS, "margo_addr_self");
56
57

      // get server address
58
59
60
61
      hret = margo_addr_to_string(context->mid, server_addr_str, &server_addr_str_sz, server_addr);
      DIE_IF(hret != HG_SUCCESS, "margo_addr_to_string");
      margo_addr_free(context->mid, server_addr);
      
62
      // broadcast (send) server address to all clients
63
      printf("server (rank %d): server addr_str: %s\n", rank, server_addr_str);
64
65
66
      MPI_Bcast(server_addr_str, 128, MPI_BYTE, 0, MPI_COMM_WORLD);

      // process requests until finalized
67
      kv_server_wait_for_shutdown(context);
68

69
      // now finish cleaning up
70
71
72
73
      kv_server_deregister(context);
      printf("rank %d: server deregistered\n", rank);
    }
    else {
74
75
      int sleep_time = 0;
      char server_addr_str[128];
76
77
78
79
      char client_addr_str_in[128];
      char client_addr_str_out[128];
      hg_size_t client_addr_str_sz = 128;
      hg_addr_t client_addr;
80
81
      hg_return_t hret;
      
82
      MPI_Comm_split(MPI_COMM_WORLD, 1, rank, &clientComm);
83
      
84
85
      // broadcast (recv) server address
      MPI_Bcast(server_addr_str, 128, MPI_BYTE, 0, MPI_COMM_WORLD);
David Rich's avatar
David Rich committed
86
      printf("client (rank %d): server addr_str: %s\n", rank, server_addr_str);
87
88

      // kv-client
89
90
91
92
93
94
95
96
97
98
      sprintf(client_addr_str_in, "cci+tcp://534%02d", rank);
      kv_context *context = kv_client_register(client_addr_str_in);
      hret = margo_addr_self(context->mid, &client_addr);
      DIE_IF(hret != HG_SUCCESS, "margo_addr_self");

      // get client address
      hret = margo_addr_to_string(context->mid, client_addr_str_out, &client_addr_str_sz, client_addr);
      DIE_IF(hret != HG_SUCCESS, "margo_addr_to_string");
      margo_addr_free(context->mid, client_addr);
      printf("client (rank %d): client addr_str: %s\n", rank, client_addr_str_out);
99
100
      
      // open specified "DB" (pass in the server's address)
David Rich's avatar
David Rich committed
101
      const char *db = "kv-test-db";
102
103
      hret = kv_open(context, server_addr_str, (char*)db, KV_UINT, KV_BULK);
      DIE_IF(hret != HG_SUCCESS, "kv_open");
104
      
105
      // do a set of puts/gets
David Rich's avatar
David Rich committed
106
      for (int i=0; i<rank; i++) {
107
108
109
110
111
112
	uint64_t key = rank+i;
	
	// put
	int put_val = rank+i;
	std::vector<char> put_data;
	put_data.resize(sizeof(put_val));
David Rich's avatar
David Rich committed
113
114
	uint64_t data_size = put_data.size();
	memcpy(put_data.data(), &put_val, data_size);
115

David Rich's avatar
David Rich committed
116
117
	hret = kv_bulk_put(context, (void*)&key, (void*)put_data.data(), &data_size);
	printf("(put) key %lu, size=%lu\n", key, data_size);
118
119
120
121
122
123
124
125
	DIE_IF(hret != HG_SUCCESS, "kv_bulk_put");

	sleep(2);

	// get
	int get_val;
	std::vector<char> get_data;
	get_data.resize(sizeof(get_val));
David Rich's avatar
David Rich committed
126
	data_size = get_data.size();
127
	printf("(get) key %lu, estimated size=%lu\n", key, data_size);
David Rich's avatar
David Rich committed
128
	hret = kv_bulk_get(context, (void*)&key, (void*)get_data.data(), &data_size);
129
130
131
132
	DIE_IF(hret != HG_SUCCESS, "kv_bulk_get");
	printf("(get) key %lu, actual size=%lu\n", key, data_size);

	get_data.resize(data_size);
David Rich's avatar
David Rich committed
133
	memcpy(&get_val, get_data.data(), data_size);
134
135
	printf("key: %lu in: %d out: %d\n", key, put_val, get_val);
      }
136
137

      // close
138
139
      hret = kv_close(context);
      DIE_IF(hret != HG_SUCCESS, "kv_close");
140

141
142
143
144
      // once all clients are done, one client can signal server
      MPI_Barrier(clientComm);
      if (rank==1) {
	printf("rank %d: sending server a shutdown request\n", rank);
David Rich's avatar
David Rich committed
145
	kv_client_signal_shutdown(context);
146
147
      }

148
      // now finish cleaning up
149
150
151
152
153
154
155
156
157
      kv_client_deregister(context);
      printf("rank %d: client deregistered\n", rank);
    }

    MPI_Finalize();

    printf("rank %d: finalized\n", rank);
    return 0;
}