Mesh Oriented datABase
(version 5.4.1)
Array-based unstructured mesh datastructure
|
00001 #undef DEBUG 00002 #undef TIME_DEBUG 00003 00004 #include <cstdarg> 00005 #include <ctime> 00006 #include <cstdlib> 00007 00008 #include <cstring> 00009 #include <cassert> 00010 00011 #include <vector> 00012 #include <set> 00013 #include <map> 00014 #include <utility> 00015 #include <iostream> 00016 #include <sstream> 00017 #include <string> 00018 00019 #include "moab/Interface.hpp" 00020 #include "Internals.hpp" 00021 #include "MBTagConventions.hpp" 00022 #include "MBParallelConventions.h" 00023 #include "moab/ParallelComm.hpp" 00024 #include "moab/CN.hpp" 00025 #include "moab/Range.hpp" 00026 #include "moab/CpuTimer.hpp" 00027 00028 #include "WriteHDF5Parallel.hpp" 00029 00030 #ifndef MOAB_HAVE_HDF5 00031 #error Attempt to compile WriteHDF5Parallel with HDF5 support disabled 00032 #endif 00033 00034 #include <H5Tpublic.h> 00035 #include <H5Ppublic.h> 00036 #include <H5FDmpi.h> 00037 #include <H5FDmpio.h> 00038 00039 #include "mhdf.h" 00040 00041 #include "IODebugTrack.hpp" 00042 #include "moab/FileOptions.hpp" 00043 00044 namespace 00045 { 00046 template < bool Condition > 00047 struct STATIC_ASSERTION; 00048 template <> 00049 struct STATIC_ASSERTION< true > 00050 { 00051 }; 00052 } // namespace 00053 00054 #define PP_CAT_( a, b ) a##b 00055 #define PP_CAT( a, b ) PP_CAT_( a, b ) 00056 #define STATIC_ASSERT( Condition ) \ 00057 enum \ 00058 { \ 00059 PP_CAT( dummy, __LINE__ ) = sizeof( ::STATIC_ASSERTION< (bool)( Condition ) > ) \ 00060 } 00061 00062 namespace moab 00063 { 00064 00065 #ifndef _WIN32 // problematic for windows 00066 // Need an MPI type that we can put handles in 00067 STATIC_ASSERT( sizeof( unsigned long ) >= sizeof( EntityHandle ) ); 00068 00069 // Need an MPI type that we can put file IDs in 00070 STATIC_ASSERT( sizeof( unsigned long ) >= sizeof( WriteHDF5::wid_t ) ); 00071 #endif 00072 00073 // This function doesn't do anything useful. It's just a nice 00074 // place to set a break point to determine why the reader fails. 00075 static inline ErrorCode error( ErrorCode rval ) 00076 { 00077 return rval; 00078 } 00079 00080 const char* mpi_err_str( int errorcode ) 00081 { 00082 static char buffer[2048]; 00083 int len = sizeof( buffer ); 00084 MPI_Error_string( errorcode, buffer, &len ); 00085 buffer[std::min( (size_t)len, sizeof( buffer ) - 1 )] = '\0'; 00086 return buffer; 00087 } 00088 00089 #define MPI_FAILURE_MSG( A ) \ 00090 "MPI Failure at " __FILE__ ":%d : (Code %d) %s\n", __LINE__, (int)( A ), mpi_err_str( ( A ) ) 00091 00092 #define CHECK_MPI( A ) \ 00093 do \ 00094 { \ 00095 if( MPI_SUCCESS != ( A ) ) \ 00096 { \ 00097 MB_SET_ERR_CONT( "MPI Failure : (Code " << (int)( A ) << ") " << mpi_err_str( ( A ) ) ); \ 00098 dbgOut.printf( 1, MPI_FAILURE_MSG( ( A ) ) ); \ 00099 return error( MB_FAILURE ); \ 00100 } \ 00101 } while( false ) 00102 00103 #define MB_FAILURE_MSG( A ) "MOAB_Failure at " __FILE__ ":%d : %s (%d)\n", __LINE__, ErrorCodeStr[( A )], (int)( A ) 00104 00105 #define CHECK_MB( A ) \ 00106 do \ 00107 { \ 00108 if( MB_SUCCESS != ( A ) ) \ 00109 { \ 00110 MB_SET_ERR_CONT( "MOAB Failure : " << ErrorCodeStr[( A )] ); \ 00111 dbgOut.printf( 1, MB_FAILURE_MSG( ( A ) ) ); \ 00112 return error( A ); \ 00113 } \ 00114 } while( false ) 00115 00116 #define HDF_FAILURE_MSG( A ) "MHDF Failure at " __FILE__ ":%d : %s\n", __LINE__, mhdf_message( &( A ) ) 00117 00118 #define CHECK_HDF( A ) \ 00119 do \ 00120 { \ 00121 if( mhdf_isError( &( A ) ) ) \ 00122 { \ 00123 MB_SET_ERR_CONT( "MHDF Failure : " << mhdf_message( &( A ) ) ); \ 00124 dbgOut.printf( 1, HDF_FAILURE_MSG( ( A ) ) ); \ 00125 return error( MB_FAILURE ); \ 00126 } \ 00127 } while( false ) 00128 00129 #define CHECK_HDFN( A ) \ 00130 do \ 00131 { \ 00132 if( mhdf_isError( &( A ) ) ) \ 00133 { \ 00134 MB_SET_ERR_CONT( "MHDF Failure : " << mhdf_message( &( A ) ) ); \ 00135 return error( MB_FAILURE ); \ 00136 } \ 00137 } while( false ) 00138 00139 #ifdef VALGRIND 00140 #include <valgrind/memcheck.h> 00141 00142 template < typename T > 00143 inline void VALGRIND_MAKE_VEC_UNDEFINED( std::vector< T >& v ) 00144 { 00145 if( v.size() ) 00146 { 00147 } 00148 (void)VALGRIND_MAKE_MEM_UNDEFINED( &v[0], v.size() * sizeof( T ) ); 00149 } 00150 00151 #else 00152 #ifndef VALGRIND_CHECK_MEM_IS_DEFINED 00153 #define VALGRIND_CHECK_MEM_IS_DEFINED( a, b ) ( (void)0 ) 00154 #endif 00155 #ifndef VALGRIND_CHECK_MEM_IS_ADDRESSABLE 00156 #define VALGRIND_CHECK_MEM_IS_ADDRESSABLE( a, b ) ( (void)0 ) 00157 #endif 00158 #ifndef VALGRIND_MAKE_MEM_UNDEFINED 00159 #define VALGRIND_MAKE_MEM_UNDEFINED( a, b ) ( (void)0 ) 00160 #endif 00161 00162 template < typename T > 00163 inline void VALGRIND_MAKE_VEC_UNDEFINED( std::vector< T >& ) 00164 { 00165 /* Nothing to do */ 00166 } 00167 00168 #endif 00169 00170 #ifndef NDEBUG 00171 #define START_SERIAL \ 00172 for( unsigned _x = 0; _x < myPcomm->proc_config().proc_size(); ++_x ) \ 00173 { \ 00174 MPI_Barrier( myPcomm->proc_config().proc_comm() ); \ 00175 if( _x != myPcomm->proc_config().proc_rank() ) continue 00176 #define END_SERIAL \ 00177 } \ 00178 MPI_Barrier( myPcomm->proc_config().proc_comm() ) 00179 #else 00180 #define START_SERIAL 00181 #define END_SERIAL 00182 #endif 00183 00184 static int my_Gatherv( void* sendbuf, 00185 int sendcount, 00186 MPI_Datatype sendtype, 00187 std::vector< unsigned char >& recvbuf, 00188 std::vector< int >& recvcounts, 00189 int root, 00190 MPI_Comm comm ) 00191 { 00192 int nproc, rank, bytes, err; 00193 MPI_Comm_size( comm, &nproc ); 00194 MPI_Comm_rank( comm, &rank ); 00195 MPI_Type_size( sendtype, &bytes ); 00196 00197 recvcounts.resize( rank == root ? nproc : 0 ); 00198 err = MPI_Gather( &sendcount, 1, MPI_INT, &recvcounts[0], 1, MPI_INT, root, comm ); 00199 if( MPI_SUCCESS != err ) return err; 00200 00201 std::vector< int > disp( recvcounts.size() ); 00202 if( root == rank ) 00203 { 00204 disp[0] = 0; 00205 for( int i = 1; i < nproc; ++i ) 00206 disp[i] = disp[i - 1] + recvcounts[i - 1]; 00207 recvbuf.resize( bytes * ( disp.back() + recvcounts.back() ) ); 00208 } 00209 00210 return MPI_Gatherv( sendbuf, sendcount, sendtype, &recvbuf[0], &recvcounts[0], &disp[0], sendtype, root, comm ); 00211 } 00212 00213 static void print_type_sets( Interface* iFace, DebugOutput* str, Range& sets ) 00214 { 00215 const unsigned VB = 2; 00216 if( str->get_verbosity() < VB ) return; 00217 00218 Tag gid, did, bid, sid, nid; 00219 gid = iFace->globalId_tag(); 00220 iFace->tag_get_handle( GEOM_DIMENSION_TAG_NAME, 1, MB_TYPE_INTEGER, did ); 00221 iFace->tag_get_handle( MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, bid ); 00222 iFace->tag_get_handle( DIRICHLET_SET_TAG_NAME, 1, MB_TYPE_INTEGER, nid ); 00223 iFace->tag_get_handle( NEUMANN_SET_TAG_NAME, 1, MB_TYPE_INTEGER, sid ); 00224 Range typesets[10]; 00225 const char* typenames[] = { "Block ", "Sideset ", "NodeSet", "Vertex", "Curve", 00226 "Surface", "Volume", "Body", "Other" }; 00227 for( Range::iterator riter = sets.begin(); riter != sets.end(); ++riter ) 00228 { 00229 unsigned dim, id; //, oldsize; 00230 if( MB_SUCCESS == iFace->tag_get_data( bid, &*riter, 1, &id ) ) 00231 dim = 0; 00232 else if( MB_SUCCESS == iFace->tag_get_data( sid, &*riter, 1, &id ) ) 00233 dim = 1; 00234 else if( MB_SUCCESS == iFace->tag_get_data( nid, &*riter, 1, &id ) ) 00235 dim = 2; 00236 else if( MB_SUCCESS == iFace->tag_get_data( did, &*riter, 1, &dim ) ) 00237 { 00238 id = 0; 00239 iFace->tag_get_data( gid, &*riter, 1, &id ); 00240 dim += 3; 00241 } 00242 else 00243 { 00244 id = *riter; 00245 dim = 9; 00246 } 00247 00248 // oldsize = typesets[dim].size(); 00249 typesets[dim].insert( id ); 00250 // assert(typesets[dim].size() - oldsize == 1); 00251 } 00252 for( int ii = 0; ii < 9; ++ii ) 00253 { 00254 char tmp[64]; 00255 sprintf( tmp, "%s (%lu) ", typenames[ii], (unsigned long)typesets[ii].size() ); 00256 str->print( VB, tmp, typesets[ii] ); 00257 } 00258 str->printf( VB, "Total: %lu\n", (unsigned long)sets.size() ); 00259 } 00260 00261 #define debug_barrier() debug_barrier_line( __LINE__ ) 00262 00263 void WriteHDF5Parallel::debug_barrier_line( int lineno ) 00264 { 00265 const unsigned threshold = 2; 00266 static unsigned long count = 0; 00267 if( dbgOut.get_verbosity() >= threshold && myPcomm ) 00268 { 00269 dbgOut.printf( threshold, "*********** Debug Barrier %lu (@%d)***********\n", ++count, lineno ); 00270 MPI_Barrier( myPcomm->proc_config().proc_comm() ); 00271 } 00272 } 00273 00274 WriterIface* WriteHDF5Parallel::factory( Interface* iface ) 00275 { 00276 return new WriteHDF5Parallel( iface ); 00277 } 00278 00279 WriteHDF5Parallel::WriteHDF5Parallel( Interface* iface ) 00280 : WriteHDF5( iface ), myPcomm( NULL ), pcommAllocated( false ), hslabOp( H5S_SELECT_OR ) 00281 { 00282 } 00283 00284 WriteHDF5Parallel::~WriteHDF5Parallel() 00285 { 00286 if( pcommAllocated && myPcomm ) delete myPcomm; 00287 } 00288 00289 // The parent WriteHDF5 class has ExportSet structs that are 00290 // populated with the entities to be written, grouped by type 00291 // (and for elements, connectivity length). This function: 00292 // o determines which entities are to be written by a remote processor 00293 // o removes those entities from the ExportSet structs in WriteMesh 00294 // o passes them back in a Range 00295 ErrorCode WriteHDF5Parallel::gather_interface_meshes( Range& nonowned ) 00296 { 00297 ErrorCode result; 00298 00299 // START_SERIAL; 00300 dbgOut.print( 3, "Pre-interface mesh:\n" ); 00301 dbgOut.print( 3, nodeSet.range ); 00302 for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) 00303 dbgOut.print( 3, eiter->range ); 00304 dbgOut.print( 3, setSet.range ); 00305 00306 // Move handles of non-owned entities from lists of entities 00307 // that this processor will write to the 'nonowned' list. 00308 00309 nonowned.clear(); 00310 result = myPcomm->filter_pstatus( nodeSet.range, PSTATUS_NOT_OWNED, PSTATUS_AND, -1, &nonowned ); 00311 if( MB_SUCCESS != result ) return error( result ); 00312 nodeSet.range = subtract( nodeSet.range, nonowned ); 00313 00314 for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) 00315 { 00316 Range tmpset; 00317 result = myPcomm->filter_pstatus( eiter->range, PSTATUS_NOT_OWNED, PSTATUS_AND, -1, &tmpset ); 00318 if( MB_SUCCESS != result ) return error( result ); 00319 eiter->range = subtract( eiter->range, tmpset ); 00320 nonowned.merge( tmpset ); 00321 } 00322 00323 dbgOut.print( 3, "Post-interface mesh:\n" ); 00324 dbgOut.print( 3, nodeSet.range ); 00325 for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) 00326 dbgOut.print( 3, eiter->range ); 00327 dbgOut.print( 3, setSet.range ); 00328 00329 // END_SERIAL; 00330 00331 return MB_SUCCESS; 00332 } 00333 00334 ErrorCode WriteHDF5Parallel::parallel_create_file( const char* filename, 00335 bool overwrite, 00336 const std::vector< std::string >& qa_records, 00337 const FileOptions& opts, 00338 const Tag* user_tag_list, 00339 int user_tag_count, 00340 int dimension, 00341 double* times ) 00342 { 00343 ErrorCode rval; 00344 mhdf_Status status; 00345 00346 int pcomm_no = 0; 00347 opts.get_int_option( "PARALLEL_COMM", pcomm_no ); 00348 00349 myPcomm = ParallelComm::get_pcomm( iFace, pcomm_no ); 00350 if( 0 == myPcomm ) 00351 { 00352 myPcomm = new ParallelComm( iFace, MPI_COMM_WORLD ); 00353 pcommAllocated = true; 00354 } 00355 00356 MPI_Info info = MPI_INFO_NULL; 00357 std::string cb_size; 00358 rval = opts.get_str_option( "CB_BUFFER_SIZE", cb_size ); 00359 if( MB_SUCCESS == rval ) 00360 { 00361 MPI_Info_create( &info ); 00362 MPI_Info_set( info, const_cast< char* >( "cb_buffer_size" ), const_cast< char* >( cb_size.c_str() ) ); 00363 } 00364 00365 dbgOut.set_rank( myPcomm->proc_config().proc_rank() ); 00366 dbgOut.limit_output_to_first_N_procs( 32 ); 00367 Range nonlocal; 00368 debug_barrier(); 00369 dbgOut.tprint( 1, "Gathering interface meshes\n" ); 00370 rval = gather_interface_meshes( nonlocal ); 00371 if( MB_SUCCESS != rval ) return error( rval ); 00372 00373 /**************** Get tag names for sets likely to be shared ***********/ 00374 // debug_barrier(); 00375 // dbgOut.tprint(1, "Getting shared entity sets\n"); 00376 // rval = get_sharedset_tags(); 00377 // if (MB_SUCCESS != rval) return error(rval); 00378 00379 /**************** Create actual file and write meta info ***************/ 00380 00381 debug_barrier(); 00382 if( myPcomm->proc_config().proc_rank() == 0 ) 00383 { 00384 dbgOut.tprintf( 1, "Creating file: %s\n", filename ); 00385 00386 // Create the file 00387 const char* type_names[MBMAXTYPE]; 00388 memset( type_names, 0, MBMAXTYPE * sizeof( char* ) ); 00389 for( EntityType i = MBEDGE; i < MBENTITYSET; ++i ) 00390 type_names[i] = CN::EntityTypeName( i ); 00391 00392 dbgOut.tprint( 1, "call mhdf_createFile\n" ); 00393 filePtr = mhdf_createFile( filename, overwrite, type_names, MBMAXTYPE, id_type, &status ); 00394 if( !filePtr ) 00395 { 00396 MB_SET_ERR( MB_FAILURE, mhdf_message( &status ) ); 00397 } 00398 00399 dbgOut.tprint( 1, "call write_qa\n" ); 00400 rval = write_qa( qa_records ); 00401 if( MB_SUCCESS != rval ) return error( rval ); 00402 } 00403 00404 /**************** Create node coordinate table ***************/ 00405 CpuTimer timer; 00406 debug_barrier(); 00407 dbgOut.tprint( 1, "creating node table\n" ); 00408 topState.start( "creating node table" ); 00409 rval = create_node_table( dimension ); 00410 topState.end( rval ); 00411 if( MB_SUCCESS != rval ) return error( rval ); 00412 if( times ) times[CREATE_NODE_TIME] = timer.time_elapsed(); 00413 00414 /**************** Create element tables ***************/ 00415 00416 debug_barrier(); 00417 dbgOut.tprint( 1, "negotiating element types\n" ); 00418 topState.start( "negotiating element types" ); 00419 rval = negotiate_type_list(); 00420 topState.end( rval ); 00421 if( MB_SUCCESS != rval ) return error( rval ); 00422 if( times ) times[NEGOTIATE_TYPES_TIME] = timer.time_elapsed(); 00423 dbgOut.tprint( 1, "creating element table\n" ); 00424 topState.start( "creating element tables" ); 00425 rval = create_element_tables(); 00426 topState.end( rval ); 00427 if( MB_SUCCESS != rval ) return error( rval ); 00428 if( times ) times[CREATE_ELEM_TIME] = timer.time_elapsed(); 00429 00430 /*************** Exchange file IDs *****************/ 00431 00432 debug_barrier(); 00433 dbgOut.tprint( 1, "communicating file ids\n" ); 00434 topState.start( "communicating file ids" ); 00435 rval = exchange_file_ids( nonlocal ); 00436 topState.end( rval ); 00437 if( MB_SUCCESS != rval ) return error( rval ); 00438 if( times ) times[FILEID_EXCHANGE_TIME] = timer.time_elapsed(); 00439 00440 /**************** Create meshset tables *********************/ 00441 00442 debug_barrier(); 00443 dbgOut.tprint( 1, "creating meshset table\n" ); 00444 topState.start( "creating meshset tables" ); 00445 rval = create_meshset_tables( times ); 00446 topState.end( rval ); 00447 if( MB_SUCCESS != rval ) return error( rval ); 00448 if( times ) times[CREATE_SET_TIME] = timer.time_elapsed(); 00449 00450 /**************** Create adjacency tables *********************/ 00451 00452 debug_barrier(); 00453 dbgOut.tprint( 1, "creating adjacency table\n" ); 00454 topState.start( "creating adjacency tables" ); 00455 rval = create_adjacency_tables(); 00456 topState.end( rval ); 00457 if( MB_SUCCESS != rval ) return error( rval ); 00458 if( times ) times[CREATE_ADJ_TIME] = timer.time_elapsed(); 00459 00460 /**************** Create tag data *********************/ 00461 00462 debug_barrier(); 00463 dbgOut.tprint( 1, "creating tag tables\n" ); 00464 topState.start( "creating tag tables" ); 00465 rval = gather_tags( user_tag_list, user_tag_count ); 00466 if( MB_SUCCESS != rval ) return error( rval ); 00467 rval = create_tag_tables(); 00468 topState.end( rval ); 00469 if( MB_SUCCESS != rval ) return error( rval ); 00470 if( times ) times[CREATE_TAG_TIME] = timer.time_elapsed(); 00471 00472 /************** Close serial file and reopen parallel *****************/ 00473 00474 if( 0 == myPcomm->proc_config().proc_rank() ) mhdf_closeFile( filePtr, &status ); 00475 00476 MPI_Barrier( myPcomm->proc_config().proc_comm() ); 00477 dbgOut.tprint( 1, "(re)opening file in parallel mode\n" ); 00478 unsigned long junk; 00479 hid_t hdf_opt = H5Pcreate( H5P_FILE_ACCESS ); 00480 H5Pset_fapl_mpio( hdf_opt, myPcomm->proc_config().proc_comm(), info ); 00481 filePtr = mhdf_openFileWithOpt( filename, 1, &junk, id_type, hdf_opt, &status ); 00482 H5Pclose( hdf_opt ); 00483 if( !filePtr ) 00484 { 00485 MB_SET_ERR( MB_FAILURE, mhdf_message( &status ) ); 00486 } 00487 00488 if( collectiveIO ) 00489 { 00490 dbgOut.print( 1, "USING COLLECTIVE IO\n" ); 00491 writeProp = H5Pcreate( H5P_DATASET_XFER ); 00492 H5Pset_dxpl_mpio( writeProp, H5FD_MPIO_COLLECTIVE ); 00493 } 00494 00495 /* Test if we can use H5S_APPEND when selecting hyperslabs */ 00496 if( MB_SUCCESS != opts.get_null_option( "HYPERSLAB_OR" ) && 00497 ( MB_SUCCESS == opts.get_null_option( "HYPERSLAB_APPEND" ) || HDF5_can_append_hyperslabs() ) ) 00498 { 00499 dbgOut.print( 1, "HDF5 library supports H5Sselect_hyperlsab with H5S_SELECT_APPEND\n" ); 00500 hslabOp = H5S_SELECT_APPEND; 00501 } 00502 00503 dbgOut.tprint( 1, "Exiting parallel_create_file\n" ); 00504 return MB_SUCCESS; 00505 } 00506 00507 class TagNameCompare 00508 { 00509 Interface* iFace; 00510 std::string name1, name2; 00511 00512 public: 00513 TagNameCompare( Interface* iface ) : iFace( iface ) {} 00514 bool operator()( const WriteHDF5::TagDesc& t1, const WriteHDF5::TagDesc& t2 ); 00515 }; 00516 00517 bool TagNameCompare::operator()( const WriteHDF5::TagDesc& t1, const WriteHDF5::TagDesc& t2 ) 00518 { 00519 iFace->tag_get_name( t1.tag_id, name1 ); 00520 iFace->tag_get_name( t2.tag_id, name2 ); 00521 return name1 < name2; 00522 } 00523 00524 struct serial_tag_data 00525 { 00526 TagType storage; 00527 DataType type; 00528 int size; 00529 int name_len; 00530 int def_val_len; 00531 char name[sizeof( unsigned long )]; 00532 00533 static size_t pad( size_t len ) 00534 { 00535 if( len % sizeof( unsigned long ) ) 00536 return len + sizeof( unsigned long ) - len % sizeof( unsigned long ); 00537 else 00538 return len; 00539 } 00540 00541 static size_t def_val_bytes( int def_val_len, DataType type ) 00542 { 00543 switch( type ) 00544 { 00545 case MB_TYPE_BIT: 00546 return def_val_len ? 1 : 0; 00547 case MB_TYPE_OPAQUE: 00548 return def_val_len; 00549 case MB_TYPE_INTEGER: 00550 return def_val_len * sizeof( int ); 00551 case MB_TYPE_DOUBLE: 00552 return def_val_len * sizeof( double ); 00553 case MB_TYPE_HANDLE: 00554 return def_val_len * sizeof( EntityHandle ); 00555 } 00556 return 0; 00557 } 00558 00559 static size_t len( int name_len, int def_val_len, DataType type ) 00560 { 00561 return sizeof( serial_tag_data ) + pad( name_len + def_val_bytes( def_val_len, type ) ) - 00562 sizeof( unsigned long ); 00563 } 00564 size_t len() const 00565 { 00566 return len( name_len, def_val_len, type ); 00567 } 00568 void* default_value() 00569 { 00570 return def_val_len ? name + name_len : 0; 00571 } 00572 const void* default_value() const 00573 { 00574 return const_cast< serial_tag_data* >( this )->default_value(); 00575 } 00576 void set_default_value( const void* val ) 00577 { 00578 memcpy( default_value(), val, def_val_bytes( def_val_len, type ) ); 00579 } 00580 }; 00581 00582 ErrorCode WriteHDF5Parallel::append_serial_tag_data( std::vector< unsigned char >& buffer, 00583 const WriteHDF5::TagDesc& tag ) 00584 { 00585 ErrorCode rval; 00586 00587 std::string name; 00588 rval = iFace->tag_get_name( tag.tag_id, name ); 00589 if( MB_SUCCESS != rval ) return error( rval ); 00590 00591 // Get name length, including space for null char 00592 size_t name_len = name.size() + 1; 00593 if( name_len == 1 ) return MB_SUCCESS; // Skip tags with no name 00594 00595 DataType data_type; 00596 rval = iFace->tag_get_data_type( tag.tag_id, data_type ); 00597 if( MB_SUCCESS != rval ) return error( rval ); 00598 00599 // Get default value 00600 int def_val_len; 00601 const void* def_val; 00602 if( MB_SUCCESS != iFace->tag_get_default_value( tag.tag_id, def_val, def_val_len ) ) 00603 { 00604 def_val_len = 0; 00605 def_val = 0; 00606 } 00607 00608 // Allocate struct within buffer 00609 size_t init_size = buffer.size(); 00610 buffer.resize( init_size + serial_tag_data::len( name_len, def_val_len, data_type ) ); 00611 serial_tag_data* ptr = reinterpret_cast< serial_tag_data* >( &buffer[init_size] ); 00612 00613 // Populate struct 00614 rval = iFace->tag_get_type( tag.tag_id, ptr->storage ); 00615 if( MB_SUCCESS != rval ) return error( rval ); 00616 ptr->type = data_type; 00617 rval = iFace->tag_get_length( tag.tag_id, ptr->size ); 00618 if( MB_VARIABLE_DATA_LENGTH == rval ) 00619 ptr->size = MB_VARIABLE_LENGTH; 00620 else if( MB_SUCCESS != rval ) 00621 return error( rval ); 00622 ptr->name_len = name_len; 00623 Range range; 00624 memset( ptr->name, 0, ptr->name_len ); 00625 memcpy( ptr->name, name.data(), name.size() ); 00626 ptr->def_val_len = def_val_len; 00627 ptr->set_default_value( def_val ); 00628 00629 return MB_SUCCESS; 00630 } 00631 00632 ErrorCode WriteHDF5Parallel::check_serial_tag_data( const std::vector< unsigned char >& buffer, 00633 std::vector< TagDesc* >* missing, 00634 std::vector< TagDesc* >* newlist ) 00635 { 00636 ErrorCode rval; 00637 00638 // Use 'write_sparse' field as a 'visited' mark 00639 std::list< TagDesc >::iterator tag_iter; 00640 if( missing ) 00641 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) 00642 tag_iter->write_sparse = true; 00643 00644 // Use a set as a temporary for what will ultimately go in 00645 // newlist because we need to pass back newlist in the order 00646 // of the tagList member. 00647 std::set< TagDesc* > newset; 00648 00649 // Iterate over data from, updating the local list of tags. 00650 // Be careful to keep tagList sorted such that in the end all 00651 // procs have the same list in the same order. 00652 std::vector< unsigned char >::const_iterator diter = buffer.begin(); 00653 tag_iter = tagList.begin(); 00654 while( diter < buffer.end() ) 00655 { 00656 // Get struct from buffer 00657 const serial_tag_data* ptr = reinterpret_cast< const serial_tag_data* >( &*diter ); 00658 00659 // Find local struct for tag 00660 std::string name( ptr->name ); 00661 std::string n; 00662 iFace->tag_get_name( tag_iter->tag_id, n ); // Second time we've called, so shouldn't fail 00663 if( n > name ) 00664 { 00665 tag_iter = tagList.begin(); // New proc, start search from beginning 00666 } 00667 iFace->tag_get_name( tag_iter->tag_id, n ); 00668 while( n < name ) 00669 { 00670 ++tag_iter; 00671 if( tag_iter == tagList.end() ) break; 00672 iFace->tag_get_name( tag_iter->tag_id, n ); 00673 } 00674 if( tag_iter == tagList.end() || n != name ) 00675 { // New tag 00676 TagDesc newtag; 00677 00678 if( ptr->size == MB_VARIABLE_LENGTH ) 00679 rval = iFace->tag_get_handle( name.c_str(), ptr->def_val_len, ptr->type, newtag.tag_id, 00680 MB_TAG_VARLEN | MB_TAG_CREAT | ptr->storage, ptr->default_value() ); 00681 else 00682 rval = iFace->tag_get_handle( name.c_str(), ptr->size, ptr->type, newtag.tag_id, 00683 MB_TAG_CREAT | ptr->storage, ptr->default_value() ); 00684 if( MB_SUCCESS != rval ) return error( rval ); 00685 00686 newtag.sparse_offset = 0; 00687 newtag.var_data_offset = 0; 00688 newtag.write_sparse = false; 00689 newtag.max_num_ents = 0; 00690 newtag.max_num_vals = 0; 00691 00692 tag_iter = tagList.insert( tag_iter, newtag ); 00693 if( newlist ) newset.insert( &*tag_iter ); 00694 } 00695 else 00696 { // Check that tag is as expected 00697 DataType type; 00698 iFace->tag_get_data_type( tag_iter->tag_id, type ); 00699 if( type != ptr->type ) 00700 { 00701 MB_SET_ERR( MB_FAILURE, "Processes have inconsistent data type for tag \"" << name << "\"" ); 00702 } 00703 int size; 00704 iFace->tag_get_length( tag_iter->tag_id, size ); 00705 if( size != ptr->size ) 00706 { 00707 MB_SET_ERR( MB_FAILURE, "Processes have inconsistent size for tag \"" << name << "\"" ); 00708 } 00709 tag_iter->write_sparse = false; 00710 } 00711 00712 // Step to next variable-length struct. 00713 diter += ptr->len(); 00714 } 00715 00716 // Now pass back any local tags that weren't in the buffer 00717 if( missing ) 00718 { 00719 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) 00720 { 00721 if( tag_iter->write_sparse ) 00722 { 00723 tag_iter->write_sparse = false; 00724 missing->push_back( &*tag_iter ); 00725 } 00726 } 00727 } 00728 00729 // Be careful to populate newlist in the same, sorted, order as tagList 00730 if( newlist ) 00731 { 00732 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) 00733 if( newset.find( &*tag_iter ) != newset.end() ) newlist->push_back( &*tag_iter ); 00734 } 00735 00736 return MB_SUCCESS; 00737 } 00738 00739 static void set_bit( int position, unsigned char* bytes ) 00740 { 00741 int byte = position / 8; 00742 int bit = position % 8; 00743 bytes[byte] |= ( ( (unsigned char)1 ) << bit ); 00744 } 00745 00746 static bool get_bit( int position, const unsigned char* bytes ) 00747 { 00748 int byte = position / 8; 00749 int bit = position % 8; 00750 return 0 != ( bytes[byte] & ( ( (unsigned char)1 ) << bit ) ); 00751 } 00752 00753 ErrorCode WriteHDF5Parallel::create_tag_tables() 00754 { 00755 std::list< TagDesc >::iterator tag_iter; 00756 ErrorCode rval; 00757 int err; 00758 const int rank = myPcomm->proc_config().proc_rank(); 00759 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 00760 00761 subState.start( "negotiating tag list" ); 00762 00763 dbgOut.tprint( 1, "communicating tag metadata\n" ); 00764 00765 dbgOut.printf( 2, "Exchanging tag data for %d tags.\n", (int)tagList.size() ); 00766 00767 // Sort tagList contents in alphabetical order by tag name 00768 tagList.sort( TagNameCompare( iFace ) ); 00769 00770 // Negotiate total list of tags to write 00771 00772 // Build concatenated list of all tag data 00773 std::vector< unsigned char > tag_buffer; 00774 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) 00775 { 00776 rval = append_serial_tag_data( tag_buffer, *tag_iter ); 00777 CHECK_MB( rval ); 00778 } 00779 00780 // Broadcast list from root to all other procs 00781 unsigned long size = tag_buffer.size(); 00782 err = MPI_Bcast( &size, 1, MPI_UNSIGNED_LONG, 0, comm ); 00783 CHECK_MPI( err ); 00784 tag_buffer.resize( size ); 00785 err = MPI_Bcast( &tag_buffer[0], size, MPI_UNSIGNED_CHAR, 0, comm ); 00786 CHECK_MPI( err ); 00787 00788 // Update local tag list 00789 std::vector< TagDesc* > missing; 00790 rval = check_serial_tag_data( tag_buffer, &missing, 0 ); 00791 CHECK_MB( rval ); 00792 00793 // Check if we're done (0->done, 1->more, 2+->error) 00794 int code, lcode = ( MB_SUCCESS != rval ) ? rval + 2 : missing.empty() ? 0 : 1; 00795 err = MPI_Allreduce( &lcode, &code, 1, MPI_INT, MPI_MAX, comm ); 00796 CHECK_MPI( err ); 00797 if( code > 1 ) 00798 { 00799 MB_SET_ERR_CONT( "Inconsistent tag definitions between procs" ); 00800 return error( (ErrorCode)( code - 2 ) ); 00801 } 00802 00803 // If not done... 00804 if( code ) 00805 { 00806 dbgOut.print( 1, "Not all procs had same tag definitions, negotiating...\n" ); 00807 00808 // Get tags defined on this proc but not on root proc 00809 tag_buffer.clear(); 00810 for( size_t i = 0; i < missing.size(); ++i ) 00811 { 00812 rval = append_serial_tag_data( tag_buffer, *missing[i] ); 00813 CHECK_MB( rval ); 00814 } 00815 00816 // Gather extra tag definitions on root processor 00817 std::vector< int > junk; // don't care how many from each proc 00818 assert( rank || tag_buffer.empty() ); // must be empty on root 00819 err = my_Gatherv( &tag_buffer[0], tag_buffer.size(), MPI_UNSIGNED_CHAR, tag_buffer, junk, 0, comm ); 00820 CHECK_MPI( err ); 00821 00822 // Process serialized tag descriptions on root, and 00823 rval = MB_SUCCESS; 00824 if( 0 == rank ) 00825 { 00826 // Process serialized tag descriptions on root, and 00827 std::vector< TagDesc* > newlist; 00828 rval = check_serial_tag_data( tag_buffer, 0, &newlist ); 00829 tag_buffer.clear(); 00830 // re-serialize a unique list of new tag definitions 00831 for( size_t i = 0; MB_SUCCESS == rval && i != newlist.size(); ++i ) 00832 { 00833 rval = append_serial_tag_data( tag_buffer, *newlist[i] ); 00834 CHECK_MB( rval ); 00835 } 00836 } 00837 00838 // Broadcast any new tag definitions from root to other procs 00839 long this_size = tag_buffer.size(); 00840 if( MB_SUCCESS != rval ) this_size = -rval; 00841 err = MPI_Bcast( &this_size, 1, MPI_LONG, 0, comm ); 00842 CHECK_MPI( err ); 00843 if( this_size < 0 ) 00844 { 00845 MB_SET_ERR_CONT( "Inconsistent tag definitions between procs" ); 00846 return error( (ErrorCode)-this_size ); 00847 } 00848 tag_buffer.resize( this_size ); 00849 err = MPI_Bcast( &tag_buffer[0], this_size, MPI_UNSIGNED_CHAR, 0, comm ); 00850 CHECK_MPI( err ); 00851 00852 // Process new tag definitions 00853 rval = check_serial_tag_data( tag_buffer, 0, 0 ); 00854 CHECK_MB( rval ); 00855 } 00856 00857 subState.end(); 00858 subState.start( "negotiate which element/tag combinations are dense" ); 00859 00860 // Figure out for which tag/element combinations we can 00861 // write dense tag data. 00862 00863 // Construct a table of bits, 00864 // where each row of the table corresponds to a tag 00865 // and each column to an element group. 00866 00867 // Two extra, because first is nodes and last is sets. 00868 // (n+7)/8 is ceil(n/8) 00869 const int bytes_per_tag = ( exportList.size() + 9 ) / 8; 00870 std::vector< unsigned char > data( bytes_per_tag * tagList.size(), 0 ); 00871 std::vector< unsigned char > recv( data.size(), 0 ); 00872 unsigned char* iter = &data[0]; 00873 if( writeTagDense && !data.empty() ) 00874 { 00875 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter, iter += bytes_per_tag ) 00876 { 00877 00878 Range tagged; 00879 rval = get_sparse_tagged_entities( *tag_iter, tagged ); 00880 CHECK_MB( rval ); 00881 00882 int s; 00883 if( MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s ) ) continue; 00884 00885 std::string n; 00886 iFace->tag_get_name( tag_iter->tag_id, 00887 n ); // Second time we've called, so shouldn't fail 00888 00889 // Check if we want to write this tag in dense format even if not 00890 // all of the entities have a tag value. The criterion of this 00891 // is that the tag be dense, have a default value, and have at 00892 // least 2/3 of the entities tagged. 00893 bool prefer_dense = false; 00894 TagType type; 00895 rval = iFace->tag_get_type( tag_iter->tag_id, type ); 00896 CHECK_MB( rval ); 00897 if( MB_TAG_DENSE == type ) 00898 { 00899 const void* defval = 0; 00900 rval = iFace->tag_get_default_value( tag_iter->tag_id, defval, s ); 00901 if( MB_SUCCESS == rval ) prefer_dense = true; 00902 } 00903 00904 int i = 0; 00905 if( check_dense_format_tag( nodeSet, tagged, prefer_dense ) ) 00906 { 00907 set_bit( i, iter ); 00908 dbgOut.printf( 2, "Can write dense data for \"%s\"/Nodes\n", n.c_str() ); 00909 } 00910 std::list< ExportSet >::const_iterator ex_iter = exportList.begin(); 00911 for( ++i; ex_iter != exportList.end(); ++i, ++ex_iter ) 00912 { 00913 // when writing in parallel, on some partitions, some of these element ranges might 00914 // be empty so do not turn this tag as sparse, just because of that, leave it dense, 00915 // if we prefer dense 00916 if( ( prefer_dense && ex_iter->range.empty() ) || 00917 check_dense_format_tag( *ex_iter, tagged, prefer_dense ) ) 00918 { 00919 set_bit( i, iter ); 00920 dbgOut.printf( 2, "Can write dense data for \"%s\"/%s\n", n.c_str(), ex_iter->name() ); 00921 } 00922 } 00923 if( check_dense_format_tag( setSet, tagged, prefer_dense ) ) 00924 { 00925 set_bit( i, iter ); 00926 dbgOut.printf( 2, "Can write dense data for \"%s\"/Sets\n", n.c_str() ); 00927 } 00928 } 00929 00930 // Do bit-wise AND of list over all processors (only write dense format 00931 // if all proccesses want dense format for this group of entities). 00932 err = MPI_Allreduce( &data[0], &recv[0], data.size(), MPI_UNSIGNED_CHAR, MPI_BAND, 00933 myPcomm->proc_config().proc_comm() ); 00934 CHECK_MPI( err ); 00935 } // if (writeTagDense) 00936 00937 // Store initial counts for sparse-formatted tag data. 00938 // The total number of values to send and receive will be the number of 00939 // tags plus the number of var-len tags because we need to negotiate 00940 // offsets into two different tables for the var-len tags. 00941 std::vector< long > counts; 00942 00943 // Record dense tag/element combinations 00944 iter = &recv[0]; 00945 const unsigned char* iter2 = &data[0]; 00946 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); 00947 ++tag_iter, iter += bytes_per_tag, iter2 += bytes_per_tag ) 00948 { 00949 00950 Range tagged; 00951 rval = get_sparse_tagged_entities( *tag_iter, tagged ); 00952 CHECK_MB( rval ); 00953 00954 std::string n; 00955 iFace->tag_get_name( tag_iter->tag_id, n ); // Second time we've called, so shouldn't fail 00956 00957 int i = 0; 00958 if( get_bit( i, iter ) ) 00959 { 00960 assert( get_bit( i, iter2 ) ); 00961 tag_iter->dense_list.push_back( nodeSet ); 00962 tagged -= nodeSet.range; 00963 dbgOut.printf( 2, "Will write dense data for \"%s\"/Nodes\n", n.c_str() ); 00964 } 00965 std::list< ExportSet >::const_iterator ex_iter = exportList.begin(); 00966 for( ++i; ex_iter != exportList.end(); ++i, ++ex_iter ) 00967 { 00968 if( get_bit( i, iter ) ) 00969 { 00970 assert( get_bit( i, iter2 ) ); 00971 tag_iter->dense_list.push_back( *ex_iter ); 00972 dbgOut.printf( 2, "WIll write dense data for \"%s\"/%s\n", n.c_str(), ex_iter->name() ); 00973 tagged -= ex_iter->range; 00974 } 00975 } 00976 if( get_bit( i, iter ) ) 00977 { 00978 assert( get_bit( i, iter2 ) ); 00979 tag_iter->dense_list.push_back( setSet ); 00980 dbgOut.printf( 2, "Will write dense data for \"%s\"/Sets\n", n.c_str() ); 00981 tagged -= setSet.range; 00982 } 00983 00984 counts.push_back( tagged.size() ); 00985 00986 int s; 00987 if( MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s ) ) 00988 { 00989 unsigned long data_len; 00990 rval = get_tag_data_length( *tag_iter, tagged, data_len ); 00991 CHECK_MB( rval ); 00992 counts.push_back( data_len ); 00993 } 00994 } 00995 00996 subState.end(); 00997 subState.start( "Negotiate offsets for sparse tag info" ); 00998 00999 std::vector< long > offsets( counts.size() ), maxima( counts.size() ), totals( counts.size() ); 01000 rval = create_dataset( counts.size(), &counts[0], &offsets[0], &maxima[0], &totals[0] ); 01001 CHECK_MB( rval ); 01002 01003 // Copy values into local structs and if root then create tables 01004 size_t idx = 0; 01005 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter, ++idx ) 01006 { 01007 assert( idx < counts.size() ); 01008 tag_iter->sparse_offset = offsets[idx]; 01009 tag_iter->max_num_ents = maxima[idx]; 01010 tag_iter->write_sparse = ( 0 != totals[idx] ); 01011 int s; 01012 if( MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s ) ) 01013 { 01014 ++idx; 01015 assert( idx < counts.size() ); 01016 tag_iter->var_data_offset = offsets[idx]; 01017 tag_iter->max_num_vals = maxima[idx]; 01018 } 01019 else 01020 { 01021 tag_iter->var_data_offset = 0; 01022 tag_iter->max_num_vals = 0; 01023 } 01024 } 01025 01026 subState.end(); 01027 01028 // Create tag tables on root process 01029 if( 0 == myPcomm->proc_config().proc_rank() ) 01030 { 01031 size_t iidx = 0; 01032 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter, ++iidx ) 01033 { 01034 assert( iidx < totals.size() ); 01035 unsigned long num_ents = totals[iidx]; 01036 unsigned long num_val = 0; 01037 int s; 01038 if( MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s ) ) 01039 { 01040 ++iidx; 01041 assert( iidx < totals.size() ); 01042 num_val = totals[iidx]; 01043 } 01044 dbgOut.printf( 2, "Writing tag description for tag 0x%lx with %lu values\n", 01045 (unsigned long)tag_iter->tag_id, num_val ? num_val : num_ents ); 01046 01047 rval = create_tag( *tag_iter, num_ents, num_val ); 01048 if( MB_SUCCESS != rval ) return error( rval ); 01049 } 01050 } 01051 01052 if( dbgOut.get_verbosity() > 1 ) 01053 { 01054 dbgOut.printf( 2, "Tags: %12s %8s %8s %8s %8s %8s\n", "Name", "Count", "Offset", "Var Off", "Max Ent", 01055 "Handle" ); 01056 01057 for( tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter ) 01058 { 01059 std::string name; 01060 iFace->tag_get_name( tag_iter->tag_id, name ); 01061 size_t this_size; 01062 get_num_sparse_tagged_entities( *tag_iter, this_size ); 01063 dbgOut.printf( 2, "%18s %8lu %8lu %8lu %8lu 0x%7lx\n", name.c_str(), (unsigned long)this_size, 01064 (unsigned long)tag_iter->sparse_offset, (unsigned long)tag_iter->var_data_offset, 01065 (unsigned long)tag_iter->max_num_ents, (unsigned long)tag_iter->tag_id ); 01066 } 01067 } 01068 01069 return MB_SUCCESS; 01070 } 01071 01072 struct DatasetVals 01073 { 01074 long start_id; 01075 long max_count; 01076 long total; 01077 }; 01078 STATIC_ASSERT( ( sizeof( DatasetVals ) == 3 * sizeof( long ) ) ); 01079 01080 ErrorCode WriteHDF5Parallel::create_dataset( int num_datasets, 01081 const long* num_owned, 01082 long* offsets_out, 01083 long* max_proc_entities, 01084 long* total_entities, 01085 const DataSetCreator& creator, 01086 ExportSet* groups[], 01087 wid_t* first_ids_out ) 01088 { 01089 int result; 01090 ErrorCode rval; 01091 const unsigned rank = myPcomm->proc_config().proc_rank(); 01092 const unsigned nproc = myPcomm->proc_config().proc_size(); 01093 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 01094 01095 // Gather entity counts for each processor on root 01096 std::vector< long > counts( rank ? 0 : nproc * num_datasets ); 01097 (void)VALGRIND_CHECK_MEM_IS_DEFINED( &num_owned, sizeof( long ) ); 01098 result = MPI_Gather( const_cast< long* >( num_owned ), num_datasets, MPI_LONG, &counts[0], num_datasets, MPI_LONG, 01099 0, comm ); 01100 CHECK_MPI( result ); 01101 01102 // Create node data in file 01103 DatasetVals zero_val = { 0, 0, 0 }; 01104 std::vector< DatasetVals > cumulative( num_datasets, zero_val ); 01105 if( rank == 0 ) 01106 { 01107 for( unsigned i = 0; i < nproc; i++ ) 01108 { 01109 const long* proc_data = &counts[i * num_datasets]; 01110 for( int index = 0; index < num_datasets; ++index ) 01111 { 01112 cumulative[index].total += proc_data[index]; 01113 if( proc_data[index] > cumulative[index].max_count ) cumulative[index].max_count = proc_data[index]; 01114 } 01115 } 01116 01117 for( int index = 0; index < num_datasets; ++index ) 01118 { 01119 if( cumulative[index].total ) 01120 { 01121 rval = creator( this, cumulative[index].total, groups ? groups[index] : 0, cumulative[index].start_id ); 01122 CHECK_MB( rval ); 01123 } 01124 else 01125 { 01126 cumulative[index].start_id = -1; 01127 } 01128 } 01129 } 01130 01131 // Send id offset to every proc 01132 result = MPI_Bcast( (void*)&cumulative[0], 3 * num_datasets, MPI_LONG, 0, comm ); 01133 CHECK_MPI( result ); 01134 for( int index = 0; index < num_datasets; ++index ) 01135 { 01136 if( first_ids_out ) first_ids_out[index] = (wid_t)cumulative[index].start_id; 01137 max_proc_entities[index] = cumulative[index].max_count; 01138 total_entities[index] = cumulative[index].total; 01139 } 01140 01141 // Convert array of per-process counts to per-process offsets 01142 if( rank == 0 ) 01143 { 01144 // Initialize prev_size with data sizes for root process 01145 std::vector< long > prev_size( counts.begin(), counts.begin() + num_datasets ); 01146 // Root process gets offset zero 01147 std::fill( counts.begin(), counts.begin() + num_datasets, 0L ); 01148 // For each proc other than this one (root) 01149 for( unsigned i = 1; i < nproc; ++i ) 01150 { 01151 // Get pointer to offsets for previous process in list 01152 long* prev_data = &counts[( i - 1 ) * num_datasets]; 01153 // Get pointer to offsets for this process in list 01154 long* proc_data = &counts[i * num_datasets]; 01155 // For each data set 01156 for( int j = 0; j < num_datasets; ++j ) 01157 { 01158 // Get size of data in dataset from process i 01159 long mysize = proc_data[j]; 01160 // Offset for process i is offset of previous process plus 01161 // number of values previous process will write 01162 proc_data[j] = prev_data[j] + prev_size[j]; 01163 // Store my size, as it is no longer available in 'counts' 01164 prev_size[j] = mysize; 01165 } 01166 } 01167 } 01168 01169 // Send each proc it's offset in the table 01170 if( rank == 0 ) 01171 { 01172 (void)VALGRIND_CHECK_MEM_IS_DEFINED( &counts[0], num_datasets * nproc * sizeof( long ) ); 01173 } 01174 result = MPI_Scatter( &counts[0], num_datasets, MPI_LONG, offsets_out, num_datasets, MPI_LONG, 0, comm ); 01175 CHECK_MPI( result ); 01176 01177 return MB_SUCCESS; 01178 } 01179 01180 ErrorCode WriteHDF5Parallel::create_node_table( int dimension ) 01181 { 01182 nodeSet.num_nodes = dimension; // Put it here so NodeSetCreator can access it 01183 struct NodeSetCreator : public DataSetCreator 01184 { 01185 ErrorCode operator()( WriteHDF5* file, long count, const ExportSet* group, long& start_id ) const 01186 { 01187 mhdf_Status status; 01188 hid_t handle = mhdf_createNodeCoords( file->file_ptr(), group->num_nodes, count, &start_id, &status ); 01189 CHECK_HDFN( status ); 01190 mhdf_closeData( file->file_ptr(), handle, &status ); 01191 CHECK_HDFN( status ); 01192 return MB_SUCCESS; 01193 } 01194 }; 01195 01196 const long count = nodeSet.range.size(); 01197 ExportSet* array[] = { &nodeSet }; 01198 ErrorCode rval = create_dataset( 1, &count, &nodeSet.offset, &nodeSet.max_num_ents, &nodeSet.total_num_ents, 01199 NodeSetCreator(), array, &nodeSet.first_id ); 01200 CHECK_MB( rval ); 01201 return assign_ids( nodeSet.range, nodeSet.first_id + nodeSet.offset ); 01202 } 01203 01204 struct elemtype 01205 { 01206 int mbtype; 01207 int numnode; 01208 01209 elemtype( int vals[2] ) : mbtype( vals[0] ), numnode( vals[1] ) {} 01210 elemtype( int t, int n ) : mbtype( t ), numnode( n ) {} 01211 01212 bool operator==( const elemtype& other ) const 01213 { 01214 return mbtype == other.mbtype && ( mbtype == MBENTITYSET || numnode == other.numnode ); 01215 } 01216 bool operator<( const elemtype& other ) const 01217 { 01218 if( mbtype > other.mbtype ) return false; 01219 01220 return mbtype < other.mbtype || ( mbtype != MBENTITYSET && numnode < other.numnode ); 01221 } 01222 bool operator!=( const elemtype& other ) const 01223 { 01224 return !this->operator==( other ); 01225 } 01226 }; 01227 01228 ErrorCode WriteHDF5Parallel::negotiate_type_list() 01229 { 01230 int result; 01231 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 01232 01233 exportList.sort(); 01234 int num_types = exportList.size(); 01235 01236 // Get list of types on this processor 01237 typedef std::vector< std::pair< int, int > > typelist; 01238 typelist my_types( num_types ); 01239 (void)VALGRIND_MAKE_VEC_UNDEFINED( my_types ); 01240 typelist::iterator viter = my_types.begin(); 01241 for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) 01242 { 01243 viter->first = eiter->type; 01244 viter->second = eiter->num_nodes; 01245 ++viter; 01246 } 01247 01248 dbgOut.print( 2, "Local Element Types:\n" ); 01249 for( viter = my_types.begin(); viter != my_types.end(); ++viter ) 01250 { 01251 int type = viter->first; 01252 int count = viter->second; 01253 dbgOut.printf( 2, " %s : %d\n", CN::EntityTypeName( (EntityType)type ), count ); 01254 } 01255 01256 // Broadcast number of types from root to all nodes 01257 int num_types0 = num_types; 01258 result = MPI_Bcast( &num_types0, 1, MPI_INT, 0, comm ); 01259 CHECK_MPI( result ); 01260 // Broadcast type list from root to all nodes 01261 typelist root_types( num_types0 ); 01262 if( 0 == myPcomm->proc_config().proc_rank() ) root_types = my_types; 01263 result = MPI_Bcast( (void*)&root_types[0], 2 * num_types0, MPI_INT, 0, comm ); 01264 CHECK_MPI( result ); 01265 01266 // Build local list of any types that root did not know about 01267 typelist non_root_types; 01268 viter = root_types.begin(); 01269 for( typelist::iterator iter = my_types.begin(); iter != my_types.end(); ++iter ) 01270 { 01271 if( viter == root_types.end() || *viter != *iter ) 01272 non_root_types.push_back( *iter ); 01273 else 01274 ++viter; 01275 } 01276 01277 // Determine if any process had types not defined on the root 01278 int non_root_count = non_root_types.size(); 01279 int not_done; 01280 result = MPI_Allreduce( &non_root_count, ¬_done, 1, MPI_INT, MPI_LOR, comm ); 01281 CHECK_MPI( result ); 01282 if( not_done ) 01283 { 01284 // Get number of types each processor has that root does not 01285 std::vector< int > counts( myPcomm->proc_config().proc_size() ); 01286 int two_count = 2 * non_root_count; 01287 result = MPI_Gather( &two_count, 1, MPI_INT, &counts[0], 1, MPI_INT, 0, comm ); 01288 CHECK_MPI( result ); 01289 01290 // Get list of types from each processor 01291 std::vector< int > displs( myPcomm->proc_config().proc_size() + 1 ); 01292 (void)VALGRIND_MAKE_VEC_UNDEFINED( displs ); 01293 displs[0] = 0; 01294 for( unsigned long i = 1; i <= myPcomm->proc_config().proc_size(); ++i ) 01295 displs[i] = displs[i - 1] + counts[i - 1]; 01296 int total = displs[myPcomm->proc_config().proc_size()]; 01297 typelist alltypes( total / 2 ); 01298 (void)VALGRIND_MAKE_VEC_UNDEFINED( alltypes ); 01299 (void)VALGRIND_CHECK_MEM_IS_DEFINED( &non_root_types[0], non_root_types.size() * sizeof( int ) ); 01300 result = MPI_Gatherv( (void*)&non_root_types[0], 2 * non_root_count, MPI_INT, (int*)&alltypes[0], &counts[0], 01301 &displs[0], MPI_INT, 0, comm ); 01302 CHECK_MPI( result ); 01303 01304 // Merge type lists. 01305 // Prefer O(n) insertions with O(ln n) search time because 01306 // we expect data from a potentially large number of processes, 01307 // but with a small total number of element types. 01308 if( 0 == myPcomm->proc_config().proc_rank() ) 01309 { 01310 for( viter = alltypes.begin(); viter != alltypes.end(); ++viter ) 01311 { 01312 typelist::iterator titer = std::lower_bound( my_types.begin(), my_types.end(), *viter ); 01313 if( titer == my_types.end() || *titer != *viter ) my_types.insert( titer, *viter ); 01314 } 01315 01316 dbgOut.print( 2, "Global Element Types:\n" ); 01317 for( viter = my_types.begin(); viter != my_types.end(); ++viter ) 01318 dbgOut.printf( 2, " %s : %d\n", CN::EntityTypeName( (EntityType)viter->first ), viter->second ); 01319 } 01320 01321 // Send total number of types to each processor 01322 total = my_types.size(); 01323 result = MPI_Bcast( &total, 1, MPI_INT, 0, comm ); 01324 CHECK_MPI( result ); 01325 01326 // Send list of types to each processor 01327 my_types.resize( total ); 01328 result = MPI_Bcast( (void*)&my_types[0], 2 * total, MPI_INT, 0, comm ); 01329 CHECK_MPI( result ); 01330 } 01331 else 01332 { 01333 // Special case: if root had types but some subset of procs did not 01334 // have those types, but there are no types that the root doesn't 01335 // know about then we still need to update processes that are missing 01336 // types. 01337 my_types.swap( root_types ); 01338 } 01339 01340 // Insert missing types into exportList, with an empty 01341 // range of entities to export. 01342 std::list< ExportSet >::iterator ex_iter = exportList.begin(); 01343 for( viter = my_types.begin(); viter != my_types.end(); ++viter ) 01344 { 01345 while( ex_iter != exportList.end() && *ex_iter < *viter ) 01346 ++ex_iter; 01347 01348 if( ex_iter == exportList.end() || !( *ex_iter == *viter ) ) 01349 { 01350 ExportSet insert; 01351 insert.type = (EntityType)viter->first; 01352 insert.num_nodes = viter->second; 01353 insert.first_id = 0; 01354 insert.offset = 0; 01355 insert.adj_offset = 0; 01356 ex_iter = exportList.insert( ex_iter, insert ); 01357 } 01358 } 01359 01360 return MB_SUCCESS; 01361 } 01362 01363 ErrorCode WriteHDF5Parallel::create_element_tables() 01364 { 01365 struct ElemSetCreator : public DataSetCreator 01366 { 01367 ErrorCode operator()( WriteHDF5* file, long size, const ExportSet* ex, long& start_id ) const 01368 { 01369 return file->create_elem_table( *ex, size, start_id ); 01370 } 01371 }; 01372 01373 const int numtypes = exportList.size(); 01374 std::vector< ExportSet* > groups( numtypes ); 01375 std::vector< long > counts( numtypes ), offsets( numtypes ), max_ents( numtypes ), total_ents( numtypes ); 01376 std::vector< wid_t > start_ids( numtypes ); 01377 01378 size_t idx = 0; 01379 std::list< ExportSet >::iterator ex_iter; 01380 for( ex_iter = exportList.begin(); ex_iter != exportList.end(); ++ex_iter, ++idx ) 01381 { 01382 groups[idx] = &*ex_iter; 01383 counts[idx] = ex_iter->range.size(); 01384 } 01385 ErrorCode rval = create_dataset( numtypes, &counts[0], &offsets[0], &max_ents[0], &total_ents[0], ElemSetCreator(), 01386 &groups[0], &start_ids[0] ); 01387 CHECK_MB( rval ); 01388 01389 for( idx = 0, ex_iter = exportList.begin(); ex_iter != exportList.end(); ++ex_iter, ++idx ) 01390 { 01391 ex_iter->first_id = start_ids[idx]; 01392 ex_iter->offset = offsets[idx]; 01393 ex_iter->max_num_ents = max_ents[idx]; 01394 ex_iter->total_num_ents = total_ents[idx]; 01395 rval = assign_ids( ex_iter->range, ex_iter->first_id + ex_iter->offset ); 01396 CHECK_MB( rval ); 01397 } 01398 01399 return MB_SUCCESS; 01400 } 01401 01402 ErrorCode WriteHDF5Parallel::create_adjacency_tables() 01403 { 01404 struct AdjSetCreator : public DataSetCreator 01405 { 01406 ErrorCode operator()( WriteHDF5* file, long size, const ExportSet* ex, long& start_id ) const 01407 { 01408 mhdf_Status status; 01409 hid_t handle = mhdf_createAdjacency( file->file_ptr(), ex->name(), size, &status ); 01410 CHECK_HDFN( status ); 01411 mhdf_closeData( file->file_ptr(), handle, &status ); 01412 CHECK_HDFN( status ); 01413 start_id = -1; 01414 return MB_SUCCESS; 01415 } 01416 }; 01417 01418 std::vector< ExportSet* > groups; 01419 #ifdef WRITE_NODE_ADJACENCIES 01420 groups.push_back( &nodeSet ); 01421 #endif 01422 for( std::list< ExportSet >::iterator ex_iter = exportList.begin(); ex_iter != exportList.end(); ++ex_iter ) 01423 groups.push_back( &*ex_iter ); 01424 01425 ErrorCode rval; 01426 const int numtypes = groups.size(); 01427 std::vector< long > counts( numtypes ); 01428 std::vector< long > offsets( numtypes ); 01429 std::vector< long > max_ents( numtypes ); 01430 std::vector< long > totals( numtypes ); 01431 for( int i = 0; i < numtypes; ++i ) 01432 { 01433 wid_t count; 01434 rval = count_adjacencies( groups[i]->range, count ); 01435 CHECK_MB( rval ); 01436 counts[i] = count; 01437 } 01438 01439 rval = create_dataset( numtypes, &counts[0], &offsets[0], &max_ents[0], &totals[0], AdjSetCreator(), &groups[0] ); 01440 CHECK_MB( rval ); 01441 01442 // Cppcheck warning (false positive): variable groups is assigned a value that is never used 01443 for( int i = 0; i < numtypes; ++i ) 01444 { 01445 groups[i]->max_num_adjs = max_ents[i]; 01446 groups[i]->adj_offset = offsets[i]; 01447 } 01448 return MB_SUCCESS; 01449 } 01450 01451 const unsigned SSVB = 3; 01452 01453 void WriteHDF5Parallel::print_set_sharing_data( const Range& range, const char* label, Tag idt ) 01454 { 01455 dbgOut.printf( SSVB, "set\tid\towner\t%-*s\tfid\tshared\n", (int)( sizeof( EntityHandle ) * 2 ), "handle" ); 01456 for( Range::iterator it = range.begin(); it != range.end(); ++it ) 01457 { 01458 int id; 01459 iFace->tag_get_data( idt, &*it, 1, &id ); 01460 EntityHandle handle = 0; 01461 unsigned owner = 0; 01462 wid_t file_id = 0; 01463 myPcomm->get_entityset_owner( *it, owner, &handle ); 01464 if( !idMap.find( *it, file_id ) ) file_id = 0; 01465 dbgOut.printf( SSVB, "%s\t%d\t%u\t%lx\t%lu\t", label, id, owner, (unsigned long)handle, 01466 (unsigned long)file_id ); 01467 std::vector< unsigned > procs; 01468 myPcomm->get_entityset_procs( *it, procs ); 01469 if( procs.empty() ) 01470 dbgOut.print( SSVB, "<none>\n" ); 01471 else 01472 { 01473 for( unsigned i = 0; i < procs.size() - 1; ++i ) 01474 dbgOut.printf( SSVB, "%u,", procs[i] ); 01475 dbgOut.printf( SSVB, "%u\n", procs.back() ); 01476 } 01477 } 01478 } 01479 01480 void WriteHDF5Parallel::print_shared_sets() 01481 { 01482 const char* tag_names[][2] = { { MATERIAL_SET_TAG_NAME, "block" }, 01483 { DIRICHLET_SET_TAG_NAME, "nodeset" }, 01484 { NEUMANN_SET_TAG_NAME, "sideset" }, 01485 { 0, 0 } }; 01486 01487 for( int i = 0; tag_names[i][0]; ++i ) 01488 { 01489 Tag tag; 01490 if( MB_SUCCESS != iFace->tag_get_handle( tag_names[i][0], 1, MB_TYPE_INTEGER, tag ) ) continue; 01491 01492 Range tagged; 01493 iFace->get_entities_by_type_and_tag( 0, MBENTITYSET, &tag, 0, 1, tagged ); 01494 print_set_sharing_data( tagged, tag_names[i][1], tag ); 01495 } 01496 01497 Tag geom, id; 01498 if( MB_SUCCESS != iFace->tag_get_handle( GEOM_DIMENSION_TAG_NAME, 1, MB_TYPE_INTEGER, geom ) ) return; 01499 id = iFace->globalId_tag(); 01500 01501 const char* geom_names[] = { "vertex", "curve", "surface", "volume" }; 01502 for( int d = 0; d <= 3; ++d ) 01503 { 01504 Range tagged; 01505 const void* vals[] = { &d }; 01506 iFace->get_entities_by_type_and_tag( 0, MBENTITYSET, &geom, vals, 1, tagged ); 01507 print_set_sharing_data( tagged, geom_names[d], id ); 01508 } 01509 } 01510 01511 ErrorCode WriteHDF5Parallel::communicate_shared_set_ids( const Range& owned, const Range& remote ) 01512 { 01513 ErrorCode rval; 01514 int mperr; 01515 const int TAG = 0xD0E; 01516 // const unsigned rank = myPcomm->proc_config().proc_rank(); 01517 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 01518 01519 dbgOut.tprint( 1, "COMMUNICATING SHARED SET IDS\n" ); 01520 dbgOut.print( 6, "Owned, shared sets: ", owned ); 01521 01522 // Post receive buffers for all procs for which we share sets 01523 01524 std::vector< unsigned > procs; 01525 rval = myPcomm->get_entityset_owners( procs ); 01526 CHECK_MB( rval ); 01527 std::vector< unsigned >::iterator it = std::find( procs.begin(), procs.end(), myPcomm->proc_config().proc_rank() ); 01528 if( it != procs.end() ) procs.erase( it ); 01529 01530 std::vector< MPI_Request > recv_req( procs.size(), MPI_REQUEST_NULL ); 01531 std::vector< std::vector< unsigned long > > recv_buf( procs.size() ); 01532 01533 size_t recv_count = 0; 01534 for( size_t i = 0; i < procs.size(); ++i ) 01535 { 01536 Range tmp; 01537 rval = myPcomm->get_owned_sets( procs[i], tmp ); 01538 CHECK_MB( rval ); 01539 size_t count = 01540 intersect( tmp, remote ).size(); // Necessary because we might not be writing all of the database 01541 if( count ) 01542 { 01543 dbgOut.printf( 6, "Sets owned by proc %u (remote handles): ", procs[i] ); 01544 if( dbgOut.get_verbosity() >= 6 ) 01545 { 01546 Range remote_handles; 01547 tmp = intersect( tmp, remote ); 01548 for( Range::iterator j = tmp.begin(); j != tmp.end(); ++j ) 01549 { 01550 unsigned r; 01551 EntityHandle h; 01552 myPcomm->get_entityset_owner( *j, r, &h ); 01553 assert( r == procs[i] ); 01554 remote_handles.insert( h ); 01555 } 01556 dbgOut.print( 6, remote_handles ); 01557 } 01558 recv_count++; 01559 recv_buf[i].resize( 2 * count + 1 ); 01560 dbgOut.printf( 5, "Posting receive buffer of size %lu for proc %u (%lu of %lu owned sets)\n", 01561 (unsigned long)recv_buf[i].size(), procs[i], count, tmp.size() ); 01562 mperr = 01563 MPI_Irecv( &recv_buf[i][0], recv_buf[i].size(), MPI_UNSIGNED_LONG, procs[i], TAG, comm, &recv_req[i] ); 01564 CHECK_MPI( mperr ); 01565 } 01566 } 01567 01568 // Send set ids to all procs with which we share them 01569 01570 // First build per-process lists of sets for which we need to send data 01571 std::map< unsigned, Range > send_sets; 01572 std::vector< unsigned > set_procs; 01573 for( Range::reverse_iterator i = owned.rbegin(); i != owned.rend(); ++i ) 01574 { 01575 set_procs.clear(); 01576 rval = myPcomm->get_entityset_procs( *i, set_procs ); 01577 CHECK_MB( rval ); 01578 for( size_t j = 0; j < set_procs.size(); ++j ) 01579 if( set_procs[j] != myPcomm->proc_config().proc_rank() ) send_sets[set_procs[j]].insert( *i ); 01580 } 01581 assert( send_sets.find( myPcomm->proc_config().proc_rank() ) == send_sets.end() ); 01582 01583 // Now send the data 01584 std::vector< std::vector< unsigned long > > send_buf( send_sets.size() ); 01585 std::vector< MPI_Request > send_req( send_sets.size() ); 01586 std::map< unsigned, Range >::iterator si = send_sets.begin(); 01587 for( size_t i = 0; si != send_sets.end(); ++si, ++i ) 01588 { 01589 dbgOut.printf( 6, "Sending data for shared sets to proc %u: ", si->first ); 01590 dbgOut.print( 6, si->second ); 01591 01592 send_buf[i].reserve( 2 * si->second.size() + 1 ); 01593 send_buf[i].push_back( si->second.size() ); 01594 for( Range::iterator j = si->second.begin(); j != si->second.end(); ++j ) 01595 { 01596 send_buf[i].push_back( *j ); 01597 send_buf[i].push_back( idMap.find( *j ) ); 01598 } 01599 dbgOut.printf( 5, "Sending buffer of size %lu to proc %u (%lu of %lu owned sets)\n", 01600 (unsigned long)send_buf[i].size(), si->first, si->second.size(), owned.size() ); 01601 mperr = MPI_Isend( &send_buf[i][0], send_buf[i].size(), MPI_UNSIGNED_LONG, si->first, TAG, comm, &send_req[i] ); 01602 } 01603 01604 // Process received data 01605 MPI_Status status; 01606 int idx; 01607 while( recv_count-- ) 01608 { 01609 mperr = MPI_Waitany( recv_req.size(), &recv_req[0], &idx, &status ); 01610 CHECK_MPI( mperr ); 01611 01612 assert( (unsigned)status.MPI_SOURCE == procs[idx] ); 01613 assert( 2 * recv_buf[idx].front() + 1 == recv_buf[idx].size() ); 01614 const size_t n = std::min< size_t >( recv_buf[idx].front(), ( recv_buf[idx].size() - 1 ) / 2 ); 01615 dbgOut.printf( 5, "Received buffer of size %lu from proc %d\n", (unsigned long)( 2 * n + 1 ), 01616 (int)status.MPI_SOURCE ); 01617 01618 for( size_t i = 0; i < n; ++i ) 01619 { 01620 EntityHandle handle = 0; 01621 rval = myPcomm->get_entityset_local_handle( procs[idx], recv_buf[idx][2 * i + 1], handle ); 01622 CHECK_MB( rval ); 01623 assert( handle != 0 ); 01624 if( !idMap.insert( handle, recv_buf[idx][2 * i + 2], 1 ).second ) 01625 error( MB_FAILURE ); // Conflicting IDs?????? 01626 } 01627 01628 recv_req[idx] = MPI_REQUEST_NULL; 01629 } 01630 assert( MPI_SUCCESS == MPI_Waitany( recv_req.size(), &recv_req[0], &idx, &status ) && 01631 MPI_UNDEFINED == idx ); // Check that we got them all 01632 01633 // Wait for all sends to complete before we release send 01634 // buffers (implicitly releases when we return from this function) 01635 01636 std::vector< MPI_Status > stats( send_req.size() ); 01637 mperr = MPI_Waitall( send_req.size(), &send_req[0], &stats[0] ); 01638 CHECK_MPI( mperr ); 01639 01640 if( dbgOut.get_verbosity() >= SSVB ) print_shared_sets(); 01641 01642 return MB_SUCCESS; 01643 } 01644 01645 // void get_global_ids(Interface* iFace, const unsigned long* ptr, 01646 // size_t len, unsigned flags, 01647 // std::vector<int>& ids) 01648 //{ 01649 // Tag idtag; 01650 // iFace->tag_get_handle(GLOBAL_ID_TAG_NAME, 1, MB_TYPE_INTEGER, idtag); 01651 // for (size_t i = 0; i < len; ++i) { 01652 // if (flags & MESHSET_ORDERED) { 01653 // int tmp; 01654 // iFace->tag_get_data(idtag, ptr + i, 1, &tmp); 01655 // ids.push_back(tmp); 01656 // continue; 01657 // } 01658 // 01659 // EntityHandle s = ptr[i]; 01660 // EntityHandle e = ptr[++i]; 01661 // for (; s <= e; ++s) { 01662 // int tmp; 01663 // iFace->tag_get_data(idtag, &s, 1, &tmp); 01664 // ids.push_back(tmp); 01665 // } 01666 // } 01667 //} 01668 01669 ErrorCode WriteHDF5Parallel::pack_set( Range::const_iterator it, unsigned long* buffer, size_t buffer_size ) 01670 { 01671 ErrorCode rval; 01672 const EntityHandle* ptr; 01673 int len; 01674 unsigned char flags; 01675 std::vector< wid_t > tmp; 01676 size_t newlen; 01677 01678 // Buffer must always contain at least flags and desired sizes 01679 assert( buffer_size >= 4 ); 01680 buffer_size -= 4; 01681 01682 Range::const_iterator nd = it; 01683 ++nd; 01684 rval = writeUtil->get_entity_list_pointers( it, nd, &ptr, WriteUtilIface::CONTENTS, &len, &flags ); 01685 CHECK_MB( rval ); 01686 01687 // Tag mattag; 01688 // iFace->tag_get_handle(MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, mattag); 01689 // int block; 01690 // if (MB_SUCCESS != iFace->tag_get_data(mattag, &*it, 1, &block)) 01691 // block = 0; 01692 // 01693 // if (block) { 01694 // std::vector<int> ids; 01695 // get_global_ids(iFace, ptr, len, flags, ids); 01696 //} 01697 01698 if( len && !( flags & MESHSET_ORDERED ) ) 01699 { 01700 tmp.clear(); 01701 bool blocked = false; 01702 assert( ( 0 == len % 2 ) ); 01703 rval = range_to_blocked_list( ptr, len / 2, tmp, blocked ); 01704 CHECK_MB( rval ); 01705 if( blocked ) flags |= mhdf_SET_RANGE_BIT; 01706 } 01707 else 01708 { 01709 tmp.resize( len ); 01710 rval = vector_to_id_list( ptr, len, &tmp[0], newlen, true ); 01711 CHECK_MB( rval ); 01712 tmp.resize( newlen ); 01713 } 01714 01715 buffer[0] = flags; 01716 buffer[1] = tmp.size(); 01717 if( tmp.size() <= buffer_size ) std::copy( tmp.begin(), tmp.end(), buffer + 4 ); 01718 01719 rval = writeUtil->get_entity_list_pointers( it, nd, &ptr, WriteUtilIface::CHILDREN, &len ); 01720 CHECK_MB( rval ); 01721 tmp.resize( len ); 01722 rval = vector_to_id_list( ptr, len, &tmp[0], newlen, true ); 01723 tmp.resize( newlen ); 01724 buffer[2] = tmp.size(); 01725 if( tmp.size() <= buffer_size - buffer[1] ) std::copy( tmp.begin(), tmp.end(), buffer + 4 + buffer[1] ); 01726 01727 rval = writeUtil->get_entity_list_pointers( it, nd, &ptr, WriteUtilIface::PARENTS, &len ); 01728 CHECK_MB( rval ); 01729 tmp.resize( len ); 01730 rval = vector_to_id_list( ptr, len, &tmp[0], newlen, true ); 01731 tmp.resize( newlen ); 01732 buffer[3] = tmp.size(); 01733 if( tmp.size() <= buffer_size - buffer[1] - buffer[2] ) 01734 std::copy( tmp.begin(), tmp.end(), buffer + 4 + buffer[1] + buffer[2] ); 01735 01736 return MB_SUCCESS; 01737 } 01738 01739 template < typename TYPE > 01740 static void convert_to_ranged_ids( const TYPE* buffer, size_t len, std::vector< WriteHDF5::wid_t >& result ) 01741 { 01742 if( !len ) 01743 { 01744 result.clear(); 01745 return; 01746 } 01747 01748 result.resize( len * 2 ); 01749 Range tmp; 01750 for( size_t i = 0; i < len; i++ ) 01751 tmp.insert( (EntityHandle)buffer[i] ); 01752 result.resize( tmp.psize() * 2 ); 01753 int j = 0; 01754 for( Range::const_pair_iterator pit = tmp.const_pair_begin(); pit != tmp.const_pair_end(); ++pit, j++ ) 01755 { 01756 result[2 * j] = pit->first; 01757 result[2 * j + 1] = pit->second - pit->first + 1; 01758 } 01759 } 01760 01761 static void merge_ranged_ids( const unsigned long* range_list, size_t len, std::vector< WriteHDF5::wid_t >& result ) 01762 { 01763 typedef WriteHDF5::wid_t wid_t; 01764 assert( 0 == len % 2 ); 01765 assert( 0 == result.size() % 2 ); 01766 STATIC_ASSERT( sizeof( std::pair< wid_t, wid_t > ) == 2 * sizeof( wid_t ) ); 01767 01768 result.insert( result.end(), range_list, range_list + len ); 01769 size_t plen = result.size() / 2; 01770 Range tmp; 01771 for( size_t i = 0; i < plen; i++ ) 01772 { 01773 EntityHandle starth = (EntityHandle)result[2 * i]; 01774 EntityHandle endh = (EntityHandle)result[2 * i] + (wid_t)result[2 * i + 1] - 1; // id + count - 1 01775 tmp.insert( starth, endh ); 01776 } 01777 // Now convert back to std::vector<WriteHDF5::wid_t>, compressed range format 01778 result.resize( tmp.psize() * 2 ); 01779 int j = 0; 01780 for( Range::const_pair_iterator pit = tmp.const_pair_begin(); pit != tmp.const_pair_end(); ++pit, j++ ) 01781 { 01782 result[2 * j] = pit->first; 01783 result[2 * j + 1] = pit->second - pit->first + 1; 01784 } 01785 } 01786 01787 static void merge_vector_ids( const unsigned long* list, size_t len, std::vector< WriteHDF5::wid_t >& result ) 01788 { 01789 result.insert( result.end(), list, list + len ); 01790 } 01791 01792 ErrorCode WriteHDF5Parallel::unpack_set( EntityHandle set, const unsigned long* buffer, size_t buffer_size ) 01793 { 01794 // Use local variables for readability 01795 assert( buffer_size >= 4 ); 01796 assert( buffer[1] + buffer[2] + buffer[3] <= buffer_size ); 01797 const unsigned long flags = buffer[0]; 01798 unsigned long num_content = buffer[1]; 01799 const unsigned long num_child = buffer[2]; 01800 const unsigned long num_parent = buffer[3]; 01801 const unsigned long* contents = buffer + 4; 01802 const unsigned long* children = contents + num_content; 01803 const unsigned long* parents = children + num_child; 01804 01805 SpecialSetData* data = find_set_data( set ); 01806 assert( NULL != data ); 01807 if( NULL == data ) return MB_FAILURE; 01808 01809 // Tag mattag; 01810 // iFace->tag_get_handle(MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, mattag); 01811 // int block; 01812 // if (MB_SUCCESS != iFace->tag_get_data(mattag, &set, 1, &block)) 01813 // block = 0; 01814 01815 // If either the current data or the new data is in ranged format, 01816 // then change the other to ranged format if it isn't already 01817 // in both cases when they differ, the data will end up "compressed range" 01818 std::vector< wid_t > tmp; 01819 if( ( flags & mhdf_SET_RANGE_BIT ) != ( data->setFlags & mhdf_SET_RANGE_BIT ) ) 01820 { 01821 if( flags & mhdf_SET_RANGE_BIT ) 01822 { 01823 tmp = data->contentIds; 01824 convert_to_ranged_ids( &tmp[0], tmp.size(), data->contentIds ); 01825 data->setFlags |= mhdf_SET_RANGE_BIT; 01826 } 01827 else 01828 { 01829 tmp.clear(); 01830 convert_to_ranged_ids( contents, num_content, tmp ); 01831 num_content = tmp.size(); 01832 if( sizeof( wid_t ) < sizeof( long ) ) 01833 { 01834 size_t old_size = tmp.size(); 01835 tmp.resize( sizeof( long ) * old_size / sizeof( wid_t ) ); 01836 unsigned long* array = reinterpret_cast< unsigned long* >( &tmp[0] ); 01837 for( long i = ( (long)old_size ) - 1; i >= 0; --i ) 01838 array[i] = tmp[i]; 01839 contents = array; 01840 } 01841 else if( sizeof( wid_t ) > sizeof( long ) ) 01842 { 01843 unsigned long* array = reinterpret_cast< unsigned long* >( &tmp[0] ); 01844 std::copy( tmp.begin(), tmp.end(), array ); 01845 } 01846 contents = reinterpret_cast< unsigned long* >( &tmp[0] ); 01847 } 01848 } 01849 01850 if( data->setFlags & mhdf_SET_RANGE_BIT ) 01851 merge_ranged_ids( contents, num_content, data->contentIds ); 01852 else 01853 merge_vector_ids( contents, num_content, data->contentIds ); 01854 01855 merge_vector_ids( children, num_child, data->childIds ); 01856 merge_vector_ids( parents, num_parent, data->parentIds ); 01857 return MB_SUCCESS; 01858 } 01859 01860 ErrorCode WriteHDF5Parallel::communicate_shared_set_data( const Range& owned, const Range& remote ) 01861 { 01862 ErrorCode rval; 01863 int mperr; 01864 const unsigned rank = myPcomm->proc_config().proc_rank(); 01865 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 01866 01867 dbgOut.tprintf( 1, "COMMUNICATING SHARED SET DATA (%lu owned & %lu remote)\n", (unsigned long)owned.size(), 01868 (unsigned long)remote.size() ); 01869 01870 // Calculate the total number of messages to be in transit (send and receive) 01871 size_t nummess = 0; 01872 std::vector< unsigned > procs; 01873 ; 01874 Range shared( owned ); 01875 shared.merge( remote ); 01876 for( Range::iterator i = shared.begin(); i != shared.end(); ++i ) 01877 { 01878 procs.clear(); 01879 rval = myPcomm->get_entityset_procs( *i, procs ); 01880 CHECK_MB( rval ); 01881 nummess += procs.size(); 01882 } 01883 01884 // Choose a receive buffer size. We need 4*sizeof(long) minimum, 01885 // but that is almost useless so use 16*sizeof(long) as the minimum 01886 // instead. Choose an upper limit such that we don't exceed 32 MB 01887 // of allocated memory (unless we absolutely must to meet the minimum.) 01888 // Also, don't initially choose buffers larger than 128*sizeof(long). 01889 const size_t MAX_BUFFER_MEM = 32 * 1024 * 1024 / sizeof( long ); 01890 // const size_t INIT_BUFFER_SIZE = 128; 01891 const size_t INIT_BUFFER_SIZE = 1024; 01892 const size_t MIN_BUFFER_SIZE = 16; 01893 size_t init_buff_size = INIT_BUFFER_SIZE; 01894 if( init_buff_size * nummess > MAX_BUFFER_MEM ) init_buff_size = MAX_BUFFER_MEM / nummess; 01895 if( init_buff_size < MIN_BUFFER_SIZE ) init_buff_size = MIN_BUFFER_SIZE; 01896 01897 dbgOut.printf( 2, "Using buffer size of %lu for an expected message count of %lu\n", (unsigned long)init_buff_size, 01898 (unsigned long)nummess ); 01899 01900 // Count number of recvs 01901 size_t numrecv = 0; 01902 for( Range::iterator i = owned.begin(); i != owned.end(); ++i ) 01903 { 01904 procs.clear(); 01905 rval = myPcomm->get_entityset_procs( *i, procs ); 01906 CHECK_MB( rval ); 01907 numrecv += procs.size(); 01908 if( std::find( procs.begin(), procs.end(), rank ) != procs.end() ) --numrecv; 01909 } 01910 01911 // Post receive buffers for all owned sets for all sharing procs 01912 std::vector< MPI_Request > recv_req( numrecv, MPI_REQUEST_NULL ); 01913 std::vector< MPI_Request > lrecv_req( numrecv, MPI_REQUEST_NULL ); 01914 01915 std::vector< std::vector< unsigned long > > recv_buf( numrecv, std::vector< unsigned long >( init_buff_size ) ); 01916 int idx = 0; 01917 for( Range::iterator i = owned.begin(); i != owned.end(); ++i ) 01918 { 01919 procs.clear(); 01920 rval = myPcomm->get_entityset_procs( *i, procs ); 01921 CHECK_MB( rval ); 01922 for( size_t j = 0; j < procs.size(); ++j ) 01923 { 01924 if( procs[j] == rank ) continue; 01925 int tag = ID_FROM_HANDLE( *i ); 01926 if( *i != CREATE_HANDLE( MBENTITYSET, tag ) ) 01927 { 01928 #ifndef NDEBUG 01929 abort(); 01930 #endif 01931 CHECK_MB( MB_FAILURE ); 01932 } 01933 dbgOut.printf( 5, "Posting buffer to receive set %d from proc %u\n", tag, procs[j] ); 01934 mperr = 01935 MPI_Irecv( &recv_buf[idx][0], init_buff_size, MPI_UNSIGNED_LONG, procs[j], tag, comm, &recv_req[idx] ); 01936 CHECK_MPI( mperr ); 01937 ++idx; 01938 } 01939 } 01940 assert( (size_t)idx == numrecv ); 01941 01942 // Now send set data for all remote sets that I know about 01943 std::vector< MPI_Request > send_req( remote.size() ); 01944 std::vector< std::vector< unsigned long > > send_buf( remote.size() ); 01945 idx = 0; 01946 for( Range::iterator i = remote.begin(); i != remote.end(); ++i, ++idx ) 01947 { 01948 send_buf[idx].resize( init_buff_size ); 01949 rval = pack_set( i, &send_buf[idx][0], init_buff_size ); 01950 CHECK_MB( rval ); 01951 EntityHandle remote_handle; 01952 unsigned owner; 01953 rval = myPcomm->get_entityset_owner( *i, owner, &remote_handle ); 01954 CHECK_MB( rval ); 01955 01956 int tag = ID_FROM_HANDLE( remote_handle ); 01957 assert( remote_handle == CREATE_HANDLE( MBENTITYSET, tag ) ); 01958 dbgOut.printf( 5, "Sending %lu values for set %d to proc %u\n", 01959 send_buf[idx][1] + send_buf[idx][2] + send_buf[idx][3] + 4, tag, owner ); 01960 mperr = MPI_Isend( &send_buf[idx][0], init_buff_size, MPI_UNSIGNED_LONG, owner, tag, comm, &send_req[idx] ); 01961 CHECK_MPI( mperr ); 01962 } 01963 01964 // Tag mattag; 01965 // iFace->tag_get_handle(MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, mattag); 01966 01967 // Now initialize local data for managing contents of owned, shared sets 01968 assert( specialSets.empty() ); 01969 specialSets.clear(); 01970 specialSets.reserve( owned.size() ); 01971 for( Range::iterator i = owned.begin(); i != owned.end(); ++i ) 01972 { 01973 // int block; 01974 // if (MB_SUCCESS != iFace->tag_get_data(mattag, &*i, 1, &block)) 01975 // block = 0; 01976 // std::vector<int> ids; 01977 01978 SpecialSetData data; 01979 data.setHandle = *i; 01980 rval = iFace->get_meshset_options( *i, data.setFlags ); 01981 CHECK_MB( rval ); 01982 specialSets.push_back( data ); 01983 std::vector< EntityHandle > list; 01984 if( data.setFlags & MESHSET_ORDERED ) 01985 { 01986 list.clear(); 01987 rval = iFace->get_entities_by_handle( *i, list ); 01988 CHECK_MB( rval ); 01989 rval = vector_to_id_list( list, specialSets.back().contentIds, true ); 01990 CHECK_MB( rval ); 01991 // if (block) 01992 // get_global_ids(iFace, &list[0], list.size(), MESHSET_ORDERED, ids); 01993 } 01994 else 01995 { 01996 Range range; 01997 rval = iFace->get_entities_by_handle( *i, range ); 01998 CHECK_MB( rval ); 01999 bool ranged; 02000 rval = range_to_blocked_list( range, specialSets.back().contentIds, ranged ); 02001 if( ranged ) specialSets.back().setFlags |= mhdf_SET_RANGE_BIT; 02002 // if (block) { 02003 // std::vector<EntityHandle> tmp; 02004 // for (Range::const_pair_iterator pi = range.const_pair_begin(); pi != 02005 // range.const_pair_end(); ++pi) { 02006 // tmp.push_back(pi->first); 02007 // tmp.push_back(pi->second); 02008 // } 02009 // get_global_ids(iFace, &tmp[0], tmp.size(), ranged ? 0 : MESHSET_ORDERED, ids); 02010 //} 02011 } 02012 02013 list.clear(); 02014 rval = iFace->get_parent_meshsets( *i, list ); 02015 CHECK_MB( rval ); 02016 rval = vector_to_id_list( list, specialSets.back().parentIds, true ); 02017 CHECK_MB( rval ); 02018 rval = iFace->get_child_meshsets( *i, list ); 02019 CHECK_MB( rval ); 02020 rval = vector_to_id_list( list, specialSets.back().childIds, true ); 02021 CHECK_MB( rval ); 02022 } 02023 02024 // Process received buffers, repost larger buffers where necessary 02025 size_t remaining = numrecv; 02026 numrecv = 0; 02027 while( remaining-- ) 02028 { 02029 std::vector< unsigned long > dead; 02030 MPI_Status status; 02031 mperr = MPI_Waitany( recv_req.size(), &recv_req[0], &idx, &status ); 02032 CHECK_MPI( mperr ); 02033 EntityHandle handle = CREATE_HANDLE( MBENTITYSET, status.MPI_TAG ); 02034 std::vector< unsigned long >& buff = recv_buf[idx]; 02035 size_t size = buff[1] + buff[2] + buff[3] + 4; 02036 dbgOut.printf( 5, "Received %lu values for set %d from proc %d\n", (unsigned long)size, status.MPI_TAG, 02037 status.MPI_SOURCE ); 02038 if( size <= init_buff_size ) 02039 { 02040 rval = unpack_set( handle, &buff[0], init_buff_size ); 02041 CHECK_MB( rval ); 02042 dead.swap( buff ); // Release memory 02043 } 02044 else 02045 { 02046 // Data was too big for init_buff_size 02047 // repost with larger buffer 02048 buff.resize( size ); 02049 dbgOut.printf( 5, "Re-Posting buffer to receive set %d from proc %d with size %lu\n", status.MPI_TAG, 02050 status.MPI_SOURCE, (unsigned long)size ); 02051 mperr = MPI_Irecv( &buff[0], size, MPI_UNSIGNED_LONG, status.MPI_SOURCE, status.MPI_TAG, comm, 02052 &lrecv_req[idx] ); 02053 CHECK_MPI( mperr ); 02054 ++numrecv; 02055 } 02056 recv_req[idx] = MPI_REQUEST_NULL; 02057 } 02058 02059 // Wait for sends to complete 02060 MPI_Waitall( send_req.size(), &send_req[0], MPI_STATUSES_IGNORE ); 02061 02062 // Re-send sets that didn't fit initial buffer size 02063 idx = 0; 02064 for( Range::iterator i = remote.begin(); i != remote.end(); ++i, ++idx ) 02065 { 02066 std::vector< unsigned long >& buff = send_buf[idx]; 02067 size_t size = buff[1] + buff[2] + buff[3] + 4; 02068 if( size <= init_buff_size ) continue; 02069 02070 buff.resize( size ); 02071 rval = pack_set( i, &buff[0], size ); 02072 CHECK_MB( rval ); 02073 EntityHandle remote_handle; 02074 unsigned owner; 02075 rval = myPcomm->get_entityset_owner( *i, owner, &remote_handle ); 02076 CHECK_MB( rval ); 02077 02078 int tag = ID_FROM_HANDLE( remote_handle ); 02079 assert( remote_handle == CREATE_HANDLE( MBENTITYSET, tag ) ); 02080 dbgOut.printf( 5, "Sending %lu values for set %d to proc %u\n", (unsigned long)size, tag, owner ); 02081 mperr = MPI_Isend( &buff[0], size, MPI_UNSIGNED_LONG, owner, tag, comm, &send_req[idx] ); 02082 CHECK_MPI( mperr ); 02083 } 02084 02085 // Process received buffers 02086 remaining = numrecv; 02087 while( remaining-- ) 02088 { 02089 std::vector< unsigned long > dead; 02090 MPI_Status status; 02091 mperr = MPI_Waitany( lrecv_req.size(), &lrecv_req[0], &idx, &status ); 02092 CHECK_MPI( mperr ); 02093 EntityHandle handle = CREATE_HANDLE( MBENTITYSET, status.MPI_TAG ); 02094 std::vector< unsigned long >& buff = recv_buf[idx]; 02095 dbgOut.printf( 5, "Received %lu values for set %d from proc %d\n", 4 + buff[1] + buff[2] + buff[3], 02096 status.MPI_TAG, status.MPI_SOURCE ); 02097 rval = unpack_set( handle, &buff[0], buff.size() ); 02098 CHECK_MB( rval ); 02099 dead.swap( buff ); // Release memory 02100 02101 lrecv_req[idx] = MPI_REQUEST_NULL; 02102 } 02103 02104 // Wait for sends to complete 02105 MPI_Waitall( send_req.size(), &send_req[0], MPI_STATUSES_IGNORE ); 02106 02107 return MB_SUCCESS; 02108 } 02109 02110 ErrorCode WriteHDF5Parallel::create_meshset_tables( double* times ) 02111 { 02112 Range::const_iterator riter; 02113 const unsigned rank = myPcomm->proc_config().proc_rank(); 02114 02115 START_SERIAL; 02116 print_type_sets( iFace, &dbgOut, setSet.range ); 02117 END_SERIAL; 02118 CpuTimer timer; 02119 02120 // Remove remote sets from setSets 02121 Range shared, owned, remote; 02122 ErrorCode rval = myPcomm->get_shared_sets( shared ); 02123 CHECK_MB( rval ); 02124 shared = intersect( shared, setSet.range ); 02125 rval = myPcomm->get_owned_sets( rank, owned ); 02126 CHECK_MB( rval ); 02127 owned = intersect( owned, setSet.range ); 02128 remote = subtract( shared, owned ); 02129 setSet.range = subtract( setSet.range, remote ); 02130 02131 // Create set meta table 02132 struct SetDescCreator : public DataSetCreator 02133 { 02134 ErrorCode operator()( WriteHDF5* writer, long size, const ExportSet*, long& start_id ) const 02135 { 02136 return writer->create_set_meta( size, start_id ); 02137 } 02138 }; 02139 long count = setSet.range.size(); 02140 rval = create_dataset( 1, &count, &setSet.offset, &setSet.max_num_ents, &setSet.total_num_ents, SetDescCreator(), 02141 NULL, &setSet.first_id ); 02142 CHECK_MB( rval ); 02143 writeSets = setSet.max_num_ents > 0; 02144 02145 rval = assign_ids( setSet.range, setSet.first_id + setSet.offset ); 02146 CHECK_MB( rval ); 02147 if( times ) times[SET_OFFSET_TIME] = timer.time_elapsed(); 02148 02149 // Exchange file IDS for sets between all procs 02150 rval = communicate_shared_set_ids( owned, remote ); 02151 CHECK_MB( rval ); 02152 if( times ) times[SHARED_SET_IDS] = timer.time_elapsed(); 02153 02154 // Communicate remote set contents, children, etc. 02155 rval = communicate_shared_set_data( owned, remote ); 02156 CHECK_MB( rval ); 02157 if( times ) times[SHARED_SET_CONTENTS] = timer.time_elapsed(); 02158 02159 // Communicate counts for owned sets 02160 long data_counts[3]; // { #contents, #children, #parents } 02161 rval = count_set_size( setSet.range, data_counts[0], data_counts[1], data_counts[2] ); 02162 CHECK_MB( rval ); 02163 if( times ) times[SET_OFFSET_TIME] += timer.time_elapsed(); 02164 02165 long offsets[3], max_counts[3], totals[3]; 02166 rval = create_dataset( 3, data_counts, offsets, max_counts, totals ); 02167 CHECK_MB( rval ); 02168 02169 // Create the datasets 02170 if( 0 == myPcomm->proc_config().proc_rank() ) 02171 { 02172 rval = create_set_tables( totals[0], totals[1], totals[2] ); 02173 CHECK_MB( rval ); 02174 } 02175 02176 // Store communicated global data 02177 setContentsOffset = offsets[0]; 02178 setChildrenOffset = offsets[1]; 02179 setParentsOffset = offsets[2]; 02180 maxNumSetContents = max_counts[0]; 02181 maxNumSetChildren = max_counts[1]; 02182 maxNumSetParents = max_counts[2]; 02183 writeSetContents = totals[0] > 0; 02184 writeSetChildren = totals[1] > 0; 02185 writeSetParents = totals[2] > 0; 02186 02187 dbgOut.printf( 2, "set contents: %ld local, %ld global, offset = %ld\n", data_counts[0], totals[0], offsets[0] ); 02188 dbgOut.printf( 2, "set children: %ld local, %ld global, offset = %ld\n", data_counts[1], totals[1], offsets[1] ); 02189 dbgOut.printf( 2, "set parents: %ld local, %ld global, offset = %ld\n", data_counts[2], totals[2], offsets[2] ); 02190 02191 return MB_SUCCESS; 02192 } 02193 02194 void WriteHDF5Parallel::remove_remote_entities( EntityHandle relative, Range& range ) 02195 { 02196 Range result; 02197 result.merge( intersect( range, nodeSet.range ) ); 02198 result.merge( intersect( range, setSet.range ) ); 02199 for( std::list< ExportSet >::iterator eiter = exportList.begin(); eiter != exportList.end(); ++eiter ) 02200 result.merge( intersect( range, eiter->range ) ); 02201 02202 // result.merge(intersect(range, myParallelSets)); 02203 Range sets; 02204 int junk; 02205 sets.merge( Range::lower_bound( range.begin(), range.end(), CREATE_HANDLE( MBENTITYSET, 0, junk ) ), range.end() ); 02206 remove_remote_sets( relative, sets ); 02207 result.merge( sets ); 02208 range.swap( result ); 02209 } 02210 02211 void WriteHDF5Parallel::remove_remote_sets( EntityHandle /* relative */, Range& range ) 02212 { 02213 Range result( intersect( range, setSet.range ) ); 02214 // Store the non-intersecting entities separately if needed 02215 // Range remaining(subtract(range, result)); 02216 range.swap( result ); 02217 } 02218 02219 void WriteHDF5Parallel::remove_remote_entities( EntityHandle relative, std::vector< EntityHandle >& vect ) 02220 { 02221 Range intrsct; 02222 for( std::vector< EntityHandle >::const_iterator iter = vect.begin(); iter != vect.end(); ++iter ) 02223 intrsct.insert( *iter ); 02224 remove_remote_entities( relative, intrsct ); 02225 02226 unsigned int read, write; 02227 for( read = write = 0; read < vect.size(); ++read ) 02228 { 02229 if( intrsct.find( vect[read] ) != intrsct.end() ) 02230 { 02231 if( read != write ) vect[write] = vect[read]; 02232 ++write; 02233 } 02234 } 02235 if( write != vect.size() ) vect.resize( write ); 02236 } 02237 02238 void WriteHDF5Parallel::remove_remote_sets( EntityHandle relative, std::vector< EntityHandle >& vect ) 02239 { 02240 Range intrsct; 02241 for( std::vector< EntityHandle >::const_iterator iter = vect.begin(); iter != vect.end(); ++iter ) 02242 intrsct.insert( *iter ); 02243 remove_remote_sets( relative, intrsct ); 02244 02245 unsigned int read, write; 02246 for( read = write = 0; read < vect.size(); ++read ) 02247 { 02248 if( intrsct.find( vect[read] ) != intrsct.end() ) 02249 { 02250 if( read != write ) vect[write] = vect[read]; 02251 ++write; 02252 } 02253 } 02254 if( write != vect.size() ) vect.resize( write ); 02255 } 02256 02257 ErrorCode WriteHDF5Parallel::exchange_file_ids( const Range& nonlocal ) 02258 { 02259 ErrorCode rval; 02260 02261 // For each entity owned on the interface, write its file id to 02262 // a tag. The sets of entities to be written should already contain 02263 // only owned entities so by intersecting with them we not only 02264 // filter by entities to be written, but also restrict to entities 02265 // owned by the proc 02266 02267 // Get list of interface entities 02268 Range imesh, tmp; 02269 for( std::list< ExportSet >::reverse_iterator i = exportList.rbegin(); i != exportList.rend(); ++i ) 02270 { 02271 tmp.clear(); 02272 rval = myPcomm->filter_pstatus( i->range, PSTATUS_SHARED, PSTATUS_AND, -1, &tmp ); 02273 if( MB_SUCCESS != rval ) return error( rval ); 02274 imesh.merge( tmp ); 02275 } 02276 tmp.clear(); 02277 rval = myPcomm->filter_pstatus( nodeSet.range, PSTATUS_SHARED, PSTATUS_AND, -1, &tmp ); 02278 if( MB_SUCCESS != rval ) return error( rval ); 02279 imesh.merge( tmp ); 02280 02281 // Create tag to store file IDs 02282 EntityHandle default_val = 0; 02283 Tag file_id_tag = 0; 02284 rval = iFace->tag_get_handle( "__hdf5_ll_fileid", 1, MB_TYPE_HANDLE, file_id_tag, MB_TAG_DENSE | MB_TAG_CREAT, 02285 &default_val ); 02286 if( MB_SUCCESS != rval ) return error( rval ); 02287 02288 // Copy file IDs into tag 02289 std::vector< EntityHandle > file_id_vect( imesh.size() ); 02290 Range::const_iterator i; 02291 std::vector< EntityHandle >::iterator j = file_id_vect.begin(); 02292 for( i = imesh.begin(); i != imesh.end(); ++i, ++j ) 02293 { 02294 *j = idMap.find( *i ); 02295 if( !*j ) 02296 { 02297 iFace->tag_delete( file_id_tag ); 02298 return error( MB_FAILURE ); 02299 } 02300 } 02301 rval = iFace->tag_set_data( file_id_tag, imesh, &file_id_vect[0] ); 02302 if( MB_SUCCESS != rval ) 02303 { 02304 iFace->tag_delete( file_id_tag ); 02305 return error( rval ); 02306 } 02307 02308 // Do communication 02309 rval = myPcomm->exchange_tags( file_id_tag, imesh ); 02310 if( MB_SUCCESS != rval ) 02311 { 02312 iFace->tag_delete( file_id_tag ); 02313 return error( rval ); 02314 } 02315 02316 // Copy file IDs from tag into idMap for remote entities 02317 file_id_vect.resize( nonlocal.size() ); 02318 rval = iFace->tag_get_data( file_id_tag, nonlocal, &file_id_vect[0] ); 02319 if( MB_SUCCESS != rval ) 02320 { 02321 iFace->tag_delete( file_id_tag ); 02322 return error( rval ); 02323 } 02324 02325 j = file_id_vect.begin(); 02326 for( i = nonlocal.begin(); i != nonlocal.end(); ++i, ++j ) 02327 { 02328 if( *j == 0 ) 02329 { 02330 int owner = -1; 02331 myPcomm->get_owner( *i, owner ); 02332 const char* name = CN::EntityTypeName( TYPE_FROM_HANDLE( *i ) ); 02333 int id = ID_FROM_HANDLE( *i ); 02334 MB_SET_ERR_CONT( "Process " << myPcomm->proc_config().proc_rank() 02335 << " did not receive valid id handle for shared " << name << " " << id 02336 << " owned by process " << owner ); 02337 dbgOut.printf( 1, 02338 "Did not receive valid remote id for " 02339 "shared %s %d owned by process %d", 02340 name, id, owner ); 02341 iFace->tag_delete( file_id_tag ); 02342 return error( MB_FAILURE ); 02343 } 02344 else 02345 { 02346 if( !idMap.insert( *i, *j, 1 ).second ) 02347 { 02348 iFace->tag_delete( file_id_tag ); 02349 return error( MB_FAILURE ); 02350 } 02351 } 02352 } 02353 02354 #ifndef NDEBUG 02355 // Check that writer is correct with regards to which entities 02356 // that it owns by verifying that the file ids that we thought 02357 // we were sending where not received instead 02358 file_id_vect.resize( imesh.size() ); 02359 rval = iFace->tag_get_data( file_id_tag, imesh, &file_id_vect[0] ); 02360 if( MB_SUCCESS != rval ) 02361 { 02362 iFace->tag_delete( file_id_tag ); 02363 return error( rval ); 02364 } 02365 int invalid_count = 0; 02366 j = file_id_vect.begin(); 02367 for( i = imesh.begin(); i != imesh.end(); ++i, ++j ) 02368 { 02369 EntityHandle h = idMap.find( *i ); 02370 if( *j != h ) 02371 { 02372 ++invalid_count; 02373 dbgOut.printf( 1, "Conflicting ownership for %s %ld\n", CN::EntityTypeName( TYPE_FROM_HANDLE( *i ) ), 02374 (long)ID_FROM_HANDLE( *i ) ); 02375 } 02376 } 02377 if( invalid_count ) 02378 { 02379 iFace->tag_delete( file_id_tag ); 02380 MB_SET_ERR( MB_FAILURE, invalid_count << " entities with conflicting ownership found by process " 02381 << myPcomm->proc_config().proc_rank() 02382 << ". This will result in duplicate entities written to file" ); 02383 } 02384 #endif 02385 02386 return iFace->tag_delete( file_id_tag ); 02387 } 02388 02389 void WriteHDF5Parallel::print_times( const double* times ) const 02390 { 02391 if( !myPcomm ) 02392 { 02393 WriteHDF5::print_times( times ); 02394 } 02395 else 02396 { 02397 double recv[NUM_TIMES]; 02398 MPI_Reduce( (void*)times, recv, NUM_TIMES, MPI_DOUBLE, MPI_MAX, 0, myPcomm->proc_config().proc_comm() ); 02399 if( 0 == myPcomm->proc_config().proc_rank() ) WriteHDF5::print_times( recv ); 02400 } 02401 } 02402 02403 } // namespace moab