diff --git a/unsupported/Eigen/CXX11/ThreadPool b/unsupported/Eigen/CXX11/ThreadPool new file mode 100644 index 000000000..85028603c --- /dev/null +++ b/unsupported/Eigen/CXX11/ThreadPool @@ -0,0 +1,69 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2016 Benoit Steiner +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef EIGEN_CXX11_THREADPOOL_MODULE +#define EIGEN_CXX11_THREADPOOL_MODULE + +//#include +#include "Core" + +#include + +/** \defgroup CXX11_ThreadPool_Module C++11 ThreadPool Module + * + * This module provides 2 threadpool implementations + * - a simple reference implementation + * - a faster non blocking implementation + * + * This module requires C++11. + * + * \code + * #include + * \endcode + */ + +//#include + +//#include "src/Core/util/EmulateArray.h" +//#include "src/Core/util/MaxSizeVector.h" +//#include "third_party/eigen3/Eigen/src/Core/util/Macros.h" + + +// Emulate the cxx11 functionality that we need if the compiler doesn't support it. +// Visual studio 2015 doesn't advertise itself as cxx11 compliant, although it +// supports enough of the standard for our needs + +#if __cplusplus > 199711L || EIGEN_COMP_MSVC >= 1900 +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/ThreadPool/EventCount.h" +#include "src/ThreadPool/RunQueue.h" +#include "src/ThreadPool/ThreadPoolInterface.h" +#include "src/ThreadPool/ThreadEnvironment.h" +#include "src/ThreadPool/SimpleThreadPool.h" +#include "src/ThreadPool/NonBlockingThreadPool.h" + +#endif + +#include + +#endif // EIGEN_CXX11_THREADPOOL_MODULE + diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h new file mode 100644 index 000000000..16eee1a41 --- /dev/null +++ b/unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h @@ -0,0 +1,234 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2016 Dmitry Vyukov +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_ +#define EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_ + +namespace Eigen { + +// EventCount allows to wait for arbitrary predicates in non-blocking +// algorithms. Think of condition variable, but wait predicate does not need to +// be protected by a mutex. Usage: +// Waiting thread does: +// +// if (predicate) +// return act(); +// EventCount::Waiter& w = waiters[my_index]; +// ec.Prewait(&w); +// if (predicate) { +// ec.CancelWait(&w); +// return act(); +// } +// ec.CommitWait(&w); +// +// Notifying thread does: +// +// predicate = true; +// ec.Notify(true); +// +// Notify is cheap if there are no waiting threads. Prewait/CommitWait are not +// cheap, but they are executed only if the preceeding predicate check has +// failed. +// +// Algorihtm outline: +// There are two main variables: predicate (managed by user) and state_. +// Operation closely resembles Dekker mutual algorithm: +// https://en.wikipedia.org/wiki/Dekker%27s_algorithm +// Waiting thread sets state_ then checks predicate, Notifying thread sets +// predicate then checks state_. Due to seq_cst fences in between these +// operations it is guaranteed than either waiter will see predicate change +// and won't block, or notifying thread will see state_ change and will unblock +// the waiter, or both. But it can't happen that both threads don't see each +// other changes, which would lead to deadlock. +class EventCount { + public: + class Waiter; + + EventCount(std::vector& waiters) : waiters_(waiters) { + eigen_assert(waiters.size() < (1 << kWaiterBits) - 1); + // Initialize epoch to something close to overflow to test overflow. + state_ = kStackMask | (kEpochMask - kEpochInc * waiters.size() * 2); + } + + ~EventCount() { + // Ensure there are no waiters. + eigen_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask); + } + + // Prewait prepares for waiting. + // After calling this function the thread must re-check the wait predicate + // and call either CancelWait or CommitWait passing the same Waiter object. + void Prewait(Waiter* w) { + w->epoch = state_.fetch_add(kWaiterInc, std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_seq_cst); + } + + // CommitWait commits waiting. + void CommitWait(Waiter* w) { + w->state = Waiter::kNotSignaled; + // Modification epoch of this waiter. + uint64_t epoch = + (w->epoch & kEpochMask) + + (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift); + uint64_t state = state_.load(std::memory_order_seq_cst); + for (;;) { + if (int64_t((state & kEpochMask) - epoch) < 0) { + // The preceeding waiter has not decided on its fate. Wait until it + // calls either CancelWait or CommitWait, or is notified. + std::this_thread::yield(); + state = state_.load(std::memory_order_seq_cst); + continue; + } + // We've already been notified. + if (int64_t((state & kEpochMask) - epoch) > 0) return; + // Remove this thread from prewait counter and add it to the waiter list. + eigen_assert((state & kWaiterMask) != 0); + uint64_t newstate = state - kWaiterInc + kEpochInc; + newstate = (newstate & ~kStackMask) | (w - &waiters_[0]); + if ((state & kStackMask) == kStackMask) + w->next.store(nullptr, std::memory_order_relaxed); + else + w->next.store(&waiters_[state & kStackMask], std::memory_order_relaxed); + if (state_.compare_exchange_weak(state, newstate, + std::memory_order_release)) + break; + } + Park(w); + } + + // CancelWait cancels effects of the previous Prewait call. + void CancelWait(Waiter* w) { + uint64_t epoch = + (w->epoch & kEpochMask) + + (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift); + uint64_t state = state_.load(std::memory_order_relaxed); + for (;;) { + if (int64_t((state & kEpochMask) - epoch) < 0) { + // The preceeding waiter has not decided on its fate. Wait until it + // calls either CancelWait or CommitWait, or is notified. + std::this_thread::yield(); + state = state_.load(std::memory_order_relaxed); + continue; + } + // We've already been notified. + if (int64_t((state & kEpochMask) - epoch) > 0) return; + // Remove this thread from prewait counter. + eigen_assert((state & kWaiterMask) != 0); + if (state_.compare_exchange_weak(state, state - kWaiterInc + kEpochInc, + std::memory_order_relaxed)) + return; + } + } + + // Notify wakes one or all waiting threads. + // Must be called after changing the associated wait predicate. + void Notify(bool all) { + std::atomic_thread_fence(std::memory_order_seq_cst); + uint64_t state = state_.load(std::memory_order_acquire); + for (;;) { + // Easy case: no waiters. + if ((state & kStackMask) == kStackMask && (state & kWaiterMask) == 0) + return; + uint64_t waiters = (state & kWaiterMask) >> kWaiterShift; + uint64_t newstate; + if (all) { + // Reset prewait counter and empty wait list. + newstate = (state & kEpochMask) + (kEpochInc * waiters) + kStackMask; + } else if (waiters) { + // There is a thread in pre-wait state, unblock it. + newstate = state + kEpochInc - kWaiterInc; + } else { + // Pop a waiter from list and unpark it. + Waiter* w = &waiters_[state & kStackMask]; + Waiter* wnext = w->next.load(std::memory_order_relaxed); + uint64_t next = kStackMask; + if (wnext != nullptr) next = wnext - &waiters_[0]; + // Note: we don't add kEpochInc here. ABA problem on the lock-free stack + // can't happen because a waiter is re-pushed onto the stack only after + // it was in the pre-wait state which inevitably leads to epoch + // increment. + newstate = (state & kEpochMask) + next; + } + if (state_.compare_exchange_weak(state, newstate, + std::memory_order_acquire)) { + if (!all && waiters) return; // unblocked pre-wait thread + if ((state & kStackMask) == kStackMask) return; + Waiter* w = &waiters_[state & kStackMask]; + if (!all) w->next.store(nullptr, std::memory_order_relaxed); + Unpark(w); + return; + } + } + } + + class Waiter { + friend class EventCount; + std::atomic next; + std::mutex mu; + std::condition_variable cv; + uint64_t epoch; + unsigned state; + enum { + kNotSignaled, + kWaiting, + kSignaled, + }; + // Prevent false sharing with other Waiter objects in the same vector. + char pad_[128]; + }; + + private: + // State_ layout: + // - low kStackBits is a stack of waiters committed wait. + // - next kWaiterBits is count of waiters in prewait state. + // - next kEpochBits is modification counter. + static const uint64_t kStackBits = 16; + static const uint64_t kStackMask = (1ull << kStackBits) - 1; + static const uint64_t kWaiterBits = 16; + static const uint64_t kWaiterShift = 16; + static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1) + << kWaiterShift; + static const uint64_t kWaiterInc = 1ull << kWaiterBits; + static const uint64_t kEpochBits = 32; + static const uint64_t kEpochShift = 32; + static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift; + static const uint64_t kEpochInc = 1ull << kEpochShift; + std::atomic state_; + std::vector& waiters_; + + void Park(Waiter* w) { + std::unique_lock lock(w->mu); + while (w->state != Waiter::kSignaled) { + w->state = Waiter::kWaiting; + w->cv.wait(lock); + } + } + + void Unpark(Waiter* waiters) { + Waiter* next = nullptr; + for (Waiter* w = waiters; w; w = next) { + next = w->next.load(std::memory_order_relaxed); + unsigned state; + { + std::unique_lock lock(w->mu); + state = w->state; + w->state = Waiter::kSignaled; + } + // Avoid notifying if it wasn't waiting. + if (state == Waiter::kWaiting) w->cv.notify_one(); + } + } + + EventCount(const EventCount&) = delete; + void operator=(const EventCount&) = delete; +}; + +} // namespace Eigen + +#endif // EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_ diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h new file mode 100644 index 000000000..18dec5393 --- /dev/null +++ b/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h @@ -0,0 +1,232 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2016 Dmitry Vyukov +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H +#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H + + +namespace Eigen { + +template +class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface { + public: + typedef typename Environment::Task Task; + typedef RunQueue Queue; + + NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment()) + : env_(env), + threads_(num_threads), + queues_(num_threads), + waiters_(num_threads), + blocked_(), + spinning_(), + done_(), + ec_(waiters_) { + for (int i = 0; i < num_threads; i++) queues_.push_back(new Queue()); + for (int i = 0; i < num_threads; i++) + threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); })); + } + + ~NonBlockingThreadPoolTempl() { + done_.store(true, std::memory_order_relaxed); + // Now if all threads block without work, they will start exiting. + // But note that threads can continue to work arbitrary long, + // block, submit new work, unblock and otherwise live full life. + ec_.Notify(true); + + // Join threads explicitly to avoid destruction order issues. + for (size_t i = 0; i < threads_.size(); i++) delete threads_[i]; + for (size_t i = 0; i < threads_.size(); i++) delete queues_[i]; + } + + void Schedule(std::function fn) { + Task t = env_.CreateTask(std::move(fn)); + PerThread* pt = GetPerThread(); + if (pt->pool == this) { + // Worker thread of this pool, push onto the thread's queue. + Queue* q = queues_[pt->index]; + t = q->PushFront(std::move(t)); + } else { + // A free-standing thread (or worker of another pool), push onto a random + // queue. + Queue* q = queues_[Rand(&pt->rand) % queues_.size()]; + t = q->PushBack(std::move(t)); + } + // Note: below we touch this after making w available to worker threads. + // Strictly speaking, this can lead to a racy-use-after-free. Consider that + // Schedule is called from a thread that is neither main thread nor a worker + // thread of this pool. Then, execution of w directly or indirectly + // completes overall computations, which in turn leads to destruction of + // this. We expect that such scenario is prevented by program, that is, + // this is kept alive while any threads can potentially be in Schedule. + if (!t.f) + ec_.Notify(false); + else + env_.ExecuteTask(t); // Push failed, execute directly. + } + + private: + typedef typename Environment::EnvThread Thread; + + struct PerThread { + bool inited; + NonBlockingThreadPoolTempl* pool; // Parent pool, or null for normal threads. + unsigned index; // Worker thread index in pool. + unsigned rand; // Random generator state. + }; + + Environment env_; + MaxSizeVector threads_; + MaxSizeVector queues_; + std::vector waiters_; + std::atomic blocked_; + std::atomic spinning_; + std::atomic done_; + EventCount ec_; + + // Main worker thread loop. + void WorkerLoop(unsigned index) { + PerThread* pt = GetPerThread(); + pt->pool = this; + pt->index = index; + Queue* q = queues_[index]; + EventCount::Waiter* waiter = &waiters_[index]; + std::vector stolen; + for (;;) { + Task t; + if (!stolen.empty()) { + t = std::move(stolen.back()); + stolen.pop_back(); + } + if (!t.f) t = q->PopFront(); + if (!t.f) { + if (Steal(&stolen)) { + t = std::move(stolen.back()); + stolen.pop_back(); + while (stolen.size()) { + Task t1 = q->PushFront(std::move(stolen.back())); + stolen.pop_back(); + if (t1.f) { + // There is not much we can do in this case. Just execute the + // remaining directly. + stolen.push_back(std::move(t1)); + break; + } + } + } + } + if (t.f) { + env_.ExecuteTask(t); + continue; + } + // Leave one thread spinning. This reduces latency. + if (!spinning_ && !spinning_.exchange(true)) { + bool nowork = true; + for (int i = 0; i < 1000; i++) { + if (!OutOfWork()) { + nowork = false; + break; + } + } + spinning_ = false; + if (!nowork) continue; + } + if (!WaitForWork(waiter)) return; + } + } + + // Steal tries to steal work from other worker threads in best-effort manner. + bool Steal(std::vector* stolen) { + if (queues_.size() == 1) return false; + PerThread* pt = GetPerThread(); + unsigned lastq = pt->index; + for (unsigned i = queues_.size(); i > 0; i--) { + unsigned victim = Rand(&pt->rand) % queues_.size(); + if (victim == lastq && queues_.size() > 2) { + i++; + continue; + } + // Steal half of elements from a victim queue. + // It is typical to steal just one element, but that assumes that work is + // recursively subdivided in halves so that the stolen element is exactly + // half of work. If work elements are equally-sized, then is makes sense + // to steal half of elements at once and then work locally for a while. + if (queues_[victim]->PopBackHalf(stolen)) return true; + lastq = victim; + } + // Just to make sure that we did not miss anything. + for (unsigned i = queues_.size(); i > 0; i--) + if (queues_[i - 1]->PopBackHalf(stolen)) return true; + return false; + } + + // WaitForWork blocks until new work is available, or if it is time to exit. + bool WaitForWork(EventCount::Waiter* waiter) { + // We already did best-effort emptiness check in Steal, so prepare blocking. + ec_.Prewait(waiter); + // Now do reliable emptiness check. + if (!OutOfWork()) { + ec_.CancelWait(waiter); + return true; + } + // Number of blocked threads is used as termination condition. + // If we are shutting down and all worker threads blocked without work, + // that's we are done. + blocked_++; + if (done_ && blocked_ == threads_.size()) { + ec_.CancelWait(waiter); + // Almost done, but need to re-check queues. + // Consider that all queues are empty and all worker threads are preempted + // right after incrementing blocked_ above. Now a free-standing thread + // submits work and calls destructor (which sets done_). If we don't + // re-check queues, we will exit leaving the work unexecuted. + if (!OutOfWork()) { + // Note: we must not pop from queues before we decrement blocked_, + // otherwise the following scenario is possible. Consider that instead + // of checking for emptiness we popped the only element from queues. + // Now other worker threads can start exiting, which is bad if the + // work item submits other work. So we just check emptiness here, + // which ensures that all worker threads exit at the same time. + blocked_--; + return true; + } + // Reached stable termination state. + ec_.Notify(true); + return false; + } + ec_.CommitWait(waiter); + blocked_--; + return true; + } + + bool OutOfWork() { + for (unsigned i = 0; i < queues_.size(); i++) + if (!queues_[i]->Empty()) return false; + return true; + } + + PerThread* GetPerThread() { + static thread_local PerThread per_thread_; + PerThread* pt = &per_thread_; + if (pt->inited) return pt; + pt->inited = true; + pt->rand = std::hash()(std::this_thread::get_id()); + return pt; + } + + static unsigned Rand(unsigned* state) { + return *state = *state * 1103515245 + 12345; + } +}; + +typedef NonBlockingThreadPoolTempl NonBlockingThreadPool; + +} // namespace Eigen + +#endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h new file mode 100644 index 000000000..aaa1d92c7 --- /dev/null +++ b/unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h @@ -0,0 +1,210 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2016 Dmitry Vyukov +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ +#define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ + + +namespace Eigen { + +// RunQueue is a fixed-size, partially non-blocking deque or Work items. +// Operations on front of the queue must be done by a single thread (owner), +// operations on back of the queue can be done by multiple threads concurrently. +// +// Algorithm outline: +// All remote threads operating on the queue back are serialized by a mutex. +// This ensures that at most two threads access state: owner and one remote +// thread (Size aside). The algorithm ensures that the occupied region of the +// underlying array is logically continuous (can wraparound, but no stray +// occupied elements). Owner operates on one end of this region, remote thread +// operates on the other end. Synchronization between these threads +// (potential consumption of the last element and take up of the last empty +// element) happens by means of state variable in each element. States are: +// empty, busy (in process of insertion of removal) and ready. Threads claim +// elements (empty->busy and ready->busy transitions) by means of a CAS +// operation. The finishing transition (busy->empty and busy->ready) are done +// with plain store as the element is exclusively owned by the current thread. +// +// Note: we could permit only pointers as elements, then we would not need +// separate state variable as null/non-null pointer value would serve as state, +// but that would require malloc/free per operation for large, complex values +// (and this is designed to store std::function<()>). +template +class RunQueue { + public: + RunQueue() : front_(), back_() { + // require power-of-two for fast masking + eigen_assert((kSize & (kSize - 1)) == 0); + eigen_assert(kSize > 2); // why would you do this? + eigen_assert(kSize <= (64 << 10)); // leave enough space for counter + for (unsigned i = 0; i < kSize; i++) + array_[i].state.store(kEmpty, std::memory_order_relaxed); + } + + ~RunQueue() { eigen_assert(Size() == 0); } + + // PushFront inserts w at the beginning of the queue. + // If queue is full returns w, otherwise returns default-constructed Work. + Work PushFront(Work w) { + unsigned front = front_.load(std::memory_order_relaxed); + Elem* e = &array_[front & kMask]; + uint8_t s = e->state.load(std::memory_order_relaxed); + if (s != kEmpty || + !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) + return w; + front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed); + e->w = std::move(w); + e->state.store(kReady, std::memory_order_release); + return Work(); + } + + // PopFront removes and returns the first element in the queue. + // If the queue was empty returns default-constructed Work. + Work PopFront() { + unsigned front = front_.load(std::memory_order_relaxed); + Elem* e = &array_[(front - 1) & kMask]; + uint8_t s = e->state.load(std::memory_order_relaxed); + if (s != kReady || + !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) + return Work(); + Work w = std::move(e->w); + e->state.store(kEmpty, std::memory_order_release); + front = ((front - 1) & kMask2) | (front & ~kMask2); + front_.store(front, std::memory_order_relaxed); + return w; + } + + // PushBack adds w at the end of the queue. + // If queue is full returns w, otherwise returns default-constructed Work. + Work PushBack(Work w) { + std::unique_lock lock(mutex_); + unsigned back = back_.load(std::memory_order_relaxed); + Elem* e = &array_[(back - 1) & kMask]; + uint8_t s = e->state.load(std::memory_order_relaxed); + if (s != kEmpty || + !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) + return w; + back = ((back - 1) & kMask2) | (back & ~kMask2); + back_.store(back, std::memory_order_relaxed); + e->w = std::move(w); + e->state.store(kReady, std::memory_order_release); + return Work(); + } + + // PopBack removes and returns the last elements in the queue. + // Can fail spuriously. + Work PopBack() { + if (Empty()) return 0; + std::unique_lock lock(mutex_, std::try_to_lock); + if (!lock) return Work(); + unsigned back = back_.load(std::memory_order_relaxed); + Elem* e = &array_[back & kMask]; + uint8_t s = e->state.load(std::memory_order_relaxed); + if (s != kReady || + !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire)) + return Work(); + Work w = std::move(e->w); + e->state.store(kEmpty, std::memory_order_release); + back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed); + return w; + } + + // PopBackHalf removes and returns half last elements in the queue. + // Returns number of elements removed. But can also fail spuriously. + unsigned PopBackHalf(std::vector* result) { + if (Empty()) return 0; + std::unique_lock lock(mutex_, std::try_to_lock); + if (!lock) return 0; + unsigned back = back_.load(std::memory_order_relaxed); + unsigned size = Size(); + unsigned mid = back; + if (size > 1) mid = back + (size - 1) / 2; + unsigned n = 0; + unsigned start = 0; + for (; static_cast(mid - back) >= 0; mid--) { + Elem* e = &array_[mid & kMask]; + uint8_t s = e->state.load(std::memory_order_relaxed); + if (n == 0) { + if (s != kReady || + !e->state.compare_exchange_strong(s, kBusy, + std::memory_order_acquire)) + continue; + start = mid; + } else { + // Note: no need to store temporal kBusy, we exclusively own these + // elements. + eigen_assert(s == kReady); + } + result->push_back(std::move(e->w)); + e->state.store(kEmpty, std::memory_order_release); + n++; + } + if (n != 0) + back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed); + return n; + } + + // Size returns current queue size. + // Can be called by any thread at any time. + unsigned Size() const { + // Emptiness plays critical role in thread pool blocking. So we go to great + // effort to not produce false positives (claim non-empty queue as empty). + for (;;) { + // Capture a consistent snapshot of front/tail. + unsigned front = front_.load(std::memory_order_acquire); + unsigned back = back_.load(std::memory_order_acquire); + unsigned front1 = front_.load(std::memory_order_relaxed); + if (front != front1) continue; + int size = (front & kMask2) - (back & kMask2); + // Fix overflow. + if (size < 0) size += 2 * kSize; + // Order of modification in push/pop is crafted to make the queue look + // larger than it is during concurrent modifications. E.g. pop can + // decrement size before the corresponding push has incremented it. + // So the computed size can be up to kSize + 1, fix it. + if (size > kSize) size = kSize; + return size; + } + } + + // Empty tests whether container is empty. + // Can be called by any thread at any time. + bool Empty() const { return Size() == 0; } + + private: + static const unsigned kMask = kSize - 1; + static const unsigned kMask2 = (kSize << 1) - 1; + struct Elem { + std::atomic state; + Work w; + }; + enum { + kEmpty, + kBusy, + kReady, + }; + std::mutex mutex_; + // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of + // front/back, repsectively. The remaining bits contain modification counters + // that are incremented on Push operations. This allows us to (1) distinguish + // between empty and full conditions (if we would use log(kSize) bits for + // position, these conditions would be indistinguishable); (2) obtain + // consistent snapshot of front_/back_ for Size operation using the + // modification counters. + std::atomic front_; + std::atomic back_; + Elem array_[kSize]; + + RunQueue(const RunQueue&) = delete; + void operator=(const RunQueue&) = delete; +}; + +} // namespace Eigen + +#endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_ diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h new file mode 100644 index 000000000..17fd1658b --- /dev/null +++ b/unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h @@ -0,0 +1,127 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2014 Benoit Steiner +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H +#define EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H + +namespace Eigen { + +// The implementation of the ThreadPool type ensures that the Schedule method +// runs the functions it is provided in FIFO order when the scheduling is done +// by a single thread. +// Environment provides a way to create threads and also allows to intercept +// task submission and execution. +template +class SimpleThreadPoolTempl : public ThreadPoolInterface { + public: + // Construct a pool that contains "num_threads" threads. + explicit SimpleThreadPoolTempl(int num_threads, Environment env = Environment()) + : env_(env), threads_(num_threads), waiters_(num_threads) { + for (int i = 0; i < num_threads; i++) { + threads_.push_back(env.CreateThread([this]() { WorkerLoop(); })); + } + } + + // Wait until all scheduled work has finished and then destroy the + // set of threads. + ~SimpleThreadPoolTempl() { + { + // Wait for all work to get done. + std::unique_lock l(mu_); + while (!pending_.empty()) { + empty_.wait(l); + } + exiting_ = true; + + // Wakeup all waiters. + for (auto w : waiters_) { + w->ready = true; + w->task.f = nullptr; + w->cv.notify_one(); + } + } + + // Wait for threads to finish. + for (auto t : threads_) { + delete t; + } + } + + // Schedule fn() for execution in the pool of threads. The functions are + // executed in the order in which they are scheduled. + void Schedule(std::function fn) { + Task t = env_.CreateTask(std::move(fn)); + std::unique_lock l(mu_); + if (waiters_.empty()) { + pending_.push_back(std::move(t)); + } else { + Waiter* w = waiters_.back(); + waiters_.pop_back(); + w->ready = true; + w->task = std::move(t); + w->cv.notify_one(); + } + } + + protected: + void WorkerLoop() { + std::unique_lock l(mu_); + Waiter w; + Task t; + while (!exiting_) { + if (pending_.empty()) { + // Wait for work to be assigned to me + w.ready = false; + waiters_.push_back(&w); + while (!w.ready) { + w.cv.wait(l); + } + t = w.task; + w.task.f = nullptr; + } else { + // Pick up pending work + t = std::move(pending_.front()); + pending_.pop_front(); + if (pending_.empty()) { + empty_.notify_all(); + } + } + if (t.f) { + mu_.unlock(); + env_.ExecuteTask(t); + t.f = nullptr; + mu_.lock(); + } + } + } + + private: + typedef typename Environment::Task Task; + typedef typename Environment::EnvThread Thread; + + struct Waiter { + std::condition_variable cv; + Task task; + bool ready; + }; + + Environment env_; + std::mutex mu_; + MaxSizeVector threads_; // All threads + MaxSizeVector waiters_; // Stack of waiting threads. + std::deque pending_; // Queue of pending work + std::condition_variable empty_; // Signaled on pending_.empty() + bool exiting_ = false; +}; + +typedef SimpleThreadPoolTempl SimpleThreadPool; + +} // namespace Eigen + +#endif // EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h new file mode 100644 index 000000000..d2204ad5b --- /dev/null +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h @@ -0,0 +1,38 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2014 Benoit Steiner +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef EIGEN_CXX11_THREADPOOL_THREAD_ENVIRONMENT_H +#define EIGEN_CXX11_THREADPOOL_THREAD_ENVIRONMENT_H + +namespace Eigen { + +struct StlThreadEnvironment { + struct Task { + std::function f; + }; + + // EnvThread constructor must start the thread, + // destructor must join the thread. + class EnvThread { + public: + EnvThread(std::function f) : thr_(f) {} + ~EnvThread() { thr_.join(); } + + private: + std::thread thr_; + }; + + EnvThread* CreateThread(std::function f) { return new EnvThread(f); } + Task CreateTask(std::function f) { return Task{std::move(f)}; } + void ExecuteTask(const Task& t) { t.f(); } +}; + +} // namespace Eigen + +#endif // EIGEN_CXX11_THREADPOOL_THREAD_ENVIRONMENT_H diff --git a/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h new file mode 100644 index 000000000..38b40aceb --- /dev/null +++ b/unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h @@ -0,0 +1,26 @@ +// This file is part of Eigen, a lightweight C++ template library +// for linear algebra. +// +// Copyright (C) 2014 Benoit Steiner +// +// This Source Code Form is subject to the terms of the Mozilla +// Public License v. 2.0. If a copy of the MPL was not distributed +// with this file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#ifndef EIGEN_CXX11_THREADPOOL_THREAD_POOL_INTERFACE_H +#define EIGEN_CXX11_THREADPOOL_THREAD_POOL_INTERFACE_H + +namespace Eigen { + +// This defines an interface that ThreadPoolDevice can take to use +// custom thread pools underneath. +class ThreadPoolInterface { + public: + virtual void Schedule(std::function fn) = 0; + + virtual ~ThreadPoolInterface() {} +}; + +} // namespace Eigen + +#endif // EIGEN_CXX11_THREADPOOL_THREAD_POOL_INTERFACE_H