1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
/*
 * This commgraph_test primary goal is to setup a communication framework between
 * two components, based only on a marker associated to the data being migrated
 * between the components
 * One use case for us is the physics grid in the atmosphere and spectral dynamics grid in the
 * atmosphere
 * These could be in theory on different sets of PES, although these 2 models are on the same PES
 *
 * In our case, spectral element data is hooked using the GLOBAL_DOFS tag of 4x4 ids, associated to
 * element
 * PhysGrid is coming from an AtmPhys.h5m point cloud grid partitioned in 16
 * Spectral mesh is our regular wholeATM_T_01.h5m, which is after one time step
 *
 * phys grid is very sprinkled in the partition, spectral mesh is more compact; For correct
 * identification/correspondence in parallel, it would make sense to use boxes for the spectral mesh
 *
 * We employ our friends the crystal router, in which we use rendezvous algorithm, to set
 * up the communication pattern
 *
 * input: wholeATM_T.h5m file, on 128 procs, the same file that is used by imoab_coupler test
 * input: AtmPhys.h5m file, which contains the physics grid, distributed on 16 processes
 * input 2: wholeLND.h5m, which is land distributed on 16 processes too
 *
 * The communication pattern will be established using a rendezvous method, based on the marker
 * (in this case, GLOBAL_ID on vertices on phys grid and GLOBAL_DOFS tag on spectral elements)
 *
 * in the end, we need to modify tag migrate to move data between these types of components, by
 * ID
 *
 * wholeLnd.h5m has holes in the ID space, that we need to account for;
 * In the end, this could be used to send data directly from Dyn atm to land; or to lnd on coupler ?
 *
 *
 */

#include "moab/Core.hpp"

// MPI includes
#include "moab_mpi.h"
#include "moab/ParallelComm.hpp"
#include "MBParallelConventions.h"

#include "moab/iMOAB.h"
#include "TestUtil.hpp"
#include "moab/CpuTimer.hpp"
#include "moab/ProgOptions.hpp"
#include <iostream>
#include <sstream>

#define CHECKIERR( rc, message )                        \
    if( 0 != ( rc ) )                                   \
    {                                                   \
        printf( "%s. ErrorCode = %d.\n", message, rc ); \
        CHECK( 0 );                                     \
    }

using namespace moab;

// #define VERBOSE

// declare some variables outside main method
// easier to pass them around to the test
int ierr;
int rankInGlobalComm, numProcesses;
MPI_Group jgroup;
std::string atmFilename = TestDir + "unittest/wholeATM_T.h5m";
// on a regular case,  5 ATM
// cmpatm is for atm on atm pes ! it has the spectral mesh
// cmpphys is for atm on atm phys pes ! it has the point cloud , phys grid

//
int rankInAtmComm = -1;
// it is the spectral mesh unique comp id
int cmpatm = 605;  // component ids are unique over all pes, and established in advance;

std::string atmPhysFilename    = TestDir + "unittest/AtmPhys.h5m";
std::string atmPhysOutFilename = "outPhys.h5m";
std::string atmFilename2       = "wholeATM_new.h5m";
int rankInPhysComm             = -1;
// this will be the physics atm com id; it should be actually 5
int physatm = 5;  // component ids are unique over all pes, and established in advance;

// int rankInJointComm = -1;

int nghlay = 0;  // number of ghost layers for loading the file

std::vector< int > groupTasks;
int startG1 = 0, startG2 = 0, endG1 = numProcesses - 1, endG2 = numProcesses - 1;
int typeA = 1;  // spectral mesh, with GLOBAL_DOFS tags on cells
int typeB = 2;  // point cloud mesh, with GLOBAL_ID tag on vertices

std::string readopts( "PARALLEL=READ_PART;PARTITION=PARALLEL_PARTITION;PARALLEL_RESOLVE_SHARED_ENTS" );
std::string readoptsPC( "PARALLEL=READ_PART;PARTITION=PARALLEL_PARTITION" );
std::string fileWriteOptions( "PARALLEL=WRITE_PART" );
std::string tagT( "a2oTbot" );
std::string tagU( "a2oUbot" );
std::string tagV( "a2oVbot" );
std::string separ( ":" );
std::string tagT1( "a2oTbot_1" );
std::string tagU1( "a2oUbot_1" );
std::string tagV1( "a2oVbot_1" );
std::string tagT2( "a2oTbot_2" );  // just one send

int commgraphtest();

void testspectral_phys()
{
    // no changes
    commgraphtest();
}

void testspectral_lnd()
{
    // first model is spectral, second is land
    atmPhysFilename    = TestDir + "unittest/wholeLnd.h5m";
    atmPhysOutFilename = std::string( "outLnd.h5m" );
    atmFilename2       = std::string( "wholeATM_lnd.h5m" );
    commgraphtest();
}

void testphysatm_lnd()
{
    // use for first file the output "outPhys.h5m" from first test
    atmFilename        = std::string( "outPhys.h5m" );
    atmPhysFilename    = std::string( "outLnd.h5m" );
    atmPhysOutFilename = std::string( "physAtm_lnd.h5m" );
    atmFilename2       = std::string( "physBack_lnd.h5m" );
    tagT               = tagT1;
    tagU               = tagU1;
    tagV               = tagV1;
    tagT1              = std::string( "newT" );
    tagT2              = std::string( "newT2" );
    typeA              = 2;
    commgraphtest();
}
int commgraphtest()
{

    if( !rankInGlobalComm )
    {
        std::cout << " first  file: " << atmFilename << "\n   on tasks : " << startG1 << ":" << endG1
                  << "\n second file: " << atmPhysFilename << "\n     on tasks : " << startG2 << ":" << endG2 << "\n  ";
    }

    // load files on 2 different communicators, groups
    // will create a joint comm for rendezvous
    MPI_Group atmPEGroup;
    groupTasks.resize( numProcesses, 0 );
    for( int i = startG1; i <= endG1; i++ )
        groupTasks[i - startG1] = i;

    ierr = MPI_Group_incl( jgroup, endG1 - startG1 + 1, &groupTasks[0], &atmPEGroup );
    CHECKIERR( ierr, "Cannot create atmPEGroup" )

    groupTasks.clear();
    groupTasks.resize( numProcesses, 0 );
    MPI_Group atmPhysGroup;
    for( int i = startG2; i <= endG2; i++ )
        groupTasks[i - startG2] = i;

    ierr = MPI_Group_incl( jgroup, endG2 - startG2 + 1, &groupTasks[0], &atmPhysGroup );
    CHECKIERR( ierr, "Cannot create atmPhysGroup" )

    // create 2 communicators, one for each group
    int ATM_COMM_TAG = 1;
    MPI_Comm atmComm;
    // atmComm is for atmosphere app;
    ierr = MPI_Comm_create_group( MPI_COMM_WORLD, atmPEGroup, ATM_COMM_TAG, &atmComm );
    CHECKIERR( ierr, "Cannot create atmComm" )

    int PHYS_COMM_TAG = 2;
    MPI_Comm physComm;
    // physComm is for phys atm app
    ierr = MPI_Comm_create_group( MPI_COMM_WORLD, atmPhysGroup, PHYS_COMM_TAG, &physComm );
    CHECKIERR( ierr, "Cannot create physComm" )

    // now, create the joint communicator atm physatm

    //
    MPI_Group joinAtmPhysAtmGroup;
    ierr = MPI_Group_union( atmPEGroup, atmPhysGroup, &joinAtmPhysAtmGroup );
    CHECKIERR( ierr, "Cannot create joint atm - phys atm group" )
    int JOIN_COMM_TAG = 5;
    MPI_Comm joinComm;
    ierr = MPI_Comm_create_group( MPI_COMM_WORLD, joinAtmPhysAtmGroup, JOIN_COMM_TAG, &joinComm );
    CHECKIERR( ierr, "Cannot create joint atm cou communicator" )

    ierr = iMOAB_Initialize( 0, NULL );  // not really needed anything from argc, argv, yet; maybe we should
    CHECKIERR( ierr, "Cannot initialize iMOAB" )

    int cmpAtmAppID        = -1;
    iMOAB_AppID cmpAtmPID  = &cmpAtmAppID;   // atm
    int physAtmAppID       = -1;             // -1 means it is not initialized
    iMOAB_AppID physAtmPID = &physAtmAppID;  // phys atm on phys pes

    // load atm mesh
    if( atmComm != MPI_COMM_NULL )
    {
        MPI_Comm_rank( atmComm, &rankInAtmComm );
        ierr = iMOAB_RegisterApplication( "ATM1", &atmComm, &cmpatm, cmpAtmPID );
        CHECKIERR( ierr, "Cannot register ATM App" )

        // load first model
        std::string rdopts = readopts;
        if( typeA == 2 ) rdopts = readoptsPC;  // point cloud
        ierr = iMOAB_LoadMesh( cmpAtmPID, atmFilename.c_str(), rdopts.c_str(), &nghlay );
        CHECKIERR( ierr, "Cannot load ATM mesh" )
    }

    // load atm phys mesh
    if( physComm != MPI_COMM_NULL )
    {
        MPI_Comm_rank( physComm, &rankInPhysComm );
        ierr = iMOAB_RegisterApplication( "PhysATM", &physComm, &physatm, physAtmPID );
        CHECKIERR( ierr, "Cannot register PHYS ATM App" )

        // load phys atm mesh all tests  this is PC
        ierr = iMOAB_LoadMesh( physAtmPID, atmPhysFilename.c_str(), readoptsPC.c_str(), &nghlay );
        CHECKIERR( ierr, "Cannot load Phys ATM mesh" )
    }

    if( MPI_COMM_NULL != joinComm )
    {
        ierr = iMOAB_ComputeCommGraph( cmpAtmPID, physAtmPID, &joinComm, &atmPEGroup, &atmPhysGroup, &typeA, &typeB,
                                       &cmpatm, &physatm );
        // it will generate parcomm graph between atm and atmPhys models
        // 2 meshes, that are distributed in parallel
        CHECKIERR( ierr, "Cannot compute comm graph between the two apps " )
    }

    if( atmComm != MPI_COMM_NULL )
    {
        // call send tag;
        std::string tags = tagT + separ + tagU + separ + tagV;
        ierr             = iMOAB_SendElementTag( cmpAtmPID, tags.c_str(), &joinComm, &physatm );
        CHECKIERR( ierr, "cannot send tag values" )
    }

    if( physComm != MPI_COMM_NULL )
    {
        // need to define tag storage
        std::string tags1 = tagT1 + separ + tagU1 + separ + tagV1 + separ;
        int tagType       = DENSE_DOUBLE;
        int ndof          = 1;
        if( typeB == 1 ) ndof = 16;
        int tagIndex = 0;
        ierr         = iMOAB_DefineTagStorage( physAtmPID, tagT1.c_str(), &tagType, &ndof, &tagIndex );
        CHECKIERR( ierr, "failed to define the field tag a2oTbot" );

        ierr = iMOAB_DefineTagStorage( physAtmPID, tagU1.c_str(), &tagType, &ndof, &tagIndex );
        CHECKIERR( ierr, "failed to define the field tag a2oUbot" );

        ierr = iMOAB_DefineTagStorage( physAtmPID, tagV1.c_str(), &tagType, &ndof, &tagIndex );
        CHECKIERR( ierr, "failed to define the field tag a2oVbot" );

        ierr = iMOAB_ReceiveElementTag( physAtmPID, tags1.c_str(), &joinComm, &cmpatm );
        CHECKIERR( ierr, "cannot receive tag values" )
    }

    // we can now free the sender buffers
    if( atmComm != MPI_COMM_NULL )
    {
        ierr = iMOAB_FreeSenderBuffers( cmpAtmPID, &physatm );
        CHECKIERR( ierr, "cannot free buffers" )
    }

    if( physComm != MPI_COMM_NULL )<--- First condition
    {
        ierr = iMOAB_WriteMesh( physAtmPID, atmPhysOutFilename.c_str(), fileWriteOptions.c_str() );
    }
    if( physComm != MPI_COMM_NULL )<--- Second condition
    {
        // send back first tag only
        ierr = iMOAB_SendElementTag( physAtmPID, tagT1.c_str(), &joinComm, &cmpatm );
        CHECKIERR( ierr, "cannot send tag values" )
    }
    // receive it in a different tag
    if( atmComm != MPI_COMM_NULL )
    {
        // need to define tag storage
        int tagType = DENSE_DOUBLE;
        int ndof    = 16;
        if( typeA == 2 ) ndof = 1;
        int tagIndex = 0;
        ierr         = iMOAB_DefineTagStorage( cmpAtmPID, tagT2.c_str(), &tagType, &ndof, &tagIndex );
        CHECKIERR( ierr, "failed to define the field tag a2oTbot_2" );

        ierr = iMOAB_ReceiveElementTag( cmpAtmPID, tagT2.c_str(), &joinComm, &physatm );
        CHECKIERR( ierr, "cannot receive tag values a2oTbot_2" )
    }
    // now send back one tag , into a different tag, and see if we get the same values back
    // we can now free the sender buffers
    if( physComm != MPI_COMM_NULL )
    {
        ierr = iMOAB_FreeSenderBuffers( physAtmPID, &cmpatm );
        CHECKIERR( ierr, "cannot free buffers " )
    }
    if( atmComm != MPI_COMM_NULL )
    {
        ierr = iMOAB_WriteMesh( cmpAtmPID, atmFilename2.c_str(), fileWriteOptions.c_str() );
    }

    // unregister in reverse order
    if( physComm != MPI_COMM_NULL )
    {
        ierr = iMOAB_DeregisterApplication( physAtmPID );
        CHECKIERR( ierr, "cannot deregister second app model" )
    }

    if( atmComm != MPI_COMM_NULL )
    {
        ierr = iMOAB_DeregisterApplication( cmpAtmPID );
        CHECKIERR( ierr, "cannot deregister first app model" )
    }

    ierr = iMOAB_Finalize();
    CHECKIERR( ierr, "did not finalize iMOAB" )

    // free atm group and comm
    if( MPI_COMM_NULL != atmComm ) MPI_Comm_free( &atmComm );
    MPI_Group_free( &atmPEGroup );

    // free atm phys group and comm
    if( MPI_COMM_NULL != physComm ) MPI_Comm_free( &physComm );
    MPI_Group_free( &atmPhysGroup );

    // free atm phys group and comm
    if( MPI_COMM_NULL != joinComm ) MPI_Comm_free( &joinComm );
    MPI_Group_free( &joinAtmPhysAtmGroup );

    return 0;
}
int main( int argc, char* argv[] )
{

    MPI_Init( &argc, &argv );
    MPI_Comm_rank( MPI_COMM_WORLD, &rankInGlobalComm );
    MPI_Comm_size( MPI_COMM_WORLD, &numProcesses );

    MPI_Comm_group( MPI_COMM_WORLD, &jgroup );  // all processes in global group

    // default: load atm on 2 proc, phys grid on 2 procs, establish comm graph, then migrate
    // data from atm pes to phys pes, and back
    startG1 = 0, startG2 = 0, endG1 = numProcesses - 1, endG2 = numProcesses - 1;

    ProgOptions opts;
    opts.addOpt< std::string >( "modelA,m", "first model file ", &atmFilename );
    opts.addOpt< int >( "typeA,t", " type of first model ", &typeA );

    opts.addOpt< std::string >( "modelB,n", "second model file", &atmPhysFilename );
    opts.addOpt< int >( "typeB,v", " type of the second model ", &typeB );

    opts.addOpt< int >( "startAtm,a", "start task for first model layout", &startG1 );
    opts.addOpt< int >( "endAtm,b", "end task for first model layout", &endG1 );

    opts.addOpt< int >( "startPhys,c", "start task for second model layout", &startG2 );
    opts.addOpt< int >( "endPhys,d", "end task for second model layout", &endG2 );

    opts.addOpt< std::string >( "output,o", "output filename", &atmPhysOutFilename );

    opts.addOpt< std::string >( "output,o", "output filename", &atmFilename2 );

    opts.parseCommandLine( argc, argv );

    int num_err = 0;
    num_err += RUN_TEST( testspectral_phys );

    //
    if( argc == 1 )
    {
        num_err += RUN_TEST( testspectral_lnd );
        num_err += RUN_TEST( testphysatm_lnd );
    }
    MPI_Group_free( &jgroup );

    MPI_Finalize();

    return num_err;
}