ssg-token-ring2.c 5.21 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>

#include <mpi.h>

#include <margo.h>
#include <ssg-mpi.h>

#define TOKEN_VALUE 0xBEEF

MERCURY_GEN_PROC(token_t,
    ((uint32_t)(token)))

/* server state structure */
struct server_data
{
    margo_instance_id mid;
    ssg_group_id_t gid;
    int self_rank;
    int group_size;
    hg_id_t token_forward_rpc_id;
    hg_id_t shutdown_forward_rpc_id;
};

/* token forward RPC */
void token_forward(struct server_data *serv_data);
static void token_forward_recv(hg_handle_t handle);
DECLARE_MARGO_RPC_HANDLER(token_forward_recv)

/* shutdown forward RPC */
void shutdown_forward(struct server_data *serv_data);
static void shutdown_forward_recv(hg_handle_t handle);
DECLARE_MARGO_RPC_HANDLER(shutdown_forward_recv)

int main(int argc, char **argv)
{
    struct server_data serv_data;

    /**
     * Initialization
     **/
    MPI_Init(&argc, &argv);

    serv_data.mid = margo_init("na+sm", MARGO_SERVER_MODE, 0, -1);
    assert(serv_data.mid);

    ssg_init(serv_data.mid);

    serv_data.token_forward_rpc_id = MARGO_REGISTER(serv_data.mid, "token_forward",
        token_t, void, token_forward_recv);
    serv_data.shutdown_forward_rpc_id = MARGO_REGISTER(serv_data.mid, "shutdown_forward",
        void, void, shutdown_forward_recv);

    margo_registered_disable_response(serv_data.mid, serv_data.token_forward_rpc_id,
        HG_TRUE);
    margo_registered_disable_response(serv_data.mid, serv_data.shutdown_forward_rpc_id,
        HG_TRUE);
    margo_register_data(serv_data.mid, serv_data.token_forward_rpc_id, &serv_data, NULL);
    margo_register_data(serv_data.mid, serv_data.shutdown_forward_rpc_id, &serv_data, NULL);

    /**
     * SSG group creation and state query
     **/
    serv_data.gid = ssg_group_create_mpi("token-ring-group", MPI_COMM_WORLD, NULL, NULL);
    assert(serv_data.gid != SSG_GROUP_ID_INVALID);

    serv_data.self_rank = ssg_get_group_self_rank(serv_data.gid);
    assert(serv_data.self_rank >= 0);
    serv_data.group_size = ssg_get_group_size(serv_data.gid);
    assert(serv_data.group_size > 0);

    /**
     * Kick off token forward on rank 0
     **/
    if (serv_data.self_rank == 0)
        token_forward(&serv_data);

    /**
     * Wait for finalize signal
     **/
    margo_wait_for_finalize(serv_data.mid);

    MPI_Finalize();

    return 0;
}

void token_forward(struct server_data *serv_data)
{
    int target_rank;
    ssg_member_id_t target_id;
    hg_addr_t target_addr;
    token_t fwd_token;
    hg_handle_t h;

    /* Get target addr using rank, working forwards through ring */
    target_rank = (serv_data->self_rank + 1) % serv_data->group_size;
    target_id = ssg_get_group_member_id_from_rank(serv_data->gid, target_rank);
    target_addr = ssg_get_group_member_addr(serv_data->gid, target_id);

    margo_thread_sleep(serv_data->mid, 1000.0);

    fwd_token.token = TOKEN_VALUE;
    printf("Member %d forwarding token %u to %d\n", serv_data->self_rank, fwd_token.token, target_rank);
    margo_create(serv_data->mid, target_addr, serv_data->token_forward_rpc_id, &h);
    margo_forward(h, &fwd_token);

    margo_destroy(h);
    return;
}

static void token_forward_recv(hg_handle_t h)
{
    token_t fwd_token;

    margo_instance_id mid = margo_hg_handle_get_instance(h);
    const struct hg_info* info = margo_get_info(h);
    struct server_data* serv_data = (struct server_data *)
        margo_registered_data(mid, info->id);

    margo_get_input(h, &fwd_token);
    printf("Member %d got token %u\n", serv_data->self_rank, fwd_token.token);
    margo_free_input(h, &fwd_token);
    margo_destroy(h);

    /* non-zero ranks forward token, rank 0 initiates shutdown process */
    if (serv_data->self_rank > 0)
        token_forward(serv_data);
    else
        shutdown_forward(serv_data);

    return;
}
DEFINE_MARGO_RPC_HANDLER(token_forward_recv)

void shutdown_forward(struct server_data *serv_data)
{
    int target_rank;
    ssg_member_id_t target_id;
    hg_addr_t target_addr;
    token_t fwd_token;
    hg_handle_t h;

    /* Get target addr using rank, working backwards throug ring */
    target_rank = (serv_data->self_rank - 1 + serv_data->group_size) % serv_data->group_size;
    target_id = ssg_get_group_member_id_from_rank(serv_data->gid, target_rank);
    target_addr = ssg_get_group_member_addr(serv_data->gid, target_id);

    margo_thread_sleep(serv_data->mid, 1000.0);

    margo_create(serv_data->mid, target_addr, serv_data->shutdown_forward_rpc_id, &h);
    printf("Member %d forwarding shutdown to %d\n", serv_data->self_rank, target_rank);
    margo_forward(h, NULL);

    margo_destroy(h);
    return;
}

static void shutdown_forward_recv(hg_handle_t h)
{
    margo_instance_id mid = margo_hg_handle_get_instance(h);
    const struct hg_info* info = margo_get_info(h);
    struct server_data* serv_data = (struct server_data *)
        margo_registered_data(mid, info->id);

    /* non-zero ranks continue forwarding shutdown request */
    if (serv_data->self_rank > 0)
        shutdown_forward(serv_data);

171 172
    printf("Member %d shutting down\n", serv_data->self_rank);

173 174 175 176 177 178 179 180 181
    /* shutdown signal */
    margo_destroy(h);
    ssg_group_destroy(serv_data->gid);
    ssg_finalize();
    margo_finalize(serv_data->mid);

    return;
}
DEFINE_MARGO_RPC_HANDLER(shutdown_forward_recv)