cgma
CubitQtConcurrentApi.cpp
Go to the documentation of this file.
00001 
00002 
00003 #include "CubitQtConcurrentApi.h"
00004 #include <QtConcurrentRun>
00005 #include <QtConcurrentMap>
00006 #include <QFutureSynchronizer>
00007 #include <QFutureWatcher>
00008 #include <QEventLoop>
00009 #include <QThreadStorage>
00010 
00011 #include <QCoreApplication>
00012 
00013 namespace {
00014 
00015   class QtMutex : public QMutex, public CubitConcurrent::Mutex
00016   {
00017   public:
00018     //QtMutex() : QMutex(QMutex::Recursive) {}
00019     QtMutex() : QMutex() {}
00020     ~QtMutex() {}
00021     void lock()
00022     {
00023       QMutex::lock();
00024     }
00025     void unlock()
00026     {
00027       QMutex::unlock();
00028     }
00029   };
00030 
00031 
00032   struct TLSWrapper
00033   {
00034     TLSWrapper(void* p, void (*cleanup)(void*)) : mP(p) {}
00035     ~TLSWrapper()
00036     {
00037       (*mCleanup)(mP);
00038     }
00039 
00040     void* mP;
00041     void (*mCleanup)(void*);
00042   };
00043 
00044 
00045   struct QtTLS : public QThreadStorage<TLSWrapper*>,  public CubitConcurrent::ThreadLocalStorageInterface
00046   {
00047     QtTLS(void (*cleanup)(void*)) : mCleanup(cleanup)
00048     {
00049     }
00050 
00051     void* local_data()
00052     {
00053       return localData()->mP;
00054     }
00055 
00056     void set_local_data(void* p)
00057     {
00058       TLSWrapper* w = new TLSWrapper(p, mCleanup);
00059       setLocalData(w);
00060     }
00061 
00062     void (*mCleanup)(void*);
00063 
00064   };
00065 }
00066 CubitQtConcurrent::CubitQtConcurrent()
00067 {
00068     _name = "CubitQtConcurrent";
00069 
00070     // If there is no global instance, set this object as the instance.
00071   if(!CubitConcurrent::mInstance)
00072     CubitConcurrent::mInstance = this;
00073 }
00074 
00075 CubitQtConcurrent::~CubitQtConcurrent()
00076 {
00077   // If this is the global instance, clear the pointer.
00078   if(this == CubitConcurrent::mInstance)
00079     CubitConcurrent::mInstance = 0;
00080 
00081 
00082 }
00083 
00084 const std::string& CubitQtConcurrent::get_name() const
00085 {
00086     return _name;
00087 }
00088 
00089 const char* CubitQtConcurrent::get_type() const
00090 {
00091     return _name.c_str();
00092 }
00093 
00094 /*
00095 CubitConcurrent::Mutex* CubitQtConcurrent::create_mutex()
00096 {
00097   return new QtMutex;
00098 }
00099 
00100 void CubitQtConcurrent::destroy_mutex(CubitConcurrent::Mutex* m)
00101 {
00102   delete static_cast<QtMutex*>(m);
00103 }
00104 */
00105 
00106 CubitConcurrent::ThreadLocalStorageInterface* CubitQtConcurrent::create_local_storage(void (*cleanup_function)(void*))
00107 {
00108   return new QtTLS(cleanup_function);
00109 }
00110 
00111 void CubitQtConcurrent::destroy_local_storage(ThreadLocalStorageInterface* i)
00112 {
00113   delete static_cast<QtTLS*>(i);
00114 }
00115 
00116 void CubitQtConcurrent::schedule(CubitConcurrent::Task* task)
00117 {
00118     QFuture<void> f = ::QtConcurrent::run(task, &Task::execute);    
00119     m.lock();
00120     taskmap[task] = f;
00121     m.unlock();
00122 }
00123 
00124 void CubitQtConcurrent::wait(CubitConcurrent::Task* task)
00125 {
00126     m.lock();
00127     QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(task);
00128     m.unlock();
00129     iter->waitForFinished();
00130     m.lock();
00131     taskmap.erase(iter);
00132     m.unlock();
00133 }
00134 
00135 void CubitQtConcurrent::idle_wait(CubitConcurrent::Task* task)
00136 {
00137   m.lock();
00138   QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(task);
00139   m.unlock();
00140   if(!iter->isFinished())
00141   {
00142     QEventLoop loop;
00143     QFutureWatcher<void> watcher;
00144     watcher.setFuture(*iter);
00145     QObject::connect(&watcher, SIGNAL(finished()), &loop, SLOT(quit()));
00146     loop.exec();
00147   }
00148   m.lock();
00149   taskmap.erase(iter);
00150   m.unlock();
00151 }
00152 void CubitQtConcurrent::wait_for_any(const std::vector<CubitConcurrent::Task*>& tasks,std::vector<CubitConcurrent::Task*>& finished_tasks)
00153 {
00154     m.lock();
00155     for(size_t i=0; i<tasks.size(); i++)
00156     {
00157       QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(tasks[i]);
00158       if(iter->isFinished())
00159       {
00160         finished_tasks.push_back(tasks[i]);
00161         taskmap.erase(iter);
00162       }
00163     }
00164     m.unlock();
00165 
00166     if(!finished_tasks.empty())
00167       return;
00168 
00169 
00170     if(!QCoreApplication::instance())
00171     {
00172       int arg=0;
00173       new QCoreApplication(arg,NULL);
00174     }
00175 
00176     QEventLoop evLoop;
00177 
00178     m.lock();
00179     for(size_t i=0; i<tasks.size(); i++)
00180     {
00181       QFutureWatcher<void> *f= new QFutureWatcher<void>(&evLoop);
00182       QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(tasks[i]);
00183       f->setFuture(*iter);
00184       QObject::connect(f,SIGNAL(finished()),&evLoop,SLOT(quit()));
00185     }
00186     m.unlock();
00187 
00188     evLoop.exec();
00189 
00190     m.lock();
00191     for(size_t i=0; i<tasks.size(); i++)
00192     {
00193       QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(tasks[i]);
00194       if(iter->isFinished())
00195       {
00196         finished_tasks.push_back(tasks[i]);
00197         taskmap.erase(iter);
00198       }
00199     }
00200     m.unlock();
00201 
00202 }
00203 void CubitQtConcurrent::wait(const std::vector<CubitConcurrent::Task*>& task)
00204 {
00205     m.lock();
00206     std::vector<QMap<CubitConcurrent::Task*, QFuture<void> >::iterator> iters;
00207     QFutureSynchronizer<void> f;
00208     for(size_t i=0; i<task.size(); i++)
00209     {
00210         QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(task[i]);
00211         iters.push_back(iter);
00212         f.addFuture(*iter);
00213     }
00214     m.unlock();
00215 
00216     f.waitForFinished();
00217 
00218     m.lock();
00219     for(size_t i=0; i<iters.size(); i++)
00220     {
00221         taskmap.erase(iters[i]);
00222     }
00223     m.unlock();
00224 }
00225 
00226 bool CubitQtConcurrent::is_completed(CubitConcurrent::Task* task)
00227 {
00228     m.lock();
00229     QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(task);
00230     m.unlock();
00231 
00232     return iter->isFinished();
00233 }
00234 
00235 bool CubitQtConcurrent::is_running(CubitConcurrent::Task* task)
00236 {
00237     m.lock();
00238     QMap<CubitConcurrent::Task*, QFuture<void> >::iterator iter = taskmap.find(task);
00239     m.unlock();
00240 
00241     return iter->isRunning();
00242 }
00243 
00244 // internal helper to start tasks within a task group
00245 void CubitQtConcurrent::execute(CubitConcurrent::Task* t)
00246 {
00247     t->execute();
00248 }
00249 
00250 void CubitQtConcurrent::schedule(CubitConcurrent::TaskGroup* tg)
00251 {
00252     QFuture<void> f = ::QtConcurrent::map(tg->tasks, CubitQtConcurrent::execute);
00253     m2.lock();
00254     taskgroupmap[tg] = f;
00255     m2.unlock();
00256 }
00257 
00258 void CubitQtConcurrent::wait(CubitConcurrent::TaskGroup* tg)
00259 {
00260     m2.lock();
00261     QMap<CubitConcurrent::TaskGroup*, QFuture<void> >::iterator iter = taskgroupmap.find(tg);
00262     m2.unlock();
00263     iter->waitForFinished();
00264     m2.lock();
00265     taskgroupmap.erase(iter);
00266     m2.unlock();
00267 }
00268 
00269 bool CubitQtConcurrent::is_completed(CubitConcurrent::TaskGroup* tg)
00270 {
00271     m2.lock();
00272     QMap<CubitConcurrent::TaskGroup*, QFuture<void> >::iterator iter = taskgroupmap.find(tg);
00273     m2.unlock();
00274 
00275     return iter->isFinished();
00276 }
00277 
00278 bool CubitQtConcurrent::is_running(CubitConcurrent::TaskGroup* tg)
00279 {
00280     m2.lock();
00281     QMap<CubitConcurrent::TaskGroup*, QFuture<void> >::iterator iter = taskgroupmap.find(tg);
00282     m2.unlock();
00283 
00284     return iter->isRunning();
00285 }
00286 
00287 void CubitQtConcurrent::cancel(CubitConcurrent::TaskGroup* tg)
00288 {
00289     m2.lock();
00290     QMap<CubitConcurrent::TaskGroup*, QFuture<void> >::iterator iter = taskgroupmap.find(tg);
00291     m2.unlock();
00292 
00293     iter->cancel();
00294 }
00295 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines