MOAB: Mesh Oriented datABase
(version 5.4.1)
|
#include <ReadParallel.hpp>
Public Types | |
enum | ParallelActions { PA_READ = 0, PA_READ_PART = 1, PA_BROADCAST = 2, PA_DELETE_NONLOCAL = 3, PA_CHECK_GIDS_SERIAL = 4, PA_GET_FILESET_ENTS = 5, PA_RESOLVE_SHARED_ENTS = 6, PA_EXCHANGE_GHOSTS = 7, PA_RESOLVE_SHARED_SETS = 8, PA_AUGMENT_SETS_WITH_GHOSTS = 9, PA_PRINT_PARALLEL = 10, PA_CREATE_TRIVIAL_PARTITION = 11, PA_CORRECT_THIN_GHOSTS = 12 } |
enum | ParallelOpts { POPT_NONE = 0, POPT_BCAST, POPT_BCAST_DELETE, POPT_READ_DELETE, POPT_READ_PART, POPT_DEFAULT } |
Public Member Functions | |
ErrorCode | load_file (const char *file_name, const EntityHandle *file_set, const FileOptions &opts, const ReaderIface::SubsetList *subset_list=0, const Tag *file_id_tag=0) |
load a file | |
ErrorCode | load_file (const char **file_names, const int num_files, const EntityHandle *file_set, const FileOptions &opts, const ReaderIface::SubsetList *subset_list=0, const Tag *file_id_tag=0) |
load multiple files | |
ErrorCode | load_file (const char **file_names, const int num_files, const EntityHandle *file_set, int parallel_mode, std::string &partition_tag_name, std::vector< int > &partition_tag_vals, bool distrib, bool partition_by_rank, std::vector< int > &pa_vec, const FileOptions &opts, const ReaderIface::SubsetList *subset_list, const Tag *file_id_tag, const int reader_rank, const bool cputime, const int resolve_dim, const int shared_dim, const int ghost_dim, const int bridge_dim, const int num_layers, const int addl_ents) |
ReadParallel (Interface *impl=NULL, ParallelComm *pc=NULL) | |
Constructor. | |
virtual | ~ReadParallel () |
Destructor. | |
ErrorCode | delete_nonlocal_entities (std::string &ptag_name, std::vector< int > &ptag_vals, bool distribute, EntityHandle file_set) |
PUBLIC TO ALLOW TESTING. | |
ErrorCode | delete_nonlocal_entities (EntityHandle file_set) |
Static Public Member Functions | |
static ReaderIface * | factory (Interface *) |
Static Public Attributes | |
static const char * | parallelOptsNames [] = { "NONE", "BCAST", "BCAST_DELETE", "READ_DELETE", "READ_PART", "", 0 } |
static const char * | ParallelActionsNames [] |
Protected Member Functions | |
ErrorCode | create_partition_sets (std::string &ptag_name, EntityHandle file_set) |
Private Attributes | |
Interface * | mbImpl |
ParallelComm * | myPcomm |
DebugOutput | myDebug |
Error * | mError |
Definition at line 17 of file ReadParallel.hpp.
Definition at line 66 of file ReadParallel.hpp.
{ PA_READ = 0, PA_READ_PART = 1, PA_BROADCAST = 2, PA_DELETE_NONLOCAL = 3, PA_CHECK_GIDS_SERIAL = 4, PA_GET_FILESET_ENTS = 5, PA_RESOLVE_SHARED_ENTS = 6, PA_EXCHANGE_GHOSTS = 7, PA_RESOLVE_SHARED_SETS = 8, PA_AUGMENT_SETS_WITH_GHOSTS = 9, PA_PRINT_PARALLEL = 10, PA_CREATE_TRIVIAL_PARTITION = 11, PA_CORRECT_THIN_GHOSTS = 12 };
Definition at line 85 of file ReadParallel.hpp.
moab::ReadParallel::ReadParallel | ( | Interface * | impl = NULL , |
ParallelComm * | pc = NULL |
||
) |
Constructor.
Definition at line 39 of file ReadParallel.cpp.
References moab::debug, moab::ParallelComm::get_pcomm(), mbImpl, mError, MPI_COMM_WORLD, myDebug, myPcomm, moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::Interface::query_interface(), moab::DebugOutput::set_rank(), and moab::DebugOutput::set_verbosity().
: mbImpl( impl ), myPcomm( pc ), myDebug( "ReadPara", std::cerr ) { if( !myPcomm ) { myPcomm = ParallelComm::get_pcomm( mbImpl, 0 ); if( NULL == myPcomm ) myPcomm = new ParallelComm( mbImpl, MPI_COMM_WORLD ); } myDebug.set_rank( myPcomm->proc_config().proc_rank() ); if( debug ) // For backwards compatibility, enable all debug output if constant is true myDebug.set_verbosity( 10 ); impl->query_interface( mError ); }
virtual moab::ReadParallel::~ReadParallel | ( | ) | [inline, virtual] |
ErrorCode moab::ReadParallel::create_partition_sets | ( | std::string & | ptag_name, |
EntityHandle | file_set | ||
) | [protected] |
Definition at line 720 of file ReadParallel.cpp.
References moab::Range::empty(), ErrorCode, moab::Interface::get_entities_by_type_and_tag(), MB_CHK_SET_ERR, MB_SUCCESS, MB_TAG_CREAT, MB_TAG_SPARSE, MB_TYPE_INTEGER, MBENTITYSET, mbImpl, myPcomm, PARALLEL_PARTITION_TAG_NAME, moab::ParallelComm::partition_sets(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::Range::size(), moab::Interface::tag_delete_data(), moab::Interface::tag_get_handle(), and moab::Interface::tag_set_data().
Referenced by load_file().
{ int proc_rk = myPcomm->proc_config().proc_rank(); ErrorCode result = MB_SUCCESS; Tag ptag; // Tag the partition sets with a standard tag name if( ptag_name.empty() ) ptag_name = PARALLEL_PARTITION_TAG_NAME; bool tag_created = false; result = mbImpl->tag_get_handle( ptag_name.c_str(), 1, MB_TYPE_INTEGER, ptag, MB_TAG_SPARSE | MB_TAG_CREAT, 0, &tag_created );MB_CHK_SET_ERR( result, "Trouble getting PARALLEL_PARTITION tag" ); if( !tag_created ) { // This tag already exists; better check to see that tagged sets // agree with this partition Range tagged_sets; int* proc_rk_ptr = &proc_rk; result = mbImpl->get_entities_by_type_and_tag( file_set, MBENTITYSET, &ptag, (const void* const*)&proc_rk_ptr, 1, tagged_sets );MB_CHK_SET_ERR( result, "Trouble getting tagged sets" ); if( !tagged_sets.empty() && tagged_sets != myPcomm->partition_sets() ) { result = mbImpl->tag_delete_data( ptag, tagged_sets );MB_CHK_SET_ERR( result, "Trouble deleting data of PARALLEL_PARTITION tag" ); } else if( tagged_sets == myPcomm->partition_sets() ) return MB_SUCCESS; } // If we get here, we need to assign the tag std::vector< int > values( myPcomm->partition_sets().size() ); for( unsigned int i = 0; i < myPcomm->partition_sets().size(); i++ ) values[i] = proc_rk; result = mbImpl->tag_set_data( ptag, myPcomm->partition_sets(), &values[0] );MB_CHK_SET_ERR( result, "Trouble setting data to PARALLEL_PARTITION tag" ); return MB_SUCCESS; }
ErrorCode moab::ReadParallel::delete_nonlocal_entities | ( | std::string & | ptag_name, |
std::vector< int > & | ptag_vals, | ||
bool | distribute, | ||
EntityHandle | file_set | ||
) |
PUBLIC TO ALLOW TESTING.
Definition at line 654 of file ReadParallel.cpp.
References ErrorCode, moab::Interface::get_entities_by_type_and_tag(), moab::Range::insert(), MB_CHK_ERR, MB_CHK_SET_ERR, MB_SET_ERR, MB_SUCCESS, MB_TYPE_INTEGER, MBENTITYSET, mbImpl, myDebug, myPcomm, moab::ParallelComm::partition_sets(), moab::DebugOutput::print(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::ProcConfig::proc_size(), moab::Range::size(), moab::Range::swap(), moab::Interface::tag_get_data(), and moab::Interface::tag_get_handle().
Referenced by load_file().
{ Range partition_sets; Tag ptag; ErrorCode result = mbImpl->tag_get_handle( ptag_name.c_str(), 1, MB_TYPE_INTEGER, ptag );MB_CHK_SET_ERR( result, "Failed getting tag handle in delete_nonlocal_entities" ); result = mbImpl->get_entities_by_type_and_tag( file_set, MBENTITYSET, &ptag, NULL, 1, myPcomm->partition_sets() );MB_CHK_SET_ERR( result, "Failed to get sets with partition-type tag" ); int proc_sz = myPcomm->proc_config().proc_size(); int proc_rk = myPcomm->proc_config().proc_rank(); if( !ptag_vals.empty() ) { // Values input, get sets with those values Range tmp_sets; std::vector< int > tag_vals( myPcomm->partition_sets().size() ); result = mbImpl->tag_get_data( ptag, myPcomm->partition_sets(), &tag_vals[0] );MB_CHK_SET_ERR( result, "Failed to get tag data for partition vals tag" ); for( std::vector< int >::iterator pit = tag_vals.begin(); pit != tag_vals.end(); ++pit ) { std::vector< int >::iterator pit2 = std::find( ptag_vals.begin(), ptag_vals.end(), *pit ); if( pit2 != ptag_vals.end() ) tmp_sets.insert( myPcomm->partition_sets()[pit - tag_vals.begin()] ); } myPcomm->partition_sets().swap( tmp_sets ); } if( distribute ) { // For now, require that number of partition sets be greater // than number of procs if( myPcomm->partition_sets().size() < (unsigned int)proc_sz ) { MB_SET_ERR( MB_FAILURE, "Too few parts; P = " << proc_rk << ", tag = " << ptag << ", # sets = " << myPcomm->partition_sets().size() ); } Range tmp_sets; // Distribute the partition sets unsigned int num_sets = myPcomm->partition_sets().size() / proc_sz; unsigned int num_leftover = myPcomm->partition_sets().size() % proc_sz; int begin_set = 0; if( proc_rk < (int)num_leftover ) { num_sets++; begin_set = num_sets * proc_rk; } else begin_set = proc_rk * num_sets + num_leftover; for( unsigned int i = 0; i < num_sets; i++ ) tmp_sets.insert( myPcomm->partition_sets()[begin_set + i] ); myPcomm->partition_sets().swap( tmp_sets ); } myDebug.print( 1, "My partition sets: ", myPcomm->partition_sets() ); result = delete_nonlocal_entities( file_set );MB_CHK_ERR( result ); return MB_SUCCESS; }
Definition at line 758 of file ReadParallel.cpp.
References moab::Range::begin(), moab::Interface::delete_entities(), moab::Range::empty(), moab::Range::end(), ErrorCode, moab::ReadUtilIface::gather_related_ents(), moab::Interface::get_entities_by_handle(), MB_CHK_SET_ERR, MB_SUCCESS, MBENTITYSET, mbImpl, myDebug, myPcomm, moab::ParallelComm::partition_sets(), moab::DebugOutput::print(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::Interface::query_interface(), moab::Interface::remove_entities(), moab::Range::subset_by_type(), moab::subtract(), and moab::DebugOutput::tprint().
{ // Get partition entities and ents related to/used by those // get ents in the partition ReadUtilIface* read_iface; mbImpl->query_interface( read_iface ); Range partition_ents, all_sets; myDebug.tprint( 2, "Gathering related entities.\n" ); ErrorCode result = read_iface->gather_related_ents( myPcomm->partition_sets(), partition_ents, &file_set );MB_CHK_SET_ERR( result, "Failure gathering related entities" ); // Get pre-existing entities Range file_ents; result = mbImpl->get_entities_by_handle( file_set, file_ents );MB_CHK_SET_ERR( result, "Couldn't get pre-existing entities" ); if( 0 == myPcomm->proc_config().proc_rank() ) { myDebug.print( 2, "File entities: ", file_ents ); } // Get deletable entities by subtracting partition ents from file ents Range deletable_ents = subtract( file_ents, partition_ents ); // Cache deletable vs. keepable sets Range deletable_sets = deletable_ents.subset_by_type( MBENTITYSET ); Range keepable_sets = subtract( file_ents.subset_by_type( MBENTITYSET ), deletable_sets ); myDebug.tprint( 2, "Removing deletable entities from keepable sets.\n" ); // Remove deletable ents from all keepable sets for( Range::iterator rit = keepable_sets.begin(); rit != keepable_sets.end(); ++rit ) { result = mbImpl->remove_entities( *rit, deletable_ents );MB_CHK_SET_ERR( result, "Failure removing deletable entities" ); } result = mbImpl->remove_entities( file_set, deletable_ents );MB_CHK_SET_ERR( result, "Failure removing deletable entities" ); myDebug.tprint( 2, "Deleting deletable entities.\n" ); if( 0 == myPcomm->proc_config().proc_rank() ) { myDebug.print( 2, "Deletable sets: ", deletable_sets ); } // Delete sets, then ents if( !deletable_sets.empty() ) { result = mbImpl->delete_entities( deletable_sets );MB_CHK_SET_ERR( result, "Failure deleting sets in delete_nonlocal_entities" ); } deletable_ents -= deletable_sets; if( 0 == myPcomm->proc_config().proc_rank() ) { myDebug.print( 2, "Deletable entities: ", deletable_ents ); } if( !deletable_ents.empty() ) { result = mbImpl->delete_entities( deletable_ents );MB_CHK_SET_ERR( result, "Failure deleting entities in delete_nonlocal_entities" ); } return MB_SUCCESS; }
static ReaderIface* moab::ReadParallel::factory | ( | Interface * | ) | [static] |
ErrorCode moab::ReadParallel::load_file | ( | const char * | file_name, |
const EntityHandle * | file_set, | ||
const FileOptions & | opts, | ||
const ReaderIface::SubsetList * | subset_list = 0 , |
||
const Tag * | file_id_tag = 0 |
||
) | [inline] |
load a file
Definition at line 117 of file ReadParallel.hpp.
Referenced by load_file(), moab::Core::load_file(), and main().
{ return load_file( &file_name, 1, file_set, opts, subset_list, file_id_tag ); }
ErrorCode moab::ReadParallel::load_file | ( | const char ** | file_names, |
const int | num_files, | ||
const EntityHandle * | file_set, | ||
const FileOptions & | opts, | ||
const ReaderIface::SubsetList * | subset_list = 0 , |
||
const Tag * | file_id_tag = 0 |
||
) |
load multiple files
Definition at line 54 of file ReadParallel.cpp.
References ErrorCode, moab::FileOptions::get_int_option(), moab::FileOptions::get_ints_option(), moab::FileOptions::get_null_option(), moab::FileOptions::get_option(), moab::FileOptions::get_real_option(), moab::FileOptions::get_str_option(), load_file(), moab::FileOptions::mark_all_seen(), moab::FileOptions::match_option(), MB_CHK_ERR, MB_ENTITY_NOT_FOUND, MB_SET_ERR, MB_SUCCESS, MB_TAG_NOT_FOUND, MB_TYPE_OUT_OF_RANGE, mbImpl, myDebug, myPcomm, PA_AUGMENT_SETS_WITH_GHOSTS, PA_BROADCAST, PA_CHECK_GIDS_SERIAL, PA_CORRECT_THIN_GHOSTS, PA_CREATE_TRIVIAL_PARTITION, PA_DELETE_NONLOCAL, PA_EXCHANGE_GHOSTS, PA_GET_FILESET_ENTS, PA_PRINT_PARALLEL, PA_READ, PA_READ_PART, PA_RESOLVE_SHARED_ENTS, PA_RESOLVE_SHARED_SETS, PARALLEL_PARTITION_TAG_NAME, parallelOptsNames, POPT_BCAST, POPT_BCAST_DELETE, POPT_DEFAULT, POPT_READ_DELETE, POPT_READ_PART, moab::DebugOutput::print(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::ParallelComm::set_debug_verbosity(), moab::Interface::set_sequence_multiplier(), moab::DebugOutput::set_verbosity(), moab::Interface::tag_get_handle(), and moab::DebugOutput::tprint().
{ int tmpval; if( MB_SUCCESS == opts.get_int_option( "DEBUG_PIO", 1, tmpval ) ) { myDebug.set_verbosity( tmpval ); myPcomm->set_debug_verbosity( tmpval ); } myDebug.tprint( 1, "Setting up...\n" ); // Get parallel settings int parallel_mode; ErrorCode result = opts.match_option( "PARALLEL", parallelOptsNames, parallel_mode ); if( MB_FAILURE == result ) { MB_SET_ERR( MB_FAILURE, "Unexpected value for 'PARALLEL' option" ); } else if( MB_ENTITY_NOT_FOUND == result ) { parallel_mode = 0; } // Get partition setting bool distrib; std::string partition_tag_name; result = opts.get_option( "PARTITION", partition_tag_name ); if( MB_ENTITY_NOT_FOUND == result ) { distrib = false; partition_tag_name = ""; } else { distrib = true; if( partition_tag_name.empty() ) partition_tag_name = PARALLEL_PARTITION_TAG_NAME; // Also get deprecated PARTITION_DISTRIBUTE option // so that higher-level code doesn't return an error // due to an unrecognized option opts.get_null_option( "PARTITION_DISTRIBUTE" ); } // Get partition tag value(s), if any, and whether they're to be // distributed or assigned std::vector< int > partition_tag_vals; opts.get_ints_option( "PARTITION_VAL", partition_tag_vals ); // see if partition tag name is "TRIVIAL", if the tag exists bool create_trivial_partition = false; if( partition_tag_name == std::string( "TRIVIAL" ) ) { Tag ttag; // see if the trivial tag exists result = mbImpl->tag_get_handle( partition_tag_name.c_str(), ttag ); if( MB_TAG_NOT_FOUND == result ) create_trivial_partition = true; } // See if we need to report times bool cputime = false; result = opts.get_null_option( "CPUTIME" ); if( MB_SUCCESS == result ) cputime = true; // See if we need to report times bool print_parallel = false; result = opts.get_null_option( "PRINT_PARALLEL" ); if( MB_SUCCESS == result ) print_parallel = true; // Get ghosting options std::string ghost_str; int bridge_dim, ghost_dim = -1, num_layers, addl_ents = 0; result = opts.get_str_option( "PARALLEL_GHOSTS", ghost_str ); if( MB_TYPE_OUT_OF_RANGE == result ) { ghost_dim = 3; bridge_dim = 0; num_layers = 1; } else if( MB_SUCCESS == result ) { int num_fields = sscanf( ghost_str.c_str(), "%d.%d.%d.%d", &ghost_dim, &bridge_dim, &num_layers, &addl_ents ); if( 3 > num_fields ) { MB_SET_ERR( MB_FAILURE, "Didn't read 3 fields from PARALLEL_GHOSTS string" ); } } // Get resolve_shared_ents option std::string shared_str; int resolve_dim = -2, shared_dim = -1; result = opts.get_str_option( "PARALLEL_RESOLVE_SHARED_ENTS", shared_str ); if( MB_TYPE_OUT_OF_RANGE == result ) { resolve_dim = -1; shared_dim = -1; } else if( MB_SUCCESS == result ) { int num_fields = sscanf( shared_str.c_str(), "%d.%d", &resolve_dim, &shared_dim ); if( 2 != num_fields ) { MB_SET_ERR( MB_FAILURE, "Didn't read 2 fields from PARALLEL_RESOLVE_SHARED_ENTS string" ); } } // Get skip augmenting with ghosts option bool skip_augment = false; result = opts.get_null_option( "SKIP_AUGMENT_WITH_GHOSTS" ); if( MB_SUCCESS == result ) skip_augment = true; bool correct_thin_ghosts = false; result = opts.get_null_option( "PARALLEL_THIN_GHOST_LAYER" ); if( MB_SUCCESS == result ) correct_thin_ghosts = true; // Get MPI IO processor rank int reader_rank; result = opts.get_int_option( "MPI_IO_RANK", reader_rank ); if( MB_ENTITY_NOT_FOUND == result ) reader_rank = 0; else if( MB_SUCCESS != result ) { MB_SET_ERR( MB_FAILURE, "Unexpected value for 'MPI_IO_RANK' option" ); } // Now that we've parsed all the parallel options, make an instruction queue std::vector< int > pa_vec; bool is_reader = ( reader_rank == (int)myPcomm->proc_config().proc_rank() ); bool partition_by_rank = false; if( MB_SUCCESS == opts.get_null_option( "PARTITION_BY_RANK" ) ) { partition_by_rank = true; if( !partition_tag_vals.empty() ) { MB_SET_ERR( MB_FAILURE, "Cannot specify both PARTITION_VALS and PARTITION_BY_RANK" ); } } double factor_seq; if( MB_SUCCESS == opts.get_real_option( "PARALLEL_SEQUENCE_FACTOR", factor_seq ) ) { if( factor_seq < 1. ) MB_SET_ERR( MB_FAILURE, "cannot have sequence factor less than 1." ); mbImpl->set_sequence_multiplier( factor_seq ); } switch( parallel_mode ) { case POPT_BCAST: myDebug.print( 1, "Read mode is BCAST\n" ); if( is_reader ) { pa_vec.push_back( PA_READ ); pa_vec.push_back( PA_CHECK_GIDS_SERIAL ); pa_vec.push_back( PA_GET_FILESET_ENTS ); } pa_vec.push_back( PA_BROADCAST ); if( !is_reader ) pa_vec.push_back( PA_GET_FILESET_ENTS ); break; case POPT_BCAST_DELETE: myDebug.print( 1, "Read mode is BCAST_DELETE\n" ); if( is_reader ) { pa_vec.push_back( PA_READ ); pa_vec.push_back( PA_CHECK_GIDS_SERIAL ); pa_vec.push_back( PA_GET_FILESET_ENTS ); if( create_trivial_partition ) pa_vec.push_back( PA_CREATE_TRIVIAL_PARTITION ); } pa_vec.push_back( PA_BROADCAST ); if( !is_reader ) pa_vec.push_back( PA_GET_FILESET_ENTS ); pa_vec.push_back( PA_DELETE_NONLOCAL ); break; case POPT_DEFAULT: case POPT_READ_DELETE: myDebug.print( 1, "Read mode is READ_DELETE\n" ); pa_vec.push_back( PA_READ ); pa_vec.push_back( PA_CHECK_GIDS_SERIAL ); pa_vec.push_back( PA_GET_FILESET_ENTS ); pa_vec.push_back( PA_DELETE_NONLOCAL ); break; case POPT_READ_PART: myDebug.print( 1, "Read mode is READ_PART\n" ); pa_vec.push_back( PA_READ_PART ); break; default: MB_SET_ERR( MB_FAILURE, "Unexpected parallel read mode" ); } if( -2 != resolve_dim ) pa_vec.push_back( PA_RESOLVE_SHARED_ENTS ); if( -1 != ghost_dim ) pa_vec.push_back( PA_EXCHANGE_GHOSTS ); if( -2 != resolve_dim ) { pa_vec.push_back( PA_RESOLVE_SHARED_SETS ); if( -1 != ghost_dim && !skip_augment ) pa_vec.push_back( PA_AUGMENT_SETS_WITH_GHOSTS ); if( -1 != ghost_dim && correct_thin_ghosts ) pa_vec.push_back( PA_CORRECT_THIN_GHOSTS ); } if( print_parallel ) pa_vec.push_back( PA_PRINT_PARALLEL ); result = load_file( file_names, num_files, file_set, parallel_mode, partition_tag_name, partition_tag_vals, distrib, partition_by_rank, pa_vec, opts, subset_list, file_id_tag, reader_rank, cputime, resolve_dim, shared_dim, ghost_dim, bridge_dim, num_layers, addl_ents );MB_CHK_ERR( result ); if( parallel_mode == POPT_BCAST_DELETE && !is_reader ) opts.mark_all_seen(); return MB_SUCCESS; }
ErrorCode moab::ReadParallel::load_file | ( | const char ** | file_names, |
const int | num_files, | ||
const EntityHandle * | file_set, | ||
int | parallel_mode, | ||
std::string & | partition_tag_name, | ||
std::vector< int > & | partition_tag_vals, | ||
bool | distrib, | ||
bool | partition_by_rank, | ||
std::vector< int > & | pa_vec, | ||
const FileOptions & | opts, | ||
const ReaderIface::SubsetList * | subset_list, | ||
const Tag * | file_id_tag, | ||
const int | reader_rank, | ||
const bool | cputime, | ||
const int | resolve_dim, | ||
const int | shared_dim, | ||
const int | ghost_dim, | ||
const int | bridge_dim, | ||
const int | num_layers, | ||
const int | addl_ents | ||
) |
Definition at line 268 of file ReadParallel.cpp.
References moab::Interface::add_entities(), moab::ParallelComm::augment_default_sets_with_ghosts(), moab::Range::begin(), moab::ParallelComm::broadcast_entities(), moab::ParallelComm::check_global_ids(), moab::Range::clear(), moab::ParallelComm::correct_thin_ghost_layers(), moab::Interface::create_meshset(), create_partition_sets(), moab::debug, delete_nonlocal_entities(), moab::Range::empty(), entities, ErrorCode, moab::ParallelComm::exchange_ghost_cells(), moab::ParallelComm::filter_pstatus(), moab::Interface::get_entities_by_handle(), moab::Interface::get_entities_by_type(), moab::Core::get_entities_by_type_and_tag(), id_tag, moab::Range::insert(), moab::ParallelComm::list_entities(), moab::Interface::list_entities(), MB_CHK_SET_ERR, MB_NOT_IMPLEMENTED, MB_SET_ERR, MB_SUCCESS, MB_TAG_CREAT, MB_TAG_DENSE, MB_TAG_SPARSE, MB_TYPE_INTEGER, MB_TYPE_OPAQUE, MBENTITYSET, mbImpl, MBVERTEX, MESHSET_SET, MPI_COMM_WORLD, myDebug, myPcomm, moab::ReaderIface::SubsetList::num_parts, moab::ReaderIface::IDTag::num_tag_values, PA_AUGMENT_SETS_WITH_GHOSTS, PA_BROADCAST, PA_CHECK_GIDS_SERIAL, PA_CORRECT_THIN_GHOSTS, PA_CREATE_TRIVIAL_PARTITION, PA_DELETE_NONLOCAL, PA_EXCHANGE_GHOSTS, PA_GET_FILESET_ENTS, PA_PRINT_PARALLEL, PA_READ, PA_READ_PART, PA_RESOLVE_SHARED_ENTS, PA_RESOLVE_SHARED_SETS, ParallelActionsNames, moab::ReaderIface::SubsetList::part_number, moab::ParallelComm::partition_sets(), moab::ProcConfig::proc_comm(), moab::ParallelComm::proc_config(), moab::ProcConfig::proc_rank(), moab::ProcConfig::proc_size(), PSTATUS_NOT, PSTATUS_NOT_OWNED, rank, moab::ParallelComm::rank(), moab::ParallelComm::resolve_shared_ents(), moab::ParallelComm::resolve_shared_sets(), moab::Core::serial_load_file(), moab::Range::size(), moab::ParallelComm::size(), moab::Range::subset_by_dimension(), moab::Interface::tag_delete(), moab::Core::tag_get_handle(), moab::Interface::tag_get_handle(), moab::ReaderIface::SubsetList::tag_list, moab::ReaderIface::SubsetList::tag_list_length, moab::Interface::tag_set_data(), moab::ReaderIface::IDTag::tag_values, moab::DebugOutput::tprint(), and moab::DebugOutput::tprintf().
{ ErrorCode result = MB_SUCCESS; if( myPcomm == NULL ) myPcomm = new ParallelComm( mbImpl, MPI_COMM_WORLD ); Range entities; Tag file_set_tag = 0; int other_sets = 0; ReaderWriterSet::iterator iter; Range other_file_sets, file_sets; Core* impl = dynamic_cast< Core* >( mbImpl ); std::vector< double > act_times( pa_vec.size() + 1 ); std::vector< int >::iterator vit; int i, j; act_times[0] = MPI_Wtime(); // Make a new set for the parallel read EntityHandle file_set; if( !file_set_ptr || !( *file_set_ptr ) ) { result = mbImpl->create_meshset( MESHSET_SET, file_set );MB_CHK_SET_ERR( result, "Trouble creating file set" ); } else file_set = *file_set_ptr; bool i_read = false; Tag id_tag = 0; bool use_id_tag = false; Range ents; for( i = 1, vit = pa_vec.begin(); vit != pa_vec.end(); ++vit, i++ ) { ErrorCode tmp_result = MB_SUCCESS; switch( *vit ) { //================== case PA_READ: i_read = true; for( j = 0; j < num_files; j++ ) { myDebug.tprintf( 1, "Reading file: \"%s\"\n", file_names[j] ); EntityHandle new_file_set; result = mbImpl->create_meshset( MESHSET_SET, new_file_set );MB_CHK_SET_ERR( result, "Trouble creating file set" ); tmp_result = impl->serial_load_file( file_names[j], &new_file_set, opts, subset_list, file_id_tag ); if( MB_SUCCESS != tmp_result ) break; // Put the contents of each file set for the reader into the // file set for the parallel read assert( 0 != new_file_set ); Range all_ents; tmp_result = mbImpl->get_entities_by_handle( new_file_set, all_ents ); if( MB_SUCCESS != tmp_result ) break; all_ents.insert( new_file_set ); tmp_result = mbImpl->add_entities( file_set, all_ents ); if( MB_SUCCESS != tmp_result ) break; } if( MB_SUCCESS != tmp_result ) break; // Mark the file set for this parallel reader tmp_result = mbImpl->tag_get_handle( "__file_set", 1, MB_TYPE_INTEGER, file_set_tag, MB_TAG_SPARSE | MB_TAG_CREAT ); if( MB_SUCCESS != tmp_result ) break; tmp_result = mbImpl->tag_set_data( file_set_tag, &file_set, 1, &other_sets ); break; //================== case PA_READ_PART: { myDebug.tprintf( 1, "Reading file: \"%s\"\n", file_names[0] ); i_read = true; if( num_files != 1 ) { MB_SET_ERR( MB_NOT_IMPLEMENTED, "Multiple file read not supported for READ_PART" ); } // If we're going to resolve shared entities, then we need // to ask the file reader to populate a tag with unique ids // (typically file ids/indices/whatever.) if( std::find( pa_vec.begin(), pa_vec.end(), PA_RESOLVE_SHARED_ENTS ) != pa_vec.end() ) { use_id_tag = true; if( !file_id_tag ) { // This tag is really used for resolving shared entities with crystal router // In the end, this is an identifier that gets converted to long // In hdf5 file reader, we also convert from hdf5 file id type to long tmp_result = mbImpl->tag_get_handle( "", sizeof( long ), MB_TYPE_OPAQUE, id_tag, MB_TAG_DENSE | MB_TAG_CREAT ); if( MB_SUCCESS != tmp_result ) break; file_id_tag = &id_tag; } } ReaderIface::IDTag parts = { partition_tag_name.c_str(), 0, 0 }; ReaderIface::SubsetList sl; sl.num_parts = 0; int rank = myPcomm->rank(); if( partition_by_rank ) { assert( partition_tag_vals.empty() ); parts.tag_values = &rank; parts.num_tag_values = 1; } else { sl.num_parts = myPcomm->size(); sl.part_number = myPcomm->rank(); if( !partition_tag_vals.empty() ) { parts.tag_values = &partition_tag_vals[0]; parts.num_tag_values = partition_tag_vals.size(); } } std::vector< ReaderIface::IDTag > subset; if( subset_list ) { std::vector< ReaderIface::IDTag > tmplist( subset_list->tag_list, subset_list->tag_list + subset_list->tag_list_length ); tmplist.push_back( parts ); subset.swap( tmplist ); sl.tag_list = &subset[0]; sl.tag_list_length = subset.size(); } else { sl.tag_list = &parts; sl.tag_list_length = 1; } tmp_result = impl->serial_load_file( *file_names, &file_set, opts, &sl, file_id_tag ); if( MB_SUCCESS != tmp_result ) break; if( !partition_tag_name.empty() ) { Tag part_tag; tmp_result = impl->tag_get_handle( partition_tag_name.c_str(), 1, MB_TYPE_INTEGER, part_tag ); if( MB_SUCCESS != tmp_result ) break; tmp_result = impl->get_entities_by_type_and_tag( file_set, MBENTITYSET, &part_tag, 0, 1, myPcomm->partition_sets() ); } } break; //================== case PA_GET_FILESET_ENTS: myDebug.tprint( 1, "Getting fileset entities.\n" ); // Get entities in the file set, and add actual file set to it; // mark the file set to make sure any receiving procs know which it is tmp_result = mbImpl->get_entities_by_handle( file_set, entities ); if( MB_SUCCESS != tmp_result ) { entities.clear(); break; } // Add actual file set to entities too entities.insert( file_set ); break; //================== case PA_CREATE_TRIVIAL_PARTITION: { myDebug.tprint( 1, "create trivial partition, for higher dim entities.\n" ); // get high dim entities (2 or 3) Range hi_dim_ents = entities.subset_by_dimension( 3 ); if( hi_dim_ents.empty() ) hi_dim_ents = entities.subset_by_dimension( 2 ); if( hi_dim_ents.empty() ) hi_dim_ents = entities.subset_by_dimension( 1 ); if( hi_dim_ents.empty() ) MB_SET_ERR( MB_FAILURE, "there are no elements of dim 1-3" ); size_t num_hi_ents = hi_dim_ents.size(); unsigned int num_parts = myPcomm->size(); // create first the trivial partition tag int dum_id = -1; Tag ttag; // trivial tag tmp_result = mbImpl->tag_get_handle( partition_tag_name.c_str(), 1, MB_TYPE_INTEGER, ttag, MB_TAG_CREAT | MB_TAG_SPARSE, &dum_id );MB_CHK_SET_ERR( tmp_result, "Can't create trivial partition tag" ); // Compute the number of high dim entities on each part size_t nPartEnts = num_hi_ents / num_parts; // Number of extra entities after equal split over parts int iextra = num_hi_ents % num_parts; Range::iterator itr = hi_dim_ents.begin(); for( int k = 0; k < (int)num_parts; k++ ) { // create a mesh set, insert a subrange of entities EntityHandle part_set; tmp_result = mbImpl->create_meshset( MESHSET_SET, part_set );MB_CHK_SET_ERR( tmp_result, "Can't create part set" ); entities.insert( part_set ); tmp_result = mbImpl->tag_set_data( ttag, &part_set, 1, &k );MB_CHK_SET_ERR( tmp_result, "Can't set trivial partition tag" ); Range subrange; size_t num_ents_in_part = nPartEnts; if( i < iextra ) num_ents_in_part++; for( size_t i1 = 0; i1 < num_ents_in_part; i1++, itr++ ) subrange.insert( *itr ); tmp_result = mbImpl->add_entities( part_set, subrange );MB_CHK_SET_ERR( tmp_result, "Can't add entities to trivial part " << k ); myDebug.tprintf( 1, "create trivial part %d with %lu entities \n", k, num_ents_in_part ); tmp_result = mbImpl->add_entities( file_set, &part_set, 1 );MB_CHK_SET_ERR( tmp_result, "Can't add trivial part to file set " << k ); } } break; //================== case PA_BROADCAST: // Do the actual broadcast; if single-processor, ignore error myDebug.tprint( 1, "Broadcasting mesh.\n" ); if( myPcomm->proc_config().proc_size() > 1 ) { tmp_result = myPcomm->broadcast_entities( reader_rank, entities ); if( MB_SUCCESS != tmp_result ) break; } if( debug ) { std::cerr << "Bcast done; entities:" << std::endl; mbImpl->list_entities( 0, 0 ); } // Add the received entities to this fileset if I wasn't the reader if( !i_read && MB_SUCCESS == tmp_result ) tmp_result = mbImpl->add_entities( file_set, entities ); break; //================== case PA_DELETE_NONLOCAL: myDebug.tprint( 1, "Deleting nonlocal entities.\n" ); tmp_result = delete_nonlocal_entities( partition_tag_name, partition_tag_vals, distrib, file_set ); if( debug ) { std::cerr << "Delete nonlocal done; entities:" << std::endl; mbImpl->list_entities( 0, 0 ); } if( MB_SUCCESS == tmp_result ) tmp_result = create_partition_sets( partition_tag_name, file_set ); break; //================== case PA_CHECK_GIDS_SERIAL: myDebug.tprint( 1, "Checking global IDs.\n" ); tmp_result = myPcomm->check_global_ids( file_set, 0, 1, true, false ); break; //================== case PA_RESOLVE_SHARED_ENTS: myDebug.tprint( 1, "Resolving shared entities.\n" ); if( 1 == myPcomm->size() ) tmp_result = MB_SUCCESS; else tmp_result = myPcomm->resolve_shared_ents( file_set, resolve_dim, shared_dim, use_id_tag ? file_id_tag : 0 ); if( MB_SUCCESS != tmp_result ) break; #ifndef NDEBUG // check number of owned vertices through pcomm's public interface tmp_result = mbImpl->get_entities_by_type( 0, MBVERTEX, ents ); if( MB_SUCCESS == tmp_result ) tmp_result = myPcomm->filter_pstatus( ents, PSTATUS_NOT_OWNED, PSTATUS_NOT ); if( MB_SUCCESS == tmp_result ) myDebug.tprintf( 1, "Proc %u reports %lu owned vertices.\n", myPcomm->proc_config().proc_rank(), ents.size() ); #endif break; //================== case PA_EXCHANGE_GHOSTS: myDebug.tprint( 1, "Exchanging ghost entities.\n" ); tmp_result = myPcomm->exchange_ghost_cells( ghost_dim, bridge_dim, num_layers, addl_ents, true, true, &file_set ); break; //================== case PA_RESOLVE_SHARED_SETS: myDebug.tprint( 1, "Resolving shared sets.\n" ); if( 1 == myPcomm->size() ) tmp_result = MB_SUCCESS; else tmp_result = myPcomm->resolve_shared_sets( file_set, use_id_tag ? file_id_tag : 0 ); break; //================== case PA_AUGMENT_SETS_WITH_GHOSTS: myDebug.tprint( 1, "Augmenting sets with ghost entities.\n" ); if( 1 == myPcomm->size() ) tmp_result = MB_SUCCESS; else tmp_result = myPcomm->augment_default_sets_with_ghosts( file_set ); break; //================== case PA_CORRECT_THIN_GHOSTS: myDebug.tprint( 1, "correcting thin ghost layers.\n" ); if( 2 >= myPcomm->size() ) // it is a problem only for multi-shared entities tmp_result = MB_SUCCESS; else tmp_result = myPcomm->correct_thin_ghost_layers(); break; case PA_PRINT_PARALLEL: myDebug.tprint( 1, "Printing parallel information.\n" ); tmp_result = myPcomm->list_entities( 0, -1 ); break; //================== default: MB_SET_ERR( MB_FAILURE, "Unexpected parallel action" ); } // switch (*vit) if( MB_SUCCESS != tmp_result ) { MB_SET_ERR( MB_FAILURE, "Failed in step " << ParallelActionsNames[*vit] ); } if( cputime ) act_times[i] = MPI_Wtime(); } // for (i = 1, vit = pa_vec.begin(); vit != pa_vec.end(); ++vit, i++) if( use_id_tag ) { result = mbImpl->tag_delete( id_tag );MB_CHK_SET_ERR( result, "Trouble deleting id tag" ); } if( cputime ) { for( i = pa_vec.size(); i > 0; i-- ) act_times[i] -= act_times[i - 1]; // Replace initial time with overall time act_times[0] = MPI_Wtime() - act_times[0]; // Get the maximum over all procs if( 0 != myPcomm->proc_config().proc_rank() ) { MPI_Reduce( &act_times[0], 0, pa_vec.size() + 1, MPI_DOUBLE, MPI_MAX, 0, myPcomm->proc_config().proc_comm() ); } else { #if( MPI_VERSION >= 2 ) MPI_Reduce( MPI_IN_PLACE, &act_times[0], pa_vec.size() + 1, MPI_DOUBLE, MPI_MAX, 0, myPcomm->proc_config().proc_comm() ); #else // Note, extra comm-size allocation is required std::vector< double > act_times_tmp( pa_vec.size() + 1 ); MPI_Reduce( &act_times[0], &act_times_tmp[0], pa_vec.size() + 1, MPI_DOUBLE, MPI_MAX, 0, myPcomm->proc_config().proc_comm() ); act_times = act_times_tmp; // extra copy here too #endif std::cout << "Parallel Read times: " << std::endl; for( i = 1, vit = pa_vec.begin(); vit != pa_vec.end(); ++vit, i++ ) std::cout << " " << act_times[i] << " " << ParallelActionsNames[*vit] << std::endl; std::cout << " " << act_times[0] << " PARALLEL TOTAL" << std::endl; } } return MB_SUCCESS; }
Interface* moab::ReadParallel::mbImpl [private] |
Definition at line 107 of file ReadParallel.hpp.
Referenced by create_partition_sets(), delete_nonlocal_entities(), load_file(), and ReadParallel().
Error* moab::ReadParallel::mError [private] |
Definition at line 114 of file ReadParallel.hpp.
Referenced by ReadParallel().
DebugOutput moab::ReadParallel::myDebug [private] |
Definition at line 112 of file ReadParallel.hpp.
Referenced by delete_nonlocal_entities(), load_file(), and ReadParallel().
ParallelComm* moab::ReadParallel::myPcomm [private] |
Definition at line 110 of file ReadParallel.hpp.
Referenced by create_partition_sets(), delete_nonlocal_entities(), load_file(), and ReadParallel().
const char * moab::ReadParallel::ParallelActionsNames [static] |
{ "PARALLEL READ", "PARALLEL READ PART", "PARALLEL BROADCAST", "PARALLEL DELETE NONLOCAL", "PARALLEL CHECK_GIDS_SERIAL", "PARALLEL GET_FILESET_ENTS", "PARALLEL RESOLVE_SHARED_ENTS", "PARALLEL EXCHANGE_GHOSTS", "PARALLEL RESOLVE_SHARED_SETS", "PARALLEL_AUGMENT_SETS_WITH_GHOSTS", "PARALLEL PRINT_PARALLEL", "PARALLEL_CREATE_TRIVIAL_PARTITION", "PARALLEL_CORRECT_THIN_GHOST_LAYERS" }
Definition at line 83 of file ReadParallel.hpp.
Referenced by load_file().
const char * moab::ReadParallel::parallelOptsNames = { "NONE", "BCAST", "BCAST_DELETE", "READ_DELETE", "READ_PART", "", 0 } [static] |
Definition at line 64 of file ReadParallel.hpp.
Referenced by load_file().