elan_init.c 14 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12

/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
 *  (C) 2006 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include <elan/elan.h>
#include <elan/capability.h>
#include <elan/elanctrl.h>
#include "mpidimpl.h"
#include "mpid_nem_impl.h"
13
#include "elan_impl.h"
14

15
16
17
18
19
20
21
22
23
MPID_nem_netmod_funcs_t MPIDI_nem_elan_funcs = {
    MPID_nem_elan_init,
    MPID_nem_elan_finalize,
    MPID_nem_elan_ckpt_shutdown,
    MPID_nem_elan_poll,
    MPID_nem_elan_get_business_card,
    MPID_nem_elan_connect_to_root,
    MPID_nem_elan_vc_init,
    MPID_nem_elan_vc_destroy,
24
25
    MPID_nem_elan_vc_terminate,
    NULL /* anysource iprobe */
26
27
};

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

#define MPID_NEM_ELAN_ALLOC_SIZE         16 
#define MPIDI_CH3I_QUEUE_PTR_KEY         "q_ptr_val"
#define MPIDI_CH3I_ELAN_VPID_KEY         "elan_vpid"
#define MPID_NEM_ELAN_CONTEXT_ID_OFFSET  2

ELAN_QUEUE_TX     **rxq_ptr_array;
ELAN_QUEUE_TX      *mpid_nem_elan_recv_queue_ptr;
static ELAN_QUEUE  *localq_ptr; 
static ELAN_QUEUE **localq_ptr_val; 
static int         *node_ids;  
static int          my_node_id;
static int          min_node_id;
static int          max_node_id;
static int          my_ctxt_id;

int  MPID_nem_elan_freq = 0;
int  MPID_nem_module_elan_pendings_sends = 0;
int *MPID_nem_elan_vpids = NULL;

static MPID_nem_elan_event_queue_t _elan_free_event_q;
static MPID_nem_elan_event_queue_t _elan_pending_event_q;
static MPID_nem_queue_t            _free_queue;

MPID_nem_elan_event_queue_ptr_t MPID_nem_module_elan_free_event_queue    = &_elan_free_event_q ;
MPID_nem_elan_event_queue_ptr_t MPID_nem_module_elan_pending_event_queue = &_elan_pending_event_q ;
MPID_nem_elan_cell_ptr_t        MPID_nem_module_elan_cells       = 0;
MPID_nem_queue_ptr_t            MPID_nem_module_elan_free_queue  = 0;
MPID_nem_queue_ptr_t            MPID_nem_process_recv_queue      = 0;
MPID_nem_queue_ptr_t            MPID_nem_process_free_queue      = 0;

static 
int my_compar(const void *a, const void *b)
{
   int _a = *(int *)a;
   int _b = *(int *)b;
   
   if ( _a <= _b ) 
     return -1;
   else
     return 1;
}

#undef FUNCNAME
#define FUNCNAME init_elan
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int init_elan( MPIDI_PG_t *pg_p )
{
   char            capability_str[MPID_NEM_ELAN_ALLOC_SIZE];
   int             mpi_errno = MPI_SUCCESS;
   char            file_name[256];
   char            line[255]; 
   int             numprocs  = MPID_nem_mem_region.ext_procs;
82
83
84
   char            * key;
   char            * val;
   int             key_max_sz;
85
   int             val_max_sz;
86
87
88
89
90
91
92
93
94
   char           *kvs_name;
   FILE           *myfile;
   int             ncells;
   int             grank;
   int             index; 
   int             pmi_errno;
   int             ret;
   ELAN_BASE      *base = NULL;
   ELAN_FLAGS      flags;
95
   MPIU_CHKLMEM_DECL(2);
96
97

   /* Allocate space for pmi keys and values */
98
99
100
101
102
103
104
   pmi_errno = PMI_KVS_Get_key_length_max(&key_max_sz);
   MPIU_ERR_CHKANDJUMP1(pmi_errno, mpi_errno, MPI_ERR_OTHER, "**fail", "**fail %d", pmi_errno);
   MPIU_CHKLMEM_MALLOC(key, char *, key_max_sz, mpi_errno, "key");

   pmi_errno = PMI_KVS_Get_value_length_max(&val_max_sz);
   MPIU_ERR_CHKANDJUMP1(pmi_errno, mpi_errno, MPI_ERR_OTHER, "**fail", "**fail %d", pmi_errno);
   MPIU_CHKLMEM_MALLOC(val, char *, val_max_sz, mpi_errno, "val");
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
   
   if ( !getenv("ELAN_AUTO") && !getenv("RMS_NPROCS") ) {
       /* Get My Node Id from relevant file */
       myfile = fopen("/proc/qsnet/elan3/device0/position","r");
       if (myfile == NULL) 
       {
	   myfile = fopen("/proc/qsnet/elan4/device0/position","r");
       }
   
       if (myfile != NULL)
       {	
	   ret = fscanf(myfile,"%s%i",&line,&my_node_id);
       }
       else
       {
	   /* Error */
       }
       
       mpi_errno = MPIDI_PG_GetConnKVSname (&kvs_name);      
       
       /* Put My Node Id */
       for (index = 0 ; index < numprocs ; index++)
       {	
	   grank = MPID_nem_mem_region.ext_ranks[index];
129
130
	   MPIU_Snprintf (val, key_max_sz, "%i",my_node_id);
	   MPIU_Snprintf (key, key_max_sz, "QsNetkey[%d:%d]", MPID_nem_mem_region.rank, grank);
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
	   
	   pmi_errno = PMI_KVS_Put (kvs_name, key, val);
	   MPIU_ERR_CHKANDJUMP1 (pmi_errno != PMI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**pmi_kvs_put", "**pmi_kvs_put %d", pmi_errno);
	   
	   pmi_errno = PMI_KVS_Commit (kvs_name);
	   MPIU_ERR_CHKANDJUMP1 (pmi_errno != PMI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**pmi_kvs_commit", "**pmi_kvs_commit %d", pmi_errno);
       }   
       pmi_errno = PMI_Barrier();
       MPIU_ERR_CHKANDJUMP1 (pmi_errno != PMI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**pmi_barrier", "**pmi_barrier %d", pmi_errno);
       
       /* Get Node Ids from others */
       node_ids = (int *)MPIU_Malloc(numprocs * sizeof(int));
       for (index = 0 ; index < numprocs ; index++)
       {
	   grank = MPID_nem_mem_region.ext_ranks[index];
146
147
	   memset(val, 0, key_max_sz);
	   MPIU_Snprintf (key, key_max_sz,"QsNetkey[%d:%d]", grank, MPID_nem_mem_region.rank);
148
	   
149
	   pmi_errno = PMI_KVS_Get (kvs_name, key, val, key_max_sz);
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
	   MPIU_ERR_CHKANDJUMP1 (pmi_errno != PMI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**pmi_kvs_get", "**pmi_kvs_get %d", pmi_errno);
	   
	   ret = sscanf (val, "%i", &(node_ids[index]));
	   MPIU_ERR_CHKANDJUMP1 (ret != 1, mpi_errno, MPI_ERR_OTHER, "**business_card", "**business_card %s", val);	
       }
       pmi_errno = PMI_Barrier();
       MPIU_ERR_CHKANDJUMP1 (pmi_errno != PMI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**pmi_barrier", "**pmi_barrier %d", pmi_errno);
       
       /* Compute Min and Max  Ids*/
       qsort(node_ids, numprocs, sizeof(int), my_compar);   
       
       if (node_ids[0] < my_node_id)
	   min_node_id = node_ids[0] ;
       else
	   min_node_id = my_node_id ;
       
       if (node_ids[numprocs - 1] > my_node_id)
	   max_node_id = node_ids[numprocs - 1] ;
       else
	   max_node_id = my_node_id;
       
       /* Generate capability string */
       MPIU_Snprintf(capability_str, MPID_NEM_ELAN_ALLOC_SIZE, "N%dC%d-%d-%dN%d-%dR1b",
		     my_node_id,
		     MPID_NEM_ELAN_CONTEXT_ID_OFFSET,
		     MPID_NEM_ELAN_CONTEXT_ID_OFFSET+MPID_nem_mem_region.local_rank,
		     MPID_NEM_ELAN_CONTEXT_ID_OFFSET+(MPID_nem_mem_region.num_local - 1),
		     min_node_id,max_node_id);      
       elan_generateCapability (capability_str);    
   }
   
   /* Init Elan */
   base = elan_baseInit(0);
   /* From this point, we can use elan_base pointer, which is not declared anywhere */
   
   MPID_nem_elan_vpids = (int *)MPIU_Malloc(MPID_nem_mem_region.num_procs*sizeof(int));
   for (index = 0 ; index < MPID_nem_mem_region.num_procs ; index++)
     MPID_nem_elan_vpids[index] = -1 ;
   MPID_nem_elan_vpids[MPID_nem_mem_region.rank] = elan_base->state->vp ;
   
   /* Enable the network */
   elan_enable_network(elan_base->state);

   /* Allocate more than needed */
   rxq_ptr_array  = (ELAN_QUEUE_TX **)MPIU_Malloc(MPID_nem_mem_region.num_procs*sizeof(ELAN_QUEUE_TX *));   
   localq_ptr     = elan_allocQueue(elan_base->state);      
   localq_ptr_val = (ELAN_QUEUE **)MPIU_Malloc(sizeof(ELAN_QUEUE *));   
  *localq_ptr_val = localq_ptr ;
	   
   /* For now, one Quadrics'cell equals to one Nemesis'cell */
   MPIU_Assert( (MPID_NEM_ELAN_SLOT_SIZE) <= (elan_queueMaxSlotSize(elan_base->state)));
   
   for (index = 0 ; index < MPID_nem_mem_region.num_procs ; index++) 
     rxq_ptr_array[index] = NULL ; 
   
   ncells = MPID_NEM_ELAN_NUM_SLOTS*numprocs;
   if(ncells > MPID_NEM_ELAN_MAX_NUM_SLOTS)
     ncells = MPID_NEM_ELAN_MAX_NUM_SLOTS;
   
   rxq_ptr_array[MPID_nem_mem_region.rank] = elan_queueRxInit(elan_base->state,
							      localq_ptr,
							      ncells,
							      MPID_NEM_ELAN_SLOT_SIZE,
							      MPID_NEM_ELAN_RAIL_NUM,
							      flags);   
   mpid_nem_elan_recv_queue_ptr = rxq_ptr_array[MPID_nem_mem_region.rank] ;     
   MPID_nem_elan_freq           = 1 ;
   MPID_nem_module_elan_cells   = (MPID_nem_elan_cell_ptr_t)MPIU_Calloc( MPID_NEM_ELAN_NUM_SLOTS, sizeof(MPID_nem_elan_cell_t));
   MPID_nem_module_elan_free_event_queue->head    = NULL;
   MPID_nem_module_elan_free_event_queue->tail    = NULL;   
   MPID_nem_module_elan_pending_event_queue->head = NULL;
   MPID_nem_module_elan_pending_event_queue->tail = NULL;   
   for (index = 0; index < MPID_NEM_ELAN_NUM_SLOTS ; ++index)
     {
	MPID_nem_elan_event_queue_enqueue(MPID_nem_module_elan_free_event_queue,&MPID_nem_module_elan_cells[index]);
     }
   
   fn_exit:
228
     MPIU_CHKLMEM_FREEALL();
229
230
231
232
233
234
235
     return mpi_errno;
   fn_fail:
     goto fn_exit;
}

/*
 int  
236
   MPID_nem_elan_init(MPID_nem_queue_ptr_t proc_recv_queue, MPID_nem_queue_ptr_t proc_free_queue, MPID_nem_cell_ptr_t proc_elements, int num_proc_elements,
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
	          MPID_nem_cell_ptr_t module_elements, int num_module_elements, 
		  MPID_nem_queue_ptr_t *module_free_queue)

   IN
       proc_recv_queue -- main recv queue for the process
       proc_free_queue -- main free queueu for the process
       proc_elements -- pointer to the process' queue elements
       num_proc_elements -- number of process' queue elements
       module_elements -- pointer to queue elements to be used by this module
       num_module_elements -- number of queue elements for this module
   OUT
       free_queue -- pointer to the free queue for this module.  The process will return elements to
                     this queue
*/

#undef FUNCNAME
253
#define FUNCNAME MPID_nem_elan_init
254
255
256
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int
257
MPID_nem_elan_init (MPID_nem_queue_ptr_t proc_recv_queue, 
258
259
260
		MPID_nem_queue_ptr_t proc_free_queue, 
		MPID_nem_cell_ptr_t proc_elements,   int num_proc_elements,
		MPID_nem_cell_ptr_t module_elements, int num_module_elements, 
261
		MPID_nem_queue_ptr_t *module_free_queue,
262
263
264
265
266
267
268
		MPIDI_PG_t *pg_p, int pg_rank,
		char **bc_val_p, int *val_max_sz_p)
{   
   int mpi_errno = MPI_SUCCESS ;
   int index;
   
   /* first make sure that our private fields in the vc fit into the area provided  */
269
   MPIU_Assert(sizeof(MPID_nem_elan_vc_area) <= MPID_NEM_VC_NETMOD_AREA_LEN);
270
271
272
273

   if( MPID_nem_mem_region.ext_procs > 0)
     {
	init_elan(pg_p);
274
	mpi_errno = MPID_nem_elan_get_business_card (pg_rank, bc_val_p, val_max_sz_p);
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
	if (mpi_errno) MPIU_ERR_POP (mpi_errno);		
     }   
   
   MPID_nem_process_recv_queue = proc_recv_queue;
   MPID_nem_process_free_queue = proc_free_queue;   
   
   MPID_nem_module_elan_free_queue = &_free_queue;
   MPID_nem_queue_init (MPID_nem_module_elan_free_queue);
   for (index = 0; index < num_module_elements; ++index)
     {
	MPID_nem_queue_enqueue (MPID_nem_module_elan_free_queue, &module_elements[index]);
     }
   
   *module_free_queue = MPID_nem_module_elan_free_queue;

   fn_exit:
       return mpi_errno;
   fn_fail:
       goto fn_exit;
}

#undef FUNCNAME
297
#define FUNCNAME MPID_nem_elan_get_business_card
298
299
300
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int
301
  MPID_nem_elan_get_business_card (int my_rank, char **bc_val_p, int *val_max_sz_p)
302
303
{
   int mpi_errno = MPI_SUCCESS;
304
   int str_errno = MPIU_STR_SUCCESS;
305

306
307
308
309
310
   str_errno = MPIU_Str_add_int_arg (bc_val_p, val_max_sz_p, MPIDI_CH3I_ELAN_VPID_KEY, elan_base->state->vp);
   if (str_errno) {
        MPIU_ERR_CHKANDJUMP(str_errno == MPIU_STR_NOMEM, mpi_errno, MPI_ERR_OTHER, "**buscard_len");
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**buscard");
    }
311
   
312
313
314
315
316
   str_errno = MPIU_Str_add_binary_arg (bc_val_p, val_max_sz_p, MPIDI_CH3I_QUEUE_PTR_KEY, (char *)&(*localq_ptr_val), sizeof(ELAN_QUEUE *));
   if (str_errno) {
        MPIU_ERR_CHKANDJUMP(str_errno == MPIU_STR_NOMEM, mpi_errno, MPI_ERR_OTHER, "**buscard_len");
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**buscard");
    }
317
318
319
320
321
322
323
324
325
326

   MPIU_Free(localq_ptr_val);
   
   fn_exit:
       return mpi_errno;
   fn_fail:
       goto fn_exit;
}

#undef FUNCNAME
327
#define FUNCNAME MPID_nem_elan_get_from_bc
328
329
330
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int
331
MPID_nem_elan_get_from_bc (const char *business_card,ELAN_QUEUE **remoteq_ptr, int *vpid)
332
{
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
    int mpi_errno = MPI_SUCCESS;
    int str_errno = MPIU_STR_SUCCESS;
    int tmp_vpid;
    int len;
    
    str_errno = MPIU_Str_get_int_arg (business_card, MPIDI_CH3I_ELAN_VPID_KEY, &tmp_vpid);
    /* FIXME: create a real error string for this */
    MPIU_ERR_CHKANDJUMP(str_errno, mpi_errno, MPI_ERR_OTHER, "**argstr_hostd");
    
    *vpid = tmp_vpid;
    
    str_errno = MPIU_Str_get_binary_arg (business_card, MPIDI_CH3I_QUEUE_PTR_KEY,(char *)remoteq_ptr, sizeof(ELAN_QUEUE *), &len);
    /* FIXME: create a real error string for this */
    MPIU_ERR_CHKANDJUMP(str_errno || len != sizeof(ELAN_QUEUE *), mpi_errno, MPI_ERR_OTHER, "**argstr_hostd");
    
 fn_exit:
    return mpi_errno;
 fn_fail:
    goto fn_exit;
352
353
354
}

#undef FUNCNAME
355
#define FUNCNAME MPID_nem_elan_connect_to_root
356
357
358
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int
359
MPID_nem_elan_connect_to_root (const char *business_card, MPIDI_VC_t *new_vc)
360
361
362
363
364
365
366
367
368
{
   int mpi_errno = MPI_SUCCESS;
   fn_exit:
       return mpi_errno;
   fn_fail:
       goto fn_exit;
}

#undef FUNCNAME
369
#define FUNCNAME MPID_nem_elan_vc_init
370
371
372
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int
373
MPID_nem_elan_vc_init (MPIDI_VC_t *vc, const char *business_card)
374
375
376
377
378
379
380
381
{
   int mpi_errno = MPI_SUCCESS;   
   if( MPID_nem_mem_region.ext_procs > 0)
     {
	ELAN_QUEUE *remoteq_ptr ; 
	ELAN_FLAGS  flags;
        int         vpid;
	  
382
	mpi_errno = MPID_nem_elan_get_from_bc (business_card, &remoteq_ptr, &vpid);
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
	/* --BEGIN ERROR HANDLING-- */   
	if (mpi_errno) 
	  {	
	     MPIU_ERR_POP (mpi_errno);
	  }
	/* --END ERROR HANDLING-- */
	
	rxq_ptr_array[vc->pg_rank]       = elan_queueTxInit(elan_base->state,remoteq_ptr,MPID_NEM_ELAN_RAIL_NUM,flags);
	MPID_nem_elan_vpids[vc->pg_rank] = vpid;

	VC_FIELD(vc, rxq_ptr_array) = rxq_ptr_array;   
	VC_FIELD(vc, vpid)          = vpid;
     }   
   fn_exit:   
       return mpi_errno;
   fn_fail:
       goto fn_exit;
}

#undef FUNCNAME
403
#define FUNCNAME MPID_nem_elan_vc_destroy
404
405
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
406
int MPID_nem_elan_vc_destroy(MPIDI_VC_t *vc)
407
408
409
410
411
412
413
414
415
{
    int mpi_errno = MPI_SUCCESS;   
   fn_exit:   
       return mpi_errno;
   fn_fail:
       goto fn_exit;
}

#undef FUNCNAME
416
#define FUNCNAME MPID_nem_elan_vc_terminate
417
418
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
419
int MPID_nem_elan_vc_terminate (MPIDI_VC_t *vc)
420
421
422
{
    return MPI_SUCCESS;
}