cgma
|
00001 00002 00005 #ifndef _CUBIT_CONCURRENT_API_H_ 00006 #define _CUBIT_CONCURRENT_API_H_ 00007 00008 00009 #include "CGMUtilConfigure.h" 00010 #include <vector> 00011 #include <stddef.h> 00012 00013 // class to provide a way to run tasks concurrently 00014 class CUBIT_UTIL_EXPORT CubitConcurrent 00015 { 00016 public: 00017 CubitConcurrent(); 00018 virtual ~CubitConcurrent(); 00019 00023 static CubitConcurrent *instance() {return mInstance;} 00024 00025 00026 // struct for working with mutexes 00027 class Mutex 00028 { 00029 public: 00030 virtual ~Mutex() {} 00031 virtual void lock() = 0; 00032 virtual void unlock() = 0; 00033 }; 00034 00035 // convenience class to lock/unlock mutex within a scope 00036 // use this whenever possible 00037 // it also provides a safe lock()/unlock() 00038 struct MutexLocker 00039 { 00040 MutexLocker(Mutex* mutex) : mMutex(mutex), mLocked(false) 00041 { 00042 lock(); 00043 } 00044 ~MutexLocker() 00045 { 00046 unlock(); 00047 } 00048 void lock() 00049 { 00050 if(!mLocked) 00051 { 00052 mMutex->lock(); 00053 mLocked = true; 00054 } 00055 } 00056 void unlock() 00057 { 00058 if(mLocked) 00059 { 00060 mMutex->unlock(); 00061 mLocked = false; 00062 } 00063 } 00064 Mutex* mMutex; 00065 bool mLocked; 00066 }; 00067 00068 virtual Mutex* create_mutex(); 00069 virtual void destroy_mutex(Mutex* m); 00070 00071 // struct for working with thread local storage 00072 class ThreadLocalStorageInterface 00073 { 00074 public: 00075 virtual void* local_data() = 0; 00076 virtual void set_local_data(void*) = 0; 00077 protected: 00078 virtual ~ThreadLocalStorageInterface() {} 00079 }; 00080 00081 virtual ThreadLocalStorageInterface* create_local_storage(void (*cleanup_function)(void*)) = 0; 00082 virtual void destroy_local_storage(ThreadLocalStorageInterface* s) = 0; 00083 00084 template <class T> 00085 class ThreadLocalStorage 00086 { 00087 public: 00088 ThreadLocalStorage() 00089 { 00090 mTLS = CubitConcurrent::instance()->create_local_storage(ThreadLocalStorage::cleanup); 00091 } 00092 00093 ~ThreadLocalStorage() 00094 { 00095 CubitConcurrent::instance()->destroy_local_storage(mTLS); 00096 } 00097 00098 T* local_data() 00099 { 00100 return reinterpret_cast<T*>(mTLS->local_data()); 00101 } 00102 00103 void set_local_data(T* t) 00104 { 00105 mTLS->set_local_data(t); 00106 } 00107 00108 private: 00109 00110 static void cleanup(void* p) 00111 { 00112 delete reinterpret_cast<T*>(p); 00113 } 00114 00115 ThreadLocalStorageInterface* mTLS; 00116 }; 00117 00118 00119 // struct to encapsulate a task to execute 00120 struct Task 00121 { 00122 virtual ~Task() {} 00123 virtual void execute() = 0; 00124 }; 00125 00126 // struct to represent a group of tasks (several tasks started with one api call) 00127 // this sometimes gives a simpler setup of tasks and also provides cancellation ability 00128 struct TaskGroup 00129 { 00130 std::vector<Task*> tasks; 00131 }; 00132 00133 00134 // create a schedule a task for a member function with no arguments 00135 // for example: 00136 /* 00137 class Foo 00138 { 00139 void foo() 00140 { 00141 Task* t = c->create_and_schedule(*this, &Foo::thread_func); 00142 c->wait(t); 00143 delete t; 00144 } 00145 00146 void thread_func() 00147 { 00148 } 00149 }; 00150 */ 00151 template <typename X> 00152 Task* create_and_schedule(X& x, void (X::*fun)()) 00153 { 00154 Task* t = new ClassFunctionTask<X>(x, fun); 00155 this->schedule(t); 00156 return t; 00157 }; 00158 00159 // create and schedule a task for a member function with one argument 00160 // for example: 00161 /* 00162 class Foo 00163 { 00164 void foo() 00165 { 00166 Task* t = c->create_and_schedule(*this, &Foo::thread_func, 5); 00167 c->wait(t); 00168 delete t; 00169 } 00170 00171 void thread_func(int v) 00172 { 00173 } 00174 }; 00175 */ 00176 template <typename X, typename Param1, typename Arg1> 00177 Task* create_and_schedule(X& x, void (X::*fun)(Param1), const Arg1& arg1) 00178 { 00179 return create_task1<X,Param1,const Arg1&>(x,fun,arg1); 00180 }; 00181 00182 // same as above but to handle references passed through thread function 00183 template <typename X, typename Param1, typename Arg1> 00184 Task* create_and_schedule(X& x, void (X::*fun)(Param1), Arg1& arg1) 00185 { 00186 return create_task1<X,Param1,Arg1&>(x,fun,arg1); 00187 }; 00188 00189 // create a schedule a task for a member function with two arguments 00190 // for example: 00191 /* 00192 class Foo 00193 { 00194 void foo() 00195 { 00196 int result; 00197 Task* t = c->create_and_schedule(*this, &Foo::square, 5, result); 00198 c->wait(t); 00199 delete t; 00200 } 00201 00202 void square(int v, int& result) 00203 { 00204 result = v*v; 00205 } 00206 }; 00207 */ 00208 template <typename X, typename Param1, typename Param2, typename Arg1, typename Arg2> 00209 Task* create_and_schedule(X& x, void (X::*fun)(Param1, Param2), const Arg1& arg1, const Arg2& arg2) 00210 { 00211 return create_task2<X,Param1,Param2, const Arg1&,const Arg2&>(x,fun,arg1, arg2); 00212 }; 00213 00214 template <typename X, typename Param1, typename Param2, typename Arg1, typename Arg2> 00215 Task* create_and_schedule(X& x, void (X::*fun)(Param1, Param2), Arg1& arg1, const Arg2& arg2) 00216 { 00217 return create_task2<X,Param1,Param2,Arg1&, const Arg2&>(x,fun,arg1, arg2); 00218 }; 00219 00220 template <typename X, typename Param1, typename Param2, typename Arg1, typename Arg2> 00221 Task* create_and_schedule(X& x, void (X::*fun)(Param1, Param2), const Arg1& arg1, Arg2& arg2) 00222 { 00223 return create_task2<X,Param1,Param2,const Arg1&,Arg2&>(x,fun,arg1, arg2); 00224 }; 00225 00226 template <typename X, typename Param1, typename Param2, typename Arg1, typename Arg2> 00227 Task* create_and_schedule(X& x, void (X::*fun)(Param1, Param2), Arg1& arg1, Arg2& arg2) 00228 { 00229 return create_task2<X,Param1,Param2,Arg1&,Arg2&>(x,fun,arg1, arg2); 00230 }; 00231 00232 // create a schedule a task group for a member function with one argument from a sequence 00233 // for example: 00234 /* 00235 class Foo 00236 { 00237 void foo() 00238 { 00239 std::vector<int> input(5, 3); 00240 TaskGroup* t = c->create_and_schedule_group(*this, &Foo::thread_func, input); 00241 c->wait(t); 00242 c->delete_group(t); 00243 } 00244 00245 void thread_func(int v) 00246 { 00247 } 00248 }; 00249 */ 00250 template <typename X, typename Param, typename Sequence> 00251 TaskGroup* create_and_schedule_group(X& x, void (X::*fun)(Param), Sequence& seq) 00252 { 00253 return create_taskgroup1<X, Param, Sequence, typename Sequence::iterator>(x, fun, seq); 00254 }; 00255 00256 template <typename X, typename Param, typename Sequence> 00257 TaskGroup* create_and_schedule_group(X& x, void (X::*fun)(Param), const Sequence& seq) 00258 { 00259 return create_taskgroup1<X, Param, const Sequence, typename Sequence::const_iterator>(x, fun, seq); 00260 }; 00261 00262 // create a schedule a task group for a member function with two arguments from two sequences 00263 // for example: 00264 /* 00265 class Foo 00266 { 00267 void foo() 00268 { 00269 std::vector<int> input(5, 3); 00270 std::vector<int> output(5); 00271 TaskGroup* t = c->create_and_schedule_group(*this, &Foo::square, input, output); 00272 c->wait(t); 00273 c->delete_group(t); 00274 } 00275 00276 void square(int v, int& result) 00277 { 00278 result = v*v; 00279 } 00280 }; 00281 */ 00282 template <typename X, typename Param1, typename Param2, typename Sequence1, typename Sequence2> 00283 TaskGroup* create_and_schedule_group(X& x, void (X::*fun)(Param1, Param2), Sequence1& seq1, Sequence2& seq2) 00284 { 00285 return create_taskgroup2<X, Param1, Param2, Sequence1, Sequence2, typename Sequence1::iterator, typename Sequence2::iterator>(x, fun, seq1, seq2); 00286 }; 00287 00288 template <typename X, typename Param1, typename Param2, typename Sequence1, typename Sequence2> 00289 TaskGroup* create_and_schedule_group(X& x, void (X::*fun)(Param1, Param2), const Sequence1& seq1, Sequence2& seq2) 00290 { 00291 return create_taskgroup2<X, Param1, Param2, const Sequence1, Sequence2, typename Sequence1::const_iterator, typename Sequence2::iterator>(x, fun, seq1, seq2); 00292 }; 00293 00294 template <typename X, typename Param1, typename Param2, typename Sequence1, typename Sequence2> 00295 TaskGroup* create_and_schedule_group(X& x, void (X::*fun)(Param1, Param2), Sequence1& seq1, const Sequence2& seq2) 00296 { 00297 return create_taskgroup2<X, Param1, Param2, Sequence1, const Sequence2, typename Sequence1::iterator, typename Sequence2::const_iterator>(x, fun, seq1, seq2); 00298 }; 00299 00300 template <typename X, typename Param1, typename Param2, typename Sequence1, typename Sequence2> 00301 TaskGroup* create_and_schedule_group(X& x, void (X::*fun)(Param1, Param2), const Sequence1& seq1, const Sequence2& seq2) 00302 { 00303 return create_taskgroup2<X, Param1, Param2, const Sequence1, const Sequence2, typename Sequence1::const_iterator, typename Sequence2::const_iterator>(x, fun, seq1, seq2); 00304 }; 00305 00306 // delete a task group created by create_and_schedule_group 00307 void delete_group(TaskGroup* tg) 00308 { 00309 for(size_t i=0; i<tg->tasks.size(); i++) 00310 { 00311 delete tg->tasks[i]; 00312 } 00313 delete tg; 00314 } 00315 00316 00317 00318 // wait for a task to finish 00319 virtual void wait(Task* task) = 0; 00320 00321 // wait for a task to finish 00322 // implementation may decide to wait by running a local event loop, instead of pausing this thread or taking on the unstarted task 00323 // warning: do not use this with recursive tasks 00324 virtual void idle_wait(Task* task) 00325 { 00326 wait(task); 00327 } 00328 00329 // wait for a set of tasks to finish 00330 virtual void wait(const std::vector<Task*>& task) = 0; 00331 00332 //wait for any of a set of tasks to finish 00333 virtual void wait_for_any(const std::vector<Task*>& tasks,std::vector<Task*>& finished_tasks) = 0; 00334 00335 // return whether a task is complete 00336 virtual bool is_completed(Task* task) = 0; 00337 00338 // return whether a task is currently running (as opposed to waiting in the queue) 00339 virtual bool is_running(Task* task) = 0; 00340 00341 // wait for a task group to complete 00342 virtual void wait(TaskGroup* task_group) = 0; 00343 00344 // return whether a task group is complete 00345 virtual bool is_completed(TaskGroup* task_group) = 0; 00346 00347 // return whether a task group is currently running (as opposed to waiting in the queue) 00348 virtual bool is_running(TaskGroup* task_group) = 0; 00349 00350 // cancel a task group's execution 00351 // this only un-queues tasks that haven't started, and running tasks will run to completion 00352 // after canceling a task group, one still needs to call wait() for completion. 00353 virtual void cancel(TaskGroup* task_group) = 0; 00354 00355 00356 protected: 00357 static CubitConcurrent *mInstance; 00358 00359 // schedule a task for execution 00360 virtual void schedule(Task* task) = 0; 00361 00362 // schedule a group of tasks for execution 00363 virtual void schedule(TaskGroup* task_group) = 0; 00364 00365 00366 00367 00368 // implementation helpers 00369 template <class X> 00370 struct ClassFunctionTask : public Task 00371 { 00372 public: 00373 ClassFunctionTask(X& _ptr, void (X::*_MemFun)()) : ptr(_ptr), MemFun(_MemFun) {} 00374 void execute() 00375 { 00376 (ptr.*MemFun)(); 00377 } 00378 protected: 00379 X& ptr; 00380 void (X::*MemFun)(); 00381 private: 00382 const ClassFunctionTask& operator=(const ClassFunctionTask&); 00383 ClassFunctionTask(const ClassFunctionTask&); 00384 }; 00385 00386 template <typename X, typename Param1> 00387 struct ClassFunctionTaskArg1 : public Task 00388 { 00389 public: 00390 ClassFunctionTaskArg1(X& _ptr, void (X::*_MemFun)(Param1), Param1 _arg1) : ptr(_ptr), MemFun(_MemFun), arg1(_arg1) {} 00391 void execute() 00392 { 00393 (ptr.*MemFun)(arg1); 00394 } 00395 protected: 00396 X& ptr; 00397 void (X::*MemFun)(Param1); 00398 Param1 arg1; 00399 private: 00400 const ClassFunctionTaskArg1& operator=(const ClassFunctionTaskArg1&); 00401 ClassFunctionTaskArg1(const ClassFunctionTaskArg1&); 00402 }; 00403 00404 template <typename X, typename Param1, typename Param2> 00405 struct ClassFunctionTaskArg2 : public Task 00406 { 00407 public: 00408 ClassFunctionTaskArg2(X& _ptr, void (X::*_MemFun)(Param1, Param2), Param1 _arg1, Param2 _arg2) 00409 : ptr(_ptr), MemFun(_MemFun), arg1(_arg1), arg2(_arg2) {} 00410 void execute() 00411 { 00412 (ptr.*MemFun)(arg1, arg2); 00413 } 00414 protected: 00415 X& ptr; 00416 void (X::*MemFun)(Param1, Param2); 00417 Param1 arg1; 00418 Param2 arg2; 00419 private: 00420 const ClassFunctionTaskArg2& operator=(const ClassFunctionTaskArg2&); 00421 ClassFunctionTaskArg2(const ClassFunctionTaskArg2&); 00422 }; 00423 00424 template <typename X, typename Param1, typename Arg1> 00425 inline Task* create_task1(X& x, void (X::*fun)(Param1), Arg1 arg1) 00426 { 00427 Task* t = new ClassFunctionTaskArg1<X, Param1>(x, fun, arg1); 00428 this->schedule(t); 00429 return t; 00430 } 00431 template <typename X, typename Param1, typename Param2, typename Arg1, typename Arg2> 00432 inline Task* create_task2(X& x, void (X::*fun)(Param1, Param2), Arg1 arg1, Arg2 arg2) 00433 { 00434 Task* t = new ClassFunctionTaskArg2<X, Param1, Param2>(x, fun, arg1, arg2); 00435 this->schedule(t); 00436 return t; 00437 } 00438 00439 template <typename X, typename Param, typename Sequence, typename Iterator> 00440 inline TaskGroup* create_taskgroup1(X& x, void (X::*fun)(Param), Sequence& seq) 00441 { 00442 TaskGroup* tg = new TaskGroup; 00443 Iterator iter; 00444 for(iter = seq.begin(); iter != seq.end(); ++iter) 00445 { 00446 Task* t = new ClassFunctionTaskArg1<X, Param>(x, fun, *iter); 00447 tg->tasks.push_back(t); 00448 } 00449 this->schedule(tg); 00450 return tg; 00451 } 00452 00453 template <typename X, typename Param1, typename Param2, typename Sequence1, typename Sequence2, typename Iterator1, typename Iterator2> 00454 inline TaskGroup* create_taskgroup2(X& x, void (X::*fun)(Param1, Param2), Sequence1& seq1, Sequence2& seq2) 00455 { 00456 if(seq1.size() != seq2.size()) 00457 return NULL; 00458 00459 TaskGroup* tg = new TaskGroup; 00460 Iterator1 iter1; 00461 Iterator2 iter2; 00462 for(iter1 = seq1.begin(), iter2 = seq2.begin(); iter1 != seq1.end(); ++iter1, ++iter2) 00463 { 00464 Task* t = new ClassFunctionTaskArg2<X, Param1, Param2>(x, fun, *iter1, *iter2); 00465 tg->tasks.push_back(t); 00466 } 00467 this->schedule(tg); 00468 return tg; 00469 } 00470 00471 00472 public: 00473 00474 const char* get_base_type() const; 00475 00476 }; 00477 00478 00479 #endif // CUBITCONCURRENT_API_H_ 00480