Mesh Oriented datABase
(version 5.4.1)
Array-based unstructured mesh datastructure
|
Write MOAB HDF5 file in parallel. More...
#include <WriteHDF5Parallel.hpp>
Classes | |
struct | DataSetCreator |
Argument ot create_dataset. More... | |
struct | NoopDescCreator |
Public Member Functions | |
WriteHDF5Parallel (Interface *iface) | |
virtual | ~WriteHDF5Parallel () |
Static Public Member Functions | |
static WriterIface * | factory (Interface *) |
Protected Member Functions | |
virtual void | debug_barrier_line (int lineno) |
virtual void | print_times (const double *times) const |
virtual ErrorCode | parallel_create_file (const char *filename, bool overwrite, const std::vector< std::string > &qa_records, const FileOptions &opts, const Tag *user_tag_list=0, int user_tag_count=0, int dimension=3, double *times=0) |
Called by normal (non-parallel) writer. Sets up necessary data for parallel write. | |
ErrorCode | gather_interface_meshes (Range &non_local_ents) |
Figure out which mesh local mesh is duplicated on remote processors and which processor will write that mesh. | |
ErrorCode | exchange_file_ids (const Range &non_local_ents) |
For entities that will be written by another processor but are referenced by entities on this processor, get the file Ids that will be assigned to those so they can be referenced by entities to be written on this processor. | |
ErrorCode | communicate_shared_set_ids (const Range &owned, const Range &remote) |
Get remote ids for shared sets. | |
ErrorCode | pack_set (Range::const_iterator set, unsigned long *set_data, size_t set_data_length) |
Pack set data for communication. | |
ErrorCode | unpack_set (EntityHandle set, const unsigned long *set_data, size_t set_data_length) |
Unpack set data from communication. | |
ErrorCode | communicate_shared_set_data (const Range &owned, const Range &remote) |
Communicate set contents between processors such that each owner knows the contents, parents, & child lists from all processors that have a copy of the set. | |
ErrorCode | create_node_table (int dimension) |
Create the node table in the file. | |
ErrorCode | negotiate_type_list () |
Communicate with other processors to negotiate the types of elements that will be written (the union of the types defined on each proc.) | |
ErrorCode | create_element_tables () |
Create tables to hold element connectivity. | |
ErrorCode | create_adjacency_tables () |
Create tables to hold element adjacencies. | |
ErrorCode | create_meshset_tables (double *times) |
Create tables for mesh sets. | |
ErrorCode | create_tag_tables () |
Write tag descriptions and create tables to hold tag data. | |
void | remove_remote_entities (EntityHandle relative, Range &range) |
Remove any remote mesh entities from the passed range. | |
void | remove_remote_entities (EntityHandle relative, std::vector< EntityHandle > &vect) |
void | remove_remote_sets (EntityHandle relative, Range &range) |
void | remove_remote_sets (EntityHandle relative, std::vector< EntityHandle > &vect) |
ErrorCode | get_sharedset_tags () |
get any existing tags which aren't excluded and add to shared set tags | |
ErrorCode | append_serial_tag_data (std::vector< unsigned char > &buffer, const WriteHDF5::TagDesc &tag) |
ErrorCode | check_serial_tag_data (const std::vector< unsigned char > &buffer, std::vector< TagDesc * > *missing=0, std::vector< TagDesc * > *newlist=0) |
helper function for create_tag_tables | |
ErrorCode | create_dataset (int num_datasets, const long *num_owned_entities, long *offsets_out, long *max_proc_ents_out, long *total_ents_out, const DataSetCreator &creator=NoopDescCreator(), ExportSet *groups[]=0, wid_t *first_ids_out=NULL) |
Do typical communication for dataset creation. | |
void | print_shared_sets () |
void | print_set_sharing_data (const Range &range, const char *label, Tag idt) |
Private Attributes | |
ParallelComm * | myPcomm |
pcomm controlling parallel nature of mesh | |
bool | pcommAllocated |
whether this instance allocated (and dtor should delete) the pcomm | |
H5S_seloper_t | hslabOp |
Operation to use to append hyperslab selections. |
Write MOAB HDF5 file in parallel.
Definition at line 20 of file WriteHDF5Parallel.hpp.
Consturctor
Definition at line 279 of file WriteHDF5Parallel.cpp.
Referenced by factory().
: WriteHDF5( iface ), myPcomm( NULL ), pcommAllocated( false ), hslabOp( H5S_SELECT_OR ) { }
moab::WriteHDF5Parallel::~WriteHDF5Parallel | ( | ) | [virtual] |
Definition at line 284 of file WriteHDF5Parallel.cpp.
References myPcomm, and pcommAllocated.
{ if( pcommAllocated && myPcomm ) delete myPcomm; }
ErrorCode moab::WriteHDF5Parallel::append_serial_tag_data | ( | std::vector< unsigned char > & | buffer, |
const WriteHDF5::TagDesc & | tag | ||
) | [protected] |
Definition at line 582 of file WriteHDF5Parallel.cpp.
References moab::serial_tag_data::def_val_len, moab::error(), ErrorCode, moab::WriteHDF5::iFace, moab::serial_tag_data::len(), MB_SUCCESS, MB_VARIABLE_DATA_LENGTH, MB_VARIABLE_LENGTH, moab::serial_tag_data::name, moab::serial_tag_data::name_len, moab::serial_tag_data::set_default_value(), moab::serial_tag_data::size, moab::serial_tag_data::storage, moab::Interface::tag_get_data_type(), moab::Interface::tag_get_default_value(), moab::Interface::tag_get_length(), moab::Interface::tag_get_name(), moab::Interface::tag_get_type(), moab::WriteHDF5::TagDesc::tag_id, and moab::serial_tag_data::type.
Referenced by create_tag_tables().
{ ErrorCode rval; std::string name; rval = iFace->tag_get_name( tag.tag_id, name ); if( MB_SUCCESS != rval ) return error( rval ); // Get name length, including space for null char size_t name_len = name.size() + 1; if( name_len == 1 ) return MB_SUCCESS; // Skip tags with no name DataType data_type; rval = iFace->tag_get_data_type( tag.tag_id, data_type ); if( MB_SUCCESS != rval ) return error( rval ); // Get default value int def_val_len; const void* def_val; if( MB_SUCCESS != iFace->tag_get_default_value( tag.tag_id, def_val, def_val_len ) ) { def_val_len = 0; def_val = 0; } // Allocate struct within buffer size_t init_size = buffer.size(); buffer.resize( init_size + serial_tag_data::len( name_len, def_val_len, data_type ) ); serial_tag_data* ptr = reinterpret_cast< serial_tag_data* >( &buffer[init_size] ); // Populate struct rval = iFace->tag_get_type( tag.tag_id, ptr->storage ); if( MB_SUCCESS != rval ) return error( rval ); ptr->type = data_type; rval = iFace->tag_get_length( tag.tag_id, ptr->size ); if( MB_VARIABLE_DATA_LENGTH == rval ) ptr->size = MB_VARIABLE_LENGTH; else if( MB_SUCCESS != rval ) return error( rval ); ptr->name_len = name_len; Range range; memset( ptr->name, 0, ptr->name_len ); memcpy( ptr->name, name.data(), name.size() ); ptr->def_val_len = def_val_len; ptr->set_default_value( def_val ); return MB_SUCCESS; }
ErrorCode moab::WriteHDF5Parallel::check_serial_tag_data | ( | const std::vector< unsigned char > & | buffer, |
std::vector< TagDesc * > * | missing = 0 , |
||
std::vector< TagDesc * > * | newlist = 0 |
||
) | [protected] |
helper function for create_tag_tables
Definition at line 632 of file WriteHDF5Parallel.cpp.
References moab::serial_tag_data::def_val_len, moab::serial_tag_data::default_value(), moab::error(), ErrorCode, moab::WriteHDF5::iFace, moab::serial_tag_data::len(), moab::WriteHDF5::TagDesc::max_num_ents, moab::WriteHDF5::TagDesc::max_num_vals, MB_SET_ERR, MB_SUCCESS, MB_TAG_CREAT, MB_TAG_VARLEN, MB_VARIABLE_LENGTH, moab::serial_tag_data::name, size, moab::serial_tag_data::size, moab::WriteHDF5::TagDesc::sparse_offset, moab::serial_tag_data::storage, moab::Interface::tag_get_data_type(), moab::Interface::tag_get_handle(), moab::Interface::tag_get_length(), moab::Interface::tag_get_name(), moab::WriteHDF5::TagDesc::tag_id, moab::WriteHDF5::tagList, moab::serial_tag_data::type, moab::WriteHDF5::TagDesc::var_data_offset, and moab::WriteHDF5::TagDesc::write_sparse.
Referenced by create_tag_tables().
{ ErrorCode rval; // Use 'write_sparse' field as a 'visited' mark std::list< TagDesc >::iterator tag_iter; if( missing ) for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) tag_iter->write_sparse = true; // Use a set as a temporary for what will ultimately go in // newlist because we need to pass back newlist in the order // of the tagList member. std::set< TagDesc* > newset; // Iterate over data from, updating the local list of tags. // Be careful to keep tagList sorted such that in the end all // procs have the same list in the same order. std::vector< unsigned char >::const_iterator diter = buffer.begin(); tag_iter = tagList.begin(); while( diter < buffer.end() ) { // Get struct from buffer const serial_tag_data* ptr = reinterpret_cast< const serial_tag_data* >( &*diter ); // Find local struct for tag std::string name( ptr->name ); std::string n; iFace->tag_get_name( tag_iter->tag_id, n ); // Second time we've called, so shouldn't fail if( n > name ) { tag_iter = tagList.begin(); // New proc, start search from beginning } iFace->tag_get_name( tag_iter->tag_id, n ); while( n < name ) { ++tag_iter; if( tag_iter == tagList.end() ) break; iFace->tag_get_name( tag_iter->tag_id, n ); } if( tag_iter == tagList.end() || n != name ) { // New tag TagDesc newtag; if( ptr->size == MB_VARIABLE_LENGTH ) rval = iFace->tag_get_handle( name.c_str(), ptr->def_val_len, ptr->type, newtag.tag_id, MB_TAG_VARLEN | MB_TAG_CREAT | ptr->storage, ptr->default_value() ); else rval = iFace->tag_get_handle( name.c_str(), ptr->size, ptr->type, newtag.tag_id, MB_TAG_CREAT | ptr->storage, ptr->default_value() ); if( MB_SUCCESS != rval ) return error( rval ); newtag.sparse_offset = 0; newtag.var_data_offset = 0; newtag.write_sparse = false; newtag.max_num_ents = 0; newtag.max_num_vals = 0; tag_iter = tagList.insert( tag_iter, newtag ); if( newlist ) newset.insert( &*tag_iter ); } else { // Check that tag is as expected DataType type; iFace->tag_get_data_type( tag_iter->tag_id, type ); if( type != ptr->type ) { MB_SET_ERR( MB_FAILURE, "Processes have inconsistent data type for tag \"" << name << "\"" ); } int size; iFace->tag_get_length( tag_iter->tag_id, size ); if( size != ptr->size ) { MB_SET_ERR( MB_FAILURE, "Processes have inconsistent size for tag \"" << name << "\"" ); } tag_iter->write_sparse = false; } // Step to next variable-length struct. diter += ptr->len(); } // Now pass back any local tags that weren't in the buffer if( missing ) { for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) { if( tag_iter->write_sparse ) { tag_iter->write_sparse = false; missing->push_back( &*tag_iter ); } } } // Be careful to populate newlist in the same, sorted, order as tagList if( newlist ) { for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) if( newset.find( &*tag_iter ) != newset.end() ) newlist->push_back( &*tag_iter ); } return MB_SUCCESS; }
ErrorCode moab::WriteHDF5Parallel::communicate_shared_set_data | ( | const Range & | owned, |
const Range & | remote | ||
) | [protected] |
Communicate set contents between processors such that each owner knows the contents, parents, & child lists from all processors that have a copy of the set.
Definition at line 1860 of file WriteHDF5Parallel.cpp.
References moab::Range::begin(), CHECK_MB, CHECK_MPI, moab::CREATE_HANDLE(), moab::WriteHDF5::dbgOut, moab::Range::end(), ErrorCode, moab::Interface::get_child_meshsets(), moab::Interface::get_entities_by_handle(), moab::ParallelComm::get_entityset_owner(), moab::ParallelComm::get_entityset_procs(), moab::Interface::get_meshset_options(), moab::Interface::get_parent_meshsets(), moab::ID_FROM_HANDLE(), moab::WriteHDF5::iFace, MB_SUCCESS, MBENTITYSET, moab::Range::merge(), mhdf_SET_RANGE_BIT, myPcomm, pack_set(), moab::DebugOutput::printf(), moab::ProcConfig::proc_comm(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::WriteHDF5::range_to_blocked_list(), moab::WriteHDF5::SpecialSetData::setFlags, moab::WriteHDF5::SpecialSetData::setHandle, size, moab::Range::size(), moab::WriteHDF5::specialSets, moab::DebugOutput::tprintf(), unpack_set(), and moab::WriteHDF5::vector_to_id_list().
Referenced by create_meshset_tables().
{ ErrorCode rval; int mperr; const unsigned rank = myPcomm->proc_config().proc_rank(); const MPI_Comm comm = myPcomm->proc_config().proc_comm(); dbgOut.tprintf( 1, "COMMUNICATING SHARED SET DATA (%lu owned & %lu remote)\n", (unsigned long)owned.size(), (unsigned long)remote.size() ); // Calculate the total number of messages to be in transit (send and receive) size_t nummess = 0; std::vector< unsigned > procs; ; Range shared( owned ); shared.merge( remote ); for( Range::iterator i = shared.begin(); i != shared.end(); ++i ) { procs.clear(); rval = myPcomm->get_entityset_procs( *i, procs ); CHECK_MB( rval ); nummess += procs.size(); } // Choose a receive buffer size. We need 4*sizeof(long) minimum, // but that is almost useless so use 16*sizeof(long) as the minimum // instead. Choose an upper limit such that we don't exceed 32 MB // of allocated memory (unless we absolutely must to meet the minimum.) // Also, don't initially choose buffers larger than 128*sizeof(long). const size_t MAX_BUFFER_MEM = 32 * 1024 * 1024 / sizeof( long ); // const size_t INIT_BUFFER_SIZE = 128; const size_t INIT_BUFFER_SIZE = 1024; const size_t MIN_BUFFER_SIZE = 16; size_t init_buff_size = INIT_BUFFER_SIZE; if( init_buff_size * nummess > MAX_BUFFER_MEM ) init_buff_size = MAX_BUFFER_MEM / nummess; if( init_buff_size < MIN_BUFFER_SIZE ) init_buff_size = MIN_BUFFER_SIZE; dbgOut.printf( 2, "Using buffer size of %lu for an expected message count of %lu\n", (unsigned long)init_buff_size, (unsigned long)nummess ); // Count number of recvs size_t numrecv = 0; for( Range::iterator i = owned.begin(); i != owned.end(); ++i ) { procs.clear(); rval = myPcomm->get_entityset_procs( *i, procs ); CHECK_MB( rval ); numrecv += procs.size(); if( std::find( procs.begin(), procs.end(), rank ) != procs.end() ) --numrecv; } // Post receive buffers for all owned sets for all sharing procs std::vector< MPI_Request > recv_req( numrecv, MPI_REQUEST_NULL ); std::vector< MPI_Request > lrecv_req( numrecv, MPI_REQUEST_NULL ); std::vector< std::vector< unsigned long > > recv_buf( numrecv, std::vector< unsigned long >( init_buff_size ) ); int idx = 0; for( Range::iterator i = owned.begin(); i != owned.end(); ++i ) { procs.clear(); rval = myPcomm->get_entityset_procs( *i, procs ); CHECK_MB( rval ); for( size_t j = 0; j < procs.size(); ++j ) { if( procs[j] == rank ) continue; int tag = ID_FROM_HANDLE( *i ); if( *i != CREATE_HANDLE( MBENTITYSET, tag ) ) { #ifndef NDEBUG abort(); #endif CHECK_MB( MB_FAILURE ); } dbgOut.printf( 5, "Posting buffer to receive set %d from proc %u\n", tag, procs[j] ); mperr = MPI_Irecv( &recv_buf[idx][0], init_buff_size, MPI_UNSIGNED_LONG, procs[j], tag, comm, &recv_req[idx] ); CHECK_MPI( mperr ); ++idx; } } assert( (size_t)idx == numrecv ); // Now send set data for all remote sets that I know about std::vector< MPI_Request > send_req( remote.size() ); std::vector< std::vector< unsigned long > > send_buf( remote.size() ); idx = 0; for( Range::iterator i = remote.begin(); i != remote.end(); ++i, ++idx ) { send_buf[idx].resize( init_buff_size ); rval = pack_set( i, &send_buf[idx][0], init_buff_size ); CHECK_MB( rval ); EntityHandle remote_handle; unsigned owner; rval = myPcomm->get_entityset_owner( *i, owner, &remote_handle ); CHECK_MB( rval ); int tag = ID_FROM_HANDLE( remote_handle ); assert( remote_handle == CREATE_HANDLE( MBENTITYSET, tag ) ); dbgOut.printf( 5, "Sending %lu values for set %d to proc %u\n", send_buf[idx][1] + send_buf[idx][2] + send_buf[idx][3] + 4, tag, owner ); mperr = MPI_Isend( &send_buf[idx][0], init_buff_size, MPI_UNSIGNED_LONG, owner, tag, comm, &send_req[idx] ); CHECK_MPI( mperr ); } // Tag mattag; // iFace->tag_get_handle(MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, mattag); // Now initialize local data for managing contents of owned, shared sets assert( specialSets.empty() ); specialSets.clear(); specialSets.reserve( owned.size() ); for( Range::iterator i = owned.begin(); i != owned.end(); ++i ) { // int block; // if (MB_SUCCESS != iFace->tag_get_data(mattag, &*i, 1, &block)) // block = 0; // std::vector<int> ids; SpecialSetData data; data.setHandle = *i; rval = iFace->get_meshset_options( *i, data.setFlags ); CHECK_MB( rval ); specialSets.push_back( data ); std::vector< EntityHandle > list; if( data.setFlags & MESHSET_ORDERED ) { list.clear(); rval = iFace->get_entities_by_handle( *i, list ); CHECK_MB( rval ); rval = vector_to_id_list( list, specialSets.back().contentIds, true ); CHECK_MB( rval ); // if (block) // get_global_ids(iFace, &list[0], list.size(), MESHSET_ORDERED, ids); } else { Range range; rval = iFace->get_entities_by_handle( *i, range ); CHECK_MB( rval ); bool ranged; rval = range_to_blocked_list( range, specialSets.back().contentIds, ranged ); if( ranged ) specialSets.back().setFlags |= mhdf_SET_RANGE_BIT; // if (block) { // std::vector<EntityHandle> tmp; // for (Range::const_pair_iterator pi = range.const_pair_begin(); pi != // range.const_pair_end(); ++pi) { // tmp.push_back(pi->first); // tmp.push_back(pi->second); // } // get_global_ids(iFace, &tmp[0], tmp.size(), ranged ? 0 : MESHSET_ORDERED, ids); //} } list.clear(); rval = iFace->get_parent_meshsets( *i, list ); CHECK_MB( rval ); rval = vector_to_id_list( list, specialSets.back().parentIds, true ); CHECK_MB( rval ); rval = iFace->get_child_meshsets( *i, list ); CHECK_MB( rval ); rval = vector_to_id_list( list, specialSets.back().childIds, true ); CHECK_MB( rval ); } // Process received buffers, repost larger buffers where necessary size_t remaining = numrecv; numrecv = 0; while( remaining-- ) { std::vector< unsigned long > dead; MPI_Status status; mperr = MPI_Waitany( recv_req.size(), &recv_req[0], &idx, &status ); CHECK_MPI( mperr ); EntityHandle handle = CREATE_HANDLE( MBENTITYSET, status.MPI_TAG ); std::vector< unsigned long >& buff = recv_buf[idx]; size_t size = buff[1] + buff[2] + buff[3] + 4; dbgOut.printf( 5, "Received %lu values for set %d from proc %d\n", (unsigned long)size, status.MPI_TAG, status.MPI_SOURCE ); if( size <= init_buff_size ) { rval = unpack_set( handle, &buff[0], init_buff_size ); CHECK_MB( rval ); dead.swap( buff ); // Release memory } else { // Data was too big for init_buff_size // repost with larger buffer buff.resize( size ); dbgOut.printf( 5, "Re-Posting buffer to receive set %d from proc %d with size %lu\n", status.MPI_TAG, status.MPI_SOURCE, (unsigned long)size ); mperr = MPI_Irecv( &buff[0], size, MPI_UNSIGNED_LONG, status.MPI_SOURCE, status.MPI_TAG, comm, &lrecv_req[idx] ); CHECK_MPI( mperr ); ++numrecv; } recv_req[idx] = MPI_REQUEST_NULL; } // Wait for sends to complete MPI_Waitall( send_req.size(), &send_req[0], MPI_STATUSES_IGNORE ); // Re-send sets that didn't fit initial buffer size idx = 0; for( Range::iterator i = remote.begin(); i != remote.end(); ++i, ++idx ) { std::vector< unsigned long >& buff = send_buf[idx]; size_t size = buff[1] + buff[2] + buff[3] + 4; if( size <= init_buff_size ) continue; buff.resize( size ); rval = pack_set( i, &buff[0], size ); CHECK_MB( rval ); EntityHandle remote_handle; unsigned owner; rval = myPcomm->get_entityset_owner( *i, owner, &remote_handle ); CHECK_MB( rval ); int tag = ID_FROM_HANDLE( remote_handle ); assert( remote_handle == CREATE_HANDLE( MBENTITYSET, tag ) ); dbgOut.printf( 5, "Sending %lu values for set %d to proc %u\n", (unsigned long)size, tag, owner ); mperr = MPI_Isend( &buff[0], size, MPI_UNSIGNED_LONG, owner, tag, comm, &send_req[idx] ); CHECK_MPI( mperr ); } // Process received buffers remaining = numrecv; while( remaining-- ) { std::vector< unsigned long > dead; MPI_Status status; mperr = MPI_Waitany( lrecv_req.size(), &lrecv_req[0], &idx, &status ); CHECK_MPI( mperr ); EntityHandle handle = CREATE_HANDLE( MBENTITYSET, status.MPI_TAG ); std::vector< unsigned long >& buff = recv_buf[idx]; dbgOut.printf( 5, "Received %lu values for set %d from proc %d\n", 4 + buff[1] + buff[2] + buff[3], status.MPI_TAG, status.MPI_SOURCE ); rval = unpack_set( handle, &buff[0], buff.size() ); CHECK_MB( rval ); dead.swap( buff ); // Release memory lrecv_req[idx] = MPI_REQUEST_NULL; } // Wait for sends to complete MPI_Waitall( send_req.size(), &send_req[0], MPI_STATUSES_IGNORE ); return MB_SUCCESS; }
ErrorCode moab::WriteHDF5Parallel::communicate_shared_set_ids | ( | const Range & | owned, |
const Range & | remote | ||
) | [protected] |
Get remote ids for shared sets.
Definition at line 1511 of file WriteHDF5Parallel.cpp.
References moab::Range::begin(), CHECK_MB, CHECK_MPI, moab::WriteHDF5::dbgOut, moab::Range::end(), moab::error(), ErrorCode, moab::RangeMap< KeyType, ValType, NullVal >::find(), moab::WriteHDF5::find(), moab::ParallelComm::get_entityset_local_handle(), moab::ParallelComm::get_entityset_owner(), moab::ParallelComm::get_entityset_owners(), moab::ParallelComm::get_entityset_procs(), moab::ParallelComm::get_owned_sets(), moab::DebugOutput::get_verbosity(), moab::WriteHDF5::idMap, moab::RangeMap< KeyType, ValType, NullVal >::insert(), moab::Range::insert(), moab::intersect(), MB_SUCCESS, myPcomm, moab::DebugOutput::print(), print_shared_sets(), moab::DebugOutput::printf(), moab::ProcConfig::proc_comm(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::Range::rbegin(), moab::Range::rend(), size, moab::Range::size(), moab::SSVB, and moab::DebugOutput::tprint().
Referenced by create_meshset_tables().
{ ErrorCode rval; int mperr; const int TAG = 0xD0E; // const unsigned rank = myPcomm->proc_config().proc_rank(); const MPI_Comm comm = myPcomm->proc_config().proc_comm(); dbgOut.tprint( 1, "COMMUNICATING SHARED SET IDS\n" ); dbgOut.print( 6, "Owned, shared sets: ", owned ); // Post receive buffers for all procs for which we share sets std::vector< unsigned > procs; rval = myPcomm->get_entityset_owners( procs ); CHECK_MB( rval ); std::vector< unsigned >::iterator it = std::find( procs.begin(), procs.end(), myPcomm->proc_config().proc_rank() ); if( it != procs.end() ) procs.erase( it ); std::vector< MPI_Request > recv_req( procs.size(), MPI_REQUEST_NULL ); std::vector< std::vector< unsigned long > > recv_buf( procs.size() ); size_t recv_count = 0; for( size_t i = 0; i < procs.size(); ++i ) { Range tmp; rval = myPcomm->get_owned_sets( procs[i], tmp ); CHECK_MB( rval ); size_t count = intersect( tmp, remote ).size(); // Necessary because we might not be writing all of the database if( count ) { dbgOut.printf( 6, "Sets owned by proc %u (remote handles): ", procs[i] ); if( dbgOut.get_verbosity() >= 6 ) { Range remote_handles; tmp = intersect( tmp, remote ); for( Range::iterator j = tmp.begin(); j != tmp.end(); ++j ) { unsigned r; EntityHandle h; myPcomm->get_entityset_owner( *j, r, &h ); assert( r == procs[i] ); remote_handles.insert( h ); } dbgOut.print( 6, remote_handles ); } recv_count++; recv_buf[i].resize( 2 * count + 1 ); dbgOut.printf( 5, "Posting receive buffer of size %lu for proc %u (%lu of %lu owned sets)\n", (unsigned long)recv_buf[i].size(), procs[i], count, tmp.size() ); mperr = MPI_Irecv( &recv_buf[i][0], recv_buf[i].size(), MPI_UNSIGNED_LONG, procs[i], TAG, comm, &recv_req[i] ); CHECK_MPI( mperr ); } } // Send set ids to all procs with which we share them // First build per-process lists of sets for which we need to send data std::map< unsigned, Range > send_sets; std::vector< unsigned > set_procs; for( Range::reverse_iterator i = owned.rbegin(); i != owned.rend(); ++i ) { set_procs.clear(); rval = myPcomm->get_entityset_procs( *i, set_procs ); CHECK_MB( rval ); for( size_t j = 0; j < set_procs.size(); ++j ) if( set_procs[j] != myPcomm->proc_config().proc_rank() ) send_sets[set_procs[j]].insert( *i ); } assert( send_sets.find( myPcomm->proc_config().proc_rank() ) == send_sets.end() ); // Now send the data std::vector< std::vector< unsigned long > > send_buf( send_sets.size() ); std::vector< MPI_Request > send_req( send_sets.size() ); std::map< unsigned, Range >::iterator si = send_sets.begin(); for( size_t i = 0; si != send_sets.end(); ++si, ++i ) { dbgOut.printf( 6, "Sending data for shared sets to proc %u: ", si->first ); dbgOut.print( 6, si->second ); send_buf[i].reserve( 2 * si->second.size() + 1 ); send_buf[i].push_back( si->second.size() ); for( Range::iterator j = si->second.begin(); j != si->second.end(); ++j ) { send_buf[i].push_back( *j ); send_buf[i].push_back( idMap.find( *j ) ); } dbgOut.printf( 5, "Sending buffer of size %lu to proc %u (%lu of %lu owned sets)\n", (unsigned long)send_buf[i].size(), si->first, si->second.size(), owned.size() ); mperr = MPI_Isend( &send_buf[i][0], send_buf[i].size(), MPI_UNSIGNED_LONG, si->first, TAG, comm, &send_req[i] ); } // Process received data MPI_Status status; int idx; while( recv_count-- ) { mperr = MPI_Waitany( recv_req.size(), &recv_req[0], &idx, &status ); CHECK_MPI( mperr ); assert( (unsigned)status.MPI_SOURCE == procs[idx] ); assert( 2 * recv_buf[idx].front() + 1 == recv_buf[idx].size() ); const size_t n = std::min< size_t >( recv_buf[idx].front(), ( recv_buf[idx].size() - 1 ) / 2 ); dbgOut.printf( 5, "Received buffer of size %lu from proc %d\n", (unsigned long)( 2 * n + 1 ), (int)status.MPI_SOURCE ); for( size_t i = 0; i < n; ++i ) { EntityHandle handle = 0; rval = myPcomm->get_entityset_local_handle( procs[idx], recv_buf[idx][2 * i + 1], handle ); CHECK_MB( rval ); assert( handle != 0 ); if( !idMap.insert( handle, recv_buf[idx][2 * i + 2], 1 ).second ) error( MB_FAILURE ); // Conflicting IDs?????? } recv_req[idx] = MPI_REQUEST_NULL; } assert( MPI_SUCCESS == MPI_Waitany( recv_req.size(), &recv_req[0], &idx, &status ) && MPI_UNDEFINED == idx ); // Check that we got them all // Wait for all sends to complete before we release send // buffers (implicitly releases when we return from this function) std::vector< MPI_Status > stats( send_req.size() ); mperr = MPI_Waitall( send_req.size(), &send_req[0], &stats[0] ); CHECK_MPI( mperr ); if( dbgOut.get_verbosity() >= SSVB ) print_shared_sets(); return MB_SUCCESS; }
ErrorCode moab::WriteHDF5Parallel::create_adjacency_tables | ( | ) | [protected] |
Create tables to hold element adjacencies.
Definition at line 1402 of file WriteHDF5Parallel.cpp.
References CHECK_HDFN, CHECK_MB, moab::WriteHDF5::count_adjacencies(), create_dataset(), ErrorCode, moab::WriteHDF5::exportList, moab::WriteHDF5::file_ptr(), MB_SUCCESS, mhdf_closeData(), mhdf_createAdjacency(), moab::WriteHDF5::ExportSet::name(), moab::WriteHDF5::nodeSet, and size.
Referenced by parallel_create_file().
{ struct AdjSetCreator : public DataSetCreator { ErrorCode operator()( WriteHDF5* file, long size, const ExportSet* ex, long& start_id ) const { mhdf_Status status; hid_t handle = mhdf_createAdjacency( file->file_ptr(), ex->name(), size, &status ); CHECK_HDFN( status ); mhdf_closeData( file->file_ptr(), handle, &status ); CHECK_HDFN( status ); start_id = -1; return MB_SUCCESS; } }; std::vector< ExportSet* > groups; #ifdef WRITE_NODE_ADJACENCIES groups.push_back( &nodeSet ); #endif for( std::list< ExportSet >::iterator ex_iter = exportList.begin(); ex_iter != exportList.end(); ++ex_iter ) groups.push_back( &*ex_iter ); ErrorCode rval; const int numtypes = groups.size(); std::vector< long > counts( numtypes ); std::vector< long > offsets( numtypes ); std::vector< long > max_ents( numtypes ); std::vector< long > totals( numtypes ); for( int i = 0; i < numtypes; ++i ) { wid_t count; rval = count_adjacencies( groups[i]->range, count ); CHECK_MB( rval ); counts[i] = count; } rval = create_dataset( numtypes, &counts[0], &offsets[0], &max_ents[0], &totals[0], AdjSetCreator(), &groups[0] ); CHECK_MB( rval ); // Cppcheck warning (false positive): variable groups is assigned a value that is never used for( int i = 0; i < numtypes; ++i ) { groups[i]->max_num_adjs = max_ents[i]; groups[i]->adj_offset = offsets[i]; } return MB_SUCCESS; }
ErrorCode moab::WriteHDF5Parallel::create_dataset | ( | int | num_datasets, |
const long * | num_owned_entities, | ||
long * | offsets_out, | ||
long * | max_proc_ents_out, | ||
long * | total_ents_out, | ||
const DataSetCreator & | creator = NoopDescCreator() , |
||
ExportSet * | groups[] = 0 , |
||
wid_t * | first_ids_out = NULL |
||
) | [protected] |
Do typical communication for dataset creation.
Given the number of entities each processor intends to write, do necessary communication and create dataset on root, passing back misc info to each proc.
creator | Functor to do actual dataset creation. Used only on root process. |
num_datasets | The number of datasets to create. |
groups | Third argument passed to DataSetCreator. Array of length num_datasets pr NULL. |
num_owned_entities | The number of entities this proc will write. Array of length num_datasets . |
offsets_out | Output: The offset in the dataset at which this process should write. Array of length num_datasets . |
max_proc_ents_out | Output: The maximun number of entities that any proc will write Array of length num_datasets . |
total_ents_out | Output: The size of the created dataset (sum of counts over all procs) Array of length num_datasets . |
first_ids_out | Output: The first ID of the first entity in the data set. First ID for this proc's entities is first_id_out+offset_out Array of length num_datasets or NULL. |
Definition at line 1080 of file WriteHDF5Parallel.cpp.
References CHECK_MB, CHECK_MPI, ErrorCode, MB_SUCCESS, myPcomm, moab::ProcConfig::proc_comm(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::ProcConfig::proc_size(), and VALGRIND_CHECK_MEM_IS_DEFINED.
Referenced by create_adjacency_tables(), create_element_tables(), create_meshset_tables(), create_node_table(), and create_tag_tables().
{ int result; ErrorCode rval; const unsigned rank = myPcomm->proc_config().proc_rank(); const unsigned nproc = myPcomm->proc_config().proc_size(); const MPI_Comm comm = myPcomm->proc_config().proc_comm(); // Gather entity counts for each processor on root std::vector< long > counts( rank ? 0 : nproc * num_datasets ); (void)VALGRIND_CHECK_MEM_IS_DEFINED( &num_owned, sizeof( long ) ); result = MPI_Gather( const_cast< long* >( num_owned ), num_datasets, MPI_LONG, &counts[0], num_datasets, MPI_LONG, 0, comm ); CHECK_MPI( result ); // Create node data in file DatasetVals zero_val = { 0, 0, 0 }; std::vector< DatasetVals > cumulative( num_datasets, zero_val ); if( rank == 0 ) { for( unsigned i = 0; i < nproc; i++ ) { const long* proc_data = &counts[i * num_datasets]; for( int index = 0; index < num_datasets; ++index ) { cumulative[index].total += proc_data[index]; if( proc_data[index] > cumulative[index].max_count ) cumulative[index].max_count = proc_data[index]; } } for( int index = 0; index < num_datasets; ++index ) { if( cumulative[index].total ) { rval = creator( this, cumulative[index].total, groups ? groups[index] : 0, cumulative[index].start_id ); CHECK_MB( rval ); } else { cumulative[index].start_id = -1; } } } // Send id offset to every proc result = MPI_Bcast( (void*)&cumulative[0], 3 * num_datasets, MPI_LONG, 0, comm ); CHECK_MPI( result ); for( int index = 0; index < num_datasets; ++index ) { if( first_ids_out ) first_ids_out[index] = (wid_t)cumulative[index].start_id; max_proc_entities[index] = cumulative[index].max_count; total_entities[index] = cumulative[index].total; } // Convert array of per-process counts to per-process offsets if( rank == 0 ) { // Initialize prev_size with data sizes for root process std::vector< long > prev_size( counts.begin(), counts.begin() + num_datasets ); // Root process gets offset zero std::fill( counts.begin(), counts.begin() + num_datasets, 0L ); // For each proc other than this one (root) for( unsigned i = 1; i < nproc; ++i ) { // Get pointer to offsets for previous process in list long* prev_data = &counts[( i - 1 ) * num_datasets]; // Get pointer to offsets for this process in list long* proc_data = &counts[i * num_datasets]; // For each data set for( int j = 0; j < num_datasets; ++j ) { // Get size of data in dataset from process i long mysize = proc_data[j]; // Offset for process i is offset of previous process plus // number of values previous process will write proc_data[j] = prev_data[j] + prev_size[j]; // Store my size, as it is no longer available in 'counts' prev_size[j] = mysize; } } } // Send each proc it's offset in the table if( rank == 0 ) { (void)VALGRIND_CHECK_MEM_IS_DEFINED( &counts[0], num_datasets * nproc * sizeof( long ) ); } result = MPI_Scatter( &counts[0], num_datasets, MPI_LONG, offsets_out, num_datasets, MPI_LONG, 0, comm ); CHECK_MPI( result ); return MB_SUCCESS; }
ErrorCode moab::WriteHDF5Parallel::create_element_tables | ( | ) | [protected] |
Create tables to hold element connectivity.
Definition at line 1363 of file WriteHDF5Parallel.cpp.
References moab::WriteHDF5::assign_ids(), CHECK_MB, create_dataset(), moab::WriteHDF5::create_elem_table(), ErrorCode, moab::WriteHDF5::exportList, MB_SUCCESS, and size.
Referenced by parallel_create_file().
{ struct ElemSetCreator : public DataSetCreator { ErrorCode operator()( WriteHDF5* file, long size, const ExportSet* ex, long& start_id ) const { return file->create_elem_table( *ex, size, start_id ); } }; const int numtypes = exportList.size(); std::vector< ExportSet* > groups( numtypes ); std::vector< long > counts( numtypes ), offsets( numtypes ), max_ents( numtypes ), total_ents( numtypes ); std::vector< wid_t > start_ids( numtypes ); size_t idx = 0; std::list< ExportSet >::iterator ex_iter; for( ex_iter = exportList.begin(); ex_iter != exportList.end(); ++ex_iter, ++idx ) { groups[idx] = &*ex_iter; counts[idx] = ex_iter->range.size(); } ErrorCode rval = create_dataset( numtypes, &counts[0], &offsets[0], &max_ents[0], &total_ents[0], ElemSetCreator(), &groups[0], &start_ids[0] ); CHECK_MB( rval ); for( idx = 0, ex_iter = exportList.begin(); ex_iter != exportList.end(); ++ex_iter, ++idx ) { ex_iter->first_id = start_ids[idx]; ex_iter->offset = offsets[idx]; ex_iter->max_num_ents = max_ents[idx]; ex_iter->total_num_ents = total_ents[idx]; rval = assign_ids( ex_iter->range, ex_iter->first_id + ex_iter->offset ); CHECK_MB( rval ); } return MB_SUCCESS; }
ErrorCode moab::WriteHDF5Parallel::create_meshset_tables | ( | double * | times | ) | [protected] |
Create tables for mesh sets.
Definition at line 2110 of file WriteHDF5Parallel.cpp.
References moab::WriteHDF5::assign_ids(), CHECK_MB, communicate_shared_set_data(), communicate_shared_set_ids(), moab::WriteHDF5::count_set_size(), create_dataset(), moab::WriteHDF5::create_set_meta(), moab::WriteHDF5::create_set_tables(), moab::WriteHDF5::dbgOut, END_SERIAL, ErrorCode, moab::WriteHDF5::ExportSet::first_id, moab::ParallelComm::get_owned_sets(), moab::ParallelComm::get_shared_sets(), moab::WriteHDF5::iFace, moab::intersect(), moab::WriteHDF5::ExportSet::max_num_ents, moab::WriteHDF5::maxNumSetChildren, moab::WriteHDF5::maxNumSetContents, moab::WriteHDF5::maxNumSetParents, MB_SUCCESS, myPcomm, moab::WriteHDF5::ExportSet::offset, moab::print_type_sets(), moab::DebugOutput::printf(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::WriteHDF5::ExportSet::range, moab::WriteHDF5::SET_OFFSET_TIME, moab::WriteHDF5::setChildrenOffset, moab::WriteHDF5::setContentsOffset, moab::WriteHDF5::setParentsOffset, moab::WriteHDF5::setSet, moab::WriteHDF5::SHARED_SET_CONTENTS, moab::WriteHDF5::SHARED_SET_IDS, size, moab::Range::size(), START_SERIAL, moab::subtract(), moab::CpuTimer::time_elapsed(), moab::WriteHDF5::ExportSet::total_num_ents, moab::WriteHDF5::writeSetChildren, moab::WriteHDF5::writeSetContents, moab::WriteHDF5::writeSetParents, and moab::WriteHDF5::writeSets.
Referenced by parallel_create_file().
{ Range::const_iterator riter; const unsigned rank = myPcomm->proc_config().proc_rank(); START_SERIAL; print_type_sets( iFace, &dbgOut, setSet.range ); END_SERIAL; CpuTimer timer; // Remove remote sets from setSets Range shared, owned, remote; ErrorCode rval = myPcomm->get_shared_sets( shared ); CHECK_MB( rval ); shared = intersect( shared, setSet.range ); rval = myPcomm->get_owned_sets( rank, owned ); CHECK_MB( rval ); owned = intersect( owned, setSet.range ); remote = subtract( shared, owned ); setSet.range = subtract( setSet.range, remote ); // Create set meta table struct SetDescCreator : public DataSetCreator { ErrorCode operator()( WriteHDF5* writer, long size, const ExportSet*, long& start_id ) const { return writer->create_set_meta( size, start_id ); } }; long count = setSet.range.size(); rval = create_dataset( 1, &count, &setSet.offset, &setSet.max_num_ents, &setSet.total_num_ents, SetDescCreator(), NULL, &setSet.first_id ); CHECK_MB( rval ); writeSets = setSet.max_num_ents > 0; rval = assign_ids( setSet.range, setSet.first_id + setSet.offset ); CHECK_MB( rval ); if( times ) times[SET_OFFSET_TIME] = timer.time_elapsed(); // Exchange file IDS for sets between all procs rval = communicate_shared_set_ids( owned, remote ); CHECK_MB( rval ); if( times ) times[SHARED_SET_IDS] = timer.time_elapsed(); // Communicate remote set contents, children, etc. rval = communicate_shared_set_data( owned, remote ); CHECK_MB( rval ); if( times ) times[SHARED_SET_CONTENTS] = timer.time_elapsed(); // Communicate counts for owned sets long data_counts[3]; // { #contents, #children, #parents } rval = count_set_size( setSet.range, data_counts[0], data_counts[1], data_counts[2] ); CHECK_MB( rval ); if( times ) times[SET_OFFSET_TIME] += timer.time_elapsed(); long offsets[3], max_counts[3], totals[3]; rval = create_dataset( 3, data_counts, offsets, max_counts, totals ); CHECK_MB( rval ); // Create the datasets if( 0 == myPcomm->proc_config().proc_rank() ) { rval = create_set_tables( totals[0], totals[1], totals[2] ); CHECK_MB( rval ); } // Store communicated global data setContentsOffset = offsets[0]; setChildrenOffset = offsets[1]; setParentsOffset = offsets[2]; maxNumSetContents = max_counts[0]; maxNumSetChildren = max_counts[1]; maxNumSetParents = max_counts[2]; writeSetContents = totals[0] > 0; writeSetChildren = totals[1] > 0; writeSetParents = totals[2] > 0; dbgOut.printf( 2, "set contents: %ld local, %ld global, offset = %ld\n", data_counts[0], totals[0], offsets[0] ); dbgOut.printf( 2, "set children: %ld local, %ld global, offset = %ld\n", data_counts[1], totals[1], offsets[1] ); dbgOut.printf( 2, "set parents: %ld local, %ld global, offset = %ld\n", data_counts[2], totals[2], offsets[2] ); return MB_SUCCESS; }
ErrorCode moab::WriteHDF5Parallel::create_node_table | ( | int | dimension | ) | [protected] |
Create the node table in the file.
Definition at line 1180 of file WriteHDF5Parallel.cpp.
References moab::WriteHDF5::assign_ids(), CHECK_HDFN, CHECK_MB, create_dataset(), ErrorCode, moab::WriteHDF5::file_ptr(), moab::WriteHDF5::ExportSet::first_id, moab::WriteHDF5::ExportSet::max_num_ents, MB_SUCCESS, mhdf_closeData(), mhdf_createNodeCoords(), moab::WriteHDF5::nodeSet, moab::WriteHDF5::ExportType::num_nodes, moab::WriteHDF5::ExportSet::offset, moab::WriteHDF5::ExportSet::range, moab::Range::size(), and moab::WriteHDF5::ExportSet::total_num_ents.
Referenced by parallel_create_file().
{ nodeSet.num_nodes = dimension; // Put it here so NodeSetCreator can access it struct NodeSetCreator : public DataSetCreator { ErrorCode operator()( WriteHDF5* file, long count, const ExportSet* group, long& start_id ) const { mhdf_Status status; hid_t handle = mhdf_createNodeCoords( file->file_ptr(), group->num_nodes, count, &start_id, &status ); CHECK_HDFN( status ); mhdf_closeData( file->file_ptr(), handle, &status ); CHECK_HDFN( status ); return MB_SUCCESS; } }; const long count = nodeSet.range.size(); ExportSet* array[] = { &nodeSet }; ErrorCode rval = create_dataset( 1, &count, &nodeSet.offset, &nodeSet.max_num_ents, &nodeSet.total_num_ents, NodeSetCreator(), array, &nodeSet.first_id ); CHECK_MB( rval ); return assign_ids( nodeSet.range, nodeSet.first_id + nodeSet.offset ); }
ErrorCode moab::WriteHDF5Parallel::create_tag_tables | ( | ) | [protected] |
Write tag descriptions and create tables to hold tag data.
Definition at line 753 of file WriteHDF5Parallel.cpp.
References append_serial_tag_data(), moab::WriteHDF5::check_dense_format_tag(), CHECK_MB, CHECK_MPI, check_serial_tag_data(), create_dataset(), moab::WriteHDF5::create_tag(), moab::WriteHDF5::dbgOut, moab::error(), ErrorCode, moab::WriteHDF5::exportList, moab::get_bit(), moab::WriteHDF5::get_num_sparse_tagged_entities(), moab::WriteHDF5::get_sparse_tagged_entities(), moab::WriteHDF5::get_tag_data_length(), moab::DebugOutput::get_verbosity(), moab::WriteHDF5::iFace, MB_SET_ERR_CONT, MB_SUCCESS, MB_TAG_DENSE, MB_VARIABLE_DATA_LENGTH, moab::my_Gatherv(), myPcomm, moab::WriteHDF5::nodeSet, moab::DebugOutput::print(), moab::DebugOutput::printf(), moab::ProcConfig::proc_comm(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::WriteHDF5::ExportSet::range, moab::set_bit(), moab::WriteHDF5::setSet, size, moab::Range::size(), moab::WriteHDF5::subState, moab::Interface::tag_get_default_value(), moab::Interface::tag_get_length(), moab::Interface::tag_get_name(), moab::Interface::tag_get_type(), moab::WriteHDF5::tagList, TagType, moab::DebugOutput::tprint(), and moab::WriteHDF5::writeTagDense.
Referenced by parallel_create_file().
{ std::list< TagDesc >::iterator tag_iter; ErrorCode rval; int err; const int rank = myPcomm->proc_config().proc_rank(); const MPI_Comm comm = myPcomm->proc_config().proc_comm(); subState.start( "negotiating tag list" ); dbgOut.tprint( 1, "communicating tag metadata\n" ); dbgOut.printf( 2, "Exchanging tag data for %d tags.\n", (int)tagList.size() ); // Sort tagList contents in alphabetical order by tag name tagList.sort( TagNameCompare( iFace ) ); // Negotiate total list of tags to write // Build concatenated list of all tag data std::vector< unsigned char > tag_buffer; for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) { rval = append_serial_tag_data( tag_buffer, *tag_iter ); CHECK_MB( rval ); } // Broadcast list from root to all other procs unsigned long size = tag_buffer.size(); err = MPI_Bcast( &size, 1, MPI_UNSIGNED_LONG, 0, comm ); CHECK_MPI( err ); tag_buffer.resize( size ); err = MPI_Bcast( &tag_buffer[0], size, MPI_UNSIGNED_CHAR, 0, comm ); CHECK_MPI( err ); // Update local tag list std::vector< TagDesc* > missing; rval = check_serial_tag_data( tag_buffer, &missing, 0 ); CHECK_MB( rval ); // Check if we're done (0->done, 1->more, 2+->error) int code, lcode = ( MB_SUCCESS != rval ) ? rval + 2 : missing.empty() ? 0 : 1; err = MPI_Allreduce( &lcode, &code, 1, MPI_INT, MPI_MAX, comm ); CHECK_MPI( err ); if( code > 1 ) { MB_SET_ERR_CONT( "Inconsistent tag definitions between procs" ); return error( (ErrorCode)( code - 2 ) ); } // If not done... if( code ) { dbgOut.print( 1, "Not all procs had same tag definitions, negotiating...\n" ); // Get tags defined on this proc but not on root proc tag_buffer.clear(); for( size_t i = 0; i < missing.size(); ++i ) { rval = append_serial_tag_data( tag_buffer, *missing[i] ); CHECK_MB( rval ); } // Gather extra tag definitions on root processor std::vector< int > junk; // don't care how many from each proc assert( rank || tag_buffer.empty() ); // must be empty on root err = my_Gatherv( &tag_buffer[0], tag_buffer.size(), MPI_UNSIGNED_CHAR, tag_buffer, junk, 0, comm ); CHECK_MPI( err ); // Process serialized tag descriptions on root, and rval = MB_SUCCESS; if( 0 == rank ) { // Process serialized tag descriptions on root, and std::vector< TagDesc* > newlist; rval = check_serial_tag_data( tag_buffer, 0, &newlist ); tag_buffer.clear(); // re-serialize a unique list of new tag definitions for( size_t i = 0; MB_SUCCESS == rval && i != newlist.size(); ++i ) { rval = append_serial_tag_data( tag_buffer, *newlist[i] ); CHECK_MB( rval ); } } // Broadcast any new tag definitions from root to other procs long this_size = tag_buffer.size(); if( MB_SUCCESS != rval ) this_size = -rval; err = MPI_Bcast( &this_size, 1, MPI_LONG, 0, comm ); CHECK_MPI( err ); if( this_size < 0 ) { MB_SET_ERR_CONT( "Inconsistent tag definitions between procs" ); return error( (ErrorCode)-this_size ); } tag_buffer.resize( this_size ); err = MPI_Bcast( &tag_buffer[0], this_size, MPI_UNSIGNED_CHAR, 0, comm ); CHECK_MPI( err ); // Process new tag definitions rval = check_serial_tag_data( tag_buffer, 0, 0 ); CHECK_MB( rval ); } subState.end(); subState.start( "negotiate which element/tag combinations are dense" ); // Figure out for which tag/element combinations we can // write dense tag data. // Construct a table of bits, // where each row of the table corresponds to a tag // and each column to an element group. // Two extra, because first is nodes and last is sets. // (n+7)/8 is ceil(n/8) const int bytes_per_tag = ( exportList.size() + 9 ) / 8; std::vector< unsigned char > data( bytes_per_tag * tagList.size(), 0 ); std::vector< unsigned char > recv( data.size(), 0 ); unsigned char* iter = &data[0]; if( writeTagDense && !data.empty() ) { for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter, iter += bytes_per_tag ) { Range tagged; rval = get_sparse_tagged_entities( *tag_iter, tagged ); CHECK_MB( rval ); int s; if( MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s ) ) continue; std::string n; iFace->tag_get_name( tag_iter->tag_id, n ); // Second time we've called, so shouldn't fail // Check if we want to write this tag in dense format even if not // all of the entities have a tag value. The criterion of this // is that the tag be dense, have a default value, and have at // least 2/3 of the entities tagged. bool prefer_dense = false; TagType type; rval = iFace->tag_get_type( tag_iter->tag_id, type ); CHECK_MB( rval ); if( MB_TAG_DENSE == type ) { const void* defval = 0; rval = iFace->tag_get_default_value( tag_iter->tag_id, defval, s ); if( MB_SUCCESS == rval ) prefer_dense = true; } int i = 0; if( check_dense_format_tag( nodeSet, tagged, prefer_dense ) ) { set_bit( i, iter ); dbgOut.printf( 2, "Can write dense data for \"%s\"/Nodes\n", n.c_str() ); } std::list< ExportSet >::const_iterator ex_iter = exportList.begin(); for( ++i; ex_iter != exportList.end(); ++i, ++ex_iter ) { // when writing in parallel, on some partitions, some of these element ranges might // be empty so do not turn this tag as sparse, just because of that, leave it dense, // if we prefer dense if( ( prefer_dense && ex_iter->range.empty() ) || check_dense_format_tag( *ex_iter, tagged, prefer_dense ) ) { set_bit( i, iter ); dbgOut.printf( 2, "Can write dense data for \"%s\"/%s\n", n.c_str(), ex_iter->name() ); } } if( check_dense_format_tag( setSet, tagged, prefer_dense ) ) { set_bit( i, iter ); dbgOut.printf( 2, "Can write dense data for \"%s\"/Sets\n", n.c_str() ); } } // Do bit-wise AND of list over all processors (only write dense format // if all proccesses want dense format for this group of entities). err = MPI_Allreduce( &data[0], &recv[0], data.size(), MPI_UNSIGNED_CHAR, MPI_BAND, myPcomm->proc_config().proc_comm() ); CHECK_MPI( err ); } // if (writeTagDense) // Store initial counts for sparse-formatted tag data. // The total number of values to send and receive will be the number of // tags plus the number of var-len tags because we need to negotiate // offsets into two different tables for the var-len tags. std::vector< long > counts; // Record dense tag/element combinations iter = &recv[0]; const unsigned char* iter2 = &data[0]; for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter, iter += bytes_per_tag, iter2 += bytes_per_tag ) { Range tagged; rval = get_sparse_tagged_entities( *tag_iter, tagged ); CHECK_MB( rval ); std::string n; iFace->tag_get_name( tag_iter->tag_id, n ); // Second time we've called, so shouldn't fail int i = 0; if( get_bit( i, iter ) ) { assert( get_bit( i, iter2 ) ); tag_iter->dense_list.push_back( nodeSet ); tagged -= nodeSet.range; dbgOut.printf( 2, "Will write dense data for \"%s\"/Nodes\n", n.c_str() ); } std::list< ExportSet >::const_iterator ex_iter = exportList.begin(); for( ++i; ex_iter != exportList.end(); ++i, ++ex_iter ) { if( get_bit( i, iter ) ) { assert( get_bit( i, iter2 ) ); tag_iter->dense_list.push_back( *ex_iter ); dbgOut.printf( 2, "WIll write dense data for \"%s\"/%s\n", n.c_str(), ex_iter->name() ); tagged -= ex_iter->range; } } if( get_bit( i, iter ) ) { assert( get_bit( i, iter2 ) ); tag_iter->dense_list.push_back( setSet ); dbgOut.printf( 2, "Will write dense data for \"%s\"/Sets\n", n.c_str() ); tagged -= setSet.range; } counts.push_back( tagged.size() ); int s; if( MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s ) ) { unsigned long data_len; rval = get_tag_data_length( *tag_iter, tagged, data_len ); CHECK_MB( rval ); counts.push_back( data_len ); } } subState.end(); subState.start( "Negotiate offsets for sparse tag info" ); std::vector< long > offsets( counts.size() ), maxima( counts.size() ), totals( counts.size() ); rval = create_dataset( counts.size(), &counts[0], &offsets[0], &maxima[0], &totals[0] ); CHECK_MB( rval ); // Copy values into local structs and if root then create tables size_t idx = 0; for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter, ++idx ) { assert( idx < counts.size() ); tag_iter->sparse_offset = offsets[idx]; tag_iter->max_num_ents = maxima[idx]; tag_iter->write_sparse = ( 0 != totals[idx] ); int s; if( MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s ) ) { ++idx; assert( idx < counts.size() ); tag_iter->var_data_offset = offsets[idx]; tag_iter->max_num_vals = maxima[idx]; } else { tag_iter->var_data_offset = 0; tag_iter->max_num_vals = 0; } } subState.end(); // Create tag tables on root process if( 0 == myPcomm->proc_config().proc_rank() ) { size_t iidx = 0; for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter, ++iidx ) { assert( iidx < totals.size() ); unsigned long num_ents = totals[iidx]; unsigned long num_val = 0; int s; if( MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s ) ) { ++iidx; assert( iidx < totals.size() ); num_val = totals[iidx]; } dbgOut.printf( 2, "Writing tag description for tag 0x%lx with %lu values\n", (unsigned long)tag_iter->tag_id, num_val ? num_val : num_ents ); rval = create_tag( *tag_iter, num_ents, num_val ); if( MB_SUCCESS != rval ) return error( rval ); } } if( dbgOut.get_verbosity() > 1 ) { dbgOut.printf( 2, "Tags: %12s %8s %8s %8s %8s %8s\n", "Name", "Count", "Offset", "Var Off", "Max Ent", "Handle" ); for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) { std::string name; iFace->tag_get_name( tag_iter->tag_id, name ); size_t this_size; get_num_sparse_tagged_entities( *tag_iter, this_size ); dbgOut.printf( 2, "%18s %8lu %8lu %8lu %8lu 0x%7lx\n", name.c_str(), (unsigned long)this_size, (unsigned long)tag_iter->sparse_offset, (unsigned long)tag_iter->var_data_offset, (unsigned long)tag_iter->max_num_ents, (unsigned long)tag_iter->tag_id ); } } return MB_SUCCESS; }
void moab::WriteHDF5Parallel::debug_barrier_line | ( | int | lineno | ) | [protected, virtual] |
Reimplemented from moab::WriteHDF5.
Definition at line 263 of file WriteHDF5Parallel.cpp.
References moab::WriteHDF5::dbgOut, moab::DebugOutput::get_verbosity(), myPcomm, moab::DebugOutput::printf(), moab::ProcConfig::proc_comm(), and moab::ParallelComm::proc_config().
{ const unsigned threshold = 2; static unsigned long count = 0; if( dbgOut.get_verbosity() >= threshold && myPcomm ) { dbgOut.printf( threshold, "*********** Debug Barrier %lu (@%d)***********\n", ++count, lineno ); MPI_Barrier( myPcomm->proc_config().proc_comm() ); } }
ErrorCode moab::WriteHDF5Parallel::exchange_file_ids | ( | const Range & | non_local_ents | ) | [protected] |
For entities that will be written by another processor but are referenced by entities on this processor, get the file Ids that will be assigned to those so they can be referenced by entities to be written on this processor.
non_local_ents | List of entities that are not to be written by this processor but are referenced by other entities that are to be written. |
Definition at line 2257 of file WriteHDF5Parallel.cpp.
References moab::Range::begin(), moab::Range::clear(), moab::WriteHDF5::dbgOut, moab::Range::end(), moab::CN::EntityTypeName(), moab::error(), ErrorCode, moab::ParallelComm::exchange_tags(), moab::WriteHDF5::exportList, moab::ParallelComm::filter_pstatus(), moab::RangeMap< KeyType, ValType, NullVal >::find(), moab::ParallelComm::get_owner(), moab::ID_FROM_HANDLE(), moab::WriteHDF5::idMap, moab::WriteHDF5::iFace, moab::RangeMap< KeyType, ValType, NullVal >::insert(), MB_SET_ERR, MB_SET_ERR_CONT, MB_SUCCESS, MB_TAG_CREAT, MB_TAG_DENSE, MB_TYPE_HANDLE, moab::Range::merge(), myPcomm, moab::WriteHDF5::nodeSet, moab::DebugOutput::printf(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), PSTATUS_AND, PSTATUS_SHARED, moab::WriteHDF5::ExportSet::range, moab::Range::size(), moab::Interface::tag_delete(), moab::Interface::tag_get_data(), moab::Interface::tag_get_handle(), moab::Interface::tag_set_data(), and moab::TYPE_FROM_HANDLE().
Referenced by parallel_create_file().
{ ErrorCode rval; // For each entity owned on the interface, write its file id to // a tag. The sets of entities to be written should already contain // only owned entities so by intersecting with them we not only // filter by entities to be written, but also restrict to entities // owned by the proc // Get list of interface entities Range imesh, tmp; for( std::list< ExportSet >::reverse_iterator i = exportList.rbegin(); i != exportList.rend(); ++i ) { tmp.clear(); rval = myPcomm->filter_pstatus( i->range, PSTATUS_SHARED, PSTATUS_AND, -1, &tmp ); if( MB_SUCCESS != rval ) return error( rval ); imesh.merge( tmp ); } tmp.clear(); rval = myPcomm->filter_pstatus( nodeSet.range, PSTATUS_SHARED, PSTATUS_AND, -1, &tmp ); if( MB_SUCCESS != rval ) return error( rval ); imesh.merge( tmp ); // Create tag to store file IDs EntityHandle default_val = 0; Tag file_id_tag = 0; rval = iFace->tag_get_handle( "__hdf5_ll_fileid", 1, MB_TYPE_HANDLE, file_id_tag, MB_TAG_DENSE | MB_TAG_CREAT, &default_val ); if( MB_SUCCESS != rval ) return error( rval ); // Copy file IDs into tag std::vector< EntityHandle > file_id_vect( imesh.size() ); Range::const_iterator i; std::vector< EntityHandle >::iterator j = file_id_vect.begin(); for( i = imesh.begin(); i != imesh.end(); ++i, ++j ) { *j = idMap.find( *i ); if( !*j ) { iFace->tag_delete( file_id_tag ); return error( MB_FAILURE ); } } rval = iFace->tag_set_data( file_id_tag, imesh, &file_id_vect[0] ); if( MB_SUCCESS != rval ) { iFace->tag_delete( file_id_tag ); return error( rval ); } // Do communication rval = myPcomm->exchange_tags( file_id_tag, imesh ); if( MB_SUCCESS != rval ) { iFace->tag_delete( file_id_tag ); return error( rval ); } // Copy file IDs from tag into idMap for remote entities file_id_vect.resize( nonlocal.size() ); rval = iFace->tag_get_data( file_id_tag, nonlocal, &file_id_vect[0] ); if( MB_SUCCESS != rval ) { iFace->tag_delete( file_id_tag ); return error( rval ); } j = file_id_vect.begin(); for( i = nonlocal.begin(); i != nonlocal.end(); ++i, ++j ) { if( *j == 0 ) { int owner = -1; myPcomm->get_owner( *i, owner ); const char* name = CN::EntityTypeName( TYPE_FROM_HANDLE( *i ) ); int id = ID_FROM_HANDLE( *i ); MB_SET_ERR_CONT( "Process " << myPcomm->proc_config().proc_rank() << " did not receive valid id handle for shared " << name << " " << id << " owned by process " << owner ); dbgOut.printf( 1, "Did not receive valid remote id for " "shared %s %d owned by process %d", name, id, owner ); iFace->tag_delete( file_id_tag ); return error( MB_FAILURE ); } else { if( !idMap.insert( *i, *j, 1 ).second ) { iFace->tag_delete( file_id_tag ); return error( MB_FAILURE ); } } } #ifndef NDEBUG // Check that writer is correct with regards to which entities // that it owns by verifying that the file ids that we thought // we were sending where not received instead file_id_vect.resize( imesh.size() ); rval = iFace->tag_get_data( file_id_tag, imesh, &file_id_vect[0] ); if( MB_SUCCESS != rval ) { iFace->tag_delete( file_id_tag ); return error( rval ); } int invalid_count = 0; j = file_id_vect.begin(); for( i = imesh.begin(); i != imesh.end(); ++i, ++j ) { EntityHandle h = idMap.find( *i ); if( *j != h ) { ++invalid_count; dbgOut.printf( 1, "Conflicting ownership for %s %ld\n", CN::EntityTypeName( TYPE_FROM_HANDLE( *i ) ), (long)ID_FROM_HANDLE( *i ) ); } } if( invalid_count ) { iFace->tag_delete( file_id_tag ); MB_SET_ERR( MB_FAILURE, invalid_count << " entities with conflicting ownership found by process " << myPcomm->proc_config().proc_rank() << ". This will result in duplicate entities written to file" ); } #endif return iFace->tag_delete( file_id_tag ); }
WriterIface * moab::WriteHDF5Parallel::factory | ( | Interface * | iface | ) | [static] |
Reimplemented from moab::WriteHDF5.
Definition at line 274 of file WriteHDF5Parallel.cpp.
References WriteHDF5Parallel().
Referenced by moab::ReaderWriterSet::ReaderWriterSet().
{ return new WriteHDF5Parallel( iface ); }
ErrorCode moab::WriteHDF5Parallel::gather_interface_meshes | ( | Range & | non_local_ents | ) | [protected] |
Figure out which mesh local mesh is duplicated on remote processors and which processor will write that mesh.
non_local_ents | Output list of entities that are not to be written by this processor but are referenced by other entities that are to be written. |
Definition at line 295 of file WriteHDF5Parallel.cpp.
References moab::Range::clear(), moab::WriteHDF5::dbgOut, moab::error(), ErrorCode, moab::WriteHDF5::exportList, moab::ParallelComm::filter_pstatus(), MB_SUCCESS, moab::Range::merge(), myPcomm, moab::WriteHDF5::nodeSet, moab::DebugOutput::print(), PSTATUS_AND, PSTATUS_NOT_OWNED, moab::WriteHDF5::ExportSet::range, moab::WriteHDF5::setSet, and moab::subtract().
Referenced by parallel_create_file().
{ ErrorCode result; // START_SERIAL; dbgOut.print( 3, "Pre-interface mesh:\n" ); dbgOut.print( 3, nodeSet.range ); for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) dbgOut.print( 3, eiter->range ); dbgOut.print( 3, setSet.range ); // Move handles of non-owned entities from lists of entities // that this processor will write to the 'nonowned' list. nonowned.clear(); result = myPcomm->filter_pstatus( nodeSet.range, PSTATUS_NOT_OWNED, PSTATUS_AND, -1, &nonowned ); if( MB_SUCCESS != result ) return error( result ); nodeSet.range = subtract( nodeSet.range, nonowned ); for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) { Range tmpset; result = myPcomm->filter_pstatus( eiter->range, PSTATUS_NOT_OWNED, PSTATUS_AND, -1, &tmpset ); if( MB_SUCCESS != result ) return error( result ); eiter->range = subtract( eiter->range, tmpset ); nonowned.merge( tmpset ); } dbgOut.print( 3, "Post-interface mesh:\n" ); dbgOut.print( 3, nodeSet.range ); for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) dbgOut.print( 3, eiter->range ); dbgOut.print( 3, setSet.range ); // END_SERIAL; return MB_SUCCESS; }
ErrorCode moab::WriteHDF5Parallel::get_sharedset_tags | ( | ) | [protected] |
get any existing tags which aren't excluded and add to shared set tags
ErrorCode moab::WriteHDF5Parallel::negotiate_type_list | ( | ) | [protected] |
Communicate with other processors to negotiate the types of elements that will be written (the union of the types defined on each proc.)
Definition at line 1228 of file WriteHDF5Parallel.cpp.
References moab::WriteHDF5::ExportSet::adj_offset, CHECK_MPI, moab::WriteHDF5::dbgOut, moab::CN::EntityTypeName(), moab::WriteHDF5::exportList, moab::WriteHDF5::ExportSet::first_id, MB_SUCCESS, myPcomm, moab::WriteHDF5::ExportType::num_nodes, moab::WriteHDF5::ExportSet::offset, moab::DebugOutput::print(), moab::DebugOutput::printf(), moab::ProcConfig::proc_comm(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::ProcConfig::proc_size(), moab::WriteHDF5::ExportType::type, VALGRIND_CHECK_MEM_IS_DEFINED, and moab::VALGRIND_MAKE_VEC_UNDEFINED().
Referenced by parallel_create_file().
{ int result; const MPI_Comm comm = myPcomm->proc_config().proc_comm(); exportList.sort(); int num_types = exportList.size(); // Get list of types on this processor typedef std::vector< std::pair< int, int > > typelist; typelist my_types( num_types ); (void)VALGRIND_MAKE_VEC_UNDEFINED( my_types ); typelist::iterator viter = my_types.begin(); for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) { viter->first = eiter->type; viter->second = eiter->num_nodes; ++viter; } dbgOut.print( 2, "Local Element Types:\n" ); for( viter = my_types.begin(); viter != my_types.end(); ++viter ) { int type = viter->first; int count = viter->second; dbgOut.printf( 2, " %s : %d\n", CN::EntityTypeName( (EntityType)type ), count ); } // Broadcast number of types from root to all nodes int num_types0 = num_types; result = MPI_Bcast( &num_types0, 1, MPI_INT, 0, comm ); CHECK_MPI( result ); // Broadcast type list from root to all nodes typelist root_types( num_types0 ); if( 0 == myPcomm->proc_config().proc_rank() ) root_types = my_types; result = MPI_Bcast( (void*)&root_types[0], 2 * num_types0, MPI_INT, 0, comm ); CHECK_MPI( result ); // Build local list of any types that root did not know about typelist non_root_types; viter = root_types.begin(); for( typelist::iterator iter = my_types.begin(); iter != my_types.end(); ++iter ) { if( viter == root_types.end() || *viter != *iter ) non_root_types.push_back( *iter ); else ++viter; } // Determine if any process had types not defined on the root int non_root_count = non_root_types.size(); int not_done; result = MPI_Allreduce( &non_root_count, ¬_done, 1, MPI_INT, MPI_LOR, comm ); CHECK_MPI( result ); if( not_done ) { // Get number of types each processor has that root does not std::vector< int > counts( myPcomm->proc_config().proc_size() ); int two_count = 2 * non_root_count; result = MPI_Gather( &two_count, 1, MPI_INT, &counts[0], 1, MPI_INT, 0, comm ); CHECK_MPI( result ); // Get list of types from each processor std::vector< int > displs( myPcomm->proc_config().proc_size() + 1 ); (void)VALGRIND_MAKE_VEC_UNDEFINED( displs ); displs[0] = 0; for( unsigned long i = 1; i <= myPcomm->proc_config().proc_size(); ++i ) displs[i] = displs[i - 1] + counts[i - 1]; int total = displs[myPcomm->proc_config().proc_size()]; typelist alltypes( total / 2 ); (void)VALGRIND_MAKE_VEC_UNDEFINED( alltypes ); (void)VALGRIND_CHECK_MEM_IS_DEFINED( &non_root_types[0], non_root_types.size() * sizeof( int ) ); result = MPI_Gatherv( (void*)&non_root_types[0], 2 * non_root_count, MPI_INT, (int*)&alltypes[0], &counts[0], &displs[0], MPI_INT, 0, comm ); CHECK_MPI( result ); // Merge type lists. // Prefer O(n) insertions with O(ln n) search time because // we expect data from a potentially large number of processes, // but with a small total number of element types. if( 0 == myPcomm->proc_config().proc_rank() ) { for( viter = alltypes.begin(); viter != alltypes.end(); ++viter ) { typelist::iterator titer = std::lower_bound( my_types.begin(), my_types.end(), *viter ); if( titer == my_types.end() || *titer != *viter ) my_types.insert( titer, *viter ); } dbgOut.print( 2, "Global Element Types:\n" ); for( viter = my_types.begin(); viter != my_types.end(); ++viter ) dbgOut.printf( 2, " %s : %d\n", CN::EntityTypeName( (EntityType)viter->first ), viter->second ); } // Send total number of types to each processor total = my_types.size(); result = MPI_Bcast( &total, 1, MPI_INT, 0, comm ); CHECK_MPI( result ); // Send list of types to each processor my_types.resize( total ); result = MPI_Bcast( (void*)&my_types[0], 2 * total, MPI_INT, 0, comm ); CHECK_MPI( result ); } else { // Special case: if root had types but some subset of procs did not // have those types, but there are no types that the root doesn't // know about then we still need to update processes that are missing // types. my_types.swap( root_types ); } // Insert missing types into exportList, with an empty // range of entities to export. std::list< ExportSet >::iterator ex_iter = exportList.begin(); for( viter = my_types.begin(); viter != my_types.end(); ++viter ) { while( ex_iter != exportList.end() && *ex_iter < *viter ) ++ex_iter; if( ex_iter == exportList.end() || !( *ex_iter == *viter ) ) { ExportSet insert; insert.type = (EntityType)viter->first; insert.num_nodes = viter->second; insert.first_id = 0; insert.offset = 0; insert.adj_offset = 0; ex_iter = exportList.insert( ex_iter, insert ); } } return MB_SUCCESS; }
ErrorCode moab::WriteHDF5Parallel::pack_set | ( | Range::const_iterator | set, |
unsigned long * | set_data, | ||
size_t | set_data_length | ||
) | [protected] |
Pack set data for communication.
If set_data_length is insufficient for the set data, the length entries at indices 1, 2, and 3 of set_data will be set with the necessary lengths, but no data will be written to set_data beyond that.
Definition at line 1669 of file WriteHDF5Parallel.cpp.
References CHECK_MB, moab::WriteUtilIface::CHILDREN, moab::WriteUtilIface::CONTENTS, ErrorCode, moab::WriteUtilIface::get_entity_list_pointers(), MB_SUCCESS, mhdf_SET_RANGE_BIT, moab::WriteUtilIface::PARENTS, moab::WriteHDF5::range_to_blocked_list(), moab::WriteHDF5::vector_to_id_list(), and moab::WriteHDF5::writeUtil.
Referenced by communicate_shared_set_data().
{ ErrorCode rval; const EntityHandle* ptr; int len; unsigned char flags; std::vector< wid_t > tmp; size_t newlen; // Buffer must always contain at least flags and desired sizes assert( buffer_size >= 4 ); buffer_size -= 4; Range::const_iterator nd = it; ++nd; rval = writeUtil->get_entity_list_pointers( it, nd, &ptr, WriteUtilIface::CONTENTS, &len, &flags ); CHECK_MB( rval ); // Tag mattag; // iFace->tag_get_handle(MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, mattag); // int block; // if (MB_SUCCESS != iFace->tag_get_data(mattag, &*it, 1, &block)) // block = 0; // // if (block) { // std::vector<int> ids; // get_global_ids(iFace, ptr, len, flags, ids); //} if( len && !( flags & MESHSET_ORDERED ) ) { tmp.clear(); bool blocked = false; assert( ( 0 == len % 2 ) ); rval = range_to_blocked_list( ptr, len / 2, tmp, blocked ); CHECK_MB( rval ); if( blocked ) flags |= mhdf_SET_RANGE_BIT; } else { tmp.resize( len ); rval = vector_to_id_list( ptr, len, &tmp[0], newlen, true ); CHECK_MB( rval ); tmp.resize( newlen ); } buffer[0] = flags; buffer[1] = tmp.size(); if( tmp.size() <= buffer_size ) std::copy( tmp.begin(), tmp.end(), buffer + 4 ); rval = writeUtil->get_entity_list_pointers( it, nd, &ptr, WriteUtilIface::CHILDREN, &len ); CHECK_MB( rval ); tmp.resize( len ); rval = vector_to_id_list( ptr, len, &tmp[0], newlen, true ); tmp.resize( newlen ); buffer[2] = tmp.size(); if( tmp.size() <= buffer_size - buffer[1] ) std::copy( tmp.begin(), tmp.end(), buffer + 4 + buffer[1] ); rval = writeUtil->get_entity_list_pointers( it, nd, &ptr, WriteUtilIface::PARENTS, &len ); CHECK_MB( rval ); tmp.resize( len ); rval = vector_to_id_list( ptr, len, &tmp[0], newlen, true ); tmp.resize( newlen ); buffer[3] = tmp.size(); if( tmp.size() <= buffer_size - buffer[1] - buffer[2] ) std::copy( tmp.begin(), tmp.end(), buffer + 4 + buffer[1] + buffer[2] ); return MB_SUCCESS; }
ErrorCode moab::WriteHDF5Parallel::parallel_create_file | ( | const char * | filename, |
bool | overwrite, | ||
const std::vector< std::string > & | qa_records, | ||
const FileOptions & | opts, | ||
const Tag * | user_tag_list = 0 , |
||
int | user_tag_count = 0 , |
||
int | dimension = 3 , |
||
double * | times = 0 |
||
) | [protected, virtual] |
Called by normal (non-parallel) writer. Sets up necessary data for parallel write.
Reimplemented from moab::WriteHDF5.
Definition at line 334 of file WriteHDF5Parallel.cpp.
References moab::WriteHDF5::collectiveIO, moab::WriteHDF5::CREATE_ADJ_TIME, create_adjacency_tables(), moab::WriteHDF5::CREATE_ELEM_TIME, create_element_tables(), create_meshset_tables(), create_node_table(), moab::WriteHDF5::CREATE_NODE_TIME, moab::WriteHDF5::CREATE_SET_TIME, create_tag_tables(), moab::WriteHDF5::CREATE_TAG_TIME, moab::WriteHDF5::dbgOut, debug_barrier, moab::CN::EntityTypeName(), moab::error(), ErrorCode, exchange_file_ids(), moab::WriteHDF5::FILEID_EXCHANGE_TIME, moab::WriteHDF5::filePtr, gather_interface_meshes(), moab::WriteHDF5::gather_tags(), moab::FileOptions::get_int_option(), moab::FileOptions::get_null_option(), moab::ParallelComm::get_pcomm(), moab::FileOptions::get_str_option(), moab::HDF5_can_append_hyperslabs(), hslabOp, moab::WriteHDF5::id_type, moab::WriteHDF5::iFace, moab::DebugOutput::limit_output_to_first_N_procs(), MB_SET_ERR, MB_SUCCESS, MBEDGE, MBENTITYSET, MBMAXTYPE, mhdf_closeFile(), mhdf_createFile(), mhdf_message(), mhdf_openFileWithOpt(), myPcomm, negotiate_type_list(), moab::WriteHDF5::NEGOTIATE_TYPES_TIME, pcommAllocated, moab::DebugOutput::print(), moab::ProcConfig::proc_comm(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::DebugOutput::set_rank(), moab::CpuTimer::time_elapsed(), moab::WriteHDF5::topState, moab::DebugOutput::tprint(), moab::DebugOutput::tprintf(), moab::WriteHDF5::write_qa(), and moab::WriteHDF5::writeProp.
{ ErrorCode rval; mhdf_Status status; int pcomm_no = 0; opts.get_int_option( "PARALLEL_COMM", pcomm_no ); myPcomm = ParallelComm::get_pcomm( iFace, pcomm_no ); if( 0 == myPcomm ) { myPcomm = new ParallelComm( iFace, MPI_COMM_WORLD ); pcommAllocated = true; } MPI_Info info = MPI_INFO_NULL; std::string cb_size; rval = opts.get_str_option( "CB_BUFFER_SIZE", cb_size ); if( MB_SUCCESS == rval ) { MPI_Info_create( &info ); MPI_Info_set( info, const_cast< char* >( "cb_buffer_size" ), const_cast< char* >( cb_size.c_str() ) ); } dbgOut.set_rank( myPcomm->proc_config().proc_rank() ); dbgOut.limit_output_to_first_N_procs( 32 ); Range nonlocal; debug_barrier(); dbgOut.tprint( 1, "Gathering interface meshes\n" ); rval = gather_interface_meshes( nonlocal ); if( MB_SUCCESS != rval ) return error( rval ); /**************** Get tag names for sets likely to be shared ***********/ // debug_barrier(); // dbgOut.tprint(1, "Getting shared entity sets\n"); // rval = get_sharedset_tags(); // if (MB_SUCCESS != rval) return error(rval); /**************** Create actual file and write meta info ***************/ debug_barrier(); if( myPcomm->proc_config().proc_rank() == 0 ) { dbgOut.tprintf( 1, "Creating file: %s\n", filename ); // Create the file const char* type_names[MBMAXTYPE]; memset( type_names, 0, MBMAXTYPE * sizeof( char* ) ); for( EntityType i = MBEDGE; i < MBENTITYSET; ++i ) type_names[i] = CN::EntityTypeName( i ); dbgOut.tprint( 1, "call mhdf_createFile\n" ); filePtr = mhdf_createFile( filename, overwrite, type_names, MBMAXTYPE, id_type, &status ); if( !filePtr ) { MB_SET_ERR( MB_FAILURE, mhdf_message( &status ) ); } dbgOut.tprint( 1, "call write_qa\n" ); rval = write_qa( qa_records ); if( MB_SUCCESS != rval ) return error( rval ); } /**************** Create node coordinate table ***************/ CpuTimer timer; debug_barrier(); dbgOut.tprint( 1, "creating node table\n" ); topState.start( "creating node table" ); rval = create_node_table( dimension ); topState.end( rval ); if( MB_SUCCESS != rval ) return error( rval ); if( times ) times[CREATE_NODE_TIME] = timer.time_elapsed(); /**************** Create element tables ***************/ debug_barrier(); dbgOut.tprint( 1, "negotiating element types\n" ); topState.start( "negotiating element types" ); rval = negotiate_type_list(); topState.end( rval ); if( MB_SUCCESS != rval ) return error( rval ); if( times ) times[NEGOTIATE_TYPES_TIME] = timer.time_elapsed(); dbgOut.tprint( 1, "creating element table\n" ); topState.start( "creating element tables" ); rval = create_element_tables(); topState.end( rval ); if( MB_SUCCESS != rval ) return error( rval ); if( times ) times[CREATE_ELEM_TIME] = timer.time_elapsed(); /*************** Exchange file IDs *****************/ debug_barrier(); dbgOut.tprint( 1, "communicating file ids\n" ); topState.start( "communicating file ids" ); rval = exchange_file_ids( nonlocal ); topState.end( rval ); if( MB_SUCCESS != rval ) return error( rval ); if( times ) times[FILEID_EXCHANGE_TIME] = timer.time_elapsed(); /**************** Create meshset tables *********************/ debug_barrier(); dbgOut.tprint( 1, "creating meshset table\n" ); topState.start( "creating meshset tables" ); rval = create_meshset_tables( times ); topState.end( rval ); if( MB_SUCCESS != rval ) return error( rval ); if( times ) times[CREATE_SET_TIME] = timer.time_elapsed(); /**************** Create adjacency tables *********************/ debug_barrier(); dbgOut.tprint( 1, "creating adjacency table\n" ); topState.start( "creating adjacency tables" ); rval = create_adjacency_tables(); topState.end( rval ); if( MB_SUCCESS != rval ) return error( rval ); if( times ) times[CREATE_ADJ_TIME] = timer.time_elapsed(); /**************** Create tag data *********************/ debug_barrier(); dbgOut.tprint( 1, "creating tag tables\n" ); topState.start( "creating tag tables" ); rval = gather_tags( user_tag_list, user_tag_count ); if( MB_SUCCESS != rval ) return error( rval ); rval = create_tag_tables(); topState.end( rval ); if( MB_SUCCESS != rval ) return error( rval ); if( times ) times[CREATE_TAG_TIME] = timer.time_elapsed(); /************** Close serial file and reopen parallel *****************/ if( 0 == myPcomm->proc_config().proc_rank() ) mhdf_closeFile( filePtr, &status ); MPI_Barrier( myPcomm->proc_config().proc_comm() ); dbgOut.tprint( 1, "(re)opening file in parallel mode\n" ); unsigned long junk; hid_t hdf_opt = H5Pcreate( H5P_FILE_ACCESS ); H5Pset_fapl_mpio( hdf_opt, myPcomm->proc_config().proc_comm(), info ); filePtr = mhdf_openFileWithOpt( filename, 1, &junk, id_type, hdf_opt, &status ); H5Pclose( hdf_opt ); if( !filePtr ) { MB_SET_ERR( MB_FAILURE, mhdf_message( &status ) ); } if( collectiveIO ) { dbgOut.print( 1, "USING COLLECTIVE IO\n" ); writeProp = H5Pcreate( H5P_DATASET_XFER ); H5Pset_dxpl_mpio( writeProp, H5FD_MPIO_COLLECTIVE ); } /* Test if we can use H5S_APPEND when selecting hyperslabs */ if( MB_SUCCESS != opts.get_null_option( "HYPERSLAB_OR" ) && ( MB_SUCCESS == opts.get_null_option( "HYPERSLAB_APPEND" ) || HDF5_can_append_hyperslabs() ) ) { dbgOut.print( 1, "HDF5 library supports H5Sselect_hyperlsab with H5S_SELECT_APPEND\n" ); hslabOp = H5S_SELECT_APPEND; } dbgOut.tprint( 1, "Exiting parallel_create_file\n" ); return MB_SUCCESS; }
void moab::WriteHDF5Parallel::print_set_sharing_data | ( | const Range & | range, |
const char * | label, | ||
Tag | idt | ||
) | [protected] |
Definition at line 1453 of file WriteHDF5Parallel.cpp.
References moab::Range::begin(), moab::WriteHDF5::dbgOut, moab::Range::end(), moab::RangeMap< KeyType, ValType, NullVal >::find(), moab::ParallelComm::get_entityset_owner(), moab::ParallelComm::get_entityset_procs(), moab::WriteHDF5::idMap, moab::WriteHDF5::iFace, myPcomm, moab::DebugOutput::print(), moab::DebugOutput::printf(), moab::SSVB, and moab::Interface::tag_get_data().
Referenced by print_shared_sets().
{ dbgOut.printf( SSVB, "set\tid\towner\t%-*s\tfid\tshared\n", (int)( sizeof( EntityHandle ) * 2 ), "handle" ); for( Range::iterator it = range.begin(); it != range.end(); ++it ) { int id; iFace->tag_get_data( idt, &*it, 1, &id ); EntityHandle handle = 0; unsigned owner = 0; wid_t file_id = 0; myPcomm->get_entityset_owner( *it, owner, &handle ); if( !idMap.find( *it, file_id ) ) file_id = 0; dbgOut.printf( SSVB, "%s\t%d\t%u\t%lx\t%lu\t", label, id, owner, (unsigned long)handle, (unsigned long)file_id ); std::vector< unsigned > procs; myPcomm->get_entityset_procs( *it, procs ); if( procs.empty() ) dbgOut.print( SSVB, "<none>\n" ); else { for( unsigned i = 0; i < procs.size() - 1; ++i ) dbgOut.printf( SSVB, "%u,", procs[i] ); dbgOut.printf( SSVB, "%u\n", procs.back() ); } } }
void moab::WriteHDF5Parallel::print_shared_sets | ( | ) | [protected] |
Definition at line 1480 of file WriteHDF5Parallel.cpp.
References DIRICHLET_SET_TAG_NAME, geom, GEOM_DIMENSION_TAG_NAME, moab::Interface::get_entities_by_type_and_tag(), moab::Interface::globalId_tag(), moab::WriteHDF5::iFace, MATERIAL_SET_TAG_NAME, MB_SUCCESS, MB_TYPE_INTEGER, MBENTITYSET, NEUMANN_SET_TAG_NAME, print_set_sharing_data(), and moab::Interface::tag_get_handle().
Referenced by communicate_shared_set_ids().
{ const char* tag_names[][2] = { { MATERIAL_SET_TAG_NAME, "block" }, { DIRICHLET_SET_TAG_NAME, "nodeset" }, { NEUMANN_SET_TAG_NAME, "sideset" }, { 0, 0 } }; for( int i = 0; tag_names[i][0]; ++i ) { Tag tag; if( MB_SUCCESS != iFace->tag_get_handle( tag_names[i][0], 1, MB_TYPE_INTEGER, tag ) ) continue; Range tagged; iFace->get_entities_by_type_and_tag( 0, MBENTITYSET, &tag, 0, 1, tagged ); print_set_sharing_data( tagged, tag_names[i][1], tag ); } Tag geom, id; if( MB_SUCCESS != iFace->tag_get_handle( GEOM_DIMENSION_TAG_NAME, 1, MB_TYPE_INTEGER, geom ) ) return; id = iFace->globalId_tag(); const char* geom_names[] = { "vertex", "curve", "surface", "volume" }; for( int d = 0; d <= 3; ++d ) { Range tagged; const void* vals[] = { &d }; iFace->get_entities_by_type_and_tag( 0, MBENTITYSET, &geom, vals, 1, tagged ); print_set_sharing_data( tagged, geom_names[d], id ); } }
void moab::WriteHDF5Parallel::print_times | ( | const double * | times | ) | const [protected, virtual] |
Definition at line 2389 of file WriteHDF5Parallel.cpp.
References myPcomm, moab::WriteHDF5::NUM_TIMES, moab::WriteHDF5::print_times(), moab::ProcConfig::proc_comm(), moab::ParallelComm::proc_config(), and moab::ProcConfig::proc_rank().
{ if( !myPcomm ) { WriteHDF5::print_times( times ); } else { double recv[NUM_TIMES]; MPI_Reduce( (void*)times, recv, NUM_TIMES, MPI_DOUBLE, MPI_MAX, 0, myPcomm->proc_config().proc_comm() ); if( 0 == myPcomm->proc_config().proc_rank() ) WriteHDF5::print_times( recv ); } }
void moab::WriteHDF5Parallel::remove_remote_entities | ( | EntityHandle | relative, |
Range & | range | ||
) | [protected] |
Remove any remote mesh entities from the passed range.
Definition at line 2194 of file WriteHDF5Parallel.cpp.
References moab::Range::begin(), moab::CREATE_HANDLE(), moab::Range::end(), moab::WriteHDF5::exportList, moab::intersect(), moab::Range::lower_bound(), MBENTITYSET, moab::Range::merge(), moab::WriteHDF5::nodeSet, moab::WriteHDF5::ExportSet::range, remove_remote_sets(), moab::WriteHDF5::setSet, and moab::Range::swap().
Referenced by remove_remote_entities().
{ Range result; result.merge( intersect( range, nodeSet.range ) ); result.merge( intersect( range, setSet.range ) ); for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) result.merge( intersect( range, eiter->range ) ); // result.merge(intersect(range, myParallelSets)); Range sets; int junk; sets.merge( Range::lower_bound( range.begin(), range.end(), CREATE_HANDLE( MBENTITYSET, 0, junk ) ), range.end() ); remove_remote_sets( relative, sets ); result.merge( sets ); range.swap( result ); }
void moab::WriteHDF5Parallel::remove_remote_entities | ( | EntityHandle | relative, |
std::vector< EntityHandle > & | vect | ||
) | [protected] |
Definition at line 2219 of file WriteHDF5Parallel.cpp.
References moab::Range::end(), moab::Range::find(), moab::Range::insert(), and remove_remote_entities().
{ Range intrsct; for( std::vector< EntityHandle >::const_iterator iter = vect.begin(); iter != vect.end(); ++iter ) intrsct.insert( *iter ); remove_remote_entities( relative, intrsct ); unsigned int read, write; for( read = write = 0; read < vect.size(); ++read ) { if( intrsct.find( vect[read] ) != intrsct.end() ) { if( read != write ) vect[write] = vect[read]; ++write; } } if( write != vect.size() ) vect.resize( write ); }
void moab::WriteHDF5Parallel::remove_remote_sets | ( | EntityHandle | relative, |
Range & | range | ||
) | [protected] |
Definition at line 2211 of file WriteHDF5Parallel.cpp.
References moab::intersect(), moab::WriteHDF5::ExportSet::range, moab::WriteHDF5::setSet, and moab::Range::swap().
Referenced by remove_remote_entities(), and remove_remote_sets().
void moab::WriteHDF5Parallel::remove_remote_sets | ( | EntityHandle | relative, |
std::vector< EntityHandle > & | vect | ||
) | [protected] |
Definition at line 2238 of file WriteHDF5Parallel.cpp.
References moab::Range::end(), moab::Range::find(), moab::Range::insert(), and remove_remote_sets().
{ Range intrsct; for( std::vector< EntityHandle >::const_iterator iter = vect.begin(); iter != vect.end(); ++iter ) intrsct.insert( *iter ); remove_remote_sets( relative, intrsct ); unsigned int read, write; for( read = write = 0; read < vect.size(); ++read ) { if( intrsct.find( vect[read] ) != intrsct.end() ) { if( read != write ) vect[write] = vect[read]; ++write; } } if( write != vect.size() ) vect.resize( write ); }
ErrorCode moab::WriteHDF5Parallel::unpack_set | ( | EntityHandle | set, |
const unsigned long * | set_data, | ||
size_t | set_data_length | ||
) | [protected] |
Unpack set data from communication.
Definition at line 1792 of file WriteHDF5Parallel.cpp.
References moab::WriteHDF5::SpecialSetData::childIds, children, moab::WriteHDF5::SpecialSetData::contentIds, moab::convert_to_ranged_ids(), moab::WriteHDF5::find_set_data(), MB_SUCCESS, moab::merge_ranged_ids(), moab::merge_vector_ids(), mhdf_SET_RANGE_BIT, moab::WriteHDF5::SpecialSetData::parentIds, and moab::WriteHDF5::SpecialSetData::setFlags.
Referenced by communicate_shared_set_data().
{ // Use local variables for readability assert( buffer_size >= 4 ); assert( buffer[1] + buffer[2] + buffer[3] <= buffer_size ); const unsigned long flags = buffer[0]; unsigned long num_content = buffer[1]; const unsigned long num_child = buffer[2]; const unsigned long num_parent = buffer[3]; const unsigned long* contents = buffer + 4; const unsigned long* children = contents + num_content; const unsigned long* parents = children + num_child; SpecialSetData* data = find_set_data( set ); assert( NULL != data ); if( NULL == data ) return MB_FAILURE; // Tag mattag; // iFace->tag_get_handle(MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, mattag); // int block; // if (MB_SUCCESS != iFace->tag_get_data(mattag, &set, 1, &block)) // block = 0; // If either the current data or the new data is in ranged format, // then change the other to ranged format if it isn't already // in both cases when they differ, the data will end up "compressed range" std::vector< wid_t > tmp; if( ( flags & mhdf_SET_RANGE_BIT ) != ( data->setFlags & mhdf_SET_RANGE_BIT ) ) { if( flags & mhdf_SET_RANGE_BIT ) { tmp = data->contentIds; convert_to_ranged_ids( &tmp[0], tmp.size(), data->contentIds ); data->setFlags |= mhdf_SET_RANGE_BIT; } else { tmp.clear(); convert_to_ranged_ids( contents, num_content, tmp ); num_content = tmp.size(); if( sizeof( wid_t ) < sizeof( long ) ) { size_t old_size = tmp.size(); tmp.resize( sizeof( long ) * old_size / sizeof( wid_t ) ); unsigned long* array = reinterpret_cast< unsigned long* >( &tmp[0] ); for( long i = ( (long)old_size ) - 1; i >= 0; --i ) array[i] = tmp[i]; contents = array; } else if( sizeof( wid_t ) > sizeof( long ) ) { unsigned long* array = reinterpret_cast< unsigned long* >( &tmp[0] ); std::copy( tmp.begin(), tmp.end(), array ); } contents = reinterpret_cast< unsigned long* >( &tmp[0] ); } } if( data->setFlags & mhdf_SET_RANGE_BIT ) merge_ranged_ids( contents, num_content, data->contentIds ); else merge_vector_ids( contents, num_content, data->contentIds ); merge_vector_ids( children, num_child, data->childIds ); merge_vector_ids( parents, num_parent, data->parentIds ); return MB_SUCCESS; }
H5S_seloper_t moab::WriteHDF5Parallel::hslabOp [private] |
Operation to use to append hyperslab selections.
Definition at line 186 of file WriteHDF5Parallel.hpp.
Referenced by parallel_create_file().
ParallelComm* moab::WriteHDF5Parallel::myPcomm [private] |
pcomm controlling parallel nature of mesh
Definition at line 180 of file WriteHDF5Parallel.hpp.
Referenced by communicate_shared_set_data(), communicate_shared_set_ids(), create_dataset(), create_meshset_tables(), create_tag_tables(), debug_barrier_line(), exchange_file_ids(), gather_interface_meshes(), negotiate_type_list(), parallel_create_file(), print_set_sharing_data(), print_times(), and ~WriteHDF5Parallel().
bool moab::WriteHDF5Parallel::pcommAllocated [private] |
whether this instance allocated (and dtor should delete) the pcomm
Definition at line 183 of file WriteHDF5Parallel.hpp.
Referenced by parallel_create_file(), and ~WriteHDF5Parallel().