MOAB: Mesh Oriented datABase  (version 5.2.1)
ParCommGraph.cpp
Go to the documentation of this file.
00001 /*
00002  * ParCommGraph.cpp
00003  *
00004  */
00005 
00006 #include "moab/ParCommGraph.hpp"
00007 // we need to recompute adjacencies for merging to work
00008 #include "moab/Core.hpp"
00009 #include "AEntityFactory.hpp"
00010 
00011 #ifdef MOAB_HAVE_ZOLTAN
00012 #include "moab/ZoltanPartitioner.hpp"
00013 #endif
00014 
00015 // #define VERBOSE
00016 // #define GRAPH_INFO
00017 
00018 namespace moab
00019 {
00020 ParCommGraph::ParCommGraph( MPI_Comm joincomm, MPI_Group group1, MPI_Group group2, int coid1, int coid2 )
00021     : comm( joincomm ), compid1( coid1 ), compid2( coid2 )
00022 {
00023     // find out the tasks from each group, in the joint communicator
00024     find_group_ranks( group1, comm, senderTasks );
00025     find_group_ranks( group2, comm, receiverTasks );
00026 
00027     rootSender = rootReceiver = false;
00028     rankInGroup1 = rankInGroup2 = rankInJoin = -1;  // not initialized, or not part of the group
00029 
00030     int mpierr = MPI_Group_rank( group1, &rankInGroup1 );
00031     if( MPI_SUCCESS != mpierr || rankInGroup1 == MPI_UNDEFINED ) rankInGroup1 = -1;
00032 
00033     mpierr = MPI_Group_rank( group2, &rankInGroup2 );
00034     if( MPI_SUCCESS != mpierr || rankInGroup2 == MPI_UNDEFINED ) rankInGroup2 = -1;
00035 
00036     mpierr = MPI_Comm_rank( comm, &rankInJoin );
00037     if( MPI_SUCCESS != mpierr )  // it should be a fatal error
00038         rankInJoin = -1;
00039 
00040     mpierr = MPI_Comm_size( comm, &joinSize );
00041     if( MPI_SUCCESS != mpierr )  // it should be a fatal error
00042         joinSize = -1;
00043 
00044     if( 0 == rankInGroup1 ) rootSender = true;
00045     if( 0 == rankInGroup2 ) rootReceiver = true;
00046     graph_type = INITIAL_MIGRATE;  // 0
00047     comm_graph = NULL;
00048     context_id = -1;
00049     cover_set  = 0;  // refers to nothing yet
00050 }
00051 
00052 // copy constructor will copy only few basic things; split ranges will not be copied
00053 ParCommGraph::ParCommGraph( const ParCommGraph& src )
00054 {
00055     comm          = src.comm;
00056     senderTasks   = src.senderTasks;    // these are the sender tasks in joint comm
00057     receiverTasks = src.receiverTasks;  // these are all the receiver tasks in joint comm
00058     rootSender    = src.rootSender;
00059     rootReceiver  = src.rootReceiver;
00060     rankInGroup1  = src.rankInGroup1;
00061     rankInGroup2  = src.rankInGroup2;  // group 1 is sender, 2 is receiver
00062     rankInJoin    = src.rankInJoin;
00063     joinSize      = src.joinSize;
00064     compid1       = src.compid1;
00065     compid2       = src.compid2;
00066     comm_graph    = NULL;
00067     graph_type    = src.graph_type;
00068     context_id    = src.context_id;
00069     cover_set     = src.cover_set;
00070     return;
00071 }
00072 
00073 ParCommGraph::~ParCommGraph()
00074 {
00075     // TODO Auto-generated destructor stub
00076 }
00077 
00078 // utility to find out the ranks of the processes of a group, with respect to a joint comm,
00079 // which spans for sure the group
00080 // it is used locally (in the constructor), but it can be used as a utility
00081 void ParCommGraph::find_group_ranks( MPI_Group group, MPI_Comm joincomm, std::vector< int >& ranks )
00082 {
00083     MPI_Group global_grp;
00084     MPI_Comm_group( joincomm, &global_grp );
00085 
00086     int grp_size;
00087 
00088     MPI_Group_size( group, &grp_size );
00089     std::vector< int > rks( grp_size );
00090     ranks.resize( grp_size );
00091 
00092     for( int i = 0; i < grp_size; i++ )
00093         rks[i] = i;
00094 
00095     MPI_Group_translate_ranks( group, grp_size, &rks[0], global_grp, &ranks[0] );
00096     MPI_Group_free( &global_grp );
00097     return;
00098 }
00099 
00100 ErrorCode ParCommGraph::compute_trivial_partition( std::vector< int >& numElemsPerTaskInGroup1 )
00101 {
00102 
00103     recv_graph.clear();
00104     recv_sizes.clear();
00105     sender_graph.clear();
00106     sender_sizes.clear();
00107 
00108     if( numElemsPerTaskInGroup1.size() != senderTasks.size() )
00109         return MB_FAILURE;  // each sender has a number of elements that it owns
00110 
00111     // first find out total number of elements to be sent from all senders
00112     int total_elems = 0;
00113     std::vector< int > accum;
00114     accum.push_back( 0 );
00115 
00116     int num_senders = (int)senderTasks.size();
00117 
00118     for( size_t k = 0; k < numElemsPerTaskInGroup1.size(); k++ )
00119     {
00120         total_elems += numElemsPerTaskInGroup1[k];
00121         accum.push_back( total_elems );
00122     }
00123 
00124     int num_recv = ( (int)receiverTasks.size() );
00125     // in trivial partition, every receiver should get about total_elems/num_receivers elements
00126     int num_per_receiver = (int)( total_elems / num_recv );
00127     int leftover         = total_elems - num_per_receiver * num_recv;
00128 
00129     // so receiver k will receive  [starts[k], starts[k+1] ) interval
00130     std::vector< int > starts;
00131     starts.resize( num_recv + 1 );
00132     starts[0] = 0;
00133     for( int k = 0; k < num_recv; k++ )
00134     {
00135         starts[k + 1] = starts[k] + num_per_receiver;
00136         if( k < leftover ) starts[k + 1]++;
00137     }
00138 
00139     // each sender will send to a number of receivers, based on how the
00140     // arrays starts[0:num_recv] and accum[0:sendr] overlap
00141     int lastUsedReceiverRank = 0;  // first receiver was not treated yet
00142     for( int j = 0; j < num_senders; j++ )
00143     {
00144         // we could start the receiver loop with the latest receiver that received from previous
00145         // sender
00146         for( int k = lastUsedReceiverRank; k < num_recv; k++ )
00147         {
00148             // if overlap:
00149             if( starts[k] < accum[j + 1] && starts[k + 1] > accum[j] )
00150             {
00151                 recv_graph[receiverTasks[k]].push_back( senderTasks[j] );
00152                 sender_graph[senderTasks[j]].push_back( receiverTasks[k] );
00153 
00154                 // we still need to decide what is the overlap
00155                 int sizeOverlap = 1;  // at least 1, for sure
00156                 // 1
00157                 if( starts[k] >= accum[j] )  // one end is starts[k]
00158                 {
00159                     if( starts[k + 1] >= accum[j + 1] )  // the other end is accum[j+1]
00160                         sizeOverlap = accum[j + 1] - starts[k];
00161                     else  //
00162                         sizeOverlap = starts[k + 1] - starts[k];
00163                 }
00164                 else  // one end is accum[j]
00165                 {
00166                     if( starts[k + 1] >= accum[j + 1] )  // the other end is accum[j+1]
00167                         sizeOverlap = accum[j + 1] - accum[j];
00168                     else
00169                         sizeOverlap = starts[k + 1] - accum[j];
00170                 }
00171                 recv_sizes[receiverTasks[k]].push_back( sizeOverlap );  // basically, task k will receive from
00172                                                                         //   sender j, sizeOverlap elems
00173                 sender_sizes[senderTasks[j]].push_back( sizeOverlap );
00174                 if( starts[k] > accum[j + 1] )
00175                 {
00176                     lastUsedReceiverRank = k - 1;  // so next k loop will start a little higher, we
00177                                                    // probably finished with first few receivers (up
00178                                                    // to receiver lastUsedReceiverRank)
00179                     break;  // break the k loop, we distributed all elements from sender j to some
00180                             // receivers
00181                 }
00182             }
00183         }
00184     }
00185 
00186     return MB_SUCCESS;
00187 }
00188 
00189 ErrorCode ParCommGraph::pack_receivers_graph( std::vector< int >& packed_recv_array )
00190 {
00191     // it will basically look at local data, to pack communication graph, each receiver task will
00192     // have to post receives for each sender task that will send data to it; the array will be
00193     // communicated to root receiver, and eventually distributed to receiver tasks
00194 
00195     /*
00196      * packed_array will have receiver, number of senders, then senders, etc
00197      */
00198     if( recv_graph.size() < receiverTasks.size() )
00199     {
00200         // big problem, we have empty partitions in receive
00201         std::cout << " WARNING: empty partitions, some receiver tasks will receive nothing.\n";
00202     }
00203     for( std::map< int, std::vector< int > >::iterator it = recv_graph.begin(); it != recv_graph.end(); it++ )
00204     {
00205         int recv                    = it->first;
00206         std::vector< int >& senders = it->second;
00207         packed_recv_array.push_back( recv );
00208         packed_recv_array.push_back( (int)senders.size() );
00209 
00210         for( int k = 0; k < (int)senders.size(); k++ )
00211             packed_recv_array.push_back( senders[k] );
00212     }
00213 
00214     return MB_SUCCESS;
00215 }
00216 
00217 ErrorCode ParCommGraph::split_owned_range( int sender_rank, Range& owned )
00218 {
00219     int senderTask                   = senderTasks[sender_rank];
00220     std::vector< int >& distribution = sender_sizes[senderTask];
00221     std::vector< int >& receivers    = sender_graph[senderTask];
00222     if( distribution.size() != receivers.size() )  //
00223         return MB_FAILURE;
00224 
00225     Range current = owned;  // get the full range first, then we will subtract stuff, for
00226     // the following ranges
00227 
00228     Range rleftover = current;
00229     for( size_t k = 0; k < receivers.size(); k++ )
00230     {
00231         Range newr;
00232         newr.insert( current.begin(), current.begin() + distribution[k] );
00233         split_ranges[receivers[k]] = newr;
00234 
00235         rleftover = subtract( current, newr );
00236         current   = rleftover;
00237     }
00238 
00239     return MB_SUCCESS;
00240 }
00241 
00242 // use for this the corresponding tasks and sizes
00243 ErrorCode ParCommGraph::split_owned_range( Range& owned )
00244 {
00245     if( corr_tasks.size() != corr_sizes.size() )  //
00246         return MB_FAILURE;
00247 
00248     Range current = owned;  // get the full range first, then we will subtract stuff, for
00249     // the following ranges
00250 
00251     Range rleftover = current;
00252     for( size_t k = 0; k < corr_tasks.size(); k++ )
00253     {
00254         Range newr;
00255         newr.insert( current.begin(), current.begin() + corr_sizes[k] );
00256         split_ranges[corr_tasks[k]] = newr;
00257 
00258         rleftover = subtract( current, newr );
00259         current   = rleftover;
00260     }
00261 
00262     return MB_SUCCESS;
00263 }
00264 
00265 ErrorCode ParCommGraph::send_graph( MPI_Comm jcomm )
00266 {
00267     if( is_root_sender() )
00268     {
00269         int ierr;
00270         // will need to build a communication graph, because each sender knows now to which receiver
00271         // to send data the receivers need to post receives for each sender that will send data to
00272         // them will need to gather on rank 0 on the sender comm, global ranks of sender with
00273         // receivers to send build communication matrix, each receiver will receive from what sender
00274 
00275         std::vector< int > packed_recv_array;
00276         ErrorCode rval = pack_receivers_graph( packed_recv_array );
00277         if( MB_SUCCESS != rval ) return rval;
00278 
00279         int size_pack_array = (int)packed_recv_array.size();
00280         comm_graph          = new int[size_pack_array + 1];
00281         comm_graph[0]       = size_pack_array;
00282         for( int k = 0; k < size_pack_array; k++ )
00283             comm_graph[k + 1] = packed_recv_array[k];
00284         // will add 2 requests
00285         /// use tag 10 to send size and tag 20 to send the packed array
00286         sendReqs.resize( 1 );
00287         // do not send the size in advance, because we use probe now
00288         /*ierr = MPI_Isend(&comm_graph[0], 1, MPI_INT, receiver(0), 10, jcomm, &sendReqs[0]); // we
00289         have to use global communicator if (ierr!=0) return MB_FAILURE;*/
00290         ierr = MPI_Isend( &comm_graph[1], size_pack_array, MPI_INT, receiver( 0 ), 20, jcomm,
00291                           &sendReqs[0] );  // we have to use global communicator
00292         if( ierr != 0 ) return MB_FAILURE;
00293     }
00294     return MB_SUCCESS;
00295 }
00296 
00297 // pco has MOAB too get_moab()
00298 // do we need to store "method" as a member variable ?
00299 ErrorCode ParCommGraph::send_mesh_parts( MPI_Comm jcomm, ParallelComm* pco, Range& owned )
00300 {
00301 
00302     ErrorCode rval;
00303     if( split_ranges.empty() )  // in trivial partition
00304     {
00305         rval = split_owned_range( rankInGroup1, owned );
00306         if( rval != MB_SUCCESS ) return rval;
00307         // we know this on the sender side:
00308         corr_tasks = sender_graph[senderTasks[rankInGroup1]];  // copy
00309         corr_sizes = sender_sizes[senderTasks[rankInGroup1]];  // another copy
00310     }
00311 
00312     int indexReq = 0;
00313     int ierr;                             // MPI error
00314     if( is_root_sender() ) indexReq = 1;  // for sendReqs
00315     sendReqs.resize( indexReq + split_ranges.size() );
00316     for( std::map< int, Range >::iterator it = split_ranges.begin(); it != split_ranges.end(); it++ )
00317     {
00318         int receiver_proc = it->first;
00319         Range ents        = it->second;
00320 
00321         // add necessary vertices too
00322         Range verts;
00323         rval = pco->get_moab()->get_adjacencies( ents, 0, false, verts, Interface::UNION );
00324         if( rval != MB_SUCCESS )
00325         {
00326             std::cout << " can't get adjacencies. for entities to send\n";
00327             return rval;
00328         }
00329         ents.merge( verts );
00330         ParallelComm::Buffer* buffer = new ParallelComm::Buffer( ParallelComm::INITIAL_BUFF_SIZE );
00331         buffer->reset_ptr( sizeof( int ) );
00332         rval = pco->pack_buffer( ents, false, true, false, -1, buffer );
00333         if( rval != MB_SUCCESS )
00334         {
00335             std::cout << " can't pack buffer for entities to send\n";
00336             return rval;
00337         }
00338         int size_pack = buffer->get_current_size();
00339 
00340         // TODO there could be an issue with endian things; check !!!!!
00341         // we are sending the size of the buffer first as an int!!!
00342         /// not anymore !
00343         /* ierr = MPI_Isend(buffer->mem_ptr, 1, MPI_INT, receiver_proc, 1, jcomm,
00344          &sendReqs[indexReq]); // we have to use global communicator if (ierr!=0) return MB_FAILURE;
00345          indexReq++;*/
00346 
00347         ierr = MPI_Isend( buffer->mem_ptr, size_pack, MPI_CHAR, receiver_proc, 2, jcomm,
00348                           &sendReqs[indexReq] );  // we have to use global communicator
00349         if( ierr != 0 ) return MB_FAILURE;
00350         indexReq++;
00351         localSendBuffs.push_back( buffer );
00352     }
00353     return MB_SUCCESS;
00354 }
00355 
00356 // this is called on receiver side
00357 ErrorCode ParCommGraph::receive_comm_graph( MPI_Comm jcomm, ParallelComm* pco, std::vector< int >& pack_array )
00358 {
00359     // first, receive from sender_rank 0, the communication graph (matrix), so each receiver
00360     // knows what data to expect
00361     MPI_Comm receive = pco->comm();
00362     int size_pack_array, ierr;
00363     MPI_Status status;
00364     if( rootReceiver )
00365     {
00366         /*
00367          * MPI_Probe(
00368         int source,
00369         int tag,
00370         MPI_Comm comm,
00371         MPI_Status* status)
00372          *
00373          */
00374         ierr = MPI_Probe( sender( 0 ), 20, jcomm, &status );
00375         if( 0 != ierr )
00376         {
00377             std::cout << " MPI_Probe failure: " << ierr << "\n";
00378             return MB_FAILURE;
00379         }
00380         // get the count of data received from the MPI_Status structure
00381         ierr = MPI_Get_count( &status, MPI_INT, &size_pack_array );
00382         if( 0 != ierr )
00383         {
00384             std::cout << " MPI_Get_count failure: " << ierr << "\n";
00385             return MB_FAILURE;
00386         }
00387 #ifdef VERBOSE
00388         std::cout << " receive comm graph size: " << size_pack_array << "\n";
00389 #endif
00390         pack_array.resize( size_pack_array );
00391         ierr = MPI_Recv( &pack_array[0], size_pack_array, MPI_INT, sender( 0 ), 20, jcomm, &status );
00392         if( 0 != ierr ) return MB_FAILURE;
00393 #ifdef VERBOSE
00394         std::cout << " receive comm graph ";
00395         for( int k = 0; k < (int)pack_array.size(); k++ )
00396             std::cout << " " << pack_array[k];
00397         std::cout << "\n";
00398 #endif
00399     }
00400 
00401     // now broadcast this whole array to all receivers, so they know what to expect
00402     ierr = MPI_Bcast( &size_pack_array, 1, MPI_INT, 0, receive );
00403     if( 0 != ierr ) return MB_FAILURE;
00404     pack_array.resize( size_pack_array );
00405     ierr = MPI_Bcast( &pack_array[0], size_pack_array, MPI_INT, 0, receive );
00406     if( 0 != ierr ) return MB_FAILURE;
00407     return MB_SUCCESS;
00408 }
00409 
00410 ErrorCode ParCommGraph::receive_mesh( MPI_Comm jcomm, ParallelComm* pco, EntityHandle local_set,
00411                                       std::vector< int >& senders_local )
00412 {
00413     ErrorCode rval;
00414     int ierr;
00415     MPI_Status status;
00416     // we also need to fill corresponding mesh info on the other side
00417     corr_tasks = senders_local;
00418     Range newEnts;
00419 
00420     Tag orgSendProcTag;  // this will be a tag set on the received mesh, with info about from what
00421                          // task / PE the
00422     // primary element came from, in the joint communicator ; this will be forwarded by coverage
00423     // mesh
00424     int defaultInt = -1;  // no processor, so it was not migrated from somewhere else
00425     rval           = pco->get_moab()->tag_get_handle( "orig_sending_processor", 1, MB_TYPE_INTEGER, orgSendProcTag,
00426                                             MB_TAG_DENSE | MB_TAG_CREAT, &defaultInt );MB_CHK_SET_ERR( rval, "can't create original sending processor tag" );
00427     if( !senders_local.empty() )
00428     {
00429         for( size_t k = 0; k < senders_local.size(); k++ )
00430         {
00431             int sender1 = senders_local[k];
00432             // first receive the size of the buffer using probe
00433             /*
00434                  * MPI_Probe(
00435                 int source,
00436                 int tag,
00437                 MPI_Comm comm,
00438                 MPI_Status* status)
00439                  *
00440                  */
00441             ierr = MPI_Probe( sender1, 2, jcomm, &status );
00442             if( 0 != ierr )
00443             {
00444                 std::cout << " MPI_Probe failure in ParCommGraph::receive_mesh " << ierr << "\n";
00445                 return MB_FAILURE;
00446             }
00447             // get the count of data received from the MPI_Status structure
00448             int size_pack;
00449             ierr = MPI_Get_count( &status, MPI_CHAR, &size_pack );
00450             if( 0 != ierr )
00451             {
00452                 std::cout << " MPI_Get_count failure in ParCommGraph::receive_mesh  " << ierr << "\n";
00453                 return MB_FAILURE;
00454             }
00455 
00456             /* ierr = MPI_Recv (&size_pack, 1, MPI_INT, sender1, 1, jcomm, &status);
00457              if (0!=ierr) return MB_FAILURE;*/
00458             // now resize the buffer, then receive it
00459             ParallelComm::Buffer* buffer = new ParallelComm::Buffer( size_pack );
00460             // buffer->reserve(size_pack);
00461 
00462             ierr = MPI_Recv( buffer->mem_ptr, size_pack, MPI_CHAR, sender1, 2, jcomm, &status );
00463             if( 0 != ierr )
00464             {
00465                 std::cout << " MPI_Recv failure in ParCommGraph::receive_mesh " << ierr << "\n";
00466                 return MB_FAILURE;
00467             }
00468             // now unpack the buffer we just received
00469             Range entities;
00470             std::vector< std::vector< EntityHandle > > L1hloc, L1hrem;
00471             std::vector< std::vector< int > > L1p;
00472             std::vector< EntityHandle > L2hloc, L2hrem;
00473             std::vector< unsigned int > L2p;
00474 
00475             buffer->reset_ptr( sizeof( int ) );
00476             std::vector< EntityHandle > entities_vec( entities.size() );
00477             std::copy( entities.begin(), entities.end(), entities_vec.begin() );
00478             rval = pco->unpack_buffer( buffer->buff_ptr, false, -1, -1, L1hloc, L1hrem, L1p, L2hloc, L2hrem, L2p,
00479                                        entities_vec );
00480             delete buffer;
00481             if( MB_SUCCESS != rval ) return rval;
00482 
00483             std::copy( entities_vec.begin(), entities_vec.end(), range_inserter( entities ) );
00484             // we have to add them to the local set
00485             rval = pco->get_moab()->add_entities( local_set, entities );
00486             if( MB_SUCCESS != rval ) return rval;
00487             // corr_sizes is the size of primary entities received
00488             Range verts              = entities.subset_by_dimension( 0 );
00489             Range local_primary_ents = subtract( entities, verts );
00490             if( local_primary_ents.empty() )
00491             {
00492                 // it is possible that all ents sent were vertices (point cloud)
00493                 // then consider primary entities the vertices
00494                 local_primary_ents = verts;
00495             }
00496             else
00497             {
00498                 // set a tag with the original sender for the primary entity
00499                 // will be used later for coverage mesh
00500                 std::vector< int > orig_senders( local_primary_ents.size(), sender1 );
00501                 rval = pco->get_moab()->tag_set_data( orgSendProcTag, local_primary_ents, &orig_senders[0] );
00502             }
00503             corr_sizes.push_back( (int)local_primary_ents.size() );
00504 
00505             newEnts.merge( entities );
00506             // make these in split ranges
00507             split_ranges[sender1] = local_primary_ents;
00508 
00509 #ifdef VERBOSE
00510             std::ostringstream partial_outFile;
00511 
00512             partial_outFile << "part_send_" << sender1 << "."
00513                             << "recv" << rankInJoin << ".vtk";
00514 
00515             // the mesh contains ghosts too, but they are not part of mat/neumann set
00516             // write in serial the file, to see what tags are missing
00517             std::cout << " writing from receiver " << rankInJoin << " from sender " << sender1
00518                       << " entities: " << entities.size() << std::endl;
00519             rval = pco->get_moab()->write_file( partial_outFile.str().c_str(), 0, 0, &local_set,
00520                                                 1 );  // everything on local set received
00521             if( MB_SUCCESS != rval ) return rval;
00522 #endif
00523         }
00524     }
00525     // make sure adjacencies are updated on the new elements
00526 
00527     if( newEnts.empty() ) { std::cout << " WARNING: this task did not receive any entities \n"; }
00528     // in order for the merging to work, we need to be sure that the adjacencies are updated
00529     // (created)
00530     Range local_verts        = newEnts.subset_by_type( MBVERTEX );
00531     newEnts                  = subtract( newEnts, local_verts );
00532     Core* mb                 = (Core*)pco->get_moab();
00533     AEntityFactory* adj_fact = mb->a_entity_factory();
00534     if( !adj_fact->vert_elem_adjacencies() )
00535         adj_fact->create_vert_elem_adjacencies();
00536     else
00537     {
00538         for( Range::iterator it = newEnts.begin(); it != newEnts.end(); it++ )
00539         {
00540             EntityHandle eh          = *it;
00541             const EntityHandle* conn = NULL;
00542             int num_nodes            = 0;
00543             rval                     = mb->get_connectivity( eh, conn, num_nodes );
00544             if( MB_SUCCESS != rval ) return rval;
00545             adj_fact->notify_create_entity( eh, conn, num_nodes );
00546         }
00547     }
00548 
00549     return MB_SUCCESS;
00550 }
00551 
00552 // VSM: Why is the communicator never used. Remove the argument ?
00553 ErrorCode ParCommGraph::release_send_buffers()
00554 {
00555     int ierr, nsize = (int)sendReqs.size();
00556     std::vector< MPI_Status > mult_status;
00557     mult_status.resize( sendReqs.size() );
00558     ierr = MPI_Waitall( nsize, &sendReqs[0], &mult_status[0] );
00559 
00560     if( ierr != 0 ) return MB_FAILURE;
00561     // now we can free all buffers
00562     delete[] comm_graph;
00563     comm_graph = NULL;
00564     std::vector< ParallelComm::Buffer* >::iterator vit;
00565     for( vit = localSendBuffs.begin(); vit != localSendBuffs.end(); ++vit )
00566         delete( *vit );
00567     localSendBuffs.clear();
00568     return MB_SUCCESS;
00569 }
00570 
00571 // again, will use the send buffers, for nonblocking sends;
00572 // should be the receives non-blocking too?
00573 ErrorCode ParCommGraph::send_tag_values( MPI_Comm jcomm, ParallelComm* pco, Range& owned,
00574                                          std::vector< Tag >& tag_handles )
00575 {
00576     // basically, owned.size() needs to be equal to sum(corr_sizes)
00577     // get info about the tag size, type, etc
00578     int ierr;
00579     Core* mb = (Core*)pco->get_moab();
00580     // get info about the tag
00581     //! Get the size of the specified tag in bytes
00582     int total_bytes_per_entity = 0;  // we need to know, to allocate buffers
00583     ErrorCode rval;
00584     std::vector< int > vect_bytes_per_tag;
00585 #ifdef VERBOSE
00586     std::vector< int > tag_sizes;
00587 #endif
00588     for( size_t i = 0; i < tag_handles.size(); i++ )
00589     {
00590         int bytes_per_tag;
00591         rval = mb->tag_get_bytes( tag_handles[i], bytes_per_tag );MB_CHK_ERR( rval );
00592         int tag_size1;  // length
00593         rval = mb->tag_get_length( tag_handles[i], tag_size1 );MB_CHK_ERR( rval );
00594         if( graph_type == DOF_BASED )
00595             bytes_per_tag = bytes_per_tag / tag_size1;  // we know we have one double per tag , per ID sent;
00596                                                         // could be 8 for double, 4 for int, etc
00597         total_bytes_per_entity += bytes_per_tag;
00598         vect_bytes_per_tag.push_back( bytes_per_tag );
00599 #ifdef VERBOSE
00600         int tag_size;
00601         rval = mb->tag_get_length( tag_handles[i], tag_size );MB_CHK_ERR( rval );
00602         tag_sizes.push_back( tag_size );
00603 #endif
00604     }
00605 
00606     int indexReq = 0;
00607     if( graph_type == INITIAL_MIGRATE )  // original send
00608     {
00609         // use the buffers data structure to allocate memory for sending the tags
00610         sendReqs.resize( split_ranges.size() );
00611 
00612         for( std::map< int, Range >::iterator it = split_ranges.begin(); it != split_ranges.end(); it++ )
00613         {
00614             int receiver_proc = it->first;
00615             Range ents        = it->second;  // primary entities, with the tag data
00616             int size_buffer   = 4 + total_bytes_per_entity *
00617                                       (int)ents.size();  // hopefully, below 2B; if more, we have a big problem ...
00618             ParallelComm::Buffer* buffer = new ParallelComm::Buffer( size_buffer );
00619 
00620             buffer->reset_ptr( sizeof( int ) );
00621             for( size_t i = 0; i < tag_handles.size(); i++ )
00622             {
00623                 // copy tag data to buffer->buff_ptr, and send the buffer (we could have used
00624                 // regular char arrays)
00625                 rval = mb->tag_get_data( tag_handles[i], ents, (void*)( buffer->buff_ptr ) );MB_CHK_ERR( rval );
00626                 // advance the butter
00627                 buffer->buff_ptr += vect_bytes_per_tag[i] * ents.size();
00628             }
00629             *( (int*)buffer->mem_ptr ) = size_buffer;
00630             // int size_pack = buffer->get_current_size(); // debug check
00631             ierr = MPI_Isend( buffer->mem_ptr, size_buffer, MPI_CHAR, receiver_proc, 222, jcomm,
00632                               &sendReqs[indexReq] );  // we have to use global communicator
00633             if( ierr != 0 ) return MB_FAILURE;
00634             indexReq++;
00635             localSendBuffs.push_back( buffer );  // we will release them after nonblocking sends are completed
00636         }
00637     }
00638     else if( graph_type == COVERAGE )
00639     {
00640         // we know that we will need to send some tag data in a specific order
00641         // first, get the ids of the local elements, from owned Range; arrange the buffer in order
00642         // of increasing global id
00643         Tag gidTag;
00644         rval = mb->tag_get_handle( "GLOBAL_ID", gidTag );MB_CHK_ERR( rval );
00645         std::vector< int > gids;
00646         gids.resize( owned.size() );
00647         rval = mb->tag_get_data( gidTag, owned, &gids[0] );MB_CHK_ERR( rval );
00648         std::map< int, EntityHandle > gidToHandle;
00649         size_t i = 0;
00650         for( Range::iterator it = owned.begin(); it != owned.end(); it++ )
00651         {
00652             EntityHandle eh        = *it;
00653             gidToHandle[gids[i++]] = eh;
00654         }
00655         // now, pack the data and send it
00656         sendReqs.resize( involved_IDs_map.size() );
00657         for( std::map< int, std::vector< int > >::iterator mit = involved_IDs_map.begin();
00658              mit != involved_IDs_map.end(); mit++ )
00659         {
00660             int receiver_proc        = mit->first;
00661             std::vector< int >& eids = mit->second;
00662             int size_buffer          = 4 + total_bytes_per_entity *
00663                                       (int)eids.size();  // hopefully, below 2B; if more, we have a big problem ...
00664             ParallelComm::Buffer* buffer = new ParallelComm::Buffer( size_buffer );
00665             buffer->reset_ptr( sizeof( int ) );
00666 #ifdef VERBOSE
00667             std::ofstream dbfile;
00668             std::stringstream outf;
00669             outf << "from_" << rankInJoin << "_send_to_" << receiver_proc << ".txt";
00670             dbfile.open( outf.str().c_str() );
00671             dbfile << "from " << rankInJoin << " send to " << receiver_proc << "\n";
00672 #endif
00673             // copy tag data to buffer->buff_ptr, and send the buffer (we could have used regular
00674             // char arrays) pack data by tag, to be consistent with above, even though we loop
00675             // through the entities for each tag
00676 
00677             for( std::vector< int >::iterator it = eids.begin(); it != eids.end(); it++ )
00678             {
00679                 int eID         = *it;
00680                 EntityHandle eh = gidToHandle[eID];
00681                 for( i = 0; i < tag_handles.size(); i++ )
00682                 {
00683                     rval = mb->tag_get_data( tag_handles[i], &eh, 1, (void*)( buffer->buff_ptr ) );MB_CHK_ERR( rval );
00684 #ifdef VERBOSE
00685                     dbfile << "global ID " << eID << " local handle " << mb->id_from_handle( eh ) << " vals: ";
00686                     double* vals = (double*)( buffer->buff_ptr );
00687                     for( int kk = 0; kk < tag_sizes[i]; kk++ )
00688                     {
00689                         dbfile << " " << *vals;
00690                         vals++;
00691                     }
00692                     dbfile << "\n";
00693 #endif
00694                     buffer->buff_ptr += vect_bytes_per_tag[i];
00695                 }
00696             }
00697 
00698 #ifdef VERBOSE
00699             dbfile.close();
00700 #endif
00701             *( (int*)buffer->mem_ptr ) = size_buffer;
00702             // int size_pack = buffer->get_current_size(); // debug check
00703             ierr = MPI_Isend( buffer->mem_ptr, size_buffer, MPI_CHAR, receiver_proc, 222, jcomm,
00704                               &sendReqs[indexReq] );  // we have to use global communicator
00705             if( ierr != 0 ) return MB_FAILURE;
00706             indexReq++;
00707             localSendBuffs.push_back( buffer );  // we will release them after nonblocking sends are completed
00708         }
00709     }
00710     else if( graph_type == DOF_BASED )
00711     {
00712         // need to fill up the buffer, in the order desired, send it
00713         // get all the tags, for all owned entities, and pack the buffers accordingly
00714         // we do not want to get the tags by entity, it may be too expensive
00715         std::vector< std::vector< double > > valuesTags;
00716         valuesTags.resize( tag_handles.size() );
00717         for( size_t i = 0; i < tag_handles.size(); i++ )
00718         {
00719 
00720             int bytes_per_tag;
00721             rval = mb->tag_get_bytes( tag_handles[i], bytes_per_tag );MB_CHK_ERR( rval );
00722             valuesTags[i].resize( owned.size() * bytes_per_tag );
00723             // fill the whole array, we will pick up from here
00724             rval = mb->tag_get_data( tag_handles[i], owned, (void*)( &( valuesTags[i][0] ) ) );MB_CHK_ERR( rval );
00725         }
00726         // now, pack the data and send it
00727         sendReqs.resize( involved_IDs_map.size() );
00728         for( std::map< int, std::vector< int > >::iterator mit = involved_IDs_map.begin();
00729              mit != involved_IDs_map.end(); mit++ )
00730         {
00731             int receiver_proc                   = mit->first;
00732             std::vector< int >& eids            = mit->second;
00733             std::vector< int >& index_in_values = map_index[receiver_proc];
00734             std::vector< int >& index_ptr       = map_ptr[receiver_proc];  // this is eids.size()+1;
00735             int size_buffer                     = 4 + total_bytes_per_entity *
00736                                       (int)eids.size();  // hopefully, below 2B; if more, we have a big problem ...
00737             ParallelComm::Buffer* buffer = new ParallelComm::Buffer( size_buffer );
00738             buffer->reset_ptr( sizeof( int ) );
00739 #ifdef VERBOSE
00740             std::ofstream dbfile;
00741             std::stringstream outf;
00742             outf << "from_" << rankInJoin << "_send_to_" << receiver_proc << ".txt";
00743             dbfile.open( outf.str().c_str() );
00744             dbfile << "from " << rankInJoin << " send to " << receiver_proc << "\n";
00745 #endif
00746             // copy tag data to buffer->buff_ptr, and send the buffer
00747             // pack data by tag, to be consistent with above
00748             int j = 0;
00749             for( std::vector< int >::iterator it = eids.begin(); it != eids.end(); it++, j++ )
00750             {
00751                 int index_in_v = index_in_values[index_ptr[j]];
00752                 for( size_t i = 0; i < tag_handles.size(); i++ )
00753                 {
00754                     // right now, move just doubles; but it could be any type of tag
00755                     *( (double*)( buffer->buff_ptr ) ) = valuesTags[i][index_in_v];
00756                     buffer->buff_ptr += 8;  // we know we are working with doubles only !!!
00757                 }
00758             };
00759             *( (int*)buffer->mem_ptr ) = size_buffer;
00760             // int size_pack = buffer->get_current_size(); // debug check
00761             ierr = MPI_Isend( buffer->mem_ptr, size_buffer, MPI_CHAR, receiver_proc, 222, jcomm,
00762                               &sendReqs[indexReq] );  // we have to use global communicator
00763             if( ierr != 0 ) return MB_FAILURE;
00764             indexReq++;
00765             localSendBuffs.push_back( buffer );  // we will release them after nonblocking sends are completed
00766         }
00767     }
00768     return MB_SUCCESS;
00769 }
00770 
00771 ErrorCode ParCommGraph::receive_tag_values( MPI_Comm jcomm, ParallelComm* pco, Range& owned,
00772                                             std::vector< Tag >& tag_handles )
00773 {
00774     // opposite to sending, we will use blocking receives
00775     int ierr;
00776     MPI_Status status;
00777     // basically, owned.size() needs to be equal to sum(corr_sizes)
00778     // get info about the tag size, type, etc
00779     Core* mb = (Core*)pco->get_moab();
00780     // get info about the tag
00781     //! Get the size of the specified tag in bytes
00782     ErrorCode rval;
00783     int total_bytes_per_entity = 0;
00784     std::vector< int > vect_bytes_per_tag;
00785 #ifdef VERBOSE
00786     std::vector< int > tag_sizes;
00787 #endif
00788     for( size_t i = 0; i < tag_handles.size(); i++ )
00789     {
00790         int bytes_per_tag;
00791         rval = mb->tag_get_bytes( tag_handles[i], bytes_per_tag );MB_CHK_ERR( rval );
00792         total_bytes_per_entity += bytes_per_tag;
00793         vect_bytes_per_tag.push_back( bytes_per_tag );
00794 #ifdef VERBOSE
00795         int tag_size;
00796         rval = mb->tag_get_length( tag_handles[i], tag_size );MB_CHK_ERR( rval );
00797         tag_sizes.push_back( tag_size );
00798 #endif
00799     }
00800 
00801     if( graph_type == INITIAL_MIGRATE )
00802     {
00803         // std::map<int, Range> split_ranges;
00804         // rval = split_owned_range ( owned);MB_CHK_ERR ( rval );
00805 
00806         // use the buffers data structure to allocate memory for receiving the tags
00807         for( std::map< int, Range >::iterator it = split_ranges.begin(); it != split_ranges.end(); it++ )
00808         {
00809             int sender_proc = it->first;
00810             Range ents      = it->second;  // primary entities, with the tag data, we will receive
00811             int size_buffer = 4 + total_bytes_per_entity *
00812                                       (int)ents.size();  // hopefully, below 2B; if more, we have a big problem ...
00813             ParallelComm::Buffer* buffer = new ParallelComm::Buffer( size_buffer );
00814 
00815             buffer->reset_ptr( sizeof( int ) );
00816 
00817             *( (int*)buffer->mem_ptr ) = size_buffer;
00818             // int size_pack = buffer->get_current_size(); // debug check
00819 
00820             ierr = MPI_Recv( buffer->mem_ptr, size_buffer, MPI_CHAR, sender_proc, 222, jcomm, &status );
00821             if( ierr != 0 ) return MB_FAILURE;
00822             // now set the tag
00823             // copy to tag
00824 
00825             for( size_t i = 0; i < tag_handles.size(); i++ )
00826             {
00827                 rval = mb->tag_set_data( tag_handles[i], ents, (void*)( buffer->buff_ptr ) );
00828                 buffer->buff_ptr += vect_bytes_per_tag[i] * ents.size();
00829             }
00830             delete buffer;  // no need for it afterwards
00831             MB_CHK_ERR( rval );
00832         }
00833     }
00834     else if( graph_type == COVERAGE )  // receive buffer, then extract tag data, in a loop
00835     {
00836         // we know that we will need to receive some tag data in a specific order (by ids stored)
00837         // first, get the ids of the local elements, from owned Range; unpack the buffer in order
00838         Tag gidTag;
00839         rval = mb->tag_get_handle( "GLOBAL_ID", gidTag );MB_CHK_ERR( rval );
00840         std::vector< int > gids;
00841         gids.resize( owned.size() );
00842         rval = mb->tag_get_data( gidTag, owned, &gids[0] );MB_CHK_ERR( rval );
00843         std::map< int, EntityHandle > gidToHandle;
00844         size_t i = 0;
00845         for( Range::iterator it = owned.begin(); it != owned.end(); it++ )
00846         {
00847             EntityHandle eh        = *it;
00848             gidToHandle[gids[i++]] = eh;
00849         }
00850         //
00851         // now, unpack the data and set it to the tag
00852         for( std::map< int, std::vector< int > >::iterator mit = involved_IDs_map.begin();
00853              mit != involved_IDs_map.end(); mit++ )
00854         {
00855             int sender_proc          = mit->first;
00856             std::vector< int >& eids = mit->second;
00857             int size_buffer          = 4 + total_bytes_per_entity *
00858                                       (int)eids.size();  // hopefully, below 2B; if more, we have a big problem ...
00859             ParallelComm::Buffer* buffer = new ParallelComm::Buffer( size_buffer );
00860             buffer->reset_ptr( sizeof( int ) );
00861             *( (int*)buffer->mem_ptr ) = size_buffer;  // this is really not necessary, it should receive this too
00862 
00863             // receive the buffer
00864             ierr = MPI_Recv( buffer->mem_ptr, size_buffer, MPI_CHAR, sender_proc, 222, jcomm, &status );
00865             if( ierr != 0 ) return MB_FAILURE;
00866 // start copy
00867 #ifdef VERBOSE
00868             std::ofstream dbfile;
00869             std::stringstream outf;
00870             outf << "recvFrom_" << sender_proc << "_on_proc_" << rankInJoin << ".txt";
00871             dbfile.open( outf.str().c_str() );
00872             dbfile << "recvFrom_" << sender_proc << " on proc  " << rankInJoin << "\n";
00873 #endif
00874 
00875             // copy tag data from buffer->buff_ptr
00876             // data is arranged by tag , and repeat the loop for each entity ()
00877             // maybe it should be arranged by entity now, not by tag (so one loop for entities,
00878             // outside)
00879 
00880             for( std::vector< int >::iterator it = eids.begin(); it != eids.end(); it++ )
00881             {
00882                 int eID                                     = *it;
00883                 std::map< int, EntityHandle >::iterator mit = gidToHandle.find( eID );
00884                 if( mit == gidToHandle.end() )
00885                 {
00886                     std::cout << " on rank: " << rankInJoin << " cannot find entity handle with global ID " << eID
00887                               << "\n";
00888                     return MB_FAILURE;
00889                 }
00890                 EntityHandle eh = mit->second;
00891                 for( i = 0; i < tag_handles.size(); i++ )
00892                 {
00893                     rval = mb->tag_set_data( tag_handles[i], &eh, 1, (void*)( buffer->buff_ptr ) );MB_CHK_ERR( rval );
00894 #ifdef VERBOSE
00895                     dbfile << "global ID " << eID << " local handle " << mb->id_from_handle( eh ) << " vals: ";
00896                     double* vals = (double*)( buffer->buff_ptr );
00897                     for( int kk = 0; kk < tag_sizes[i]; kk++ )
00898                     {
00899                         dbfile << " " << *vals;
00900                         vals++;
00901                     }
00902                     dbfile << "\n";
00903 #endif
00904                     buffer->buff_ptr += vect_bytes_per_tag[i];
00905                 }
00906             }
00907 
00908             // delete receive buffer
00909             delete buffer;
00910 #ifdef VERBOSE
00911             dbfile.close();
00912 #endif
00913         }
00914     }
00915     else if( graph_type == DOF_BASED )
00916     {
00917         // need to fill up the values for each tag, in the order desired, from the buffer received
00918         //
00919         // get all the tags, for all owned entities, and pack the buffers accordingly
00920         // we do not want to get the tags by entity, it may be too expensive
00921         std::vector< std::vector< double > > valuesTags;
00922         valuesTags.resize( tag_handles.size() );
00923         for( size_t i = 0; i < tag_handles.size(); i++ )
00924         {
00925             int bytes_per_tag;
00926             rval = mb->tag_get_bytes( tag_handles[i], bytes_per_tag );MB_CHK_ERR( rval );
00927             valuesTags[i].resize( owned.size() * bytes_per_tag );
00928             // fill the whole array, we will pick up from here
00929             // we will fill this array, using data from received buffer
00930             // rval = mb->tag_get_data(owned, (void*)( &(valuesTags[i][0])) );MB_CHK_ERR ( rval );
00931         }
00932         // now, unpack the data and set the tags
00933         sendReqs.resize( involved_IDs_map.size() );
00934         for( std::map< int, std::vector< int > >::iterator mit = involved_IDs_map.begin();
00935              mit != involved_IDs_map.end(); mit++ )
00936         {
00937             int sender_proc                     = mit->first;
00938             std::vector< int >& eids            = mit->second;
00939             std::vector< int >& index_in_values = map_index[sender_proc];
00940             std::vector< int >& index_ptr       = map_ptr[sender_proc];  // this is eids.size()+1;
00941             int size_buffer                     = 4 + total_bytes_per_entity *
00942                                       (int)eids.size();  // hopefully, below 2B; if more, we have a big problem ...
00943             ParallelComm::Buffer* buffer = new ParallelComm::Buffer( size_buffer );
00944             buffer->reset_ptr( sizeof( int ) );
00945 
00946             // receive the buffer
00947             ierr = MPI_Recv( buffer->mem_ptr, size_buffer, MPI_CHAR, sender_proc, 222, jcomm, &status );
00948             if( ierr != 0 ) return MB_FAILURE;
00949             // use the values in buffer to populate valuesTag arrays, fill it up!
00950             int j = 0;
00951             for( std::vector< int >::iterator it = eids.begin(); it != eids.end(); it++, j++ )
00952             {
00953                 for( size_t i = 0; i < tag_handles.size(); i++ )
00954                 {
00955                     // right now, move just doubles; but it could be any type of tag
00956                     double val = *( (double*)( buffer->buff_ptr ) );
00957                     buffer->buff_ptr += 8;  // we know we are working with doubles only !!!
00958                     for( int k = index_ptr[j]; k < index_ptr[j + 1]; k++ )
00959                         valuesTags[i][index_in_values[k]] = val;
00960                 }
00961             }
00962             // we are done with the buffer in which we received tags, release / delete it
00963             delete buffer;
00964         }
00965         // now we populated the values for all tags; set now the tags!
00966         for( size_t i = 0; i < tag_handles.size(); i++ )
00967         {
00968             // we will fill this array, using data from received buffer
00969             rval = mb->tag_set_data( tag_handles[i], owned, (void*)( &( valuesTags[i][0] ) ) );MB_CHK_ERR( rval );
00970         }
00971     }
00972     return MB_SUCCESS;
00973 }
00974 /*
00975  * for example
00976  */
00977 ErrorCode ParCommGraph::settle_send_graph( TupleList& TLcovIDs )
00978 {
00979     // fill involved_IDs_map with data
00980     // will have "receiving proc" and global id of element
00981     int n      = TLcovIDs.get_n();
00982     graph_type = COVERAGE;  // do not rely only on involved_IDs_map.size(); this can be 0 in some cases
00983     for( int i = 0; i < n; i++ )
00984     {
00985         int to_proc      = TLcovIDs.vi_wr[2 * i];
00986         int globalIdElem = TLcovIDs.vi_wr[2 * i + 1];
00987         involved_IDs_map[to_proc].push_back( globalIdElem );
00988     }
00989 #ifdef VERBOSE
00990     for( std::map< int, std::vector< int > >::iterator mit = involved_IDs_map.begin(); mit != involved_IDs_map.end();
00991          mit++ )
00992     {
00993         std::cout << " towards task " << mit->first << " send: " << mit->second.size() << " cells " << std::endl;
00994         for( size_t i = 0; i < mit->second.size(); i++ )
00995         {
00996             std::cout << " " << mit->second[i];
00997         }
00998         std::cout << std::endl;
00999     }
01000 #endif
01001     return MB_SUCCESS;
01002 }
01003 
01004 // this will set involved_IDs_map will store all ids to be received from one sender task
01005 void ParCommGraph::SetReceivingAfterCoverage(
01006     std::map< int, std::set< int > >& idsFromProcs )  // will make sense only on receivers, right now after cov
01007 {
01008     for( std::map< int, std::set< int > >::iterator mt = idsFromProcs.begin(); mt != idsFromProcs.end(); mt++ )
01009     {
01010         int fromProc            = mt->first;
01011         std::set< int >& setIds = mt->second;
01012         involved_IDs_map[fromProc].resize( setIds.size() );
01013         std::vector< int >& listIDs = involved_IDs_map[fromProc];
01014         size_t indx                 = 0;
01015         for( std::set< int >::iterator st = setIds.begin(); st != setIds.end(); st++ )
01016         {
01017             int valueID     = *st;
01018             listIDs[indx++] = valueID;
01019         }
01020     }
01021     graph_type = COVERAGE;
01022     return;
01023 }
01024 //#define VERBOSE
01025 void ParCommGraph::settle_comm_by_ids( int comp, TupleList& TLBackToComp, std::vector< int >& valuesComp )
01026 {
01027     // settle comm graph on comp
01028     if( rootSender || rootReceiver ) std::cout << " settle comm graph by id on component " << comp << "\n";
01029     int n = TLBackToComp.get_n();
01030     // third_method = true; // do not rely only on involved_IDs_map.size(); this can be 0 in some
01031     // cases
01032     std::map< int, std::set< int > > uniqueIDs;
01033 
01034     for( int i = 0; i < n; i++ )
01035     {
01036         int to_proc  = TLBackToComp.vi_wr[3 * i + 2];
01037         int globalId = TLBackToComp.vi_wr[3 * i + 1];
01038         uniqueIDs[to_proc].insert( globalId );
01039     }
01040 
01041     // Vector to store element
01042     // with respective present index
01043     std::vector< std::pair< int, int > > vp;
01044     vp.reserve( valuesComp.size() );
01045 
01046     // Inserting element in pair vector
01047     // to keep track of previous indexes in valuesComp
01048     for( int i = 0; i < (int)valuesComp.size(); ++i )
01049     {
01050         vp.push_back( std::make_pair( valuesComp[i], i ) );
01051     }
01052     // Sorting pair vector
01053     sort( vp.begin(), vp.end() );
01054 
01055     // vp[i].first, second
01056 
01057     // count now how many times some value appears in ordered (so in valuesComp)
01058     for( std::map< int, std::set< int > >::iterator it = uniqueIDs.begin(); it != uniqueIDs.end(); it++ )
01059     {
01060         int procId                  = it->first;
01061         std::set< int >& nums       = it->second;
01062         std::vector< int >& indx    = map_ptr[procId];
01063         std::vector< int >& indices = map_index[procId];
01064         indx.resize( nums.size() + 1 );
01065         int indexInVp = 0;
01066         int indexVal  = 0;
01067         indx[0]       = 0;  // start from 0
01068         for( std::set< int >::iterator sst = nums.begin(); sst != nums.end(); sst++, indexVal++ )
01069         {
01070             int val = *sst;
01071             involved_IDs_map[procId].push_back( val );
01072             indx[indexVal + 1] = indx[indexVal];
01073             while( ( indexInVp < (int)valuesComp.size() ) && ( vp[indexInVp].first <= val ) )  // should be equal !
01074             {
01075                 if( vp[indexInVp].first == val )
01076                 {
01077                     indx[indexVal + 1]++;
01078                     indices.push_back( vp[indexInVp].second );
01079                 }
01080                 indexInVp++;
01081             }
01082         }
01083     }
01084 #ifdef VERBOSE
01085     std::stringstream f1;
01086     std::ofstream dbfile;
01087     f1 << "Involve_" << comp << "_" << rankInJoin << ".txt";
01088     dbfile.open( f1.str().c_str() );
01089     for( std::map< int, std::vector< int > >::iterator mit = involved_IDs_map.begin(); mit != involved_IDs_map.end();
01090          mit++ )
01091     {
01092         int corrTask                = mit->first;
01093         std::vector< int >& corrIds = mit->second;
01094         std::vector< int >& indx    = map_ptr[corrTask];
01095         std::vector< int >& indices = map_index[corrTask];
01096 
01097         dbfile << " towards proc " << corrTask << " \n";
01098         for( int i = 0; i < (int)corrIds.size(); i++ )
01099         {
01100             dbfile << corrIds[i] << " [" << indx[i] << "," << indx[i + 1] << ")  : ";
01101             for( int j = indx[i]; j < indx[i + 1]; j++ )
01102                 dbfile << indices[j] << " ";
01103             dbfile << "\n";
01104         }
01105         dbfile << " \n";
01106     }
01107     dbfile.close();
01108 #endif
01109 
01110     graph_type = DOF_BASED;
01111     // now we need to fill back and forth information, needed to fill the arrays
01112     // for example, from spectral to involved_IDs_map, in case we want to send data from
01113     // spectral to phys
01114 }
01115 //#undef VERBOSE
01116 // new partition calculation
01117 ErrorCode ParCommGraph::compute_partition( ParallelComm* pco, Range& owned, int met )
01118 {
01119     // we are on a task on sender, and need to compute a new partition;
01120     // primary cells need to be distributed to nb receivers tasks
01121     // first, we will use graph partitioner, with zoltan;
01122     // in the graph that we need to build, the first layer of ghosts is needed;
01123     // can we avoid that ? For example, we can find out from each boundary edge/face what is the
01124     // other cell (on the other side), then form the global graph, and call zoltan in parallel met 1
01125     // would be a geometric partitioner, and met 2 would be a graph partitioner for method 1 we do
01126     // not need any ghost exchange
01127 
01128     // find first edges that are shared
01129     if( owned.empty() )
01130         return MB_SUCCESS;  // nothing to do? empty partition is not allowed, maybe we should return
01131                             // error?
01132     Core* mb = (Core*)pco->get_moab();
01133 
01134     double t1, t2, t3;
01135     t1               = MPI_Wtime();
01136     int primaryDim   = mb->dimension_from_handle( *owned.rbegin() );
01137     int interfaceDim = primaryDim - 1;  // should be 1 or 2
01138     Range sharedEdges;
01139     ErrorCode rval;
01140 
01141     std::vector< int > shprocs( MAX_SHARING_PROCS );
01142     std::vector< EntityHandle > shhandles( MAX_SHARING_PROCS );
01143 
01144     Tag gidTag;  //
01145     rval = mb->tag_get_handle( "GLOBAL_ID", gidTag );MB_CHK_ERR( rval );
01146     int np;
01147     unsigned char pstatus;
01148 
01149     std::multimap< int, int > extraGraphEdges;
01150     // std::map<int, int> adjCellsId;
01151     std::map< int, int > extraCellsProc;
01152     // if method is 2, no need to do the exchange for adjacent cells across partition boundary
01153     // these maps above will be empty for method 2 (geometry)
01154     if( 1 == met )
01155     {
01156         rval = pco->get_shared_entities( /*int other_proc*/ -1, sharedEdges, interfaceDim,
01157                                          /*const bool iface*/ true );MB_CHK_ERR( rval );
01158 
01159 #ifdef VERBOSE
01160         std::cout << " on sender task " << pco->rank() << " number of shared interface cells " << sharedEdges.size()
01161                   << "\n";
01162 #endif
01163         // find to what processors we need to send the ghost info about the edge
01164         // first determine the local graph; what elements are adjacent to each cell in owned range
01165         // cells that are sharing a partition interface edge, are identified first, and form a map
01166         TupleList TLe;                                     // tuple list for cells
01167         TLe.initialize( 2, 0, 1, 0, sharedEdges.size() );  // send to, id of adj cell, remote edge
01168         TLe.enableWriteAccess();
01169 
01170         std::map< EntityHandle, int > edgeToCell;  // from local boundary edge to adjacent cell id
01171         // will be changed after
01172         for( Range::iterator eit = sharedEdges.begin(); eit != sharedEdges.end(); eit++ )
01173         {
01174             EntityHandle edge = *eit;
01175             // get the adjacent cell
01176             Range adjEnts;
01177             rval = mb->get_adjacencies( &edge, 1, primaryDim, false, adjEnts );MB_CHK_ERR( rval );
01178             if( adjEnts.size() > 0 )
01179             {
01180                 EntityHandle adjCell = adjEnts[0];
01181                 int gid;
01182                 rval = mb->tag_get_data( gidTag, &adjCell, 1, &gid );MB_CHK_ERR( rval );
01183                 rval = pco->get_sharing_data( edge, &shprocs[0], &shhandles[0], pstatus, np );MB_CHK_ERR( rval );
01184                 int n                = TLe.get_n();
01185                 TLe.vi_wr[2 * n]     = shprocs[0];
01186                 TLe.vi_wr[2 * n + 1] = gid;
01187                 TLe.vul_wr[n]        = shhandles[0];  // the remote edge corresponding to shared edge
01188                 edgeToCell[edge]     = gid;           // store the map between edge and local id of adj cell
01189                 TLe.inc_n();
01190             }
01191         }
01192 
01193 #ifdef VERBOSE
01194         std::stringstream ff2;
01195         ff2 << "TLe_" << pco->rank() << ".txt";
01196         TLe.print_to_file( ff2.str().c_str() );
01197 #endif
01198         // send the data to the other processors:
01199         ( pco->proc_config().crystal_router() )->gs_transfer( 1, TLe, 0 );
01200         // on receiver side, each local edge will have the remote cell adjacent to it!
01201 
01202         int ne = TLe.get_n();
01203         for( int i = 0; i < ne; i++ )
01204         {
01205             int sharedProc         = TLe.vi_rd[2 * i];       // this info is coming from here, originally
01206             int remoteCellID       = TLe.vi_rd[2 * i + 1];   // this is the id of the remote cell, on sharedProc
01207             EntityHandle localCell = TLe.vul_rd[i];          // this is now local edge/face on this proc
01208             int localCellId        = edgeToCell[localCell];  // this is the local cell  adjacent to edge/face
01209             // now, we will need to add to the graph the pair <localCellId, remoteCellID>
01210             std::pair< int, int > extraAdj = std::make_pair( localCellId, remoteCellID );
01211             extraGraphEdges.insert( extraAdj );
01212             // adjCellsId [edgeToCell[localCell]] = remoteCellID;
01213             extraCellsProc[remoteCellID] = sharedProc;
01214 #ifdef VERBOSE
01215             std::cout << "local ID " << edgeToCell[localCell] << " remote cell ID: " << remoteCellID << "\n";
01216 #endif
01217         }
01218     }
01219     t2 = MPI_Wtime();
01220     if( rootSender ) std::cout << " time preparing the input for Zoltan:" << t2 - t1 << " seconds. \n";
01221         // so adj cells ids; need to call zoltan for parallel partition
01222 #ifdef MOAB_HAVE_ZOLTAN
01223     ZoltanPartitioner* mbZTool = new ZoltanPartitioner( mb );
01224     if( 1 <= met )  //  partition in zoltan, either graph or geometric partitioner
01225     {
01226         std::map< int, Range > distribution;  // how to distribute owned elements by processors in receiving groups
01227         // in how many tasks do we want to be distributed?
01228         int numNewPartitions = (int)receiverTasks.size();
01229         Range primaryCells   = owned.subset_by_dimension( primaryDim );
01230         rval = mbZTool->partition_owned_cells( primaryCells, pco, extraGraphEdges, extraCellsProc, numNewPartitions,
01231                                                distribution, met );MB_CHK_ERR( rval );
01232         for( std::map< int, Range >::iterator mit = distribution.begin(); mit != distribution.end(); mit++ )
01233         {
01234             int part_index = mit->first;
01235             assert( part_index < numNewPartitions );
01236             split_ranges[receiverTasks[part_index]] = mit->second;
01237         }
01238     }
01239     // delete now the partitioner
01240     delete mbZTool;
01241 #endif
01242     t3 = MPI_Wtime();
01243     if( rootSender ) std::cout << " time spent by Zoltan " << t3 - t2 << " seconds. \n";
01244     return MB_SUCCESS;
01245 }
01246 // at this moment, each sender task has split_ranges formed;
01247 // we need to aggregate that info and send it to receiver
01248 ErrorCode ParCommGraph::send_graph_partition( ParallelComm* pco, MPI_Comm jcomm )
01249 {
01250     // first, accumulate the info to root of sender; use gatherv
01251     // first, accumulate number of receivers from each sender, to the root receiver
01252     int numberReceivers =
01253         (int)split_ranges.size();            // these are ranges of elements to be sent to each receiver, from this task
01254     int nSenders = (int)senderTasks.size();  //
01255     // this sender will have to send to this many receivers
01256     std::vector< int > displs( 1 );  // displacements for gatherv
01257     std::vector< int > counts( 1 );
01258     if( is_root_sender() )
01259     {
01260         displs.resize( nSenders + 1 );
01261         counts.resize( nSenders );
01262     }
01263 
01264     int ierr = MPI_Gather( &numberReceivers, 1, MPI_INT, &counts[0], 1, MPI_INT, 0, pco->comm() );
01265     if( ierr != MPI_SUCCESS ) return MB_FAILURE;
01266     // compute now displacements
01267     if( is_root_sender() )
01268     {
01269         displs[0] = 0;
01270         for( int k = 0; k < nSenders; k++ )
01271         {
01272             displs[k + 1] = displs[k] + counts[k];
01273         }
01274     }
01275     std::vector< int > buffer;
01276     if( is_root_sender() ) buffer.resize( displs[nSenders] );  // the last one will have the total count now
01277 
01278     std::vector< int > recvs;
01279     for( std::map< int, Range >::iterator mit = split_ranges.begin(); mit != split_ranges.end(); mit++ )
01280     {
01281         recvs.push_back( mit->first );
01282     }
01283     ierr =
01284         MPI_Gatherv( &recvs[0], numberReceivers, MPI_INT, &buffer[0], &counts[0], &displs[0], MPI_INT, 0, pco->comm() );
01285     if( ierr != MPI_SUCCESS ) return MB_FAILURE;
01286 
01287     // now form recv_graph map; points from the
01288     // now form the graph to be sent to the other side; only on root
01289     if( is_root_sender() )
01290     {
01291 #ifdef GRAPH_INFO
01292         std::ofstream dbfileSender;
01293         std::stringstream outf;
01294         outf << "S_" << compid1 << "_R_" << compid2 << "_SenderGraph.txt";
01295         dbfileSender.open( outf.str().c_str() );
01296         dbfileSender << " number senders: " << nSenders << "\n";
01297         dbfileSender << " senderRank \treceivers \n";
01298         for( int k = 0; k < nSenders; k++ )
01299         {
01300             int indexInBuff = displs[k];
01301             int senderTask  = senderTasks[k];
01302             dbfileSender << senderTask << "\t\t";
01303             for( int j = 0; j < counts[k]; j++ )
01304             {
01305                 int recvTask = buffer[indexInBuff + j];
01306                 dbfileSender << recvTask << " ";
01307             }
01308             dbfileSender << "\n";
01309         }
01310         dbfileSender.close();
01311 #endif
01312         for( int k = 0; k < nSenders; k++ )
01313         {
01314             int indexInBuff = displs[k];
01315             int senderTask  = senderTasks[k];
01316             for( int j = 0; j < counts[k]; j++ )
01317             {
01318                 int recvTask = buffer[indexInBuff + j];
01319                 recv_graph[recvTask].push_back( senderTask );  // this will be packed and sent to root receiver, with
01320                                                                // nonblocking send
01321             }
01322         }
01323 #ifdef GRAPH_INFO
01324         std::ofstream dbfile;
01325         std::stringstream outf2;
01326         outf2 << "S_" << compid1 << "_R_" << compid2 << "_RecvGraph.txt";
01327         dbfile.open( outf2.str().c_str() );
01328         dbfile << " number receivers: " << recv_graph.size() << "\n";
01329         dbfile << " receiverRank \tsenders \n";
01330         for( std::map< int, std::vector< int > >::iterator mit = recv_graph.begin(); mit != recv_graph.end(); mit++ )
01331         {
01332             int recvTask                = mit->first;
01333             std::vector< int >& senders = mit->second;
01334             dbfile << recvTask << "\t\t";
01335             for( std::vector< int >::iterator vit = senders.begin(); vit != senders.end(); vit++ )
01336                 dbfile << *vit << " ";
01337             dbfile << "\n";
01338         }
01339         dbfile.close();
01340 #endif
01341         // this is the same as trivial partition
01342         ErrorCode rval = send_graph( jcomm );MB_CHK_ERR( rval );
01343     }
01344 
01345     return MB_SUCCESS;
01346 }
01347 // method to expose local graph info: sender id, receiver id, sizes of elements to send, after or
01348 // before intersection
01349 ErrorCode ParCommGraph::dump_comm_information( std::string prefix, int is_send )
01350 {
01351     //
01352     if( -1 != rankInGroup1 && 1 == is_send )  // it is a sender task
01353     {
01354         std::ofstream dbfile;
01355         std::stringstream outf;
01356         outf << prefix << "_sender_" << rankInGroup1 << "_joint" << rankInJoin << "_type_" << (int)graph_type << ".txt";
01357         dbfile.open( outf.str().c_str() );
01358 
01359         if( graph_type == COVERAGE )
01360         {
01361             for( std::map< int, std::vector< int > >::iterator mit = involved_IDs_map.begin();
01362                  mit != involved_IDs_map.end(); mit++ )
01363             {
01364                 int receiver_proc        = mit->first;
01365                 std::vector< int >& eids = mit->second;
01366                 dbfile << "receiver: " << receiver_proc << " size:" << eids.size() << "\n";
01367             }
01368         }
01369         else if( graph_type == INITIAL_MIGRATE )  // just after migration
01370         {
01371             for( std::map< int, Range >::iterator mit = split_ranges.begin(); mit != split_ranges.end(); mit++ )
01372             {
01373                 int receiver_proc = mit->first;
01374                 Range& eids       = mit->second;
01375                 dbfile << "receiver: " << receiver_proc << " size:" << eids.size() << "\n";
01376             }
01377         }
01378         else if( graph_type == DOF_BASED )  // just after migration, or from computeGraph
01379         {
01380             for( std::map< int, std::vector< int > >::iterator mit = involved_IDs_map.begin();
01381                  mit != involved_IDs_map.end(); mit++ )
01382             {
01383                 int receiver_proc = mit->first;
01384                 dbfile << "receiver: " << receiver_proc << " size:" << mit->second.size() << "\n";
01385             }
01386         }
01387         dbfile.close();
01388     }
01389     if( -1 != rankInGroup2 && 0 == is_send )  // it is a receiver task
01390     {
01391         std::ofstream dbfile;
01392         std::stringstream outf;
01393         outf << prefix << "_receiver_" << rankInGroup2 << "_joint" << rankInJoin << "_type_" << (int)graph_type
01394              << ".txt";
01395         dbfile.open( outf.str().c_str() );
01396 
01397         if( graph_type == COVERAGE )
01398         {
01399             for( std::map< int, std::vector< int > >::iterator mit = involved_IDs_map.begin();
01400                  mit != involved_IDs_map.end(); mit++ )
01401             {
01402                 int sender_proc          = mit->first;
01403                 std::vector< int >& eids = mit->second;
01404                 dbfile << "sender: " << sender_proc << " size:" << eids.size() << "\n";
01405             }
01406         }
01407         else if( graph_type == INITIAL_MIGRATE )  // just after migration
01408         {
01409             for( std::map< int, Range >::iterator mit = split_ranges.begin(); mit != split_ranges.end(); mit++ )
01410             {
01411                 int sender_proc = mit->first;
01412                 Range& eids     = mit->second;
01413                 dbfile << "sender: " << sender_proc << " size:" << eids.size() << "\n";
01414             }
01415         }
01416         else if( graph_type == DOF_BASED )  // just after migration
01417         {
01418             for( std::map< int, std::vector< int > >::iterator mit = involved_IDs_map.begin();
01419                  mit != involved_IDs_map.end(); mit++ )
01420             {
01421                 int sender_proc = mit->first;
01422                 dbfile << "receiver: " << sender_proc << " size:" << mit->second.size() << "\n";
01423             }
01424         }
01425         dbfile.close();
01426     }
01427     return MB_SUCCESS;
01428 }
01429 }  // namespace moab
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines