cgma
|
00001 #include "CGMParallelComm.hpp" 00002 #include "TopologyEntity.hpp" 00003 #include "GeometryQueryEngine.hpp" 00004 #include "RefEntity.hpp" 00005 #include "GeometryQueryTool.hpp" 00006 #include "TDParallel.hpp" 00007 00008 #include <algorithm> 00009 00010 #define INITIAL_BUFF_SIZE 1024 00011 #define RRA(a) if (CUBIT_SUCCESS != result) { \ 00012 std::string tmp_str; \ 00013 tmp_str.append("\n"); tmp_str.append(a); \ 00014 PRINT_ERROR("%s", tmp_str.c_str()); \ 00015 return result;} 00016 00017 //std::vector<CGMParallelComm*> CGMParallelComm::instanceList; 00018 00019 CGMParallelComm::CGMParallelComm(MPI_Comm comm) 00020 : gqt(NULL), procConfig(comm), m_pBuffer(NULL), m_nBufferSize(0), m_currentPosition(0) 00021 { 00022 myBuffer.resize(INITIAL_BUFF_SIZE); 00023 00024 int flag = 1; 00025 int retval = MPI_Initialized(&flag); 00026 if (MPI_SUCCESS != retval || !flag) { 00027 int argc = 0; 00028 char **argv = NULL; 00029 00030 // mpi not initialized yet - initialize here 00031 retval = MPI_Init(&argc, &argv); 00032 } 00033 } 00034 00035 CGMParallelComm::CGMParallelComm(std::vector<unsigned char> &tmp_buff, 00036 MPI_Comm comm) 00037 : gqt(NULL), procConfig(comm), m_pBuffer(NULL), m_nBufferSize(0), m_currentPosition(0) 00038 { 00039 myBuffer.swap(tmp_buff); 00040 int flag = 1; 00041 int retval = MPI_Initialized(&flag); 00042 if (MPI_SUCCESS != retval || !flag) { 00043 int argc = 0; 00044 char **argv = NULL; 00045 00046 // mpi not initialized yet - initialize here 00047 retval = MPI_Init(&argc, &argv); 00048 } 00049 } 00050 00051 CGMParallelComm::~CGMParallelComm() 00052 { 00053 delete m_pBuffer; 00054 } 00055 00056 CubitStatus CGMParallelComm::bcast_buffer(const unsigned int from_proc) 00057 { 00058 //- broadcasts the buffer contained in this object 00059 if (procConfig.proc_rank() == from_proc) { 00060 printf("Broadcasting buffer size from %d.\n", from_proc); 00061 MPI_Bcast(&m_nBufferSize, 1, MPI_INT, from_proc, MPI_COMM_WORLD); 00062 printf("Broadcasting buffer from %d, %d bytes.\n", from_proc, 00063 m_nBufferSize); 00064 MPI_Bcast(m_pBuffer, m_nBufferSize, MPI_BYTE, from_proc, 00065 MPI_COMM_WORLD); 00066 } 00067 else { 00068 int this_size; 00069 printf("Broadcasting buffer size from proc %d.\n", 00070 procConfig.proc_rank()); 00071 MPI_Bcast(&this_size, 1, MPI_INT, from_proc, 00072 MPI_COMM_WORLD); 00073 printf("Processor %d: received size of %d.\n", procConfig.proc_rank(), this_size); 00074 check_size(this_size); 00075 printf("Broadcasting buffer from proc %d, %d bytes.\n", 00076 procConfig.proc_rank(), this_size); 00077 MPI_Bcast(m_pBuffer, this_size, MPI_BYTE, from_proc, 00078 MPI_COMM_WORLD); 00079 } 00080 00081 return CUBIT_SUCCESS; 00082 } 00083 00084 CubitStatus CGMParallelComm::broadcast_entities(const unsigned int from_proc, 00085 DLIList<RefEntity*> &ref_entity_list) 00086 { 00087 #ifndef USE_MPI 00088 return CUBIT_FAILURE; 00089 #else 00090 CubitStatus result = CUBIT_SUCCESS; 00091 00092 if (procConfig.proc_rank() == from_proc) { 00093 int nBufferSize = 0; 00094 result = write_buffer(ref_entity_list, m_pBuffer, nBufferSize, false); 00095 RRA("Failed to write ref entity list to buffer."); 00096 00097 result = check_size(nBufferSize); 00098 RRA("Failed to write ref entity list to buffer."); 00099 00100 result = write_buffer(ref_entity_list, m_pBuffer, nBufferSize, true); 00101 RRA("Failed to write ref entity list to buffer."); 00102 } 00103 00104 result = bcast_buffer(from_proc); 00105 RRA("Failed to broadcast buffer to processors."); 00106 00107 if (procConfig.proc_rank() != from_proc) { 00108 result = read_buffer(ref_entity_list, m_pBuffer, m_nBufferSize); 00109 RRA("Failed to read ref entity list from buffer."); 00110 } 00111 00112 return CUBIT_SUCCESS; 00113 #endif 00114 } 00115 00116 // scatter exact amount of geometry information to each processors 00117 CubitStatus CGMParallelComm::scatter_entities(const unsigned int from_proc, 00118 DLIList<RefEntity*> &ref_entity_list) 00119 { 00120 #ifndef USE_MPI 00121 return CUBIT_FAILURE; 00122 #else 00123 CubitStatus result = CUBIT_SUCCESS; 00124 int i, mySendCount, nEntity; 00125 int nProcs = procConfig.proc_size(); 00126 int *sendCounts = new int[nProcs]; 00127 int *displacements = new int[nProcs]; 00128 displacements[0] = 0; 00129 00130 if (procConfig.proc_rank() == from_proc) { 00131 // make a balanced entity lists 00132 int sum = 0; 00133 DLIList<RefEntity*> **balancedLists = new DLIList<RefEntity*>*[nProcs]; 00134 00135 for (i = 0; i < nProcs; i++) { 00136 balancedLists[i] = new DLIList<RefEntity*>; 00137 } 00138 00139 nEntity = ref_entity_list.size(); 00140 ref_entity_list.reset(); 00141 for (i = 0; i < nEntity; i++) { 00142 RefEntity* entity = ref_entity_list.get_and_step(); 00143 TDParallel *td_par = (TDParallel *) entity->get_TD(&TDParallel::is_parallel); 00144 00145 if (td_par == NULL) { 00146 PRINT_ERROR("Partitioned entities should have TDParallel data."); 00147 return CUBIT_FAILURE; 00148 } 00149 unsigned int charge_p = td_par->get_charge_proc(); 00150 if (charge_p != from_proc) { // only to compute processors 00151 balancedLists[charge_p]->append(entity); // add charge processor 00152 } 00153 00154 DLIList<int>* ghost_procs = td_par->get_ghost_proc_list(); 00155 int n_ghost = ghost_procs->size(); 00156 ghost_procs->reset(); 00157 for (int j = 0; j < n_ghost; j++) { // add ghost processors 00158 unsigned int ghost_p = ghost_procs->get_and_step(); 00159 if (ghost_p != from_proc) balancedLists[ghost_p]->append(entity); 00160 } 00161 } 00162 00163 // add buffer size for each processors 00164 for (i = 0; i < nProcs; i++) { 00165 result = write_buffer(*(balancedLists[i]), m_pBuffer, sendCounts[i], false); 00166 RRA("Failed to write ref entity list to buffer."); 00167 sum += sendCounts[i]; 00168 } 00169 00170 // check the size of the buffer and resize if necessary 00171 check_size(sum); 00172 00173 // now append the real information 00174 ref_entity_list.reset(); 00175 for (i = 0; i < nProcs; i++) { 00176 append_to_buffer(*(balancedLists[i]), sendCounts[i]); 00177 } 00178 00179 delete [] balancedLists; 00180 } 00181 00182 // broadcast buffer size array 00183 printf("Broadcasting buffer size array from master.\n"); 00184 MPI_Bcast(sendCounts, nProcs, MPI_INT, from_proc, MPI_COMM_WORLD); 00185 00186 for ( i = 1; i < nProcs; i++) { 00187 displacements[i] = displacements[i-1] + sendCounts[i-1]; 00188 } 00189 00190 mySendCount = sendCounts[procConfig.proc_rank()]; 00191 00192 if (procConfig.proc_rank() != from_proc) check_size(mySendCount); 00193 00194 printf("Scattering buffer from master.\n"); 00195 00196 // scatter geometry 00197 MPI_Scatterv(m_pBuffer, sendCounts, displacements, MPI_BYTE, m_pBuffer, 00198 mySendCount, MPI_BYTE, from_proc, MPI_COMM_WORLD); 00199 00200 if (procConfig.proc_rank() != from_proc) { 00201 result = read_buffer(ref_entity_list, m_pBuffer, mySendCount); 00202 RRA("Failed to read ref entity list from buffer."); 00203 } 00204 00205 return CUBIT_SUCCESS; 00206 #endif 00207 } 00208 00209 CubitStatus CGMParallelComm::write_buffer(DLIList<RefEntity*> &ref_entity_list, 00210 char* pBuffer, 00211 int& n_buffer_size, 00212 bool b_write_buffer) 00213 { 00214 #ifndef USE_MPI 00215 return CUBIT_FAILURE; 00216 #else 00217 if (ref_entity_list.size() == 0) { 00218 n_buffer_size = 0; 00219 return CUBIT_SUCCESS; 00220 } 00221 00222 #ifdef HAVE_OCC 00223 CubitStatus result = GeometryQueryTool::instance()->export_solid_model(ref_entity_list, pBuffer, 00224 n_buffer_size, b_write_buffer); 00225 RRA("Failed to write ref entities to buffer."); 00226 #endif 00227 00228 if (b_write_buffer) m_currentPosition += n_buffer_size; 00229 return CUBIT_SUCCESS; 00230 #endif 00231 } 00232 00233 CubitStatus CGMParallelComm::read_buffer(DLIList<RefEntity*> &ref_entity_list, 00234 const char* pBuffer, 00235 const int n_buffer_size) 00236 { 00237 #ifndef USE_MPI 00238 return CUBIT_FAILURE; 00239 #else 00240 if (n_buffer_size == 0) return CUBIT_SUCCESS; 00241 00242 #ifdef HAVE_OCC 00243 CubitStatus result = GeometryQueryTool::instance()->import_solid_model(&ref_entity_list, pBuffer, 00244 n_buffer_size); 00245 RRA("Failed to read ref entities from buffer."); 00246 #endif 00247 00248 return CUBIT_SUCCESS; 00249 #endif 00250 } 00251 00252 CubitStatus CGMParallelComm::check_size(int& target_size, const CubitBoolean keep) 00253 { 00254 printf("Checking buffer size on proc %d, target size %d.\n", 00255 procConfig.proc_rank(), target_size); 00256 00257 if (m_nBufferSize < target_size) { 00258 printf("Increasing buffer size on proc %d.\n", procConfig.proc_rank()); 00259 void *temp_buffer = malloc(target_size); 00260 if (keep && 0 != m_currentPosition) memcpy(temp_buffer, m_pBuffer, m_currentPosition); 00261 delete m_pBuffer; 00262 m_pBuffer = (char *) temp_buffer; 00263 m_nBufferSize = target_size; 00264 } 00265 00266 return CUBIT_SUCCESS; 00267 } 00268 00269 CubitStatus CGMParallelComm::append_to_buffer(DLIList<RefEntity*> &ref_entity_list, 00270 int add_size) 00271 { 00272 if (m_currentPosition + add_size > m_nBufferSize) return CUBIT_FAILURE; 00273 CubitStatus result = write_buffer(ref_entity_list, m_pBuffer + m_currentPosition, add_size, true); 00274 RRA("Failed to append ref entity list to buffer."); 00275 00276 return CUBIT_SUCCESS; 00277 }