cgma
CGMParallelComm.cpp
Go to the documentation of this file.
00001 #include "CGMParallelComm.hpp"
00002 #include "TopologyEntity.hpp"
00003 #include "GeometryQueryEngine.hpp"
00004 #include "RefEntity.hpp"
00005 #include "GeometryQueryTool.hpp"
00006 #include "TDParallel.hpp"
00007 
00008 #include <algorithm>
00009 
00010 #define INITIAL_BUFF_SIZE 1024
00011 #define RRA(a) if (CUBIT_SUCCESS != result) {   \
00012     std::string tmp_str;            \
00013     tmp_str.append("\n"); tmp_str.append(a);    \
00014     PRINT_ERROR("%s", tmp_str.c_str());     \
00015     return result;}
00016 
00017 //std::vector<CGMParallelComm*> CGMParallelComm::instanceList;
00018 
00019 CGMParallelComm::CGMParallelComm(MPI_Comm comm)
00020   : gqt(NULL), procConfig(comm), m_pBuffer(NULL), m_nBufferSize(0), m_currentPosition(0)
00021 {
00022   myBuffer.resize(INITIAL_BUFF_SIZE);
00023   
00024   int flag = 1;
00025   int retval = MPI_Initialized(&flag);
00026   if (MPI_SUCCESS != retval || !flag) {
00027     int argc = 0;
00028     char **argv = NULL;
00029     
00030     // mpi not initialized yet - initialize here
00031     retval = MPI_Init(&argc, &argv);
00032   }
00033 }
00034 
00035 CGMParallelComm::CGMParallelComm(std::vector<unsigned char> &tmp_buff, 
00036                  MPI_Comm comm)
00037   : gqt(NULL), procConfig(comm), m_pBuffer(NULL), m_nBufferSize(0), m_currentPosition(0)
00038 {
00039   myBuffer.swap(tmp_buff);
00040   int flag = 1;
00041   int retval = MPI_Initialized(&flag);
00042   if (MPI_SUCCESS != retval || !flag) {
00043     int argc = 0;
00044     char **argv = NULL;
00045     
00046     // mpi not initialized yet - initialize here
00047     retval = MPI_Init(&argc, &argv);
00048   }
00049 }
00050 
00051 CGMParallelComm::~CGMParallelComm() 
00052 {
00053   delete m_pBuffer;
00054 }
00055 
00056 CubitStatus CGMParallelComm::bcast_buffer(const unsigned int from_proc) 
00057 {
00058   //- broadcasts the buffer contained in this object
00059   if (procConfig.proc_rank() == from_proc) {
00060     printf("Broadcasting buffer size from %d.\n", from_proc);
00061     MPI_Bcast(&m_nBufferSize, 1, MPI_INT, from_proc, MPI_COMM_WORLD);
00062     printf("Broadcasting buffer from %d, %d bytes.\n", from_proc,
00063        m_nBufferSize);
00064     MPI_Bcast(m_pBuffer, m_nBufferSize, MPI_BYTE, from_proc, 
00065               MPI_COMM_WORLD);
00066   }
00067   else {
00068     int this_size;
00069     printf("Broadcasting buffer size from proc %d.\n",
00070        procConfig.proc_rank());
00071     MPI_Bcast(&this_size, 1, MPI_INT, from_proc, 
00072               MPI_COMM_WORLD);
00073     printf("Processor %d: received size of %d.\n", procConfig.proc_rank(), this_size);
00074     check_size(this_size);
00075     printf("Broadcasting buffer from proc %d, %d bytes.\n", 
00076        procConfig.proc_rank(), this_size);
00077     MPI_Bcast(m_pBuffer, this_size, MPI_BYTE, from_proc, 
00078               MPI_COMM_WORLD);
00079   }
00080  
00081   return CUBIT_SUCCESS;
00082 }
00083 
00084 CubitStatus CGMParallelComm::broadcast_entities(const unsigned int from_proc,
00085                         DLIList<RefEntity*> &ref_entity_list)
00086 {
00087 #ifndef USE_MPI
00088   return CUBIT_FAILURE;
00089 #else
00090   CubitStatus result = CUBIT_SUCCESS;
00091 
00092   if (procConfig.proc_rank() == from_proc) {
00093     int nBufferSize = 0;
00094     result = write_buffer(ref_entity_list, m_pBuffer, nBufferSize, false);
00095     RRA("Failed to write ref entity list to buffer.");
00096 
00097     result = check_size(nBufferSize);
00098     RRA("Failed to write ref entity list to buffer.");
00099 
00100     result = write_buffer(ref_entity_list, m_pBuffer, nBufferSize, true);
00101     RRA("Failed to write ref entity list to buffer.");
00102   }
00103   
00104   result = bcast_buffer(from_proc);
00105   RRA("Failed to broadcast buffer to processors.");
00106   
00107   if (procConfig.proc_rank() != from_proc) {
00108     result = read_buffer(ref_entity_list, m_pBuffer, m_nBufferSize);
00109     RRA("Failed to read ref entity list from buffer.");
00110   }
00111   
00112   return CUBIT_SUCCESS;
00113 #endif
00114 }
00115 
00116 // scatter exact amount of geometry information to each processors
00117 CubitStatus CGMParallelComm::scatter_entities(const unsigned int from_proc,
00118                           DLIList<RefEntity*> &ref_entity_list)
00119 {
00120 #ifndef USE_MPI
00121   return CUBIT_FAILURE;
00122 #else
00123   CubitStatus result = CUBIT_SUCCESS;
00124   int i, mySendCount, nEntity;
00125   int nProcs = procConfig.proc_size();
00126   int *sendCounts = new int[nProcs];
00127   int *displacements = new int[nProcs];
00128   displacements[0] = 0;
00129 
00130   if (procConfig.proc_rank() == from_proc) {
00131     // make a balanced entity lists
00132     int sum = 0;
00133     DLIList<RefEntity*> **balancedLists = new DLIList<RefEntity*>*[nProcs];
00134     
00135     for (i = 0; i < nProcs; i++) {
00136       balancedLists[i] = new DLIList<RefEntity*>;
00137     }
00138     
00139     nEntity = ref_entity_list.size();
00140     ref_entity_list.reset();
00141     for (i = 0; i < nEntity; i++) {
00142       RefEntity* entity = ref_entity_list.get_and_step();
00143       TDParallel *td_par = (TDParallel *) entity->get_TD(&TDParallel::is_parallel);
00144       
00145       if (td_par == NULL) {
00146     PRINT_ERROR("Partitioned entities should have TDParallel data.");
00147     return CUBIT_FAILURE;
00148       }
00149       unsigned int charge_p = td_par->get_charge_proc();
00150       if (charge_p != from_proc) { // only to compute processors
00151         balancedLists[charge_p]->append(entity); // add charge processor
00152       }
00153       
00154       DLIList<int>* ghost_procs = td_par->get_ghost_proc_list();
00155       int n_ghost = ghost_procs->size();
00156       ghost_procs->reset();
00157       for (int j = 0; j < n_ghost; j++) { // add ghost processors
00158         unsigned int ghost_p = ghost_procs->get_and_step();
00159         if (ghost_p != from_proc) balancedLists[ghost_p]->append(entity);
00160       }
00161     }
00162     
00163     // add buffer size for each processors
00164     for (i = 0; i < nProcs; i++) {
00165       result = write_buffer(*(balancedLists[i]), m_pBuffer, sendCounts[i], false);
00166       RRA("Failed to write ref entity list to buffer.");
00167       sum += sendCounts[i];
00168     }
00169   
00170     // check the size of the buffer and resize if necessary
00171     check_size(sum);
00172     
00173     // now append the real information
00174     ref_entity_list.reset();
00175     for (i = 0; i < nProcs; i++) {
00176       append_to_buffer(*(balancedLists[i]), sendCounts[i]);
00177     }
00178 
00179     delete [] balancedLists;
00180   }
00181 
00182   // broadcast buffer size array
00183   printf("Broadcasting buffer size array from master.\n");
00184   MPI_Bcast(sendCounts, nProcs, MPI_INT, from_proc, MPI_COMM_WORLD);
00185   
00186   for ( i = 1; i < nProcs; i++) {
00187     displacements[i] = displacements[i-1] + sendCounts[i-1];
00188   }
00189   
00190   mySendCount = sendCounts[procConfig.proc_rank()];
00191 
00192   if (procConfig.proc_rank() != from_proc) check_size(mySendCount);
00193 
00194   printf("Scattering buffer from master.\n");
00195 
00196   // scatter geometry
00197   MPI_Scatterv(m_pBuffer, sendCounts, displacements, MPI_BYTE, m_pBuffer, 
00198            mySendCount, MPI_BYTE, from_proc, MPI_COMM_WORLD);
00199 
00200   if (procConfig.proc_rank() != from_proc) {
00201     result = read_buffer(ref_entity_list, m_pBuffer, mySendCount);
00202     RRA("Failed to read ref entity list from buffer.");
00203   }
00204 
00205   return CUBIT_SUCCESS;
00206 #endif
00207 }
00208 
00209 CubitStatus CGMParallelComm::write_buffer(DLIList<RefEntity*> &ref_entity_list,
00210                       char* pBuffer,
00211                       int& n_buffer_size,
00212                       bool b_write_buffer)
00213 {
00214 #ifndef USE_MPI
00215   return CUBIT_FAILURE;
00216 #else
00217   if (ref_entity_list.size() == 0) {
00218     n_buffer_size = 0;
00219     return CUBIT_SUCCESS;
00220   }
00221 
00222 #ifdef HAVE_OCC
00223   CubitStatus result = GeometryQueryTool::instance()->export_solid_model(ref_entity_list, pBuffer,
00224                                      n_buffer_size, b_write_buffer);
00225   RRA("Failed to write ref entities to buffer.");
00226 #endif
00227 
00228   if (b_write_buffer) m_currentPosition += n_buffer_size;
00229   return CUBIT_SUCCESS;
00230 #endif
00231 }
00232 
00233 CubitStatus CGMParallelComm::read_buffer(DLIList<RefEntity*> &ref_entity_list,
00234                      const char* pBuffer,
00235                      const int n_buffer_size)
00236 {
00237 #ifndef USE_MPI
00238   return CUBIT_FAILURE;
00239 #else
00240   if (n_buffer_size == 0) return CUBIT_SUCCESS;
00241 
00242 #ifdef HAVE_OCC
00243   CubitStatus result = GeometryQueryTool::instance()->import_solid_model(&ref_entity_list, pBuffer,
00244                                      n_buffer_size);
00245   RRA("Failed to read ref entities from buffer.");
00246 #endif
00247   
00248   return CUBIT_SUCCESS;
00249 #endif
00250 }
00251 
00252 CubitStatus CGMParallelComm::check_size(int& target_size, const CubitBoolean keep) 
00253 {
00254   printf("Checking buffer size on proc %d, target size %d.\n", 
00255                   procConfig.proc_rank(), target_size);
00256 
00257   if (m_nBufferSize < target_size) {
00258     printf("Increasing buffer size on proc %d.\n", procConfig.proc_rank());
00259     void *temp_buffer = malloc(target_size);
00260     if (keep && 0 != m_currentPosition) memcpy(temp_buffer, m_pBuffer, m_currentPosition);
00261     delete m_pBuffer;
00262     m_pBuffer = (char *) temp_buffer;
00263     m_nBufferSize = target_size;
00264   }
00265 
00266   return CUBIT_SUCCESS;
00267 }
00268 
00269 CubitStatus CGMParallelComm::append_to_buffer(DLIList<RefEntity*> &ref_entity_list,
00270                           int add_size) 
00271 {
00272   if (m_currentPosition + add_size > m_nBufferSize) return CUBIT_FAILURE;
00273   CubitStatus result = write_buffer(ref_entity_list, m_pBuffer + m_currentPosition, add_size, true);
00274   RRA("Failed to append ref entity list to buffer.");
00275   
00276   return CUBIT_SUCCESS;
00277 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines