cgma
CubitConcurrentApi.h
Go to the documentation of this file.
00001 
00002 
00005 #ifndef _CUBIT_CONCURRENT_API_H_
00006 #define _CUBIT_CONCURRENT_API_H_
00007 
00008 
00009 #include "CGMUtilConfigure.h"
00010 #include <vector>
00011 #include <stddef.h>
00012 
00013 // class to provide a way to run tasks concurrently
00014 class  CUBIT_UTIL_EXPORT CubitConcurrent 
00015 {
00016 public:
00017   CubitConcurrent();
00018   virtual ~CubitConcurrent();
00019 
00023     static CubitConcurrent *instance() {return mInstance;}
00024 
00025 
00026   // struct for working with mutexes
00027   class Mutex
00028   {
00029   public:
00030     virtual ~Mutex() {}
00031     virtual void lock() = 0;
00032     virtual void unlock() = 0;
00033   };
00034 
00035   // convenience class to lock/unlock mutex within a scope
00036   // use this whenever possible
00037   // it also provides a safe lock()/unlock()
00038   struct MutexLocker
00039   {
00040     MutexLocker(Mutex* mutex) : mMutex(mutex), mLocked(false)
00041     {
00042       lock();
00043     }
00044     ~MutexLocker()
00045     {
00046       unlock();
00047     }
00048     void lock()
00049     {
00050       if(!mLocked)
00051       {
00052         mMutex->lock();
00053         mLocked = true;
00054       }
00055     }
00056     void unlock()
00057     {
00058       if(mLocked)
00059       {
00060         mMutex->unlock();
00061         mLocked = false;
00062       }
00063     }
00064     Mutex* mMutex;
00065     bool mLocked;
00066   };
00067 
00068   virtual Mutex* create_mutex();
00069   virtual void destroy_mutex(Mutex* m);
00070 
00071   // struct for working with thread local storage
00072   class ThreadLocalStorageInterface
00073   {
00074   public:
00075     virtual void* local_data() = 0;
00076     virtual void set_local_data(void*) = 0;
00077   protected:
00078     virtual ~ThreadLocalStorageInterface() {}
00079   };
00080 
00081   virtual ThreadLocalStorageInterface* create_local_storage(void (*cleanup_function)(void*)) = 0;
00082   virtual void destroy_local_storage(ThreadLocalStorageInterface* s) = 0;
00083 
00084   template <class T>
00085   class ThreadLocalStorage
00086   {
00087   public:
00088     ThreadLocalStorage()
00089     {
00090       mTLS = CubitConcurrent::instance()->create_local_storage(ThreadLocalStorage::cleanup);
00091     }
00092 
00093     ~ThreadLocalStorage()
00094     {
00095       CubitConcurrent::instance()->destroy_local_storage(mTLS);
00096     }
00097 
00098     T* local_data()
00099     {
00100       return reinterpret_cast<T*>(mTLS->local_data());
00101     }
00102 
00103     void set_local_data(T* t)
00104     {
00105       mTLS->set_local_data(t);
00106     }
00107 
00108   private:
00109 
00110     static void cleanup(void* p)
00111     {
00112       delete reinterpret_cast<T*>(p);
00113     }
00114 
00115     ThreadLocalStorageInterface* mTLS;
00116   };
00117 
00118 
00119   // struct to encapsulate a task to execute
00120   struct Task
00121   {
00122       virtual ~Task() {}
00123       virtual void execute() = 0;
00124   };
00125 
00126   // struct to represent a group of tasks (several tasks started with one api call)
00127   // this sometimes gives a simpler setup of tasks and also provides cancellation ability
00128   struct TaskGroup
00129   {
00130     std::vector<Task*> tasks;
00131   };
00132 
00133 
00134   // create a schedule a task for a member function with no arguments
00135   // for example:
00136   /*
00137     class Foo
00138     {
00139       void foo()
00140       {
00141         Task* t = c->create_and_schedule(*this, &Foo::thread_func);
00142         c->wait(t);
00143         delete t;
00144       }
00145       
00146       void thread_func()
00147       {
00148       }
00149     };
00150   */
00151   template <typename X>
00152   Task* create_and_schedule(X& x, void (X::*fun)())
00153   {
00154     Task* t = new ClassFunctionTask<X>(x, fun);
00155     this->schedule(t);
00156     return t;
00157   };
00158 
00159   // create and schedule a task for a member function with one argument
00160   // for example:
00161   /*
00162     class Foo
00163     {
00164       void foo()
00165       {
00166         Task* t = c->create_and_schedule(*this, &Foo::thread_func, 5);
00167         c->wait(t);
00168         delete t;
00169       }
00170       
00171       void thread_func(int v)
00172       {
00173       }
00174     };
00175   */
00176   template <typename X, typename Param1, typename Arg1>
00177   Task* create_and_schedule(X& x, void (X::*fun)(Param1), const Arg1& arg1)
00178   {
00179     return create_task1<X,Param1,const Arg1&>(x,fun,arg1);
00180   };
00181   
00182   // same as above but to handle references passed through thread function
00183   template <typename X, typename Param1, typename Arg1>
00184   Task* create_and_schedule(X& x, void (X::*fun)(Param1), Arg1& arg1)
00185   {
00186     return create_task1<X,Param1,Arg1&>(x,fun,arg1);
00187   };
00188   
00189   // create a schedule a task for a member function with two arguments
00190   // for example:
00191   /*
00192     class Foo
00193     {
00194       void foo()
00195       {
00196         int result;
00197         Task* t = c->create_and_schedule(*this, &Foo::square, 5, result);
00198         c->wait(t);
00199         delete t;
00200       }
00201       
00202       void square(int v, int& result)
00203       {
00204         result = v*v;
00205       }
00206     };
00207   */
00208   template <typename X, typename Param1, typename Param2, typename Arg1, typename Arg2>
00209   Task* create_and_schedule(X& x, void (X::*fun)(Param1, Param2), const Arg1& arg1, const Arg2& arg2)
00210   {
00211     return create_task2<X,Param1,Param2, const Arg1&,const Arg2&>(x,fun,arg1, arg2);
00212   };
00213   
00214   template <typename X, typename Param1, typename Param2, typename Arg1, typename Arg2>
00215   Task* create_and_schedule(X& x, void (X::*fun)(Param1, Param2), Arg1& arg1, const Arg2& arg2)
00216   {
00217     return create_task2<X,Param1,Param2,Arg1&, const Arg2&>(x,fun,arg1, arg2);
00218   };
00219   
00220   template <typename X, typename Param1, typename Param2, typename Arg1, typename Arg2>
00221   Task* create_and_schedule(X& x, void (X::*fun)(Param1, Param2), const Arg1& arg1, Arg2& arg2)
00222   {
00223     return create_task2<X,Param1,Param2,const Arg1&,Arg2&>(x,fun,arg1, arg2);
00224   };
00225   
00226   template <typename X, typename Param1, typename Param2, typename Arg1, typename Arg2>
00227   Task* create_and_schedule(X& x, void (X::*fun)(Param1, Param2), Arg1& arg1, Arg2& arg2)
00228   {
00229     return create_task2<X,Param1,Param2,Arg1&,Arg2&>(x,fun,arg1, arg2);
00230   };
00231 
00232   // create a schedule a task group for a member function with one argument from a sequence
00233   // for example:
00234   /*
00235     class Foo
00236     {
00237       void foo()
00238       {
00239         std::vector<int> input(5, 3);
00240         TaskGroup* t = c->create_and_schedule_group(*this, &Foo::thread_func, input);
00241         c->wait(t);
00242         c->delete_group(t);
00243       }
00244       
00245       void thread_func(int v)
00246       {
00247       }
00248     };
00249   */
00250   template <typename X, typename Param, typename Sequence>
00251   TaskGroup* create_and_schedule_group(X& x, void (X::*fun)(Param), Sequence& seq)
00252   {
00253     return create_taskgroup1<X, Param, Sequence, typename Sequence::iterator>(x, fun, seq);
00254   };
00255   
00256   template <typename X, typename Param, typename Sequence>
00257   TaskGroup* create_and_schedule_group(X& x, void (X::*fun)(Param), const Sequence& seq)
00258   {
00259     return create_taskgroup1<X, Param, const Sequence, typename Sequence::const_iterator>(x, fun, seq);
00260   };
00261 
00262   // create a schedule a task group for a member function with two arguments from two sequences
00263   // for example:
00264   /*
00265     class Foo
00266     {
00267       void foo()
00268       {
00269         std::vector<int> input(5, 3);
00270         std::vector<int> output(5);
00271         TaskGroup* t = c->create_and_schedule_group(*this, &Foo::square, input, output);
00272         c->wait(t);
00273         c->delete_group(t);
00274       }
00275       
00276       void square(int v, int& result)
00277       {
00278         result = v*v;
00279       }
00280     };
00281   */
00282   template <typename X, typename Param1, typename Param2, typename Sequence1, typename Sequence2>
00283   TaskGroup* create_and_schedule_group(X& x, void (X::*fun)(Param1, Param2), Sequence1& seq1, Sequence2& seq2)
00284   {
00285     return create_taskgroup2<X, Param1, Param2, Sequence1, Sequence2, typename Sequence1::iterator, typename Sequence2::iterator>(x, fun, seq1, seq2);
00286   };
00287   
00288   template <typename X, typename Param1, typename Param2, typename Sequence1, typename Sequence2>
00289   TaskGroup* create_and_schedule_group(X& x, void (X::*fun)(Param1, Param2), const Sequence1& seq1, Sequence2& seq2)
00290   {
00291     return create_taskgroup2<X, Param1, Param2, const Sequence1, Sequence2, typename Sequence1::const_iterator, typename Sequence2::iterator>(x, fun, seq1, seq2);
00292   };
00293   
00294   template <typename X, typename Param1, typename Param2, typename Sequence1, typename Sequence2>
00295   TaskGroup* create_and_schedule_group(X& x, void (X::*fun)(Param1, Param2), Sequence1& seq1, const Sequence2& seq2)
00296   {
00297     return create_taskgroup2<X, Param1, Param2, Sequence1, const Sequence2, typename Sequence1::iterator, typename Sequence2::const_iterator>(x, fun, seq1, seq2);
00298   };
00299   
00300   template <typename X, typename Param1, typename Param2, typename Sequence1, typename Sequence2>
00301   TaskGroup* create_and_schedule_group(X& x, void (X::*fun)(Param1, Param2), const Sequence1& seq1, const Sequence2& seq2)
00302   {
00303     return create_taskgroup2<X, Param1, Param2, const Sequence1, const Sequence2, typename Sequence1::const_iterator, typename Sequence2::const_iterator>(x, fun, seq1, seq2);
00304   };
00305 
00306   // delete a task group created by create_and_schedule_group
00307   void delete_group(TaskGroup* tg)
00308     {
00309     for(size_t i=0; i<tg->tasks.size(); i++)
00310       {
00311       delete tg->tasks[i];
00312       }
00313     delete tg;
00314     }
00315  
00316 
00317 
00318   // wait for a task to finish
00319   virtual void wait(Task* task) = 0;
00320 
00321   // wait for a task to finish
00322   // implementation may decide to wait by running a local event loop, instead of pausing this thread or taking on the unstarted task
00323   // warning: do not use this with recursive tasks
00324   virtual void idle_wait(Task* task)
00325   {
00326     wait(task);
00327   }
00328   
00329   // wait for a set of tasks to finish
00330   virtual void wait(const std::vector<Task*>& task) = 0;
00331 
00332   //wait for any of a set of tasks to finish
00333   virtual void wait_for_any(const std::vector<Task*>& tasks,std::vector<Task*>& finished_tasks) = 0;
00334 
00335   // return whether a task is complete
00336   virtual bool is_completed(Task* task) = 0;
00337   
00338   // return whether a task is currently running (as opposed to waiting in the queue)
00339   virtual bool is_running(Task* task) = 0;
00340   
00341   // wait for a task group to complete
00342   virtual void wait(TaskGroup* task_group) = 0;
00343   
00344   // return whether a task group is complete
00345   virtual bool is_completed(TaskGroup* task_group) = 0;
00346   
00347   // return whether a task group is currently running (as opposed to waiting in the queue)
00348   virtual bool is_running(TaskGroup* task_group) = 0;
00349  
00350   // cancel a task group's execution
00351   // this only un-queues tasks that haven't started, and running tasks will run to completion
00352   // after canceling a task group, one still needs to call wait() for completion.
00353   virtual void cancel(TaskGroup* task_group) = 0;
00354   
00355 
00356 protected: 
00357     static CubitConcurrent *mInstance; 
00358 
00359   // schedule a task for execution
00360   virtual void schedule(Task* task) = 0;
00361   
00362   // schedule a group of tasks for execution
00363   virtual void schedule(TaskGroup* task_group) = 0;
00364   
00365   
00366   
00367   
00368   // implementation helpers
00369   template <class X>
00370   struct ClassFunctionTask : public Task
00371   {
00372     public:
00373       ClassFunctionTask(X& _ptr, void (X::*_MemFun)()) : ptr(_ptr), MemFun(_MemFun) {}
00374       void execute()
00375       {
00376         (ptr.*MemFun)();
00377       }
00378     protected:
00379       X& ptr;
00380       void (X::*MemFun)();
00381         private:
00382           const ClassFunctionTask& operator=(const ClassFunctionTask&);
00383               ClassFunctionTask(const ClassFunctionTask&);
00384   };
00385 
00386   template <typename X, typename Param1>
00387   struct ClassFunctionTaskArg1 : public Task
00388   {
00389     public:
00390       ClassFunctionTaskArg1(X& _ptr, void (X::*_MemFun)(Param1), Param1 _arg1) : ptr(_ptr), MemFun(_MemFun), arg1(_arg1) {}
00391       void execute()
00392       {
00393         (ptr.*MemFun)(arg1);
00394       }
00395     protected:
00396       X& ptr;
00397       void (X::*MemFun)(Param1);
00398       Param1 arg1;
00399         private:
00400           const ClassFunctionTaskArg1& operator=(const ClassFunctionTaskArg1&);
00401               ClassFunctionTaskArg1(const ClassFunctionTaskArg1&);
00402   };
00403 
00404   template <typename X, typename Param1, typename Param2>
00405   struct ClassFunctionTaskArg2 : public Task
00406   {
00407     public:
00408       ClassFunctionTaskArg2(X& _ptr, void (X::*_MemFun)(Param1, Param2), Param1 _arg1, Param2 _arg2)
00409         : ptr(_ptr), MemFun(_MemFun), arg1(_arg1), arg2(_arg2) {}
00410       void execute()
00411       {
00412         (ptr.*MemFun)(arg1, arg2);
00413       }
00414     protected:
00415       X& ptr;
00416       void (X::*MemFun)(Param1, Param2);
00417       Param1 arg1;
00418       Param2 arg2;
00419         private:
00420           const ClassFunctionTaskArg2& operator=(const ClassFunctionTaskArg2&);
00421               ClassFunctionTaskArg2(const ClassFunctionTaskArg2&);
00422   };
00423 
00424   template <typename X, typename Param1, typename Arg1>
00425   inline Task* create_task1(X& x, void (X::*fun)(Param1), Arg1 arg1)
00426   {
00427     Task* t = new ClassFunctionTaskArg1<X, Param1>(x, fun, arg1);
00428     this->schedule(t);
00429     return t;
00430   }
00431   template <typename X, typename Param1, typename Param2, typename Arg1, typename Arg2>
00432   inline Task* create_task2(X& x, void (X::*fun)(Param1, Param2), Arg1 arg1, Arg2 arg2)
00433   {
00434     Task* t = new ClassFunctionTaskArg2<X, Param1, Param2>(x, fun, arg1, arg2);
00435     this->schedule(t);
00436     return t;
00437   }
00438 
00439   template <typename X, typename Param, typename Sequence, typename Iterator>
00440   inline TaskGroup* create_taskgroup1(X& x, void (X::*fun)(Param), Sequence& seq)
00441   {
00442     TaskGroup* tg = new TaskGroup;
00443     Iterator iter;
00444     for(iter = seq.begin(); iter != seq.end(); ++iter)
00445       {
00446       Task* t = new ClassFunctionTaskArg1<X, Param>(x, fun, *iter);
00447       tg->tasks.push_back(t);
00448       }
00449     this->schedule(tg);
00450     return tg;
00451   }
00452   
00453   template <typename X, typename Param1, typename Param2, typename Sequence1, typename Sequence2, typename Iterator1, typename Iterator2>
00454   inline TaskGroup* create_taskgroup2(X& x, void (X::*fun)(Param1, Param2), Sequence1& seq1, Sequence2& seq2)
00455   {
00456     if(seq1.size() != seq2.size())
00457       return NULL;
00458 
00459     TaskGroup* tg = new TaskGroup;
00460     Iterator1 iter1;
00461     Iterator2 iter2;
00462     for(iter1 = seq1.begin(), iter2 = seq2.begin(); iter1 != seq1.end(); ++iter1, ++iter2)
00463       {
00464       Task* t = new ClassFunctionTaskArg2<X, Param1, Param2>(x, fun, *iter1, *iter2);
00465       tg->tasks.push_back(t);
00466       }
00467     this->schedule(tg);
00468     return tg;
00469   }
00470 
00471 
00472 public:
00473 
00474   const char* get_base_type() const;
00475 
00476 };
00477 
00478 
00479 #endif // CUBITCONCURRENT_API_H_
00480 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines