MOAB: Mesh Oriented datABase
(version 5.2.1)
|
00001 #undef DEBUG 00002 #undef TIME_DEBUG 00003 00004 #include <stdarg.h> 00005 #include <time.h> 00006 #include <stdlib.h> 00007 00008 #include <string.h> 00009 #include <assert.h> 00010 00011 #include <vector> 00012 #include <set> 00013 #include <map> 00014 #include <utility> 00015 #include <iostream> 00016 #include <sstream> 00017 #include <string> 00018 00019 #include "moab/Interface.hpp" 00020 #include "Internals.hpp" 00021 #include "MBTagConventions.hpp" 00022 #include "MBParallelConventions.h" 00023 #include "moab/ParallelComm.hpp" 00024 #include "moab/CN.hpp" 00025 #include "moab/Range.hpp" 00026 #include "moab/CpuTimer.hpp" 00027 00028 #include "WriteHDF5Parallel.hpp" 00029 00030 #ifndef MOAB_HAVE_HDF5 00031 #error Attempt to compile WriteHDF5Parallel with HDF5 support disabled 00032 #endif 00033 00034 #include <H5Tpublic.h> 00035 #include <H5Ppublic.h> 00036 #include <H5FDmpi.h> 00037 #include <H5FDmpio.h> 00038 00039 #include "mhdf.h" 00040 00041 #include "IODebugTrack.hpp" 00042 #include "moab/FileOptions.hpp" 00043 00044 namespace 00045 { 00046 template < bool Condition > 00047 struct STATIC_ASSERTION; 00048 template <> 00049 struct STATIC_ASSERTION< true > 00050 { 00051 }; 00052 } // namespace 00053 00054 #define PP_CAT_( a, b ) a##b 00055 #define PP_CAT( a, b ) PP_CAT_( a, b ) 00056 #define STATIC_ASSERT( Condition ) \ 00057 enum \ 00058 { \ 00059 PP_CAT( dummy, __LINE__ ) = sizeof( ::STATIC_ASSERTION< (bool)( Condition ) > ) \ 00060 } 00061 00062 namespace moab 00063 { 00064 00065 #ifndef _WIN32 // problematic for windows 00066 // Need an MPI type that we can put handles in 00067 STATIC_ASSERT( sizeof( unsigned long ) >= sizeof( EntityHandle ) ); 00068 00069 // Need an MPI type that we can put file IDs in 00070 STATIC_ASSERT( sizeof( unsigned long ) >= sizeof( WriteHDF5::wid_t ) ); 00071 #endif 00072 00073 // This function doesn't do anything useful. It's just a nice 00074 // place to set a break point to determine why the reader fails. 00075 static inline ErrorCode error( ErrorCode rval ) 00076 { 00077 return rval; 00078 } 00079 00080 const char* mpi_err_str( int errorcode ) 00081 { 00082 static char buffer[2048]; 00083 int len = sizeof( buffer ); 00084 MPI_Error_string( errorcode, buffer, &len ); 00085 buffer[std::min( (size_t)len, sizeof( buffer ) - 1 )] = '\0'; 00086 return buffer; 00087 } 00088 00089 #define MPI_FAILURE_MSG( A ) \ 00090 "MPI Failure at " __FILE__ ":%d : (Code %d) %s\n", __LINE__, (int)( A ), mpi_err_str( ( A ) ) 00091 00092 #define CHECK_MPI( A ) \ 00093 do \ 00094 { \ 00095 if( MPI_SUCCESS != ( A ) ) \ 00096 { \ 00097 MB_SET_ERR_CONT( "MPI Failure : (Code " << (int)( A ) << ") " << mpi_err_str( ( A ) ) ); \ 00098 dbgOut.printf( 1, MPI_FAILURE_MSG( ( A ) ) ); \ 00099 return error( MB_FAILURE ); \ 00100 } \ 00101 } while( false ) 00102 00103 #define MB_FAILURE_MSG( A ) "MOAB_Failure at " __FILE__ ":%d : %s (%d)\n", __LINE__, ErrorCodeStr[( A )], (int)( A ) 00104 00105 #define CHECK_MB( A ) \ 00106 do \ 00107 { \ 00108 if( MB_SUCCESS != ( A ) ) \ 00109 { \ 00110 MB_SET_ERR_CONT( "MOAB Failure : " << ErrorCodeStr[( A )] ); \ 00111 dbgOut.printf( 1, MB_FAILURE_MSG( ( A ) ) ); \ 00112 return error( A ); \ 00113 } \ 00114 } while( false ) 00115 00116 #define HDF_FAILURE_MSG( A ) "MHDF Failure at " __FILE__ ":%d : %s\n", __LINE__, mhdf_message( &( A ) ) 00117 00118 #define CHECK_HDF( A ) \ 00119 do \ 00120 { \ 00121 if( mhdf_isError( &( A ) ) ) \ 00122 { \ 00123 MB_SET_ERR_CONT( "MHDF Failure : " << mhdf_message( &( A ) ) ); \ 00124 dbgOut.printf( 1, HDF_FAILURE_MSG( ( A ) ) ); \ 00125 return error( MB_FAILURE ); \ 00126 } \ 00127 } while( false ) 00128 00129 #define CHECK_HDFN( A ) \ 00130 do \ 00131 { \ 00132 if( mhdf_isError( &( A ) ) ) \ 00133 { \ 00134 MB_SET_ERR_CONT( "MHDF Failure : " << mhdf_message( &( A ) ) ); \ 00135 return error( MB_FAILURE ); \ 00136 } \ 00137 } while( false ) 00138 00139 #ifdef VALGRIND 00140 #include <valgrind/memcheck.h> 00141 00142 template < typename T > 00143 inline void VALGRIND_MAKE_VEC_UNDEFINED( std::vector< T >& v ) 00144 { 00145 if( v.size() ) {} 00146 (void)VALGRIND_MAKE_MEM_UNDEFINED( &v[0], v.size() * sizeof( T ) ); 00147 } 00148 00149 #else 00150 #ifndef VALGRIND_CHECK_MEM_IS_DEFINED 00151 #define VALGRIND_CHECK_MEM_IS_DEFINED( a, b ) ( (void)0 ) 00152 #endif 00153 #ifndef VALGRIND_CHECK_MEM_IS_ADDRESSABLE 00154 #define VALGRIND_CHECK_MEM_IS_ADDRESSABLE( a, b ) ( (void)0 ) 00155 #endif 00156 #ifndef VALGRIND_MAKE_MEM_UNDEFINED 00157 #define VALGRIND_MAKE_MEM_UNDEFINED( a, b ) ( (void)0 ) 00158 #endif 00159 00160 template < typename T > 00161 inline void VALGRIND_MAKE_VEC_UNDEFINED( std::vector< T >& ) 00162 { 00163 /* Nothing to do */ 00164 } 00165 00166 #endif 00167 00168 #ifndef NDEBUG 00169 #define START_SERIAL \ 00170 for( unsigned _x = 0; _x < myPcomm->proc_config().proc_size(); ++_x ) \ 00171 { \ 00172 MPI_Barrier( myPcomm->proc_config().proc_comm() ); \ 00173 if( _x != myPcomm->proc_config().proc_rank() ) continue 00174 #define END_SERIAL \ 00175 } \ 00176 MPI_Barrier( myPcomm->proc_config().proc_comm() ) 00177 #else 00178 #define START_SERIAL 00179 #define END_SERIAL 00180 #endif 00181 00182 static int my_Gatherv( void* sendbuf, int sendcount, MPI_Datatype sendtype, std::vector< unsigned char >& recvbuf, 00183 std::vector< int >& recvcounts, int root, MPI_Comm comm ) 00184 { 00185 int nproc, rank, bytes, err; 00186 MPI_Comm_size( comm, &nproc ); 00187 MPI_Comm_rank( comm, &rank ); 00188 MPI_Type_size( sendtype, &bytes ); 00189 00190 recvcounts.resize( rank == root ? nproc : 0 ); 00191 err = MPI_Gather( &sendcount, 1, MPI_INT, &recvcounts[0], 1, MPI_INT, root, comm ); 00192 if( MPI_SUCCESS != err ) return err; 00193 00194 std::vector< int > disp( recvcounts.size() ); 00195 if( root == rank ) 00196 { 00197 disp[0] = 0; 00198 for( int i = 1; i < nproc; ++i ) 00199 disp[i] = disp[i - 1] + recvcounts[i - 1]; 00200 recvbuf.resize( bytes * ( disp.back() + recvcounts.back() ) ); 00201 } 00202 00203 return MPI_Gatherv( sendbuf, sendcount, sendtype, &recvbuf[0], &recvcounts[0], &disp[0], sendtype, root, comm ); 00204 } 00205 00206 static void print_type_sets( Interface* iFace, DebugOutput* str, Range& sets ) 00207 { 00208 const unsigned VB = 2; 00209 if( str->get_verbosity() < VB ) return; 00210 00211 Tag gid, did, bid, sid, nid; 00212 gid = iFace->globalId_tag(); 00213 iFace->tag_get_handle( GEOM_DIMENSION_TAG_NAME, 1, MB_TYPE_INTEGER, did ); 00214 iFace->tag_get_handle( MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, bid ); 00215 iFace->tag_get_handle( DIRICHLET_SET_TAG_NAME, 1, MB_TYPE_INTEGER, nid ); 00216 iFace->tag_get_handle( NEUMANN_SET_TAG_NAME, 1, MB_TYPE_INTEGER, sid ); 00217 Range typesets[10]; 00218 const char* typenames[] = { 00219 "Block ", "Sideset ", "NodeSet", "Vertex", "Curve", "Surface", "Volume", "Body", "Other" 00220 }; 00221 for( Range::iterator riter = sets.begin(); riter != sets.end(); ++riter ) 00222 { 00223 unsigned dim, id; //, oldsize; 00224 if( MB_SUCCESS == iFace->tag_get_data( bid, &*riter, 1, &id ) ) 00225 dim = 0; 00226 else if( MB_SUCCESS == iFace->tag_get_data( sid, &*riter, 1, &id ) ) 00227 dim = 1; 00228 else if( MB_SUCCESS == iFace->tag_get_data( nid, &*riter, 1, &id ) ) 00229 dim = 2; 00230 else if( MB_SUCCESS == iFace->tag_get_data( did, &*riter, 1, &dim ) ) 00231 { 00232 id = 0; 00233 iFace->tag_get_data( gid, &*riter, 1, &id ); 00234 dim += 3; 00235 } 00236 else 00237 { 00238 id = *riter; 00239 dim = 9; 00240 } 00241 00242 // oldsize = typesets[dim].size(); 00243 typesets[dim].insert( id ); 00244 // assert(typesets[dim].size() - oldsize == 1); 00245 } 00246 for( int ii = 0; ii < 9; ++ii ) 00247 { 00248 char tmp[64]; 00249 sprintf( tmp, "%s (%lu) ", typenames[ii], (unsigned long)typesets[ii].size() ); 00250 str->print( VB, tmp, typesets[ii] ); 00251 } 00252 str->printf( VB, "Total: %lu\n", (unsigned long)sets.size() ); 00253 } 00254 00255 #define debug_barrier() debug_barrier_line( __LINE__ ) 00256 00257 void WriteHDF5Parallel::debug_barrier_line( int lineno ) 00258 { 00259 const unsigned threshold = 2; 00260 static unsigned long count = 0; 00261 if( dbgOut.get_verbosity() >= threshold && myPcomm ) 00262 { 00263 dbgOut.printf( threshold, "*********** Debug Barrier %lu (@%d)***********\n", ++count, lineno ); 00264 MPI_Barrier( myPcomm->proc_config().proc_comm() ); 00265 } 00266 } 00267 00268 WriterIface* WriteHDF5Parallel::factory( Interface* iface ) 00269 { 00270 return new WriteHDF5Parallel( iface ); 00271 } 00272 00273 WriteHDF5Parallel::WriteHDF5Parallel( Interface* iface ) 00274 : WriteHDF5( iface ), myPcomm( NULL ), pcommAllocated( false ), hslabOp( H5S_SELECT_OR ) 00275 { 00276 } 00277 00278 WriteHDF5Parallel::~WriteHDF5Parallel() 00279 { 00280 if( pcommAllocated && myPcomm ) delete myPcomm; 00281 } 00282 00283 // The parent WriteHDF5 class has ExportSet structs that are 00284 // populated with the entities to be written, grouped by type 00285 // (and for elements, connectivity length). This function: 00286 // o determines which entities are to be written by a remote processor 00287 // o removes those entities from the ExportSet structs in WriteMesh 00288 // o passes them back in a Range 00289 ErrorCode WriteHDF5Parallel::gather_interface_meshes( Range& nonowned ) 00290 { 00291 ErrorCode result; 00292 00293 // START_SERIAL; 00294 dbgOut.print( 3, "Pre-interface mesh:\n" ); 00295 dbgOut.print( 3, nodeSet.range ); 00296 for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) 00297 dbgOut.print( 3, eiter->range ); 00298 dbgOut.print( 3, setSet.range ); 00299 00300 // Move handles of non-owned entities from lists of entities 00301 // that this processor will write to the 'nonowned' list. 00302 00303 nonowned.clear(); 00304 result = myPcomm->filter_pstatus( nodeSet.range, PSTATUS_NOT_OWNED, PSTATUS_AND, -1, &nonowned ); 00305 if( MB_SUCCESS != result ) return error( result ); 00306 nodeSet.range = subtract( nodeSet.range, nonowned ); 00307 00308 for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) 00309 { 00310 Range tmpset; 00311 result = myPcomm->filter_pstatus( eiter->range, PSTATUS_NOT_OWNED, PSTATUS_AND, -1, &tmpset ); 00312 if( MB_SUCCESS != result ) return error( result ); 00313 eiter->range = subtract( eiter->range, tmpset ); 00314 nonowned.merge( tmpset ); 00315 } 00316 00317 dbgOut.print( 3, "Post-interface mesh:\n" ); 00318 dbgOut.print( 3, nodeSet.range ); 00319 for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) 00320 dbgOut.print( 3, eiter->range ); 00321 dbgOut.print( 3, setSet.range ); 00322 00323 // END_SERIAL; 00324 00325 return MB_SUCCESS; 00326 } 00327 00328 ErrorCode WriteHDF5Parallel::parallel_create_file( const char* filename, bool overwrite, 00329 const std::vector< std::string >& qa_records, 00330 const FileOptions& opts, const Tag* user_tag_list, 00331 int user_tag_count, int dimension, double* times ) 00332 { 00333 ErrorCode rval; 00334 mhdf_Status status; 00335 00336 int pcomm_no = 0; 00337 opts.get_int_option( "PARALLEL_COMM", pcomm_no ); 00338 00339 myPcomm = ParallelComm::get_pcomm( iFace, pcomm_no ); 00340 if( 0 == myPcomm ) 00341 { 00342 myPcomm = new ParallelComm( iFace, MPI_COMM_WORLD ); 00343 pcommAllocated = true; 00344 } 00345 00346 MPI_Info info = MPI_INFO_NULL; 00347 std::string cb_size; 00348 rval = opts.get_str_option( "CB_BUFFER_SIZE", cb_size ); 00349 if( MB_SUCCESS == rval ) 00350 { 00351 MPI_Info_create( &info ); 00352 MPI_Info_set( info, const_cast< char* >( "cb_buffer_size" ), const_cast< char* >( cb_size.c_str() ) ); 00353 } 00354 00355 dbgOut.set_rank( myPcomm->proc_config().proc_rank() ); 00356 dbgOut.limit_output_to_first_N_procs( 32 ); 00357 Range nonlocal; 00358 debug_barrier(); 00359 dbgOut.tprint( 1, "Gathering interface meshes\n" ); 00360 rval = gather_interface_meshes( nonlocal ); 00361 if( MB_SUCCESS != rval ) return error( rval ); 00362 00363 /**************** Get tag names for sets likely to be shared ***********/ 00364 // debug_barrier(); 00365 // dbgOut.tprint(1, "Getting shared entity sets\n"); 00366 // rval = get_sharedset_tags(); 00367 // if (MB_SUCCESS != rval) return error(rval); 00368 00369 /**************** Create actual file and write meta info ***************/ 00370 00371 debug_barrier(); 00372 if( myPcomm->proc_config().proc_rank() == 0 ) 00373 { 00374 dbgOut.tprintf( 1, "Creating file: %s\n", filename ); 00375 00376 // Create the file 00377 const char* type_names[MBMAXTYPE]; 00378 memset( type_names, 0, MBMAXTYPE * sizeof( char* ) ); 00379 for( EntityType i = MBEDGE; i < MBENTITYSET; ++i ) 00380 type_names[i] = CN::EntityTypeName( i ); 00381 00382 dbgOut.tprint( 1, "call mhdf_createFile\n" ); 00383 filePtr = mhdf_createFile( filename, overwrite, type_names, MBMAXTYPE, id_type, &status ); 00384 if( !filePtr ) { MB_SET_ERR( MB_FAILURE, mhdf_message( &status ) ); } 00385 00386 dbgOut.tprint( 1, "call write_qa\n" ); 00387 rval = write_qa( qa_records ); 00388 if( MB_SUCCESS != rval ) return error( rval ); 00389 } 00390 00391 /**************** Create node coordinate table ***************/ 00392 CpuTimer timer; 00393 debug_barrier(); 00394 dbgOut.tprint( 1, "creating node table\n" ); 00395 topState.start( "creating node table" ); 00396 rval = create_node_table( dimension ); 00397 topState.end( rval ); 00398 if( MB_SUCCESS != rval ) return error( rval ); 00399 if( times ) times[CREATE_NODE_TIME] = timer.time_elapsed(); 00400 00401 /**************** Create element tables ***************/ 00402 00403 debug_barrier(); 00404 dbgOut.tprint( 1, "negotiating element types\n" ); 00405 topState.start( "negotiating element types" ); 00406 rval = negotiate_type_list(); 00407 topState.end( rval ); 00408 if( MB_SUCCESS != rval ) return error( rval ); 00409 if( times ) times[NEGOTIATE_TYPES_TIME] = timer.time_elapsed(); 00410 dbgOut.tprint( 1, "creating element table\n" ); 00411 topState.start( "creating element tables" ); 00412 rval = create_element_tables(); 00413 topState.end( rval ); 00414 if( MB_SUCCESS != rval ) return error( rval ); 00415 if( times ) times[CREATE_ELEM_TIME] = timer.time_elapsed(); 00416 00417 /*************** Exchange file IDs *****************/ 00418 00419 debug_barrier(); 00420 dbgOut.tprint( 1, "communicating file ids\n" ); 00421 topState.start( "communicating file ids" ); 00422 rval = exchange_file_ids( nonlocal ); 00423 topState.end( rval ); 00424 if( MB_SUCCESS != rval ) return error( rval ); 00425 if( times ) times[FILEID_EXCHANGE_TIME] = timer.time_elapsed(); 00426 00427 /**************** Create meshset tables *********************/ 00428 00429 debug_barrier(); 00430 dbgOut.tprint( 1, "creating meshset table\n" ); 00431 topState.start( "creating meshset tables" ); 00432 rval = create_meshset_tables( times ); 00433 topState.end( rval ); 00434 if( MB_SUCCESS != rval ) return error( rval ); 00435 if( times ) times[CREATE_SET_TIME] = timer.time_elapsed(); 00436 00437 /**************** Create adjacency tables *********************/ 00438 00439 debug_barrier(); 00440 dbgOut.tprint( 1, "creating adjacency table\n" ); 00441 topState.start( "creating adjacency tables" ); 00442 rval = create_adjacency_tables(); 00443 topState.end( rval ); 00444 if( MB_SUCCESS != rval ) return error( rval ); 00445 if( times ) times[CREATE_ADJ_TIME] = timer.time_elapsed(); 00446 00447 /**************** Create tag data *********************/ 00448 00449 debug_barrier(); 00450 dbgOut.tprint( 1, "creating tag tables\n" ); 00451 topState.start( "creating tag tables" ); 00452 rval = gather_tags( user_tag_list, user_tag_count ); 00453 if( MB_SUCCESS != rval ) return error( rval ); 00454 rval = create_tag_tables(); 00455 topState.end( rval ); 00456 if( MB_SUCCESS != rval ) return error( rval ); 00457 if( times ) times[CREATE_TAG_TIME] = timer.time_elapsed(); 00458 00459 /************** Close serial file and reopen parallel *****************/ 00460 00461 if( 0 == myPcomm->proc_config().proc_rank() ) mhdf_closeFile( filePtr, &status ); 00462 00463 MPI_Barrier( myPcomm->proc_config().proc_comm() ); 00464 dbgOut.tprint( 1, "(re)opening file in parallel mode\n" ); 00465 unsigned long junk; 00466 hid_t hdf_opt = H5Pcreate( H5P_FILE_ACCESS ); 00467 H5Pset_fapl_mpio( hdf_opt, myPcomm->proc_config().proc_comm(), info ); 00468 filePtr = mhdf_openFileWithOpt( filename, 1, &junk, id_type, hdf_opt, &status ); 00469 H5Pclose( hdf_opt ); 00470 if( !filePtr ) { MB_SET_ERR( MB_FAILURE, mhdf_message( &status ) ); } 00471 00472 if( collectiveIO ) 00473 { 00474 dbgOut.print( 1, "USING COLLECTIVE IO\n" ); 00475 writeProp = H5Pcreate( H5P_DATASET_XFER ); 00476 H5Pset_dxpl_mpio( writeProp, H5FD_MPIO_COLLECTIVE ); 00477 } 00478 00479 /* Test if we can use H5S_APPEND when selecting hyperslabs */ 00480 if( MB_SUCCESS != opts.get_null_option( "HYPERSLAB_OR" ) && 00481 ( MB_SUCCESS == opts.get_null_option( "HYPERSLAB_APPEND" ) || HDF5_can_append_hyperslabs() ) ) 00482 { 00483 dbgOut.print( 1, "HDF5 library supports H5Sselect_hyperlsab with H5S_SELECT_APPEND\n" ); 00484 hslabOp = H5S_SELECT_APPEND; 00485 } 00486 00487 dbgOut.tprint( 1, "Exiting parallel_create_file\n" ); 00488 return MB_SUCCESS; 00489 } 00490 00491 class TagNameCompare 00492 { 00493 Interface* iFace; 00494 std::string name1, name2; 00495 00496 public: 00497 TagNameCompare( Interface* iface ) : iFace( iface ) {} 00498 bool operator()( const WriteHDF5::TagDesc& t1, const WriteHDF5::TagDesc& t2 ); 00499 }; 00500 00501 bool TagNameCompare::operator()( const WriteHDF5::TagDesc& t1, const WriteHDF5::TagDesc& t2 ) 00502 { 00503 iFace->tag_get_name( t1.tag_id, name1 ); 00504 iFace->tag_get_name( t2.tag_id, name2 ); 00505 return name1 < name2; 00506 } 00507 00508 struct serial_tag_data 00509 { 00510 TagType storage; 00511 DataType type; 00512 int size; 00513 int name_len; 00514 int def_val_len; 00515 char name[sizeof( unsigned long )]; 00516 00517 static size_t pad( size_t len ) 00518 { 00519 if( len % sizeof( unsigned long ) ) 00520 return len + sizeof( unsigned long ) - len % sizeof( unsigned long ); 00521 else 00522 return len; 00523 } 00524 00525 static size_t def_val_bytes( int def_val_len, DataType type ) 00526 { 00527 switch( type ) 00528 { 00529 case MB_TYPE_BIT: 00530 return def_val_len ? 1 : 0; 00531 case MB_TYPE_OPAQUE: 00532 return def_val_len; 00533 case MB_TYPE_INTEGER: 00534 return def_val_len * sizeof( int ); 00535 case MB_TYPE_DOUBLE: 00536 return def_val_len * sizeof( double ); 00537 case MB_TYPE_HANDLE: 00538 return def_val_len * sizeof( EntityHandle ); 00539 } 00540 return 0; 00541 } 00542 00543 static size_t len( int name_len, int def_val_len, DataType type ) 00544 { 00545 return sizeof( serial_tag_data ) + pad( name_len + def_val_bytes( def_val_len, type ) ) - 00546 sizeof( unsigned long ); 00547 } 00548 size_t len() const 00549 { 00550 return len( name_len, def_val_len, type ); 00551 } 00552 void* default_value() 00553 { 00554 return def_val_len ? name + name_len : 0; 00555 } 00556 const void* default_value() const 00557 { 00558 return const_cast< serial_tag_data* >( this )->default_value(); 00559 } 00560 void set_default_value( const void* val ) 00561 { 00562 memcpy( default_value(), val, def_val_bytes( def_val_len, type ) ); 00563 } 00564 }; 00565 00566 ErrorCode WriteHDF5Parallel::append_serial_tag_data( std::vector< unsigned char >& buffer, 00567 const WriteHDF5::TagDesc& tag ) 00568 { 00569 ErrorCode rval; 00570 00571 std::string name; 00572 rval = iFace->tag_get_name( tag.tag_id, name ); 00573 if( MB_SUCCESS != rval ) return error( rval ); 00574 00575 // Get name length, including space for null char 00576 size_t name_len = name.size() + 1; 00577 if( name_len == 1 ) return MB_SUCCESS; // Skip tags with no name 00578 00579 DataType data_type; 00580 rval = iFace->tag_get_data_type( tag.tag_id, data_type ); 00581 if( MB_SUCCESS != rval ) return error( rval ); 00582 00583 // Get default value 00584 int def_val_len; 00585 const void* def_val; 00586 if( MB_SUCCESS != iFace->tag_get_default_value( tag.tag_id, def_val, def_val_len ) ) 00587 { 00588 def_val_len = 0; 00589 def_val = 0; 00590 } 00591 00592 // Allocate struct within buffer 00593 size_t init_size = buffer.size(); 00594 buffer.resize( init_size + serial_tag_data::len( name_len, def_val_len, data_type ) ); 00595 serial_tag_data* ptr = reinterpret_cast< serial_tag_data* >( &buffer[init_size] ); 00596 00597 // Populate struct 00598 rval = iFace->tag_get_type( tag.tag_id, ptr->storage ); 00599 if( MB_SUCCESS != rval ) return error( rval ); 00600 ptr->type = data_type; 00601 rval = iFace->tag_get_length( tag.tag_id, ptr->size ); 00602 if( MB_VARIABLE_DATA_LENGTH == rval ) 00603 ptr->size = MB_VARIABLE_LENGTH; 00604 else if( MB_SUCCESS != rval ) 00605 return error( rval ); 00606 ptr->name_len = name_len; 00607 Range range; 00608 memset( ptr->name, 0, ptr->name_len ); 00609 memcpy( ptr->name, name.data(), name.size() ); 00610 ptr->def_val_len = def_val_len; 00611 ptr->set_default_value( def_val ); 00612 00613 return MB_SUCCESS; 00614 } 00615 00616 ErrorCode WriteHDF5Parallel::check_serial_tag_data( const std::vector< unsigned char >& buffer, 00617 std::vector< TagDesc* >* missing, std::vector< TagDesc* >* newlist ) 00618 { 00619 ErrorCode rval; 00620 00621 // Use 'write_sparse' field as a 'visited' mark 00622 std::list< TagDesc >::iterator tag_iter; 00623 if( missing ) 00624 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) 00625 tag_iter->write_sparse = true; 00626 00627 // Use a set as a temporary for what will ultimately go in 00628 // newlist because we need to pass back newlist in the order 00629 // of the tagList member. 00630 std::set< TagDesc* > newset; 00631 00632 // Iterate over data from, updating the local list of tags. 00633 // Be careful to keep tagList sorted such that in the end all 00634 // procs have the same list in the same order. 00635 std::vector< unsigned char >::const_iterator diter = buffer.begin(); 00636 tag_iter = tagList.begin(); 00637 while( diter < buffer.end() ) 00638 { 00639 // Get struct from buffer 00640 const serial_tag_data* ptr = reinterpret_cast< const serial_tag_data* >( &*diter ); 00641 00642 // Find local struct for tag 00643 std::string name( ptr->name ); 00644 std::string n; 00645 iFace->tag_get_name( tag_iter->tag_id, n ); // Second time we've called, so shouldn't fail 00646 if( n > name ) 00647 { 00648 tag_iter = tagList.begin(); // New proc, start search from beginning 00649 } 00650 iFace->tag_get_name( tag_iter->tag_id, n ); 00651 while( n < name ) 00652 { 00653 ++tag_iter; 00654 if( tag_iter == tagList.end() ) break; 00655 iFace->tag_get_name( tag_iter->tag_id, n ); 00656 } 00657 if( tag_iter == tagList.end() || n != name ) 00658 { // New tag 00659 TagDesc newtag; 00660 00661 if( ptr->size == MB_VARIABLE_LENGTH ) 00662 rval = iFace->tag_get_handle( name.c_str(), ptr->def_val_len, ptr->type, newtag.tag_id, 00663 MB_TAG_VARLEN | MB_TAG_CREAT | ptr->storage, ptr->default_value() ); 00664 else 00665 rval = iFace->tag_get_handle( name.c_str(), ptr->size, ptr->type, newtag.tag_id, 00666 MB_TAG_CREAT | ptr->storage, ptr->default_value() ); 00667 if( MB_SUCCESS != rval ) return error( rval ); 00668 00669 newtag.sparse_offset = 0; 00670 newtag.var_data_offset = 0; 00671 newtag.write_sparse = false; 00672 newtag.max_num_ents = 0; 00673 newtag.max_num_vals = 0; 00674 00675 tag_iter = tagList.insert( tag_iter, newtag ); 00676 if( newlist ) newset.insert( &*tag_iter ); 00677 } 00678 else 00679 { // Check that tag is as expected 00680 DataType type; 00681 iFace->tag_get_data_type( tag_iter->tag_id, type ); 00682 if( type != ptr->type ) 00683 { MB_SET_ERR( MB_FAILURE, "Processes have inconsistent data type for tag \"" << name << "\"" ); } 00684 int size; 00685 iFace->tag_get_length( tag_iter->tag_id, size ); 00686 if( size != ptr->size ) 00687 { MB_SET_ERR( MB_FAILURE, "Processes have inconsistent size for tag \"" << name << "\"" ); } 00688 tag_iter->write_sparse = false; 00689 } 00690 00691 // Step to next variable-length struct. 00692 diter += ptr->len(); 00693 } 00694 00695 // Now pass back any local tags that weren't in the buffer 00696 if( missing ) 00697 { 00698 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) 00699 { 00700 if( tag_iter->write_sparse ) 00701 { 00702 tag_iter->write_sparse = false; 00703 missing->push_back( &*tag_iter ); 00704 } 00705 } 00706 } 00707 00708 // Be careful to populate newlist in the same, sorted, order as tagList 00709 if( newlist ) 00710 { 00711 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) 00712 if( newset.find( &*tag_iter ) != newset.end() ) newlist->push_back( &*tag_iter ); 00713 } 00714 00715 return MB_SUCCESS; 00716 } 00717 00718 static void set_bit( int position, unsigned char* bytes ) 00719 { 00720 int byte = position / 8; 00721 int bit = position % 8; 00722 bytes[byte] |= ( ( (unsigned char)1 ) << bit ); 00723 } 00724 00725 static bool get_bit( int position, const unsigned char* bytes ) 00726 { 00727 int byte = position / 8; 00728 int bit = position % 8; 00729 return 0 != ( bytes[byte] & ( ( (unsigned char)1 ) << bit ) ); 00730 } 00731 00732 ErrorCode WriteHDF5Parallel::create_tag_tables() 00733 { 00734 std::list< TagDesc >::iterator tag_iter; 00735 ErrorCode rval; 00736 int err; 00737 const int rank = myPcomm->proc_config().proc_rank(); 00738 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 00739 00740 subState.start( "negotiating tag list" ); 00741 00742 dbgOut.tprint( 1, "communicating tag metadata\n" ); 00743 00744 dbgOut.printf( 2, "Exchanging tag data for %d tags.\n", (int)tagList.size() ); 00745 00746 // Sort tagList contents in alphabetical order by tag name 00747 tagList.sort( TagNameCompare( iFace ) ); 00748 00749 // Negotiate total list of tags to write 00750 00751 // Build concatenated list of all tag data 00752 std::vector< unsigned char > tag_buffer; 00753 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) 00754 { 00755 rval = append_serial_tag_data( tag_buffer, *tag_iter ); 00756 CHECK_MB( rval ); 00757 } 00758 00759 // Broadcast list from root to all other procs 00760 unsigned long size = tag_buffer.size(); 00761 err = MPI_Bcast( &size, 1, MPI_UNSIGNED_LONG, 0, comm ); 00762 CHECK_MPI( err ); 00763 tag_buffer.resize( size ); 00764 err = MPI_Bcast( &tag_buffer[0], size, MPI_UNSIGNED_CHAR, 0, comm ); 00765 CHECK_MPI( err ); 00766 00767 // Update local tag list 00768 std::vector< TagDesc* > missing; 00769 rval = check_serial_tag_data( tag_buffer, &missing, 0 ); 00770 CHECK_MB( rval ); 00771 00772 // Check if we're done (0->done, 1->more, 2+->error) 00773 int code, lcode = ( MB_SUCCESS != rval ) ? rval + 2 : missing.empty() ? 0 : 1; 00774 err = MPI_Allreduce( &lcode, &code, 1, MPI_INT, MPI_MAX, comm ); 00775 CHECK_MPI( err ); 00776 if( code > 1 ) 00777 { 00778 MB_SET_ERR_CONT( "Inconsistent tag definitions between procs" ); 00779 return error( ( ErrorCode )( code - 2 ) ); 00780 } 00781 00782 // If not done... 00783 if( code ) 00784 { 00785 dbgOut.print( 1, "Not all procs had same tag definitions, negotiating...\n" ); 00786 00787 // Get tags defined on this proc but not on root proc 00788 tag_buffer.clear(); 00789 for( size_t i = 0; i < missing.size(); ++i ) 00790 { 00791 rval = append_serial_tag_data( tag_buffer, *missing[i] ); 00792 CHECK_MB( rval ); 00793 } 00794 00795 // Gather extra tag definitions on root processor 00796 std::vector< int > junk; // don't care how many from each proc 00797 assert( rank || tag_buffer.empty() ); // must be empty on root 00798 err = my_Gatherv( &tag_buffer[0], tag_buffer.size(), MPI_UNSIGNED_CHAR, tag_buffer, junk, 0, comm ); 00799 CHECK_MPI( err ); 00800 00801 // Process serialized tag descriptions on root, and 00802 rval = MB_SUCCESS; 00803 if( 0 == rank ) 00804 { 00805 // Process serialized tag descriptions on root, and 00806 std::vector< TagDesc* > newlist; 00807 rval = check_serial_tag_data( tag_buffer, 0, &newlist ); 00808 tag_buffer.clear(); 00809 // re-serialize a unique list of new tag definitions 00810 for( size_t i = 0; MB_SUCCESS == rval && i != newlist.size(); ++i ) 00811 { 00812 rval = append_serial_tag_data( tag_buffer, *newlist[i] ); 00813 CHECK_MB( rval ); 00814 } 00815 } 00816 00817 // Broadcast any new tag definitions from root to other procs 00818 long this_size = tag_buffer.size(); 00819 if( MB_SUCCESS != rval ) this_size = -rval; 00820 err = MPI_Bcast( &this_size, 1, MPI_LONG, 0, comm ); 00821 CHECK_MPI( err ); 00822 if( this_size < 0 ) 00823 { 00824 MB_SET_ERR_CONT( "Inconsistent tag definitions between procs" ); 00825 return error( (ErrorCode)-this_size ); 00826 } 00827 tag_buffer.resize( this_size ); 00828 err = MPI_Bcast( &tag_buffer[0], this_size, MPI_UNSIGNED_CHAR, 0, comm ); 00829 CHECK_MPI( err ); 00830 00831 // Process new tag definitions 00832 rval = check_serial_tag_data( tag_buffer, 0, 0 ); 00833 CHECK_MB( rval ); 00834 } 00835 00836 subState.end(); 00837 subState.start( "negotiate which element/tag combinations are dense" ); 00838 00839 // Figure out for which tag/element combinations we can 00840 // write dense tag data. 00841 00842 // Construct a table of bits, 00843 // where each row of the table corresponds to a tag 00844 // and each column to an element group. 00845 00846 // Two extra, because first is nodes and last is sets. 00847 // (n+7)/8 is ceil(n/8) 00848 const int bytes_per_tag = ( exportList.size() + 9 ) / 8; 00849 std::vector< unsigned char > data( bytes_per_tag * tagList.size(), 0 ); 00850 std::vector< unsigned char > recv( data.size(), 0 ); 00851 unsigned char* iter = &data[0]; 00852 if( writeTagDense && !data.empty() ) 00853 { 00854 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter, iter += bytes_per_tag ) 00855 { 00856 00857 Range tagged; 00858 rval = get_sparse_tagged_entities( *tag_iter, tagged ); 00859 CHECK_MB( rval ); 00860 00861 int s; 00862 if( MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s ) ) continue; 00863 00864 std::string n; 00865 iFace->tag_get_name( tag_iter->tag_id, 00866 n ); // Second time we've called, so shouldn't fail 00867 00868 // Check if we want to write this tag in dense format even if not 00869 // all of the entities have a tag value. The criterion of this 00870 // is that the tag be dense, have a default value, and have at 00871 // least 2/3 of the entities tagged. 00872 bool prefer_dense = false; 00873 TagType type; 00874 rval = iFace->tag_get_type( tag_iter->tag_id, type ); 00875 CHECK_MB( rval ); 00876 if( MB_TAG_DENSE == type ) 00877 { 00878 const void* defval = 0; 00879 rval = iFace->tag_get_default_value( tag_iter->tag_id, defval, s ); 00880 if( MB_SUCCESS == rval ) prefer_dense = true; 00881 } 00882 00883 int i = 0; 00884 if( check_dense_format_tag( nodeSet, tagged, prefer_dense ) ) 00885 { 00886 set_bit( i, iter ); 00887 dbgOut.printf( 2, "Can write dense data for \"%s\"/Nodes\n", n.c_str() ); 00888 } 00889 std::list< ExportSet >::const_iterator ex_iter = exportList.begin(); 00890 for( ++i; ex_iter != exportList.end(); ++i, ++ex_iter ) 00891 { 00892 // when writing in parallel, on some partitions, some of these element ranges might 00893 // be empty so do not turn this tag as sparse, just because of that, leave it dense, 00894 // if we prefer dense 00895 if( ( prefer_dense && ex_iter->range.empty() ) || 00896 check_dense_format_tag( *ex_iter, tagged, prefer_dense ) ) 00897 { 00898 set_bit( i, iter ); 00899 dbgOut.printf( 2, "Can write dense data for \"%s\"/%s\n", n.c_str(), ex_iter->name() ); 00900 } 00901 } 00902 if( check_dense_format_tag( setSet, tagged, prefer_dense ) ) 00903 { 00904 set_bit( i, iter ); 00905 dbgOut.printf( 2, "Can write dense data for \"%s\"/Sets\n", n.c_str() ); 00906 } 00907 } 00908 00909 // Do bit-wise AND of list over all processors (only write dense format 00910 // if all proccesses want dense format for this group of entities). 00911 err = MPI_Allreduce( &data[0], &recv[0], data.size(), MPI_UNSIGNED_CHAR, MPI_BAND, 00912 myPcomm->proc_config().proc_comm() ); 00913 CHECK_MPI( err ); 00914 } // if (writeTagDense) 00915 00916 // Store initial counts for sparse-formatted tag data. 00917 // The total number of values to send and receive will be the number of 00918 // tags plus the number of var-len tags because we need to negotiate 00919 // offsets into two different tables for the var-len tags. 00920 std::vector< long > counts; 00921 00922 // Record dense tag/element combinations 00923 iter = &recv[0]; 00924 const unsigned char* iter2 = &data[0]; 00925 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); 00926 ++tag_iter, iter += bytes_per_tag, iter2 += bytes_per_tag ) 00927 { 00928 00929 Range tagged; 00930 rval = get_sparse_tagged_entities( *tag_iter, tagged ); 00931 CHECK_MB( rval ); 00932 00933 std::string n; 00934 iFace->tag_get_name( tag_iter->tag_id, n ); // Second time we've called, so shouldn't fail 00935 00936 int i = 0; 00937 if( get_bit( i, iter ) ) 00938 { 00939 assert( get_bit( i, iter2 ) ); 00940 tag_iter->dense_list.push_back( nodeSet ); 00941 tagged -= nodeSet.range; 00942 dbgOut.printf( 2, "Will write dense data for \"%s\"/Nodes\n", n.c_str() ); 00943 } 00944 std::list< ExportSet >::const_iterator ex_iter = exportList.begin(); 00945 for( ++i; ex_iter != exportList.end(); ++i, ++ex_iter ) 00946 { 00947 if( get_bit( i, iter ) ) 00948 { 00949 assert( get_bit( i, iter2 ) ); 00950 tag_iter->dense_list.push_back( *ex_iter ); 00951 dbgOut.printf( 2, "WIll write dense data for \"%s\"/%s\n", n.c_str(), ex_iter->name() ); 00952 tagged -= ex_iter->range; 00953 } 00954 } 00955 if( get_bit( i, iter ) ) 00956 { 00957 assert( get_bit( i, iter2 ) ); 00958 tag_iter->dense_list.push_back( setSet ); 00959 dbgOut.printf( 2, "Will write dense data for \"%s\"/Sets\n", n.c_str() ); 00960 tagged -= setSet.range; 00961 } 00962 00963 counts.push_back( tagged.size() ); 00964 00965 int s; 00966 if( MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s ) ) 00967 { 00968 unsigned long data_len; 00969 rval = get_tag_data_length( *tag_iter, tagged, data_len ); 00970 CHECK_MB( rval ); 00971 counts.push_back( data_len ); 00972 } 00973 } 00974 00975 subState.end(); 00976 subState.start( "Negotiate offsets for sparse tag info" ); 00977 00978 std::vector< long > offsets( counts.size() ), maxima( counts.size() ), totals( counts.size() ); 00979 rval = create_dataset( counts.size(), &counts[0], &offsets[0], &maxima[0], &totals[0] ); 00980 CHECK_MB( rval ); 00981 00982 // Copy values into local structs and if root then create tables 00983 size_t idx = 0; 00984 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter, ++idx ) 00985 { 00986 assert( idx < counts.size() ); 00987 tag_iter->sparse_offset = offsets[idx]; 00988 tag_iter->max_num_ents = maxima[idx]; 00989 tag_iter->write_sparse = ( 0 != totals[idx] ); 00990 int s; 00991 if( MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s ) ) 00992 { 00993 ++idx; 00994 assert( idx < counts.size() ); 00995 tag_iter->var_data_offset = offsets[idx]; 00996 tag_iter->max_num_vals = maxima[idx]; 00997 } 00998 else 00999 { 01000 tag_iter->var_data_offset = 0; 01001 tag_iter->max_num_vals = 0; 01002 } 01003 } 01004 01005 subState.end(); 01006 01007 // Create tag tables on root process 01008 if( 0 == myPcomm->proc_config().proc_rank() ) 01009 { 01010 size_t iidx = 0; 01011 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter, ++iidx ) 01012 { 01013 assert( iidx < totals.size() ); 01014 unsigned long num_ents = totals[iidx]; 01015 unsigned long num_val = 0; 01016 int s; 01017 if( MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s ) ) 01018 { 01019 ++iidx; 01020 assert( iidx < totals.size() ); 01021 num_val = totals[iidx]; 01022 } 01023 dbgOut.printf( 2, "Writing tag description for tag 0x%lx with %lu values\n", 01024 (unsigned long)tag_iter->tag_id, num_val ? num_val : num_ents ); 01025 01026 rval = create_tag( *tag_iter, num_ents, num_val ); 01027 if( MB_SUCCESS != rval ) return error( rval ); 01028 } 01029 } 01030 01031 if( dbgOut.get_verbosity() > 1 ) 01032 { 01033 dbgOut.printf( 2, "Tags: %12s %8s %8s %8s %8s %8s\n", "Name", "Count", "Offset", "Var Off", "Max Ent", 01034 "Handle" ); 01035 01036 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) 01037 { 01038 std::string name; 01039 iFace->tag_get_name( tag_iter->tag_id, name ); 01040 size_t this_size; 01041 get_num_sparse_tagged_entities( *tag_iter, this_size ); 01042 dbgOut.printf( 2, "%18s %8lu %8lu %8lu %8lu 0x%7lx\n", name.c_str(), (unsigned long)this_size, 01043 (unsigned long)tag_iter->sparse_offset, (unsigned long)tag_iter->var_data_offset, 01044 (unsigned long)tag_iter->max_num_ents, (unsigned long)tag_iter->tag_id ); 01045 } 01046 } 01047 01048 return MB_SUCCESS; 01049 } 01050 01051 struct DatasetVals 01052 { 01053 long start_id; 01054 long max_count; 01055 long total; 01056 }; 01057 STATIC_ASSERT( ( sizeof( DatasetVals ) == 3 * sizeof( long ) ) ); 01058 01059 ErrorCode WriteHDF5Parallel::create_dataset( int num_datasets, const long* num_owned, long* offsets_out, 01060 long* max_proc_entities, long* total_entities, 01061 const DataSetCreator& creator, ExportSet* groups[], wid_t* first_ids_out ) 01062 { 01063 int result; 01064 ErrorCode rval; 01065 const unsigned rank = myPcomm->proc_config().proc_rank(); 01066 const unsigned nproc = myPcomm->proc_config().proc_size(); 01067 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 01068 01069 // Gather entity counts for each processor on root 01070 std::vector< long > counts( rank ? 0 : nproc * num_datasets ); 01071 (void)VALGRIND_CHECK_MEM_IS_DEFINED( &num_owned, sizeof( long ) ); 01072 result = MPI_Gather( const_cast< long* >( num_owned ), num_datasets, MPI_LONG, &counts[0], num_datasets, MPI_LONG, 01073 0, comm ); 01074 CHECK_MPI( result ); 01075 01076 // Create node data in file 01077 DatasetVals zero_val = { 0, 0, 0 }; 01078 std::vector< DatasetVals > cumulative( num_datasets, zero_val ); 01079 if( rank == 0 ) 01080 { 01081 for( unsigned i = 0; i < nproc; i++ ) 01082 { 01083 const long* proc_data = &counts[i * num_datasets]; 01084 for( int index = 0; index < num_datasets; ++index ) 01085 { 01086 cumulative[index].total += proc_data[index]; 01087 if( proc_data[index] > cumulative[index].max_count ) cumulative[index].max_count = proc_data[index]; 01088 } 01089 } 01090 01091 for( int index = 0; index < num_datasets; ++index ) 01092 { 01093 if( cumulative[index].total ) 01094 { 01095 rval = creator( this, cumulative[index].total, groups ? groups[index] : 0, cumulative[index].start_id ); 01096 CHECK_MB( rval ); 01097 } 01098 else 01099 { 01100 cumulative[index].start_id = -1; 01101 } 01102 } 01103 } 01104 01105 // Send id offset to every proc 01106 result = MPI_Bcast( (void*)&cumulative[0], 3 * num_datasets, MPI_LONG, 0, comm ); 01107 CHECK_MPI( result ); 01108 for( int index = 0; index < num_datasets; ++index ) 01109 { 01110 if( first_ids_out ) first_ids_out[index] = (wid_t)cumulative[index].start_id; 01111 max_proc_entities[index] = cumulative[index].max_count; 01112 total_entities[index] = cumulative[index].total; 01113 } 01114 01115 // Convert array of per-process counts to per-process offsets 01116 if( rank == 0 ) 01117 { 01118 // Initialize prev_size with data sizes for root process 01119 std::vector< long > prev_size( counts.begin(), counts.begin() + num_datasets ); 01120 // Root process gets offset zero 01121 std::fill( counts.begin(), counts.begin() + num_datasets, 0L ); 01122 // For each proc other than this one (root) 01123 for( unsigned i = 1; i < nproc; ++i ) 01124 { 01125 // Get pointer to offsets for previous process in list 01126 long* prev_data = &counts[( i - 1 ) * num_datasets]; 01127 // Get pointer to offsets for this process in list 01128 long* proc_data = &counts[i * num_datasets]; 01129 // For each data set 01130 for( int j = 0; j < num_datasets; ++j ) 01131 { 01132 // Get size of data in dataset from process i 01133 long mysize = proc_data[j]; 01134 // Offset for process i is offset of previous process plus 01135 // number of values previous process will write 01136 proc_data[j] = prev_data[j] + prev_size[j]; 01137 // Store my size, as it is no longer available in 'counts' 01138 prev_size[j] = mysize; 01139 } 01140 } 01141 } 01142 01143 // Send each proc it's offset in the table 01144 if( rank == 0 ) { (void)VALGRIND_CHECK_MEM_IS_DEFINED( &counts[0], num_datasets * nproc * sizeof( long ) ); } 01145 result = MPI_Scatter( &counts[0], num_datasets, MPI_LONG, offsets_out, num_datasets, MPI_LONG, 0, comm ); 01146 CHECK_MPI( result ); 01147 01148 return MB_SUCCESS; 01149 } 01150 01151 ErrorCode WriteHDF5Parallel::create_node_table( int dimension ) 01152 { 01153 nodeSet.num_nodes = dimension; // Put it here so NodeSetCreator can access it 01154 struct NodeSetCreator : public DataSetCreator 01155 { 01156 ErrorCode operator()( WriteHDF5* file, long count, const ExportSet* group, long& start_id ) const 01157 { 01158 mhdf_Status status; 01159 hid_t handle = mhdf_createNodeCoords( file->file_ptr(), group->num_nodes, count, &start_id, &status ); 01160 CHECK_HDFN( status ); 01161 mhdf_closeData( file->file_ptr(), handle, &status ); 01162 CHECK_HDFN( status ); 01163 return MB_SUCCESS; 01164 } 01165 }; 01166 01167 const long count = nodeSet.range.size(); 01168 ExportSet* array[] = { &nodeSet }; 01169 ErrorCode rval = create_dataset( 1, &count, &nodeSet.offset, &nodeSet.max_num_ents, &nodeSet.total_num_ents, 01170 NodeSetCreator(), array, &nodeSet.first_id ); 01171 CHECK_MB( rval ); 01172 return assign_ids( nodeSet.range, nodeSet.first_id + nodeSet.offset ); 01173 } 01174 01175 struct elemtype 01176 { 01177 int mbtype; 01178 int numnode; 01179 01180 elemtype( int vals[2] ) : mbtype( vals[0] ), numnode( vals[1] ) {} 01181 elemtype( int t, int n ) : mbtype( t ), numnode( n ) {} 01182 01183 bool operator==( const elemtype& other ) const 01184 { 01185 return mbtype == other.mbtype && ( mbtype == MBENTITYSET || numnode == other.numnode ); 01186 } 01187 bool operator<( const elemtype& other ) const 01188 { 01189 if( mbtype > other.mbtype ) return false; 01190 01191 return mbtype < other.mbtype || ( mbtype != MBENTITYSET && numnode < other.numnode ); 01192 } 01193 bool operator!=( const elemtype& other ) const 01194 { 01195 return !this->operator==( other ); 01196 } 01197 }; 01198 01199 ErrorCode WriteHDF5Parallel::negotiate_type_list() 01200 { 01201 int result; 01202 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 01203 01204 exportList.sort(); 01205 int num_types = exportList.size(); 01206 01207 // Get list of types on this processor 01208 typedef std::vector< std::pair< int, int > > typelist; 01209 typelist my_types( num_types ); 01210 (void)VALGRIND_MAKE_VEC_UNDEFINED( my_types ); 01211 typelist::iterator viter = my_types.begin(); 01212 for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) 01213 { 01214 viter->first = eiter->type; 01215 viter->second = eiter->num_nodes; 01216 ++viter; 01217 } 01218 01219 dbgOut.print( 2, "Local Element Types:\n" ); 01220 for( viter = my_types.begin(); viter != my_types.end(); ++viter ) 01221 { 01222 int type = viter->first; 01223 int count = viter->second; 01224 dbgOut.printf( 2, " %s : %d\n", CN::EntityTypeName( (EntityType)type ), count ); 01225 } 01226 01227 // Broadcast number of types from root to all nodes 01228 int num_types0 = num_types; 01229 result = MPI_Bcast( &num_types0, 1, MPI_INT, 0, comm ); 01230 CHECK_MPI( result ); 01231 // Broadcast type list from root to all nodes 01232 typelist root_types( num_types0 ); 01233 if( 0 == myPcomm->proc_config().proc_rank() ) root_types = my_types; 01234 result = MPI_Bcast( (void*)&root_types[0], 2 * num_types0, MPI_INT, 0, comm ); 01235 CHECK_MPI( result ); 01236 01237 // Build local list of any types that root did not know about 01238 typelist non_root_types; 01239 viter = root_types.begin(); 01240 for( typelist::iterator iter = my_types.begin(); iter != my_types.end(); ++iter ) 01241 { 01242 if( viter == root_types.end() || *viter != *iter ) 01243 non_root_types.push_back( *iter ); 01244 else 01245 ++viter; 01246 } 01247 01248 // Determine if any process had types not defined on the root 01249 int non_root_count = non_root_types.size(); 01250 int not_done; 01251 result = MPI_Allreduce( &non_root_count, ¬_done, 1, MPI_INT, MPI_LOR, comm ); 01252 CHECK_MPI( result ); 01253 if( not_done ) 01254 { 01255 // Get number of types each processor has that root does not 01256 std::vector< int > counts( myPcomm->proc_config().proc_size() ); 01257 int two_count = 2 * non_root_count; 01258 result = MPI_Gather( &two_count, 1, MPI_INT, &counts[0], 1, MPI_INT, 0, comm ); 01259 CHECK_MPI( result ); 01260 01261 // Get list of types from each processor 01262 std::vector< int > displs( myPcomm->proc_config().proc_size() + 1 ); 01263 (void)VALGRIND_MAKE_VEC_UNDEFINED( displs ); 01264 displs[0] = 0; 01265 for( unsigned long i = 1; i <= myPcomm->proc_config().proc_size(); ++i ) 01266 displs[i] = displs[i - 1] + counts[i - 1]; 01267 int total = displs[myPcomm->proc_config().proc_size()]; 01268 typelist alltypes( total / 2 ); 01269 (void)VALGRIND_MAKE_VEC_UNDEFINED( alltypes ); 01270 (void)VALGRIND_CHECK_MEM_IS_DEFINED( &non_root_types[0], non_root_types.size() * sizeof( int ) ); 01271 result = MPI_Gatherv( (void*)&non_root_types[0], 2 * non_root_count, MPI_INT, (int*)&alltypes[0], &counts[0], 01272 &displs[0], MPI_INT, 0, comm ); 01273 CHECK_MPI( result ); 01274 01275 // Merge type lists. 01276 // Prefer O(n) insertions with O(ln n) search time because 01277 // we expect data from a potentially large number of processes, 01278 // but with a small total number of element types. 01279 if( 0 == myPcomm->proc_config().proc_rank() ) 01280 { 01281 for( viter = alltypes.begin(); viter != alltypes.end(); ++viter ) 01282 { 01283 typelist::iterator titer = std::lower_bound( my_types.begin(), my_types.end(), *viter ); 01284 if( titer == my_types.end() || *titer != *viter ) my_types.insert( titer, *viter ); 01285 } 01286 01287 dbgOut.print( 2, "Global Element Types:\n" ); 01288 for( viter = my_types.begin(); viter != my_types.end(); ++viter ) 01289 dbgOut.printf( 2, " %s : %d\n", CN::EntityTypeName( (EntityType)viter->first ), viter->second ); 01290 } 01291 01292 // Send total number of types to each processor 01293 total = my_types.size(); 01294 result = MPI_Bcast( &total, 1, MPI_INT, 0, comm ); 01295 CHECK_MPI( result ); 01296 01297 // Send list of types to each processor 01298 my_types.resize( total ); 01299 result = MPI_Bcast( (void*)&my_types[0], 2 * total, MPI_INT, 0, comm ); 01300 CHECK_MPI( result ); 01301 } 01302 else 01303 { 01304 // Special case: if root had types but some subset of procs did not 01305 // have those types, but there are no types that the root doesn't 01306 // know about then we still need to update processes that are missing 01307 // types. 01308 my_types.swap( root_types ); 01309 } 01310 01311 // Insert missing types into exportList, with an empty 01312 // range of entities to export. 01313 std::list< ExportSet >::iterator ex_iter = exportList.begin(); 01314 for( viter = my_types.begin(); viter != my_types.end(); ++viter ) 01315 { 01316 while( ex_iter != exportList.end() && *ex_iter < *viter ) 01317 ++ex_iter; 01318 01319 if( ex_iter == exportList.end() || !( *ex_iter == *viter ) ) 01320 { 01321 ExportSet insert; 01322 insert.type = (EntityType)viter->first; 01323 insert.num_nodes = viter->second; 01324 insert.first_id = 0; 01325 insert.offset = 0; 01326 insert.adj_offset = 0; 01327 ex_iter = exportList.insert( ex_iter, insert ); 01328 } 01329 } 01330 01331 return MB_SUCCESS; 01332 } 01333 01334 ErrorCode WriteHDF5Parallel::create_element_tables() 01335 { 01336 struct ElemSetCreator : public DataSetCreator 01337 { 01338 ErrorCode operator()( WriteHDF5* file, long size, const ExportSet* ex, long& start_id ) const 01339 { 01340 return file->create_elem_table( *ex, size, start_id ); 01341 } 01342 }; 01343 01344 const int numtypes = exportList.size(); 01345 std::vector< ExportSet* > groups( numtypes ); 01346 std::vector< long > counts( numtypes ), offsets( numtypes ), max_ents( numtypes ), total_ents( numtypes ); 01347 std::vector< wid_t > start_ids( numtypes ); 01348 01349 size_t idx = 0; 01350 std::list< ExportSet >::iterator ex_iter; 01351 for( ex_iter = exportList.begin(); ex_iter != exportList.end(); ++ex_iter, ++idx ) 01352 { 01353 groups[idx] = &*ex_iter; 01354 counts[idx] = ex_iter->range.size(); 01355 } 01356 ErrorCode rval = create_dataset( numtypes, &counts[0], &offsets[0], &max_ents[0], &total_ents[0], ElemSetCreator(), 01357 &groups[0], &start_ids[0] ); 01358 CHECK_MB( rval ); 01359 01360 for( idx = 0, ex_iter = exportList.begin(); ex_iter != exportList.end(); ++ex_iter, ++idx ) 01361 { 01362 ex_iter->first_id = start_ids[idx]; 01363 ex_iter->offset = offsets[idx]; 01364 ex_iter->max_num_ents = max_ents[idx]; 01365 ex_iter->total_num_ents = total_ents[idx]; 01366 rval = assign_ids( ex_iter->range, ex_iter->first_id + ex_iter->offset ); 01367 CHECK_MB( rval ); 01368 } 01369 01370 return MB_SUCCESS; 01371 } 01372 01373 ErrorCode WriteHDF5Parallel::create_adjacency_tables() 01374 { 01375 struct AdjSetCreator : public DataSetCreator 01376 { 01377 ErrorCode operator()( WriteHDF5* file, long size, const ExportSet* ex, long& start_id ) const 01378 { 01379 mhdf_Status status; 01380 hid_t handle = mhdf_createAdjacency( file->file_ptr(), ex->name(), size, &status ); 01381 CHECK_HDFN( status ); 01382 mhdf_closeData( file->file_ptr(), handle, &status ); 01383 CHECK_HDFN( status ); 01384 start_id = -1; 01385 return MB_SUCCESS; 01386 } 01387 }; 01388 01389 std::vector< ExportSet* > groups; 01390 #ifdef WRITE_NODE_ADJACENCIES 01391 groups.push_back( &nodeSet ); 01392 #endif 01393 for( std::list< ExportSet >::iterator ex_iter = exportList.begin(); ex_iter != exportList.end(); ++ex_iter ) 01394 groups.push_back( &*ex_iter ); 01395 01396 ErrorCode rval; 01397 const int numtypes = groups.size(); 01398 std::vector< long > counts( numtypes ); 01399 std::vector< long > offsets( numtypes ); 01400 std::vector< long > max_ents( numtypes ); 01401 std::vector< long > totals( numtypes ); 01402 for( int i = 0; i < numtypes; ++i ) 01403 { 01404 wid_t count; 01405 rval = count_adjacencies( groups[i]->range, count ); 01406 CHECK_MB( rval ); 01407 counts[i] = count; 01408 } 01409 01410 rval = create_dataset( numtypes, &counts[0], &offsets[0], &max_ents[0], &totals[0], AdjSetCreator(), &groups[0] ); 01411 CHECK_MB( rval ); 01412 01413 // Cppcheck warning (false positive): variable groups is assigned a value that is never used 01414 for( int i = 0; i < numtypes; ++i ) 01415 { 01416 groups[i]->max_num_adjs = max_ents[i]; 01417 groups[i]->adj_offset = offsets[i]; 01418 } 01419 return MB_SUCCESS; 01420 } 01421 01422 const unsigned SSVB = 3; 01423 01424 void WriteHDF5Parallel::print_set_sharing_data( const Range& range, const char* label, Tag idt ) 01425 { 01426 dbgOut.printf( SSVB, "set\tid\towner\t%-*s\tfid\tshared\n", (int)( sizeof( EntityHandle ) * 2 ), "handle" ); 01427 for( Range::iterator it = range.begin(); it != range.end(); ++it ) 01428 { 01429 int id; 01430 iFace->tag_get_data( idt, &*it, 1, &id ); 01431 EntityHandle handle = 0; 01432 unsigned owner = 0; 01433 wid_t file_id = 0; 01434 myPcomm->get_entityset_owner( *it, owner, &handle ); 01435 if( !idMap.find( *it, file_id ) ) file_id = 0; 01436 dbgOut.printf( SSVB, "%s\t%d\t%u\t%lx\t%lu\t", label, id, owner, (unsigned long)handle, 01437 (unsigned long)file_id ); 01438 std::vector< unsigned > procs; 01439 myPcomm->get_entityset_procs( *it, procs ); 01440 if( procs.empty() ) 01441 dbgOut.print( SSVB, "<none>\n" ); 01442 else 01443 { 01444 for( unsigned i = 0; i < procs.size() - 1; ++i ) 01445 dbgOut.printf( SSVB, "%u,", procs[i] ); 01446 dbgOut.printf( SSVB, "%u\n", procs.back() ); 01447 } 01448 } 01449 } 01450 01451 void WriteHDF5Parallel::print_shared_sets() 01452 { 01453 const char* tag_names[][2] = { { MATERIAL_SET_TAG_NAME, "block" }, 01454 { DIRICHLET_SET_TAG_NAME, "nodeset" }, 01455 { NEUMANN_SET_TAG_NAME, "sideset" }, 01456 { 0, 0 } }; 01457 01458 for( int i = 0; tag_names[i][0]; ++i ) 01459 { 01460 Tag tag; 01461 if( MB_SUCCESS != iFace->tag_get_handle( tag_names[i][0], 1, MB_TYPE_INTEGER, tag ) ) continue; 01462 01463 Range tagged; 01464 iFace->get_entities_by_type_and_tag( 0, MBENTITYSET, &tag, 0, 1, tagged ); 01465 print_set_sharing_data( tagged, tag_names[i][1], tag ); 01466 } 01467 01468 Tag geom, id; 01469 if( MB_SUCCESS != iFace->tag_get_handle( GEOM_DIMENSION_TAG_NAME, 1, MB_TYPE_INTEGER, geom ) ) return; 01470 id = iFace->globalId_tag(); 01471 01472 const char* geom_names[] = { "vertex", "curve", "surface", "volume" }; 01473 for( int d = 0; d <= 3; ++d ) 01474 { 01475 Range tagged; 01476 const void* vals[] = { &d }; 01477 iFace->get_entities_by_type_and_tag( 0, MBENTITYSET, &geom, vals, 1, tagged ); 01478 print_set_sharing_data( tagged, geom_names[d], id ); 01479 } 01480 } 01481 01482 ErrorCode WriteHDF5Parallel::communicate_shared_set_ids( const Range& owned, const Range& remote ) 01483 { 01484 ErrorCode rval; 01485 int mperr; 01486 const int TAG = 0xD0E; 01487 // const unsigned rank = myPcomm->proc_config().proc_rank(); 01488 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 01489 01490 dbgOut.tprint( 1, "COMMUNICATING SHARED SET IDS\n" ); 01491 dbgOut.print( 6, "Owned, shared sets: ", owned ); 01492 01493 // Post receive buffers for all procs for which we share sets 01494 01495 std::vector< unsigned > procs; 01496 rval = myPcomm->get_entityset_owners( procs ); 01497 CHECK_MB( rval ); 01498 std::vector< unsigned >::iterator it = std::find( procs.begin(), procs.end(), myPcomm->proc_config().proc_rank() ); 01499 if( it != procs.end() ) procs.erase( it ); 01500 01501 std::vector< MPI_Request > recv_req( procs.size(), MPI_REQUEST_NULL ); 01502 std::vector< std::vector< unsigned long > > recv_buf( procs.size() ); 01503 01504 size_t recv_count = 0; 01505 for( size_t i = 0; i < procs.size(); ++i ) 01506 { 01507 Range tmp; 01508 rval = myPcomm->get_owned_sets( procs[i], tmp ); 01509 CHECK_MB( rval ); 01510 size_t count = 01511 intersect( tmp, remote ).size(); // Necessary because we might not be writing all of the database 01512 if( count ) 01513 { 01514 dbgOut.printf( 6, "Sets owned by proc %u (remote handles): ", procs[i] ); 01515 if( dbgOut.get_verbosity() >= 6 ) 01516 { 01517 Range remote_handles; 01518 tmp = intersect( tmp, remote ); 01519 for( Range::iterator j = tmp.begin(); j != tmp.end(); ++j ) 01520 { 01521 unsigned r; 01522 EntityHandle h; 01523 myPcomm->get_entityset_owner( *j, r, &h ); 01524 assert( r == procs[i] ); 01525 remote_handles.insert( h ); 01526 } 01527 dbgOut.print( 6, remote_handles ); 01528 } 01529 recv_count++; 01530 recv_buf[i].resize( 2 * count + 1 ); 01531 dbgOut.printf( 5, "Posting receive buffer of size %lu for proc %u (%lu of %lu owned sets)\n", 01532 (unsigned long)recv_buf[i].size(), procs[i], count, tmp.size() ); 01533 mperr = 01534 MPI_Irecv( &recv_buf[i][0], recv_buf[i].size(), MPI_UNSIGNED_LONG, procs[i], TAG, comm, &recv_req[i] ); 01535 CHECK_MPI( mperr ); 01536 } 01537 } 01538 01539 // Send set ids to all procs with which we share them 01540 01541 // First build per-process lists of sets for which we need to send data 01542 std::map< unsigned, Range > send_sets; 01543 std::vector< unsigned > set_procs; 01544 for( Range::reverse_iterator i = owned.rbegin(); i != owned.rend(); ++i ) 01545 { 01546 set_procs.clear(); 01547 rval = myPcomm->get_entityset_procs( *i, set_procs ); 01548 CHECK_MB( rval ); 01549 for( size_t j = 0; j < set_procs.size(); ++j ) 01550 if( set_procs[j] != myPcomm->proc_config().proc_rank() ) send_sets[set_procs[j]].insert( *i ); 01551 } 01552 assert( send_sets.find( myPcomm->proc_config().proc_rank() ) == send_sets.end() ); 01553 01554 // Now send the data 01555 std::vector< std::vector< unsigned long > > send_buf( send_sets.size() ); 01556 std::vector< MPI_Request > send_req( send_sets.size() ); 01557 std::map< unsigned, Range >::iterator si = send_sets.begin(); 01558 for( size_t i = 0; si != send_sets.end(); ++si, ++i ) 01559 { 01560 dbgOut.printf( 6, "Sending data for shared sets to proc %u: ", si->first ); 01561 dbgOut.print( 6, si->second ); 01562 01563 send_buf[i].reserve( 2 * si->second.size() + 1 ); 01564 send_buf[i].push_back( si->second.size() ); 01565 for( Range::iterator j = si->second.begin(); j != si->second.end(); ++j ) 01566 { 01567 send_buf[i].push_back( *j ); 01568 send_buf[i].push_back( idMap.find( *j ) ); 01569 } 01570 dbgOut.printf( 5, "Sending buffer of size %lu to proc %u (%lu of %lu owned sets)\n", 01571 (unsigned long)send_buf[i].size(), si->first, si->second.size(), owned.size() ); 01572 mperr = MPI_Isend( &send_buf[i][0], send_buf[i].size(), MPI_UNSIGNED_LONG, si->first, TAG, comm, &send_req[i] ); 01573 } 01574 01575 // Process received data 01576 MPI_Status status; 01577 int idx; 01578 while( recv_count-- ) 01579 { 01580 mperr = MPI_Waitany( recv_req.size(), &recv_req[0], &idx, &status ); 01581 CHECK_MPI( mperr ); 01582 01583 assert( (unsigned)status.MPI_SOURCE == procs[idx] ); 01584 assert( 2 * recv_buf[idx].front() + 1 == recv_buf[idx].size() ); 01585 const size_t n = std::min< size_t >( recv_buf[idx].front(), ( recv_buf[idx].size() - 1 ) / 2 ); 01586 dbgOut.printf( 5, "Received buffer of size %lu from proc %d\n", (unsigned long)( 2 * n + 1 ), 01587 (int)status.MPI_SOURCE ); 01588 01589 for( size_t i = 0; i < n; ++i ) 01590 { 01591 EntityHandle handle = 0; 01592 rval = myPcomm->get_entityset_local_handle( procs[idx], recv_buf[idx][2 * i + 1], handle ); 01593 CHECK_MB( rval ); 01594 assert( handle != 0 ); 01595 if( !idMap.insert( handle, recv_buf[idx][2 * i + 2], 1 ).second ) 01596 error( MB_FAILURE ); // Conflicting IDs?????? 01597 } 01598 01599 recv_req[idx] = MPI_REQUEST_NULL; 01600 } 01601 assert( MPI_SUCCESS == MPI_Waitany( recv_req.size(), &recv_req[0], &idx, &status ) && 01602 MPI_UNDEFINED == idx ); // Check that we got them all 01603 01604 // Wait for all sends to complete before we release send 01605 // buffers (implicitly releases when we return from this function) 01606 01607 std::vector< MPI_Status > stats( send_req.size() ); 01608 mperr = MPI_Waitall( send_req.size(), &send_req[0], &stats[0] ); 01609 CHECK_MPI( mperr ); 01610 01611 if( dbgOut.get_verbosity() >= SSVB ) print_shared_sets(); 01612 01613 return MB_SUCCESS; 01614 } 01615 01616 // void get_global_ids(Interface* iFace, const unsigned long* ptr, 01617 // size_t len, unsigned flags, 01618 // std::vector<int>& ids) 01619 //{ 01620 // Tag idtag; 01621 // iFace->tag_get_handle(GLOBAL_ID_TAG_NAME, 1, MB_TYPE_INTEGER, idtag); 01622 // for (size_t i = 0; i < len; ++i) { 01623 // if (flags & MESHSET_ORDERED) { 01624 // int tmp; 01625 // iFace->tag_get_data(idtag, ptr + i, 1, &tmp); 01626 // ids.push_back(tmp); 01627 // continue; 01628 // } 01629 // 01630 // EntityHandle s = ptr[i]; 01631 // EntityHandle e = ptr[++i]; 01632 // for (; s <= e; ++s) { 01633 // int tmp; 01634 // iFace->tag_get_data(idtag, &s, 1, &tmp); 01635 // ids.push_back(tmp); 01636 // } 01637 // } 01638 //} 01639 01640 ErrorCode WriteHDF5Parallel::pack_set( Range::const_iterator it, unsigned long* buffer, size_t buffer_size ) 01641 { 01642 ErrorCode rval; 01643 const EntityHandle* ptr; 01644 int len; 01645 unsigned char flags; 01646 std::vector< wid_t > tmp; 01647 size_t newlen; 01648 01649 // Buffer must always contain at least flags and desired sizes 01650 assert( buffer_size >= 4 ); 01651 buffer_size -= 4; 01652 01653 Range::const_iterator nd = it; 01654 ++nd; 01655 rval = writeUtil->get_entity_list_pointers( it, nd, &ptr, WriteUtilIface::CONTENTS, &len, &flags ); 01656 CHECK_MB( rval ); 01657 01658 // Tag mattag; 01659 // iFace->tag_get_handle(MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, mattag); 01660 // int block; 01661 // if (MB_SUCCESS != iFace->tag_get_data(mattag, &*it, 1, &block)) 01662 // block = 0; 01663 // 01664 // if (block) { 01665 // std::vector<int> ids; 01666 // get_global_ids(iFace, ptr, len, flags, ids); 01667 //} 01668 01669 if( len && !( flags & MESHSET_ORDERED ) ) 01670 { 01671 tmp.clear(); 01672 bool blocked = false; 01673 assert( ( 0 == len % 2 ) ); 01674 rval = range_to_blocked_list( ptr, len / 2, tmp, blocked ); 01675 CHECK_MB( rval ); 01676 if( blocked ) flags |= mhdf_SET_RANGE_BIT; 01677 } 01678 else 01679 { 01680 tmp.resize( len ); 01681 rval = vector_to_id_list( ptr, len, &tmp[0], newlen, true ); 01682 CHECK_MB( rval ); 01683 tmp.resize( newlen ); 01684 } 01685 01686 buffer[0] = flags; 01687 buffer[1] = tmp.size(); 01688 if( tmp.size() <= buffer_size ) std::copy( tmp.begin(), tmp.end(), buffer + 4 ); 01689 01690 rval = writeUtil->get_entity_list_pointers( it, nd, &ptr, WriteUtilIface::CHILDREN, &len ); 01691 CHECK_MB( rval ); 01692 tmp.resize( len ); 01693 rval = vector_to_id_list( ptr, len, &tmp[0], newlen, true ); 01694 tmp.resize( newlen ); 01695 buffer[2] = tmp.size(); 01696 if( tmp.size() <= buffer_size - buffer[1] ) std::copy( tmp.begin(), tmp.end(), buffer + 4 + buffer[1] ); 01697 01698 rval = writeUtil->get_entity_list_pointers( it, nd, &ptr, WriteUtilIface::PARENTS, &len ); 01699 CHECK_MB( rval ); 01700 tmp.resize( len ); 01701 rval = vector_to_id_list( ptr, len, &tmp[0], newlen, true ); 01702 tmp.resize( newlen ); 01703 buffer[3] = tmp.size(); 01704 if( tmp.size() <= buffer_size - buffer[1] - buffer[2] ) 01705 std::copy( tmp.begin(), tmp.end(), buffer + 4 + buffer[1] + buffer[2] ); 01706 01707 return MB_SUCCESS; 01708 } 01709 01710 template < typename TYPE > 01711 static void convert_to_ranged_ids( const TYPE* buffer, size_t len, std::vector< WriteHDF5::wid_t >& result ) 01712 { 01713 if( !len ) 01714 { 01715 result.clear(); 01716 return; 01717 } 01718 01719 result.resize( len * 2 ); 01720 Range tmp; 01721 for( size_t i = 0; i < len; i++ ) 01722 tmp.insert( (EntityHandle)buffer[i] ); 01723 result.resize( tmp.psize() * 2 ); 01724 int j = 0; 01725 for( Range::const_pair_iterator pit = tmp.const_pair_begin(); pit != tmp.const_pair_end(); ++pit, j++ ) 01726 { 01727 result[2 * j] = pit->first; 01728 result[2 * j + 1] = pit->second - pit->first + 1; 01729 } 01730 } 01731 01732 static void merge_ranged_ids( const unsigned long* range_list, size_t len, std::vector< WriteHDF5::wid_t >& result ) 01733 { 01734 typedef WriteHDF5::wid_t wid_t; 01735 assert( 0 == len % 2 ); 01736 assert( 0 == result.size() % 2 ); 01737 STATIC_ASSERT( sizeof( std::pair< wid_t, wid_t > ) == 2 * sizeof( wid_t ) ); 01738 01739 result.insert( result.end(), range_list, range_list + len ); 01740 size_t plen = result.size() / 2; 01741 Range tmp; 01742 for( size_t i = 0; i < plen; i++ ) 01743 { 01744 EntityHandle starth = (EntityHandle)result[2 * i]; 01745 EntityHandle endh = (EntityHandle)result[2 * i] + (wid_t)result[2 * i + 1] - 1; // id + count - 1 01746 tmp.insert( starth, endh ); 01747 } 01748 // Now convert back to std::vector<WriteHDF5::wid_t>, compressed range format 01749 result.resize( tmp.psize() * 2 ); 01750 int j = 0; 01751 for( Range::const_pair_iterator pit = tmp.const_pair_begin(); pit != tmp.const_pair_end(); ++pit, j++ ) 01752 { 01753 result[2 * j] = pit->first; 01754 result[2 * j + 1] = pit->second - pit->first + 1; 01755 } 01756 } 01757 01758 static void merge_vector_ids( const unsigned long* list, size_t len, std::vector< WriteHDF5::wid_t >& result ) 01759 { 01760 result.insert( result.end(), list, list + len ); 01761 } 01762 01763 ErrorCode WriteHDF5Parallel::unpack_set( EntityHandle set, const unsigned long* buffer, size_t buffer_size ) 01764 { 01765 // Use local variables for readability 01766 assert( buffer_size >= 4 ); 01767 assert( buffer[1] + buffer[2] + buffer[3] <= buffer_size ); 01768 const unsigned long flags = buffer[0]; 01769 unsigned long num_content = buffer[1]; 01770 const unsigned long num_child = buffer[2]; 01771 const unsigned long num_parent = buffer[3]; 01772 const unsigned long* contents = buffer + 4; 01773 const unsigned long* children = contents + num_content; 01774 const unsigned long* parents = children + num_child; 01775 01776 SpecialSetData* data = find_set_data( set ); 01777 assert( NULL != data ); 01778 if( NULL == data ) return MB_FAILURE; 01779 01780 // Tag mattag; 01781 // iFace->tag_get_handle(MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, mattag); 01782 // int block; 01783 // if (MB_SUCCESS != iFace->tag_get_data(mattag, &set, 1, &block)) 01784 // block = 0; 01785 01786 // If either the current data or the new data is in ranged format, 01787 // then change the other to ranged format if it isn't already 01788 // in both cases when they differ, the data will end up "compressed range" 01789 std::vector< wid_t > tmp; 01790 if( ( flags & mhdf_SET_RANGE_BIT ) != ( data->setFlags & mhdf_SET_RANGE_BIT ) ) 01791 { 01792 if( flags & mhdf_SET_RANGE_BIT ) 01793 { 01794 tmp = data->contentIds; 01795 convert_to_ranged_ids( &tmp[0], tmp.size(), data->contentIds ); 01796 data->setFlags |= mhdf_SET_RANGE_BIT; 01797 } 01798 else 01799 { 01800 tmp.clear(); 01801 convert_to_ranged_ids( contents, num_content, tmp ); 01802 num_content = tmp.size(); 01803 if( sizeof( wid_t ) < sizeof( long ) ) 01804 { 01805 size_t old_size = tmp.size(); 01806 tmp.resize( sizeof( long ) * old_size / sizeof( wid_t ) ); 01807 unsigned long* array = reinterpret_cast< unsigned long* >( &tmp[0] ); 01808 for( long i = ( (long)old_size ) - 1; i >= 0; --i ) 01809 array[i] = tmp[i]; 01810 contents = array; 01811 } 01812 else if( sizeof( wid_t ) > sizeof( long ) ) 01813 { 01814 unsigned long* array = reinterpret_cast< unsigned long* >( &tmp[0] ); 01815 std::copy( tmp.begin(), tmp.end(), array ); 01816 } 01817 contents = reinterpret_cast< unsigned long* >( &tmp[0] ); 01818 } 01819 } 01820 01821 if( data->setFlags & mhdf_SET_RANGE_BIT ) 01822 merge_ranged_ids( contents, num_content, data->contentIds ); 01823 else 01824 merge_vector_ids( contents, num_content, data->contentIds ); 01825 01826 merge_vector_ids( children, num_child, data->childIds ); 01827 merge_vector_ids( parents, num_parent, data->parentIds ); 01828 return MB_SUCCESS; 01829 } 01830 01831 ErrorCode WriteHDF5Parallel::communicate_shared_set_data( const Range& owned, const Range& remote ) 01832 { 01833 ErrorCode rval; 01834 int mperr; 01835 const unsigned rank = myPcomm->proc_config().proc_rank(); 01836 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 01837 01838 dbgOut.tprintf( 1, "COMMUNICATING SHARED SET DATA (%lu owned & %lu remote)\n", (unsigned long)owned.size(), 01839 (unsigned long)remote.size() ); 01840 01841 // Calculate the total number of messages to be in transit (send and receive) 01842 size_t nummess = 0; 01843 std::vector< unsigned > procs; 01844 ; 01845 Range shared( owned ); 01846 shared.merge( remote ); 01847 for( Range::iterator i = shared.begin(); i != shared.end(); ++i ) 01848 { 01849 procs.clear(); 01850 rval = myPcomm->get_entityset_procs( *i, procs ); 01851 CHECK_MB( rval ); 01852 nummess += procs.size(); 01853 } 01854 01855 // Choose a receive buffer size. We need 4*sizeof(long) minimum, 01856 // but that is almost useless so use 16*sizeof(long) as the minimum 01857 // instead. Choose an upper limit such that we don't exceed 32 MB 01858 // of allocated memory (unless we absolutely must to meet the minimum.) 01859 // Also, don't initially choose buffers larger than 128*sizeof(long). 01860 const size_t MAX_BUFFER_MEM = 32 * 1024 * 1024 / sizeof( long ); 01861 // const size_t INIT_BUFFER_SIZE = 128; 01862 const size_t INIT_BUFFER_SIZE = 1024; 01863 const size_t MIN_BUFFER_SIZE = 16; 01864 size_t init_buff_size = INIT_BUFFER_SIZE; 01865 if( init_buff_size * nummess > MAX_BUFFER_MEM ) init_buff_size = MAX_BUFFER_MEM / nummess; 01866 if( init_buff_size < MIN_BUFFER_SIZE ) init_buff_size = MIN_BUFFER_SIZE; 01867 01868 dbgOut.printf( 2, "Using buffer size of %lu for an expected message count of %lu\n", (unsigned long)init_buff_size, 01869 (unsigned long)nummess ); 01870 01871 // Count number of recvs 01872 size_t numrecv = 0; 01873 for( Range::iterator i = owned.begin(); i != owned.end(); ++i ) 01874 { 01875 procs.clear(); 01876 rval = myPcomm->get_entityset_procs( *i, procs ); 01877 CHECK_MB( rval ); 01878 numrecv += procs.size(); 01879 if( std::find( procs.begin(), procs.end(), rank ) != procs.end() ) --numrecv; 01880 } 01881 01882 // Post receive buffers for all owned sets for all sharing procs 01883 std::vector< MPI_Request > recv_req( numrecv, MPI_REQUEST_NULL ); 01884 std::vector< MPI_Request > lrecv_req( numrecv, MPI_REQUEST_NULL ); 01885 01886 std::vector< std::vector< unsigned long > > recv_buf( numrecv, std::vector< unsigned long >( init_buff_size ) ); 01887 int idx = 0; 01888 for( Range::iterator i = owned.begin(); i != owned.end(); ++i ) 01889 { 01890 procs.clear(); 01891 rval = myPcomm->get_entityset_procs( *i, procs ); 01892 CHECK_MB( rval ); 01893 for( size_t j = 0; j < procs.size(); ++j ) 01894 { 01895 if( procs[j] == rank ) continue; 01896 int tag = ID_FROM_HANDLE( *i ); 01897 if( *i != CREATE_HANDLE( MBENTITYSET, tag ) ) 01898 { 01899 #ifndef NDEBUG 01900 abort(); 01901 #endif 01902 CHECK_MB( MB_FAILURE ); 01903 } 01904 dbgOut.printf( 5, "Posting buffer to receive set %d from proc %u\n", tag, procs[j] ); 01905 mperr = 01906 MPI_Irecv( &recv_buf[idx][0], init_buff_size, MPI_UNSIGNED_LONG, procs[j], tag, comm, &recv_req[idx] ); 01907 CHECK_MPI( mperr ); 01908 ++idx; 01909 } 01910 } 01911 assert( (size_t)idx == numrecv ); 01912 01913 // Now send set data for all remote sets that I know about 01914 std::vector< MPI_Request > send_req( remote.size() ); 01915 std::vector< std::vector< unsigned long > > send_buf( remote.size() ); 01916 idx = 0; 01917 for( Range::iterator i = remote.begin(); i != remote.end(); ++i, ++idx ) 01918 { 01919 send_buf[idx].resize( init_buff_size ); 01920 rval = pack_set( i, &send_buf[idx][0], init_buff_size ); 01921 CHECK_MB( rval ); 01922 EntityHandle remote_handle; 01923 unsigned owner; 01924 rval = myPcomm->get_entityset_owner( *i, owner, &remote_handle ); 01925 CHECK_MB( rval ); 01926 01927 int tag = ID_FROM_HANDLE( remote_handle ); 01928 assert( remote_handle == CREATE_HANDLE( MBENTITYSET, tag ) ); 01929 dbgOut.printf( 5, "Sending %lu values for set %d to proc %u\n", 01930 send_buf[idx][1] + send_buf[idx][2] + send_buf[idx][3] + 4, tag, owner ); 01931 mperr = MPI_Isend( &send_buf[idx][0], init_buff_size, MPI_UNSIGNED_LONG, owner, tag, comm, &send_req[idx] ); 01932 CHECK_MPI( mperr ); 01933 } 01934 01935 // Tag mattag; 01936 // iFace->tag_get_handle(MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, mattag); 01937 01938 // Now initialize local data for managing contents of owned, shared sets 01939 assert( specialSets.empty() ); 01940 specialSets.clear(); 01941 specialSets.reserve( owned.size() ); 01942 for( Range::iterator i = owned.begin(); i != owned.end(); ++i ) 01943 { 01944 // int block; 01945 // if (MB_SUCCESS != iFace->tag_get_data(mattag, &*i, 1, &block)) 01946 // block = 0; 01947 // std::vector<int> ids; 01948 01949 SpecialSetData data; 01950 data.setHandle = *i; 01951 rval = iFace->get_meshset_options( *i, data.setFlags ); 01952 CHECK_MB( rval ); 01953 specialSets.push_back( data ); 01954 std::vector< EntityHandle > list; 01955 if( data.setFlags & MESHSET_ORDERED ) 01956 { 01957 list.clear(); 01958 rval = iFace->get_entities_by_handle( *i, list ); 01959 CHECK_MB( rval ); 01960 rval = vector_to_id_list( list, specialSets.back().contentIds, true ); 01961 CHECK_MB( rval ); 01962 // if (block) 01963 // get_global_ids(iFace, &list[0], list.size(), MESHSET_ORDERED, ids); 01964 } 01965 else 01966 { 01967 Range range; 01968 rval = iFace->get_entities_by_handle( *i, range ); 01969 CHECK_MB( rval ); 01970 bool ranged; 01971 rval = range_to_blocked_list( range, specialSets.back().contentIds, ranged ); 01972 if( ranged ) specialSets.back().setFlags |= mhdf_SET_RANGE_BIT; 01973 // if (block) { 01974 // std::vector<EntityHandle> tmp; 01975 // for (Range::const_pair_iterator pi = range.const_pair_begin(); pi != 01976 // range.const_pair_end(); ++pi) { 01977 // tmp.push_back(pi->first); 01978 // tmp.push_back(pi->second); 01979 // } 01980 // get_global_ids(iFace, &tmp[0], tmp.size(), ranged ? 0 : MESHSET_ORDERED, ids); 01981 //} 01982 } 01983 01984 list.clear(); 01985 rval = iFace->get_parent_meshsets( *i, list ); 01986 CHECK_MB( rval ); 01987 rval = vector_to_id_list( list, specialSets.back().parentIds, true ); 01988 CHECK_MB( rval ); 01989 rval = iFace->get_child_meshsets( *i, list ); 01990 CHECK_MB( rval ); 01991 rval = vector_to_id_list( list, specialSets.back().childIds, true ); 01992 CHECK_MB( rval ); 01993 } 01994 01995 // Process received buffers, repost larger buffers where necessary 01996 size_t remaining = numrecv; 01997 numrecv = 0; 01998 while( remaining-- ) 01999 { 02000 std::vector< unsigned long > dead; 02001 MPI_Status status; 02002 mperr = MPI_Waitany( recv_req.size(), &recv_req[0], &idx, &status ); 02003 CHECK_MPI( mperr ); 02004 EntityHandle handle = CREATE_HANDLE( MBENTITYSET, status.MPI_TAG ); 02005 std::vector< unsigned long >& buff = recv_buf[idx]; 02006 size_t size = buff[1] + buff[2] + buff[3] + 4; 02007 dbgOut.printf( 5, "Received %lu values for set %d from proc %d\n", (unsigned long)size, status.MPI_TAG, 02008 status.MPI_SOURCE ); 02009 if( size <= init_buff_size ) 02010 { 02011 rval = unpack_set( handle, &buff[0], init_buff_size ); 02012 CHECK_MB( rval ); 02013 dead.swap( buff ); // Release memory 02014 } 02015 else 02016 { 02017 // Data was too big for init_buff_size 02018 // repost with larger buffer 02019 buff.resize( size ); 02020 dbgOut.printf( 5, "Re-Posting buffer to receive set %d from proc %d with size %lu\n", status.MPI_TAG, 02021 status.MPI_SOURCE, (unsigned long)size ); 02022 mperr = MPI_Irecv( &buff[0], size, MPI_UNSIGNED_LONG, status.MPI_SOURCE, status.MPI_TAG, comm, 02023 &lrecv_req[idx] ); 02024 CHECK_MPI( mperr ); 02025 ++numrecv; 02026 } 02027 recv_req[idx] = MPI_REQUEST_NULL; 02028 } 02029 02030 // Wait for sends to complete 02031 MPI_Waitall( send_req.size(), &send_req[0], MPI_STATUSES_IGNORE ); 02032 02033 // Re-send sets that didn't fit initial buffer size 02034 idx = 0; 02035 for( Range::iterator i = remote.begin(); i != remote.end(); ++i, ++idx ) 02036 { 02037 std::vector< unsigned long >& buff = send_buf[idx]; 02038 size_t size = buff[1] + buff[2] + buff[3] + 4; 02039 if( size <= init_buff_size ) continue; 02040 02041 buff.resize( size ); 02042 rval = pack_set( i, &buff[0], size ); 02043 CHECK_MB( rval ); 02044 EntityHandle remote_handle; 02045 unsigned owner; 02046 rval = myPcomm->get_entityset_owner( *i, owner, &remote_handle ); 02047 CHECK_MB( rval ); 02048 02049 int tag = ID_FROM_HANDLE( remote_handle ); 02050 assert( remote_handle == CREATE_HANDLE( MBENTITYSET, tag ) ); 02051 dbgOut.printf( 5, "Sending %lu values for set %d to proc %u\n", (unsigned long)size, tag, owner ); 02052 mperr = MPI_Isend( &buff[0], size, MPI_UNSIGNED_LONG, owner, tag, comm, &send_req[idx] ); 02053 CHECK_MPI( mperr ); 02054 } 02055 02056 // Process received buffers 02057 remaining = numrecv; 02058 while( remaining-- ) 02059 { 02060 std::vector< unsigned long > dead; 02061 MPI_Status status; 02062 mperr = MPI_Waitany( lrecv_req.size(), &lrecv_req[0], &idx, &status ); 02063 CHECK_MPI( mperr ); 02064 EntityHandle handle = CREATE_HANDLE( MBENTITYSET, status.MPI_TAG ); 02065 std::vector< unsigned long >& buff = recv_buf[idx]; 02066 dbgOut.printf( 5, "Received %lu values for set %d from proc %d\n", 4 + buff[1] + buff[2] + buff[3], 02067 status.MPI_TAG, status.MPI_SOURCE ); 02068 rval = unpack_set( handle, &buff[0], buff.size() ); 02069 CHECK_MB( rval ); 02070 dead.swap( buff ); // Release memory 02071 02072 lrecv_req[idx] = MPI_REQUEST_NULL; 02073 } 02074 02075 // Wait for sends to complete 02076 MPI_Waitall( send_req.size(), &send_req[0], MPI_STATUSES_IGNORE ); 02077 02078 return MB_SUCCESS; 02079 } 02080 02081 ErrorCode WriteHDF5Parallel::create_meshset_tables( double* times ) 02082 { 02083 Range::const_iterator riter; 02084 const unsigned rank = myPcomm->proc_config().proc_rank(); 02085 02086 START_SERIAL; 02087 print_type_sets( iFace, &dbgOut, setSet.range ); 02088 END_SERIAL; 02089 CpuTimer timer; 02090 02091 // Remove remote sets from setSets 02092 Range shared, owned, remote; 02093 ErrorCode rval = myPcomm->get_shared_sets( shared ); 02094 CHECK_MB( rval ); 02095 shared = intersect( shared, setSet.range ); 02096 rval = myPcomm->get_owned_sets( rank, owned ); 02097 CHECK_MB( rval ); 02098 owned = intersect( owned, setSet.range ); 02099 remote = subtract( shared, owned ); 02100 setSet.range = subtract( setSet.range, remote ); 02101 02102 // Create set meta table 02103 struct SetDescCreator : public DataSetCreator 02104 { 02105 ErrorCode operator()( WriteHDF5* writer, long size, const ExportSet*, long& start_id ) const 02106 { 02107 return writer->create_set_meta( size, start_id ); 02108 } 02109 }; 02110 long count = setSet.range.size(); 02111 rval = create_dataset( 1, &count, &setSet.offset, &setSet.max_num_ents, &setSet.total_num_ents, SetDescCreator(), 02112 NULL, &setSet.first_id ); 02113 CHECK_MB( rval ); 02114 writeSets = setSet.max_num_ents > 0; 02115 02116 rval = assign_ids( setSet.range, setSet.first_id + setSet.offset ); 02117 CHECK_MB( rval ); 02118 if( times ) times[SET_OFFSET_TIME] = timer.time_elapsed(); 02119 02120 // Exchange file IDS for sets between all procs 02121 rval = communicate_shared_set_ids( owned, remote ); 02122 CHECK_MB( rval ); 02123 if( times ) times[SHARED_SET_IDS] = timer.time_elapsed(); 02124 02125 // Communicate remote set contents, children, etc. 02126 rval = communicate_shared_set_data( owned, remote ); 02127 CHECK_MB( rval ); 02128 if( times ) times[SHARED_SET_CONTENTS] = timer.time_elapsed(); 02129 02130 // Communicate counts for owned sets 02131 long data_counts[3]; // { #contents, #children, #parents } 02132 rval = count_set_size( setSet.range, data_counts[0], data_counts[1], data_counts[2] ); 02133 CHECK_MB( rval ); 02134 if( times ) times[SET_OFFSET_TIME] += timer.time_elapsed(); 02135 02136 long offsets[3], max_counts[3], totals[3]; 02137 rval = create_dataset( 3, data_counts, offsets, max_counts, totals ); 02138 CHECK_MB( rval ); 02139 02140 // Create the datasets 02141 if( 0 == myPcomm->proc_config().proc_rank() ) 02142 { 02143 rval = create_set_tables( totals[0], totals[1], totals[2] ); 02144 CHECK_MB( rval ); 02145 } 02146 02147 // Store communicated global data 02148 setContentsOffset = offsets[0]; 02149 setChildrenOffset = offsets[1]; 02150 setParentsOffset = offsets[2]; 02151 maxNumSetContents = max_counts[0]; 02152 maxNumSetChildren = max_counts[1]; 02153 maxNumSetParents = max_counts[2]; 02154 writeSetContents = totals[0] > 0; 02155 writeSetChildren = totals[1] > 0; 02156 writeSetParents = totals[2] > 0; 02157 02158 dbgOut.printf( 2, "set contents: %ld local, %ld global, offset = %ld\n", data_counts[0], totals[0], offsets[0] ); 02159 dbgOut.printf( 2, "set children: %ld local, %ld global, offset = %ld\n", data_counts[1], totals[1], offsets[1] ); 02160 dbgOut.printf( 2, "set parents: %ld local, %ld global, offset = %ld\n", data_counts[2], totals[2], offsets[2] ); 02161 02162 return MB_SUCCESS; 02163 } 02164 02165 void WriteHDF5Parallel::remove_remote_entities( EntityHandle relative, Range& range ) 02166 { 02167 Range result; 02168 result.merge( intersect( range, nodeSet.range ) ); 02169 result.merge( intersect( range, setSet.range ) ); 02170 for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) 02171 result.merge( intersect( range, eiter->range ) ); 02172 02173 // result.merge(intersect(range, myParallelSets)); 02174 Range sets; 02175 int junk; 02176 sets.merge( Range::lower_bound( range.begin(), range.end(), CREATE_HANDLE( MBENTITYSET, 0, junk ) ), range.end() ); 02177 remove_remote_sets( relative, sets ); 02178 result.merge( sets ); 02179 range.swap( result ); 02180 } 02181 02182 void WriteHDF5Parallel::remove_remote_sets( EntityHandle /* relative */, Range& range ) 02183 { 02184 Range result( intersect( range, setSet.range ) ); 02185 // Store the non-intersecting entities separately if needed 02186 // Range remaining(subtract(range, result)); 02187 range.swap( result ); 02188 } 02189 02190 void WriteHDF5Parallel::remove_remote_entities( EntityHandle relative, std::vector< EntityHandle >& vect ) 02191 { 02192 Range intrsct; 02193 for( std::vector< EntityHandle >::const_iterator iter = vect.begin(); iter != vect.end(); ++iter ) 02194 intrsct.insert( *iter ); 02195 remove_remote_entities( relative, intrsct ); 02196 02197 unsigned int read, write; 02198 for( read = write = 0; read < vect.size(); ++read ) 02199 { 02200 if( intrsct.find( vect[read] ) != intrsct.end() ) 02201 { 02202 if( read != write ) vect[write] = vect[read]; 02203 ++write; 02204 } 02205 } 02206 if( write != vect.size() ) vect.resize( write ); 02207 } 02208 02209 void WriteHDF5Parallel::remove_remote_sets( EntityHandle relative, std::vector< EntityHandle >& vect ) 02210 { 02211 Range intrsct; 02212 for( std::vector< EntityHandle >::const_iterator iter = vect.begin(); iter != vect.end(); ++iter ) 02213 intrsct.insert( *iter ); 02214 remove_remote_sets( relative, intrsct ); 02215 02216 unsigned int read, write; 02217 for( read = write = 0; read < vect.size(); ++read ) 02218 { 02219 if( intrsct.find( vect[read] ) != intrsct.end() ) 02220 { 02221 if( read != write ) vect[write] = vect[read]; 02222 ++write; 02223 } 02224 } 02225 if( write != vect.size() ) vect.resize( write ); 02226 } 02227 02228 ErrorCode WriteHDF5Parallel::exchange_file_ids( const Range& nonlocal ) 02229 { 02230 ErrorCode rval; 02231 02232 // For each entity owned on the interface, write its file id to 02233 // a tag. The sets of entities to be written should already contain 02234 // only owned entities so by intersecting with them we not only 02235 // filter by entities to be written, but also restrict to entities 02236 // owned by the proc 02237 02238 // Get list of interface entities 02239 Range imesh, tmp; 02240 for( std::list< ExportSet >::reverse_iterator i = exportList.rbegin(); i != exportList.rend(); ++i ) 02241 { 02242 tmp.clear(); 02243 rval = myPcomm->filter_pstatus( i->range, PSTATUS_SHARED, PSTATUS_AND, -1, &tmp ); 02244 if( MB_SUCCESS != rval ) return error( rval ); 02245 imesh.merge( tmp ); 02246 } 02247 tmp.clear(); 02248 rval = myPcomm->filter_pstatus( nodeSet.range, PSTATUS_SHARED, PSTATUS_AND, -1, &tmp ); 02249 if( MB_SUCCESS != rval ) return error( rval ); 02250 imesh.merge( tmp ); 02251 02252 // Create tag to store file IDs 02253 EntityHandle default_val = 0; 02254 Tag file_id_tag = 0; 02255 rval = iFace->tag_get_handle( "__hdf5_ll_fileid", 1, MB_TYPE_HANDLE, file_id_tag, MB_TAG_DENSE | MB_TAG_CREAT, 02256 &default_val ); 02257 if( MB_SUCCESS != rval ) return error( rval ); 02258 02259 // Copy file IDs into tag 02260 std::vector< EntityHandle > file_id_vect( imesh.size() ); 02261 Range::const_iterator i; 02262 std::vector< EntityHandle >::iterator j = file_id_vect.begin(); 02263 for( i = imesh.begin(); i != imesh.end(); ++i, ++j ) 02264 { 02265 *j = idMap.find( *i ); 02266 if( !*j ) 02267 { 02268 iFace->tag_delete( file_id_tag ); 02269 return error( MB_FAILURE ); 02270 } 02271 } 02272 rval = iFace->tag_set_data( file_id_tag, imesh, &file_id_vect[0] ); 02273 if( MB_SUCCESS != rval ) 02274 { 02275 iFace->tag_delete( file_id_tag ); 02276 return error( rval ); 02277 } 02278 02279 // Do communication 02280 rval = myPcomm->exchange_tags( file_id_tag, imesh ); 02281 if( MB_SUCCESS != rval ) 02282 { 02283 iFace->tag_delete( file_id_tag ); 02284 return error( rval ); 02285 } 02286 02287 // Copy file IDs from tag into idMap for remote entities 02288 file_id_vect.resize( nonlocal.size() ); 02289 rval = iFace->tag_get_data( file_id_tag, nonlocal, &file_id_vect[0] ); 02290 if( MB_SUCCESS != rval ) 02291 { 02292 iFace->tag_delete( file_id_tag ); 02293 return error( rval ); 02294 } 02295 02296 j = file_id_vect.begin(); 02297 for( i = nonlocal.begin(); i != nonlocal.end(); ++i, ++j ) 02298 { 02299 if( *j == 0 ) 02300 { 02301 int owner = -1; 02302 myPcomm->get_owner( *i, owner ); 02303 const char* name = CN::EntityTypeName( TYPE_FROM_HANDLE( *i ) ); 02304 int id = ID_FROM_HANDLE( *i ); 02305 MB_SET_ERR_CONT( "Process " << myPcomm->proc_config().proc_rank() 02306 << " did not receive valid id handle for shared " << name << " " << id 02307 << " owned by process " << owner ); 02308 dbgOut.printf( 1, 02309 "Did not receive valid remote id for " 02310 "shared %s %d owned by process %d", 02311 name, id, owner ); 02312 iFace->tag_delete( file_id_tag ); 02313 return error( MB_FAILURE ); 02314 } 02315 else 02316 { 02317 if( !idMap.insert( *i, *j, 1 ).second ) 02318 { 02319 iFace->tag_delete( file_id_tag ); 02320 return error( MB_FAILURE ); 02321 } 02322 } 02323 } 02324 02325 #ifndef NDEBUG 02326 // Check that writer is correct with regards to which entities 02327 // that it owns by verifying that the file ids that we thought 02328 // we were sending where not received instead 02329 file_id_vect.resize( imesh.size() ); 02330 rval = iFace->tag_get_data( file_id_tag, imesh, &file_id_vect[0] ); 02331 if( MB_SUCCESS != rval ) 02332 { 02333 iFace->tag_delete( file_id_tag ); 02334 return error( rval ); 02335 } 02336 int invalid_count = 0; 02337 j = file_id_vect.begin(); 02338 for( i = imesh.begin(); i != imesh.end(); ++i, ++j ) 02339 { 02340 EntityHandle h = idMap.find( *i ); 02341 if( *j != h ) 02342 { 02343 ++invalid_count; 02344 dbgOut.printf( 1, "Conflicting ownership for %s %ld\n", CN::EntityTypeName( TYPE_FROM_HANDLE( *i ) ), 02345 (long)ID_FROM_HANDLE( *i ) ); 02346 } 02347 } 02348 if( invalid_count ) 02349 { 02350 iFace->tag_delete( file_id_tag ); 02351 MB_SET_ERR( MB_FAILURE, invalid_count << " entities with conflicting ownership found by process " 02352 << myPcomm->proc_config().proc_rank() 02353 << ". This will result in duplicate entities written to file" ); 02354 } 02355 #endif 02356 02357 return iFace->tag_delete( file_id_tag ); 02358 } 02359 02360 void WriteHDF5Parallel::print_times( const double* times ) const 02361 { 02362 if( !myPcomm ) { WriteHDF5::print_times( times ); } 02363 else 02364 { 02365 double recv[NUM_TIMES]; 02366 MPI_Reduce( (void*)times, recv, NUM_TIMES, MPI_DOUBLE, MPI_MAX, 0, myPcomm->proc_config().proc_comm() ); 02367 if( 0 == myPcomm->proc_config().proc_rank() ) WriteHDF5::print_times( recv ); 02368 } 02369 } 02370 02371 } // namespace moab