// Author: Ugo Varetto // Implementation of task-based concurrency (similar to Java's Executor) // and wrapper for concurrent access // gcc >= 4.8 or clang llvm >= 3.2 with libc++ required // // do specify -pthread when compiling if not you'll get a run-time error // g++ task-based-executor-concurrent-generic.cpp -std=c++11 -pthread #include #include #include #include //EXIT_* #include #include #include #include #include #include #include #include #include #include #include #include //------------------------------------------------------------------------------ // synchronized queue (could be an inner class inside Executor): // - acquire lock on insertion and notify after insertion // - on extraction: acquire lock then if queue empty wait for notify, extract // element template class SyncQueue { public: void Push(const T& e) { // simple scoped lock: acquire mutex in constructor, // release in destructor std::lock_guard guard(mutex_); queue_.push_front(e); cond_.notify_one(); // notify } void Push(T&& e) { std::lock_guard guard(mutex_); queue_.push_front(std::move(e)); cond_.notify_one(); // notify } T Pop() { // cannot use simple scoped lock here because lock passed to // wait must be able to acquire and release the mutex std::unique_lock lock(mutex_); // stop and wait for notification if condition is false; // continue otherwise cond_.wait(lock, [this] { return !queue_.empty(); }); T e = std::move(queue_.back()); queue_.pop_back(); return e; } friend class Executor; // to allow calls to Clear private: void Clear() { queue_.clear(); } private: std::deque queue_; std::mutex mutex_; std::condition_variable cond_; }; //------------------------------------------------------------------------------ // interface and base class for callable objects struct ICaller { virtual bool Empty() const = 0; virtual void Invoke() = 0; virtual ~ICaller() {} }; // callable object stored in queue shared among threads: parameters are // bound at object construction time template class Caller : public ICaller { public: template Caller(F&& f, Args... args) : f_(std::bind(std::forward(f), std::forward(args)...)), empty_(false) {} Caller() : empty_(true) {} std::future GetFuture() { return p_.get_future(); } void Invoke() { try { ResultType r = ResultType(f_()); p_.set_value(r); } catch (...) { p_.set_exception(std::current_exception()); } } bool Empty() const { return empty_; } private: std::promise p_; std::function f_; bool empty_; }; // specialization for void return type template <> class Caller : public ICaller { public: template Caller(F f, Args... args) : f_(std::bind(f, args...)), empty_(false) {} Caller() : empty_(true) {} std::future GetFuture() { return p_.get_future(); } void Invoke() { try { f_(); p_.set_value(); } catch (...) { p_.set_exception(std::current_exception()); } } bool Empty() const { return empty_; } private: std::promise p_; std::function f_; bool empty_; }; //------------------------------------------------------------------------------ // task executor: asynchronously execute callable objects. Specify the max // number of threads to use at Executor construction time; threads are started // in the constructor and joined in the destructor class Executor { typedef SyncQueue > Queue; typedef std::vector Threads; public: Executor(int numthreads = std::thread::hardware_concurrency()) { StartThreads(numthreads); } #ifdef USE_RESULT_OF // deferred call to f with args parameters // 1. all the arguments are bound to a function object taking zero // parameters // which is put into the shared queue // 2. std::future is returned template auto operator()(F&& f, Args... args) -> std::future::type> { if (threads_.empty()) throw std::logic_error("No active threads"); typedef typename std::result_of::type ResultType; Caller* c = new Caller( std::forward(f), std::forward(args)...); std::future ft = c->GetFuture(); queue_.Push(c); return ft; } #else template auto operator()(F&& f, Args... args) -> std::future { if (threads_.empty()) throw std::logic_error("No active threads"); typedef decltype(f(args...)) ResultType; Caller* c = new Caller( std::forward(f), std::forward(args)...); std::future ft = c->GetFuture(); queue_.Push(std::unique_ptr(c)); return ft; } #endif // stop and join all threads; queue is cleared by default call with // false to avoid clearing queue // to "stop" threads an empty Caller instance per-thread is put into the // queue; threads interpret an empty Caller as as stop signal and exit from // the execution loop as soon as one is popped from the queue // Note: this is the only safe way to terminate threads, other options like // invoking explicit terminate functions where available are similar // to killing a process with Ctrl-C, since however threads are not // processes the resources allocated/acquired during the thread lifetime // are not automatically released void Stop(bool clearQueue = true) { // blocking for (int t = 0; t != threads_.size(); ++t) queue_.Push(std::unique_ptr(new Caller)); std::for_each(threads_.begin(), threads_.end(), [](std::thread& t) { t.join(); }); threads_.clear(); queue_.Clear(); } // start or re-start with numthreads threads, queue is cleared by default // call with ..., false to avoid clearing queue void Start(int numthreads, bool clearQueue = true) { // non-blocking if (numthreads < 1) { throw std::range_error("Number of threads < 1"); } Stop(clearQueue); StartThreads(numthreads); } // same as Start; in case the Executor is created with zero threads // it makes sense to call start; if it's created with a number of threads // greater than zero call Restart in client code void Restart(int numthreads, bool clearQueue = true) { Start(numthreads, clearQueue); } // join all threads ~Executor() { Stop(); } private: // start threads and put them into thread vector void StartThreads(int nthreads) { for (int t = 0; t != nthreads; ++t) { threads_.push_back(std::move(std::thread([this] { while (true) { auto c = queue_.Pop(); if (c->Empty()) { // interpret an empty Caller as a //'terminate' message break; } c->Invoke(); } }))); } } private: Queue queue_; // command queue Threads threads_; // std::thread array; size == nthreads_ }; //------------------------------------------------------------------------------ struct VoidType {}; struct NonVoidType {}; template struct Void { typedef NonVoidType type; }; template <> struct Void { typedef VoidType type; }; // Safe concurrent access to data by wrapping variable with object which // synchronizes access to resource. // Access can only happen by passing a functor which receives a reference to // the wrapped resource. template class ConcurrentAccess { T data_; // warning 'mutable' cannot be applied to references bool done_ = false; SyncQueue > queue_; std::future f_; public: ConcurrentAccess() = delete; ConcurrentAccess(const ConcurrentAccess&) = delete; ConcurrentAccess(ConcurrentAccess&&) = delete; ConcurrentAccess(T data, Executor& e) : data_(data), f_(e([=] { while (!done_) queue_.Pop()(); })) {} template auto operator()(F&& f) -> std::future::type> { using R = typename std::result_of::type; return Invoke(std::forward(f), typename Void::type()); } template auto Invoke(F&& f, const NonVoidType&) -> std::future::type> { using R = typename std::result_of::type; auto p = std::make_shared >(std::promise()); auto ft = p->get_future(); queue_.Push([=]() { try { p->set_value(f(data_)); } catch (...) { p->set_exception(std::current_exception()); } }); return ft; } template std::future Invoke(F&& f, const VoidType&) { auto p = std::make_shared >(std::promise()); auto ft = p->get_future(); queue_.Push([=]() { try { f(data_); p->set_value(); } catch (...) { p->set_exception(std::current_exception()); } }); return ft; } ~ConcurrentAccess() { queue_.Push([=] { done_ = true; }); f_.wait(); } }; //------------------------------------------------------------------------------ int main(int argc, char** argv) { try { if (argc > 1 && std::string(argv[1]) == "-h") { std::cout << argv[0] << "[task sleep time (ms)] " << "[number of tasks] " << "[number of threads]\n" << "default is (0,20,4)\n"; return 0; } const int sleeptime_ms = argc > 1 ? atoi(argv[1]) : 0; const int numtasks = argc > 2 ? atoi(argv[2]) : 4; const int numthreads = argc > 3 ? atoi(argv[3]) : 2; std::cout << "Running tasks...\n"; std::cout << "Run-time configuration:\n" << " " << numtasks << " tasks\n" << " " << numthreads << " threads\n" << " " << sleeptime_ms << " ms task sleep time\n" << std::endl; using namespace std; Executor exec(numthreads); Executor exec_aux(1); string msg = "start\n"; // if single threaded use a different executor for concurrent access // if not it deadlocks ConcurrentAccess text(msg, numthreads > 1 ? exec : exec_aux); vector > v; cout << this_thread::get_id() << endl; for (int i = 0; i != numtasks; ++i) v.push_back(exec([&, i] { const thread::id calling_thread = this_thread::get_id(); std::this_thread::sleep_for( std::chrono::milliseconds(sleeptime_ms)); text([=](string& s) { s += to_string(i) + " " + to_string(i); s += "\n"; }); text([](const string& s) { cout << s; }); })); for (auto& f : v) f.wait(); std::cout << "Done\n"; return 0; } catch (const std::exception& e) { std::cerr << e.what() << std::endl; return EXIT_FAILURE; } return 0; }