ssg-token-ring1.c 3.58 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>

#include <mpi.h>

#include <margo.h>
#include <ssg-mpi.h>

#define TOKEN_VALUE 0xBEEF

MERCURY_GEN_PROC(token_t,
    ((uint32_t)(token)))

/* server state structure */
struct server_data
{
    margo_instance_id mid;
    ssg_group_id_t gid;
    int self_rank;
    int group_size;
    hg_id_t token_forward_rpc_id;
};

/* token forward RPC */
void token_forward(struct server_data *serv_data);
static void token_forward_recv(hg_handle_t handle);
DECLARE_MARGO_RPC_HANDLER(token_forward_recv)

/* XXX shutdown forward RPC declaration */

int main(int argc, char **argv)
{
    struct server_data serv_data;

    /**
     * Initialization
     **/
    MPI_Init(&argc, &argv);

    serv_data.mid = margo_init("na+sm", MARGO_SERVER_MODE, 0, -1);
    assert(serv_data.mid);

    ssg_init(serv_data.mid);

    /* XXX shutdown RPC registration */
    serv_data.token_forward_rpc_id = MARGO_REGISTER(serv_data.mid, "token_forward",
        token_t, void, token_forward_recv);

    margo_registered_disable_response(serv_data.mid, serv_data.token_forward_rpc_id,
        HG_TRUE);
    margo_register_data(serv_data.mid, serv_data.token_forward_rpc_id, &serv_data, NULL);

    /**
     * SSG group creation and state query
     **/
    serv_data.gid = ssg_group_create_mpi("token-ring-group", MPI_COMM_WORLD, NULL, NULL);
    assert(serv_data.gid != SSG_GROUP_ID_INVALID);

    serv_data.self_rank = ssg_get_group_self_rank(serv_data.gid);
    assert(serv_data.self_rank >= 0);
    serv_data.group_size = ssg_get_group_size(serv_data.gid);
    assert(serv_data.group_size > 0);

    /**
     * Kick off token forward on rank 0
     **/
    if (serv_data.self_rank == 0)
        token_forward(&serv_data);

    /**
     * Wait for finalize signal
     **/
    margo_wait_for_finalize(serv_data.mid);

    MPI_Finalize();

    return 0;
}

void token_forward(struct server_data *serv_data)
{
    int target_rank;
    ssg_member_id_t target_id;
    hg_addr_t target_addr;
    token_t fwd_token;
    hg_handle_t h;

    /* Get target addr using rank, working forwards through ring */
    target_rank = (serv_data->self_rank + 1) % serv_data->group_size;
    target_id = ssg_get_group_member_id_from_rank(serv_data->gid, target_rank);
    target_addr = ssg_get_group_member_addr(serv_data->gid, target_id);

    margo_thread_sleep(serv_data->mid, 1000.0);

    fwd_token.token = TOKEN_VALUE;
    printf("Member %d forwarding token %u to %d\n", serv_data->self_rank, fwd_token.token, target_rank);
    margo_create(serv_data->mid, target_addr, serv_data->token_forward_rpc_id, &h);
    margo_forward(h, &fwd_token);

    margo_destroy(h);
    return;
}

static void token_forward_recv(hg_handle_t h)
{
    token_t fwd_token;

    margo_instance_id mid = margo_hg_handle_get_instance(h);
    const struct hg_info* info = margo_get_info(h);
    struct server_data* serv_data = (struct server_data *)
        margo_registered_data(mid, info->id);

    margo_get_input(h, &fwd_token);
    printf("Member %d got token %u\n", serv_data->self_rank, fwd_token.token);
    margo_free_input(h, &fwd_token);
    margo_destroy(h);

    /* non-zero ranks forward token */
    if (serv_data->self_rank > 0)
        token_forward(serv_data);

Shane Snyder's avatar
Shane Snyder committed
123
124
    printf("Member %d shutting down\n", serv_data->self_rank);

Shane Snyder's avatar
Shane Snyder committed
125
126
127
128
129
    /* XXX rather than shutting down immediately, send RPCs back through
     * the ring to shutdown in reverse order (i.e., 3, 2, 1, 0) */
    ssg_group_destroy(serv_data->gid);
    ssg_finalize();
    margo_finalize(serv_data->mid);
130
131
132
133
134
135

    return;
}
DEFINE_MARGO_RPC_HANDLER(token_forward_recv)

/* XXX shutdown RPC definitions */