cgma
|
00001 00002 00003 #include "CubitQtConcurrentApi.h" 00004 #include <QtConcurrentRun> 00005 #include <QtConcurrentMap> 00006 #include <QFutureSynchronizer> 00007 #include <QFutureWatcher> 00008 #include <QEventLoop> 00009 #include <QThreadStorage> 00010 00011 #include <QCoreApplication> 00012 00013 namespace { 00014 00015 class QtMutex : public QMutex, public CubitConcurrent::Mutex 00016 { 00017 public: 00018 //QtMutex() : QMutex(QMutex::Recursive) {} 00019 QtMutex() : QMutex() {} 00020 ~QtMutex() {} 00021 void lock() 00022 { 00023 QMutex::lock(); 00024 } 00025 void unlock() 00026 { 00027 QMutex::unlock(); 00028 } 00029 }; 00030 00031 00032 struct TLSWrapper 00033 { 00034 TLSWrapper(void* p, void (*cleanup)(void*)) : mP(p) {} 00035 ~TLSWrapper() 00036 { 00037 (*mCleanup)(mP); 00038 } 00039 00040 void* mP; 00041 void (*mCleanup)(void*); 00042 }; 00043 00044 00045 struct QtTLS : public QThreadStorage<TLSWrapper*>, public CubitConcurrent::ThreadLocalStorageInterface 00046 { 00047 QtTLS(void (*cleanup)(void*)) : mCleanup(cleanup) 00048 { 00049 } 00050 00051 void* local_data() 00052 { 00053 return localData()->mP; 00054 } 00055 00056 void set_local_data(void* p) 00057 { 00058 TLSWrapper* w = new TLSWrapper(p, mCleanup); 00059 setLocalData(w); 00060 } 00061 00062 void (*mCleanup)(void*); 00063 00064 }; 00065 } 00066 CubitQtConcurrent::CubitQtConcurrent() 00067 { 00068 _name = "CubitQtConcurrent"; 00069 00070 // If there is no global instance, set this object as the instance. 00071 if(!CubitConcurrent::mInstance) 00072 CubitConcurrent::mInstance = this; 00073 } 00074 00075 CubitQtConcurrent::~CubitQtConcurrent() 00076 { 00077 // If this is the global instance, clear the pointer. 00078 if(this == CubitConcurrent::mInstance) 00079 CubitConcurrent::mInstance = 0; 00080 00081 00082 } 00083 00084 const std::string& CubitQtConcurrent::get_name() const 00085 { 00086 return _name; 00087 } 00088 00089 const char* CubitQtConcurrent::get_type() const 00090 { 00091 return _name.c_str(); 00092 } 00093 00094 /* 00095 CubitConcurrent::Mutex* CubitQtConcurrent::create_mutex() 00096 { 00097 return new QtMutex; 00098 } 00099 00100 void CubitQtConcurrent::destroy_mutex(CubitConcurrent::Mutex* m) 00101 { 00102 delete static_cast<QtMutex*>(m); 00103 } 00104 */ 00105 00106 CubitConcurrent::ThreadLocalStorageInterface* CubitQtConcurrent::create_local_storage(void (*cleanup_function)(void*)) 00107 { 00108 return new QtTLS(cleanup_function); 00109 } 00110 00111 void CubitQtConcurrent::destroy_local_storage(ThreadLocalStorageInterface* i) 00112 { 00113 delete static_cast<QtTLS*>(i); 00114 } 00115 00116 void CubitQtConcurrent::schedule(CubitConcurrent::Task* task) 00117 { 00118 QFuture<void> f = ::QtConcurrent::run(task, &Task::execute); 00119 m.lock(); 00120 taskmap[task] = f; 00121 m.unlock(); 00122 } 00123 00124 void CubitQtConcurrent::wait(CubitConcurrent::Task* task) 00125 { 00126 m.lock(); 00127 QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(task); 00128 m.unlock(); 00129 iter->waitForFinished(); 00130 m.lock(); 00131 taskmap.erase(iter); 00132 m.unlock(); 00133 } 00134 00135 void CubitQtConcurrent::idle_wait(CubitConcurrent::Task* task) 00136 { 00137 m.lock(); 00138 QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(task); 00139 m.unlock(); 00140 if(!iter->isFinished()) 00141 { 00142 QEventLoop loop; 00143 QFutureWatcher<void> watcher; 00144 watcher.setFuture(*iter); 00145 QObject::connect(&watcher, SIGNAL(finished()), &loop, SLOT(quit())); 00146 loop.exec(); 00147 } 00148 m.lock(); 00149 taskmap.erase(iter); 00150 m.unlock(); 00151 } 00152 void CubitQtConcurrent::wait_for_any(const std::vector<CubitConcurrent::Task*>& tasks,std::vector<CubitConcurrent::Task*>& finished_tasks) 00153 { 00154 m.lock(); 00155 for(size_t i=0; i<tasks.size(); i++) 00156 { 00157 QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(tasks[i]); 00158 if(iter->isFinished()) 00159 { 00160 finished_tasks.push_back(tasks[i]); 00161 taskmap.erase(iter); 00162 } 00163 } 00164 m.unlock(); 00165 00166 if(!finished_tasks.empty()) 00167 return; 00168 00169 00170 if(!QCoreApplication::instance()) 00171 { 00172 int arg=0; 00173 new QCoreApplication(arg,NULL); 00174 } 00175 00176 QEventLoop evLoop; 00177 00178 m.lock(); 00179 for(size_t i=0; i<tasks.size(); i++) 00180 { 00181 QFutureWatcher<void> *f= new QFutureWatcher<void>(&evLoop); 00182 QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(tasks[i]); 00183 f->setFuture(*iter); 00184 QObject::connect(f,SIGNAL(finished()),&evLoop,SLOT(quit())); 00185 } 00186 m.unlock(); 00187 00188 evLoop.exec(); 00189 00190 m.lock(); 00191 for(size_t i=0; i<tasks.size(); i++) 00192 { 00193 QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(tasks[i]); 00194 if(iter->isFinished()) 00195 { 00196 finished_tasks.push_back(tasks[i]); 00197 taskmap.erase(iter); 00198 } 00199 } 00200 m.unlock(); 00201 00202 } 00203 void CubitQtConcurrent::wait(const std::vector<CubitConcurrent::Task*>& task) 00204 { 00205 m.lock(); 00206 std::vector<QMap<CubitConcurrent::Task*, QFuture<void> >::iterator> iters; 00207 QFutureSynchronizer<void> f; 00208 for(size_t i=0; i<task.size(); i++) 00209 { 00210 QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(task[i]); 00211 iters.push_back(iter); 00212 f.addFuture(*iter); 00213 } 00214 m.unlock(); 00215 00216 f.waitForFinished(); 00217 00218 m.lock(); 00219 for(size_t i=0; i<iters.size(); i++) 00220 { 00221 taskmap.erase(iters[i]); 00222 } 00223 m.unlock(); 00224 } 00225 00226 bool CubitQtConcurrent::is_completed(CubitConcurrent::Task* task) 00227 { 00228 m.lock(); 00229 QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(task); 00230 m.unlock(); 00231 00232 return iter->isFinished(); 00233 } 00234 00235 bool CubitQtConcurrent::is_running(CubitConcurrent::Task* task) 00236 { 00237 m.lock(); 00238 QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(task); 00239 m.unlock(); 00240 00241 return iter->isRunning(); 00242 } 00243 00244 // internal helper to start tasks within a task group 00245 void CubitQtConcurrent::execute(CubitConcurrent::Task* t) 00246 { 00247 t->execute(); 00248 } 00249 00250 void CubitQtConcurrent::schedule(CubitConcurrent::TaskGroup* tg) 00251 { 00252 QFuture<void> f = ::QtConcurrent::map(tg->tasks, CubitQtConcurrent::execute); 00253 m2.lock(); 00254 taskgroupmap[tg] = f; 00255 m2.unlock(); 00256 } 00257 00258 void CubitQtConcurrent::wait(CubitConcurrent::TaskGroup* tg) 00259 { 00260 m2.lock(); 00261 QMap<CubitConcurrent::TaskGroup*, QFuture<void> >::iterator iter = taskgroupmap.find(tg); 00262 m2.unlock(); 00263 iter->waitForFinished(); 00264 m2.lock(); 00265 taskgroupmap.erase(iter); 00266 m2.unlock(); 00267 } 00268 00269 bool CubitQtConcurrent::is_completed(CubitConcurrent::TaskGroup* tg) 00270 { 00271 m2.lock(); 00272 QMap<CubitConcurrent::TaskGroup*, QFuture<void> >::iterator iter = taskgroupmap.find(tg); 00273 m2.unlock(); 00274 00275 return iter->isFinished(); 00276 } 00277 00278 bool CubitQtConcurrent::is_running(CubitConcurrent::TaskGroup* tg) 00279 { 00280 m2.lock(); 00281 QMap<CubitConcurrent::TaskGroup*, QFuture<void> >::iterator iter = taskgroupmap.find(tg); 00282 m2.unlock(); 00283 00284 return iter->isRunning(); 00285 } 00286 00287 void CubitQtConcurrent::cancel(CubitConcurrent::TaskGroup* tg) 00288 { 00289 m2.lock(); 00290 QMap<CubitConcurrent::TaskGroup*, QFuture<void> >::iterator iter = taskgroupmap.find(tg); 00291 m2.unlock(); 00292 00293 iter->cancel(); 00294 } 00295