mirror of
https://gitlab.com/libeigen/eigen.git
synced 2024-12-15 07:10:37 +08:00
Added a more scalable non blocking thread pool
This commit is contained in:
parent
7718749fee
commit
78a51abc12
69
unsupported/Eigen/CXX11/ThreadPool
Normal file
69
unsupported/Eigen/CXX11/ThreadPool
Normal file
@ -0,0 +1,69 @@
|
||||
// This file is part of Eigen, a lightweight C++ template library
|
||||
// for linear algebra.
|
||||
//
|
||||
// Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com>
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla
|
||||
// Public License v. 2.0. If a copy of the MPL was not distributed
|
||||
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
#ifndef EIGEN_CXX11_THREADPOOL_MODULE
|
||||
#define EIGEN_CXX11_THREADPOOL_MODULE
|
||||
|
||||
//#include <Eigen/Core>
|
||||
#include "Core"
|
||||
|
||||
#include <Eigen/src/Core/util/DisableStupidWarnings.h>
|
||||
|
||||
/** \defgroup CXX11_ThreadPool_Module C++11 ThreadPool Module
|
||||
*
|
||||
* This module provides 2 threadpool implementations
|
||||
* - a simple reference implementation
|
||||
* - a faster non blocking implementation
|
||||
*
|
||||
* This module requires C++11.
|
||||
*
|
||||
* \code
|
||||
* #include <Eigen/CXX11/ThreadPool>
|
||||
* \endcode
|
||||
*/
|
||||
|
||||
//#include <vector>
|
||||
|
||||
//#include "src/Core/util/EmulateArray.h"
|
||||
//#include "src/Core/util/MaxSizeVector.h"
|
||||
//#include "third_party/eigen3/Eigen/src/Core/util/Macros.h"
|
||||
|
||||
|
||||
// Emulate the cxx11 functionality that we need if the compiler doesn't support it.
|
||||
// Visual studio 2015 doesn't advertise itself as cxx11 compliant, although it
|
||||
// supports enough of the standard for our needs
|
||||
|
||||
#if __cplusplus > 199711L || EIGEN_COMP_MSVC >= 1900
|
||||
#include <cstddef>
|
||||
#include <cstring>
|
||||
#include <stdint.h>
|
||||
#include <time.h>
|
||||
|
||||
#include <vector>
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <deque>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
|
||||
#include "src/ThreadPool/EventCount.h"
|
||||
#include "src/ThreadPool/RunQueue.h"
|
||||
#include "src/ThreadPool/ThreadPoolInterface.h"
|
||||
#include "src/ThreadPool/ThreadEnvironment.h"
|
||||
#include "src/ThreadPool/SimpleThreadPool.h"
|
||||
#include "src/ThreadPool/NonBlockingThreadPool.h"
|
||||
|
||||
#endif
|
||||
|
||||
#include <Eigen/src/Core/util/ReenableStupidWarnings.h>
|
||||
|
||||
#endif // EIGEN_CXX11_THREADPOOL_MODULE
|
||||
|
234
unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
Normal file
234
unsupported/Eigen/CXX11/src/ThreadPool/EventCount.h
Normal file
@ -0,0 +1,234 @@
|
||||
// This file is part of Eigen, a lightweight C++ template library
|
||||
// for linear algebra.
|
||||
//
|
||||
// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla
|
||||
// Public License v. 2.0. If a copy of the MPL was not distributed
|
||||
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
#ifndef EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
|
||||
#define EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
|
||||
|
||||
namespace Eigen {
|
||||
|
||||
// EventCount allows to wait for arbitrary predicates in non-blocking
|
||||
// algorithms. Think of condition variable, but wait predicate does not need to
|
||||
// be protected by a mutex. Usage:
|
||||
// Waiting thread does:
|
||||
//
|
||||
// if (predicate)
|
||||
// return act();
|
||||
// EventCount::Waiter& w = waiters[my_index];
|
||||
// ec.Prewait(&w);
|
||||
// if (predicate) {
|
||||
// ec.CancelWait(&w);
|
||||
// return act();
|
||||
// }
|
||||
// ec.CommitWait(&w);
|
||||
//
|
||||
// Notifying thread does:
|
||||
//
|
||||
// predicate = true;
|
||||
// ec.Notify(true);
|
||||
//
|
||||
// Notify is cheap if there are no waiting threads. Prewait/CommitWait are not
|
||||
// cheap, but they are executed only if the preceeding predicate check has
|
||||
// failed.
|
||||
//
|
||||
// Algorihtm outline:
|
||||
// There are two main variables: predicate (managed by user) and state_.
|
||||
// Operation closely resembles Dekker mutual algorithm:
|
||||
// https://en.wikipedia.org/wiki/Dekker%27s_algorithm
|
||||
// Waiting thread sets state_ then checks predicate, Notifying thread sets
|
||||
// predicate then checks state_. Due to seq_cst fences in between these
|
||||
// operations it is guaranteed than either waiter will see predicate change
|
||||
// and won't block, or notifying thread will see state_ change and will unblock
|
||||
// the waiter, or both. But it can't happen that both threads don't see each
|
||||
// other changes, which would lead to deadlock.
|
||||
class EventCount {
|
||||
public:
|
||||
class Waiter;
|
||||
|
||||
EventCount(std::vector<Waiter>& waiters) : waiters_(waiters) {
|
||||
eigen_assert(waiters.size() < (1 << kWaiterBits) - 1);
|
||||
// Initialize epoch to something close to overflow to test overflow.
|
||||
state_ = kStackMask | (kEpochMask - kEpochInc * waiters.size() * 2);
|
||||
}
|
||||
|
||||
~EventCount() {
|
||||
// Ensure there are no waiters.
|
||||
eigen_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask);
|
||||
}
|
||||
|
||||
// Prewait prepares for waiting.
|
||||
// After calling this function the thread must re-check the wait predicate
|
||||
// and call either CancelWait or CommitWait passing the same Waiter object.
|
||||
void Prewait(Waiter* w) {
|
||||
w->epoch = state_.fetch_add(kWaiterInc, std::memory_order_relaxed);
|
||||
std::atomic_thread_fence(std::memory_order_seq_cst);
|
||||
}
|
||||
|
||||
// CommitWait commits waiting.
|
||||
void CommitWait(Waiter* w) {
|
||||
w->state = Waiter::kNotSignaled;
|
||||
// Modification epoch of this waiter.
|
||||
uint64_t epoch =
|
||||
(w->epoch & kEpochMask) +
|
||||
(((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
|
||||
uint64_t state = state_.load(std::memory_order_seq_cst);
|
||||
for (;;) {
|
||||
if (int64_t((state & kEpochMask) - epoch) < 0) {
|
||||
// The preceeding waiter has not decided on its fate. Wait until it
|
||||
// calls either CancelWait or CommitWait, or is notified.
|
||||
std::this_thread::yield();
|
||||
state = state_.load(std::memory_order_seq_cst);
|
||||
continue;
|
||||
}
|
||||
// We've already been notified.
|
||||
if (int64_t((state & kEpochMask) - epoch) > 0) return;
|
||||
// Remove this thread from prewait counter and add it to the waiter list.
|
||||
eigen_assert((state & kWaiterMask) != 0);
|
||||
uint64_t newstate = state - kWaiterInc + kEpochInc;
|
||||
newstate = (newstate & ~kStackMask) | (w - &waiters_[0]);
|
||||
if ((state & kStackMask) == kStackMask)
|
||||
w->next.store(nullptr, std::memory_order_relaxed);
|
||||
else
|
||||
w->next.store(&waiters_[state & kStackMask], std::memory_order_relaxed);
|
||||
if (state_.compare_exchange_weak(state, newstate,
|
||||
std::memory_order_release))
|
||||
break;
|
||||
}
|
||||
Park(w);
|
||||
}
|
||||
|
||||
// CancelWait cancels effects of the previous Prewait call.
|
||||
void CancelWait(Waiter* w) {
|
||||
uint64_t epoch =
|
||||
(w->epoch & kEpochMask) +
|
||||
(((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
|
||||
uint64_t state = state_.load(std::memory_order_relaxed);
|
||||
for (;;) {
|
||||
if (int64_t((state & kEpochMask) - epoch) < 0) {
|
||||
// The preceeding waiter has not decided on its fate. Wait until it
|
||||
// calls either CancelWait or CommitWait, or is notified.
|
||||
std::this_thread::yield();
|
||||
state = state_.load(std::memory_order_relaxed);
|
||||
continue;
|
||||
}
|
||||
// We've already been notified.
|
||||
if (int64_t((state & kEpochMask) - epoch) > 0) return;
|
||||
// Remove this thread from prewait counter.
|
||||
eigen_assert((state & kWaiterMask) != 0);
|
||||
if (state_.compare_exchange_weak(state, state - kWaiterInc + kEpochInc,
|
||||
std::memory_order_relaxed))
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Notify wakes one or all waiting threads.
|
||||
// Must be called after changing the associated wait predicate.
|
||||
void Notify(bool all) {
|
||||
std::atomic_thread_fence(std::memory_order_seq_cst);
|
||||
uint64_t state = state_.load(std::memory_order_acquire);
|
||||
for (;;) {
|
||||
// Easy case: no waiters.
|
||||
if ((state & kStackMask) == kStackMask && (state & kWaiterMask) == 0)
|
||||
return;
|
||||
uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
|
||||
uint64_t newstate;
|
||||
if (all) {
|
||||
// Reset prewait counter and empty wait list.
|
||||
newstate = (state & kEpochMask) + (kEpochInc * waiters) + kStackMask;
|
||||
} else if (waiters) {
|
||||
// There is a thread in pre-wait state, unblock it.
|
||||
newstate = state + kEpochInc - kWaiterInc;
|
||||
} else {
|
||||
// Pop a waiter from list and unpark it.
|
||||
Waiter* w = &waiters_[state & kStackMask];
|
||||
Waiter* wnext = w->next.load(std::memory_order_relaxed);
|
||||
uint64_t next = kStackMask;
|
||||
if (wnext != nullptr) next = wnext - &waiters_[0];
|
||||
// Note: we don't add kEpochInc here. ABA problem on the lock-free stack
|
||||
// can't happen because a waiter is re-pushed onto the stack only after
|
||||
// it was in the pre-wait state which inevitably leads to epoch
|
||||
// increment.
|
||||
newstate = (state & kEpochMask) + next;
|
||||
}
|
||||
if (state_.compare_exchange_weak(state, newstate,
|
||||
std::memory_order_acquire)) {
|
||||
if (!all && waiters) return; // unblocked pre-wait thread
|
||||
if ((state & kStackMask) == kStackMask) return;
|
||||
Waiter* w = &waiters_[state & kStackMask];
|
||||
if (!all) w->next.store(nullptr, std::memory_order_relaxed);
|
||||
Unpark(w);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class Waiter {
|
||||
friend class EventCount;
|
||||
std::atomic<Waiter*> next;
|
||||
std::mutex mu;
|
||||
std::condition_variable cv;
|
||||
uint64_t epoch;
|
||||
unsigned state;
|
||||
enum {
|
||||
kNotSignaled,
|
||||
kWaiting,
|
||||
kSignaled,
|
||||
};
|
||||
// Prevent false sharing with other Waiter objects in the same vector.
|
||||
char pad_[128];
|
||||
};
|
||||
|
||||
private:
|
||||
// State_ layout:
|
||||
// - low kStackBits is a stack of waiters committed wait.
|
||||
// - next kWaiterBits is count of waiters in prewait state.
|
||||
// - next kEpochBits is modification counter.
|
||||
static const uint64_t kStackBits = 16;
|
||||
static const uint64_t kStackMask = (1ull << kStackBits) - 1;
|
||||
static const uint64_t kWaiterBits = 16;
|
||||
static const uint64_t kWaiterShift = 16;
|
||||
static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1)
|
||||
<< kWaiterShift;
|
||||
static const uint64_t kWaiterInc = 1ull << kWaiterBits;
|
||||
static const uint64_t kEpochBits = 32;
|
||||
static const uint64_t kEpochShift = 32;
|
||||
static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift;
|
||||
static const uint64_t kEpochInc = 1ull << kEpochShift;
|
||||
std::atomic<uint64_t> state_;
|
||||
std::vector<Waiter>& waiters_;
|
||||
|
||||
void Park(Waiter* w) {
|
||||
std::unique_lock<std::mutex> lock(w->mu);
|
||||
while (w->state != Waiter::kSignaled) {
|
||||
w->state = Waiter::kWaiting;
|
||||
w->cv.wait(lock);
|
||||
}
|
||||
}
|
||||
|
||||
void Unpark(Waiter* waiters) {
|
||||
Waiter* next = nullptr;
|
||||
for (Waiter* w = waiters; w; w = next) {
|
||||
next = w->next.load(std::memory_order_relaxed);
|
||||
unsigned state;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(w->mu);
|
||||
state = w->state;
|
||||
w->state = Waiter::kSignaled;
|
||||
}
|
||||
// Avoid notifying if it wasn't waiting.
|
||||
if (state == Waiter::kWaiting) w->cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
EventCount(const EventCount&) = delete;
|
||||
void operator=(const EventCount&) = delete;
|
||||
};
|
||||
|
||||
} // namespace Eigen
|
||||
|
||||
#endif // EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
|
232
unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
Normal file
232
unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h
Normal file
@ -0,0 +1,232 @@
|
||||
// This file is part of Eigen, a lightweight C++ template library
|
||||
// for linear algebra.
|
||||
//
|
||||
// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla
|
||||
// Public License v. 2.0. If a copy of the MPL was not distributed
|
||||
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
|
||||
#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
|
||||
|
||||
|
||||
namespace Eigen {
|
||||
|
||||
template <typename Environment>
|
||||
class NonBlockingThreadPoolTempl : public Eigen::ThreadPoolInterface {
|
||||
public:
|
||||
typedef typename Environment::Task Task;
|
||||
typedef RunQueue<Task, 1024> Queue;
|
||||
|
||||
NonBlockingThreadPoolTempl(int num_threads, Environment env = Environment())
|
||||
: env_(env),
|
||||
threads_(num_threads),
|
||||
queues_(num_threads),
|
||||
waiters_(num_threads),
|
||||
blocked_(),
|
||||
spinning_(),
|
||||
done_(),
|
||||
ec_(waiters_) {
|
||||
for (int i = 0; i < num_threads; i++) queues_.push_back(new Queue());
|
||||
for (int i = 0; i < num_threads; i++)
|
||||
threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
|
||||
}
|
||||
|
||||
~NonBlockingThreadPoolTempl() {
|
||||
done_.store(true, std::memory_order_relaxed);
|
||||
// Now if all threads block without work, they will start exiting.
|
||||
// But note that threads can continue to work arbitrary long,
|
||||
// block, submit new work, unblock and otherwise live full life.
|
||||
ec_.Notify(true);
|
||||
|
||||
// Join threads explicitly to avoid destruction order issues.
|
||||
for (size_t i = 0; i < threads_.size(); i++) delete threads_[i];
|
||||
for (size_t i = 0; i < threads_.size(); i++) delete queues_[i];
|
||||
}
|
||||
|
||||
void Schedule(std::function<void()> fn) {
|
||||
Task t = env_.CreateTask(std::move(fn));
|
||||
PerThread* pt = GetPerThread();
|
||||
if (pt->pool == this) {
|
||||
// Worker thread of this pool, push onto the thread's queue.
|
||||
Queue* q = queues_[pt->index];
|
||||
t = q->PushFront(std::move(t));
|
||||
} else {
|
||||
// A free-standing thread (or worker of another pool), push onto a random
|
||||
// queue.
|
||||
Queue* q = queues_[Rand(&pt->rand) % queues_.size()];
|
||||
t = q->PushBack(std::move(t));
|
||||
}
|
||||
// Note: below we touch this after making w available to worker threads.
|
||||
// Strictly speaking, this can lead to a racy-use-after-free. Consider that
|
||||
// Schedule is called from a thread that is neither main thread nor a worker
|
||||
// thread of this pool. Then, execution of w directly or indirectly
|
||||
// completes overall computations, which in turn leads to destruction of
|
||||
// this. We expect that such scenario is prevented by program, that is,
|
||||
// this is kept alive while any threads can potentially be in Schedule.
|
||||
if (!t.f)
|
||||
ec_.Notify(false);
|
||||
else
|
||||
env_.ExecuteTask(t); // Push failed, execute directly.
|
||||
}
|
||||
|
||||
private:
|
||||
typedef typename Environment::EnvThread Thread;
|
||||
|
||||
struct PerThread {
|
||||
bool inited;
|
||||
NonBlockingThreadPoolTempl* pool; // Parent pool, or null for normal threads.
|
||||
unsigned index; // Worker thread index in pool.
|
||||
unsigned rand; // Random generator state.
|
||||
};
|
||||
|
||||
Environment env_;
|
||||
MaxSizeVector<Thread*> threads_;
|
||||
MaxSizeVector<Queue*> queues_;
|
||||
std::vector<EventCount::Waiter> waiters_;
|
||||
std::atomic<unsigned> blocked_;
|
||||
std::atomic<bool> spinning_;
|
||||
std::atomic<bool> done_;
|
||||
EventCount ec_;
|
||||
|
||||
// Main worker thread loop.
|
||||
void WorkerLoop(unsigned index) {
|
||||
PerThread* pt = GetPerThread();
|
||||
pt->pool = this;
|
||||
pt->index = index;
|
||||
Queue* q = queues_[index];
|
||||
EventCount::Waiter* waiter = &waiters_[index];
|
||||
std::vector<Task> stolen;
|
||||
for (;;) {
|
||||
Task t;
|
||||
if (!stolen.empty()) {
|
||||
t = std::move(stolen.back());
|
||||
stolen.pop_back();
|
||||
}
|
||||
if (!t.f) t = q->PopFront();
|
||||
if (!t.f) {
|
||||
if (Steal(&stolen)) {
|
||||
t = std::move(stolen.back());
|
||||
stolen.pop_back();
|
||||
while (stolen.size()) {
|
||||
Task t1 = q->PushFront(std::move(stolen.back()));
|
||||
stolen.pop_back();
|
||||
if (t1.f) {
|
||||
// There is not much we can do in this case. Just execute the
|
||||
// remaining directly.
|
||||
stolen.push_back(std::move(t1));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (t.f) {
|
||||
env_.ExecuteTask(t);
|
||||
continue;
|
||||
}
|
||||
// Leave one thread spinning. This reduces latency.
|
||||
if (!spinning_ && !spinning_.exchange(true)) {
|
||||
bool nowork = true;
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
if (!OutOfWork()) {
|
||||
nowork = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
spinning_ = false;
|
||||
if (!nowork) continue;
|
||||
}
|
||||
if (!WaitForWork(waiter)) return;
|
||||
}
|
||||
}
|
||||
|
||||
// Steal tries to steal work from other worker threads in best-effort manner.
|
||||
bool Steal(std::vector<Task>* stolen) {
|
||||
if (queues_.size() == 1) return false;
|
||||
PerThread* pt = GetPerThread();
|
||||
unsigned lastq = pt->index;
|
||||
for (unsigned i = queues_.size(); i > 0; i--) {
|
||||
unsigned victim = Rand(&pt->rand) % queues_.size();
|
||||
if (victim == lastq && queues_.size() > 2) {
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
// Steal half of elements from a victim queue.
|
||||
// It is typical to steal just one element, but that assumes that work is
|
||||
// recursively subdivided in halves so that the stolen element is exactly
|
||||
// half of work. If work elements are equally-sized, then is makes sense
|
||||
// to steal half of elements at once and then work locally for a while.
|
||||
if (queues_[victim]->PopBackHalf(stolen)) return true;
|
||||
lastq = victim;
|
||||
}
|
||||
// Just to make sure that we did not miss anything.
|
||||
for (unsigned i = queues_.size(); i > 0; i--)
|
||||
if (queues_[i - 1]->PopBackHalf(stolen)) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
// WaitForWork blocks until new work is available, or if it is time to exit.
|
||||
bool WaitForWork(EventCount::Waiter* waiter) {
|
||||
// We already did best-effort emptiness check in Steal, so prepare blocking.
|
||||
ec_.Prewait(waiter);
|
||||
// Now do reliable emptiness check.
|
||||
if (!OutOfWork()) {
|
||||
ec_.CancelWait(waiter);
|
||||
return true;
|
||||
}
|
||||
// Number of blocked threads is used as termination condition.
|
||||
// If we are shutting down and all worker threads blocked without work,
|
||||
// that's we are done.
|
||||
blocked_++;
|
||||
if (done_ && blocked_ == threads_.size()) {
|
||||
ec_.CancelWait(waiter);
|
||||
// Almost done, but need to re-check queues.
|
||||
// Consider that all queues are empty and all worker threads are preempted
|
||||
// right after incrementing blocked_ above. Now a free-standing thread
|
||||
// submits work and calls destructor (which sets done_). If we don't
|
||||
// re-check queues, we will exit leaving the work unexecuted.
|
||||
if (!OutOfWork()) {
|
||||
// Note: we must not pop from queues before we decrement blocked_,
|
||||
// otherwise the following scenario is possible. Consider that instead
|
||||
// of checking for emptiness we popped the only element from queues.
|
||||
// Now other worker threads can start exiting, which is bad if the
|
||||
// work item submits other work. So we just check emptiness here,
|
||||
// which ensures that all worker threads exit at the same time.
|
||||
blocked_--;
|
||||
return true;
|
||||
}
|
||||
// Reached stable termination state.
|
||||
ec_.Notify(true);
|
||||
return false;
|
||||
}
|
||||
ec_.CommitWait(waiter);
|
||||
blocked_--;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool OutOfWork() {
|
||||
for (unsigned i = 0; i < queues_.size(); i++)
|
||||
if (!queues_[i]->Empty()) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
PerThread* GetPerThread() {
|
||||
static thread_local PerThread per_thread_;
|
||||
PerThread* pt = &per_thread_;
|
||||
if (pt->inited) return pt;
|
||||
pt->inited = true;
|
||||
pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
|
||||
return pt;
|
||||
}
|
||||
|
||||
static unsigned Rand(unsigned* state) {
|
||||
return *state = *state * 1103515245 + 12345;
|
||||
}
|
||||
};
|
||||
|
||||
typedef NonBlockingThreadPoolTempl<StlThreadEnvironment> NonBlockingThreadPool;
|
||||
|
||||
} // namespace Eigen
|
||||
|
||||
#endif // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
|
210
unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
Normal file
210
unsupported/Eigen/CXX11/src/ThreadPool/RunQueue.h
Normal file
@ -0,0 +1,210 @@
|
||||
// This file is part of Eigen, a lightweight C++ template library
|
||||
// for linear algebra.
|
||||
//
|
||||
// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla
|
||||
// Public License v. 2.0. If a copy of the MPL was not distributed
|
||||
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
#ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
|
||||
#define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
|
||||
|
||||
|
||||
namespace Eigen {
|
||||
|
||||
// RunQueue is a fixed-size, partially non-blocking deque or Work items.
|
||||
// Operations on front of the queue must be done by a single thread (owner),
|
||||
// operations on back of the queue can be done by multiple threads concurrently.
|
||||
//
|
||||
// Algorithm outline:
|
||||
// All remote threads operating on the queue back are serialized by a mutex.
|
||||
// This ensures that at most two threads access state: owner and one remote
|
||||
// thread (Size aside). The algorithm ensures that the occupied region of the
|
||||
// underlying array is logically continuous (can wraparound, but no stray
|
||||
// occupied elements). Owner operates on one end of this region, remote thread
|
||||
// operates on the other end. Synchronization between these threads
|
||||
// (potential consumption of the last element and take up of the last empty
|
||||
// element) happens by means of state variable in each element. States are:
|
||||
// empty, busy (in process of insertion of removal) and ready. Threads claim
|
||||
// elements (empty->busy and ready->busy transitions) by means of a CAS
|
||||
// operation. The finishing transition (busy->empty and busy->ready) are done
|
||||
// with plain store as the element is exclusively owned by the current thread.
|
||||
//
|
||||
// Note: we could permit only pointers as elements, then we would not need
|
||||
// separate state variable as null/non-null pointer value would serve as state,
|
||||
// but that would require malloc/free per operation for large, complex values
|
||||
// (and this is designed to store std::function<()>).
|
||||
template <typename Work, unsigned kSize>
|
||||
class RunQueue {
|
||||
public:
|
||||
RunQueue() : front_(), back_() {
|
||||
// require power-of-two for fast masking
|
||||
eigen_assert((kSize & (kSize - 1)) == 0);
|
||||
eigen_assert(kSize > 2); // why would you do this?
|
||||
eigen_assert(kSize <= (64 << 10)); // leave enough space for counter
|
||||
for (unsigned i = 0; i < kSize; i++)
|
||||
array_[i].state.store(kEmpty, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
~RunQueue() { eigen_assert(Size() == 0); }
|
||||
|
||||
// PushFront inserts w at the beginning of the queue.
|
||||
// If queue is full returns w, otherwise returns default-constructed Work.
|
||||
Work PushFront(Work w) {
|
||||
unsigned front = front_.load(std::memory_order_relaxed);
|
||||
Elem* e = &array_[front & kMask];
|
||||
uint8_t s = e->state.load(std::memory_order_relaxed);
|
||||
if (s != kEmpty ||
|
||||
!e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
|
||||
return w;
|
||||
front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
|
||||
e->w = std::move(w);
|
||||
e->state.store(kReady, std::memory_order_release);
|
||||
return Work();
|
||||
}
|
||||
|
||||
// PopFront removes and returns the first element in the queue.
|
||||
// If the queue was empty returns default-constructed Work.
|
||||
Work PopFront() {
|
||||
unsigned front = front_.load(std::memory_order_relaxed);
|
||||
Elem* e = &array_[(front - 1) & kMask];
|
||||
uint8_t s = e->state.load(std::memory_order_relaxed);
|
||||
if (s != kReady ||
|
||||
!e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
|
||||
return Work();
|
||||
Work w = std::move(e->w);
|
||||
e->state.store(kEmpty, std::memory_order_release);
|
||||
front = ((front - 1) & kMask2) | (front & ~kMask2);
|
||||
front_.store(front, std::memory_order_relaxed);
|
||||
return w;
|
||||
}
|
||||
|
||||
// PushBack adds w at the end of the queue.
|
||||
// If queue is full returns w, otherwise returns default-constructed Work.
|
||||
Work PushBack(Work w) {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
unsigned back = back_.load(std::memory_order_relaxed);
|
||||
Elem* e = &array_[(back - 1) & kMask];
|
||||
uint8_t s = e->state.load(std::memory_order_relaxed);
|
||||
if (s != kEmpty ||
|
||||
!e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
|
||||
return w;
|
||||
back = ((back - 1) & kMask2) | (back & ~kMask2);
|
||||
back_.store(back, std::memory_order_relaxed);
|
||||
e->w = std::move(w);
|
||||
e->state.store(kReady, std::memory_order_release);
|
||||
return Work();
|
||||
}
|
||||
|
||||
// PopBack removes and returns the last elements in the queue.
|
||||
// Can fail spuriously.
|
||||
Work PopBack() {
|
||||
if (Empty()) return 0;
|
||||
std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
|
||||
if (!lock) return Work();
|
||||
unsigned back = back_.load(std::memory_order_relaxed);
|
||||
Elem* e = &array_[back & kMask];
|
||||
uint8_t s = e->state.load(std::memory_order_relaxed);
|
||||
if (s != kReady ||
|
||||
!e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
|
||||
return Work();
|
||||
Work w = std::move(e->w);
|
||||
e->state.store(kEmpty, std::memory_order_release);
|
||||
back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
|
||||
return w;
|
||||
}
|
||||
|
||||
// PopBackHalf removes and returns half last elements in the queue.
|
||||
// Returns number of elements removed. But can also fail spuriously.
|
||||
unsigned PopBackHalf(std::vector<Work>* result) {
|
||||
if (Empty()) return 0;
|
||||
std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
|
||||
if (!lock) return 0;
|
||||
unsigned back = back_.load(std::memory_order_relaxed);
|
||||
unsigned size = Size();
|
||||
unsigned mid = back;
|
||||
if (size > 1) mid = back + (size - 1) / 2;
|
||||
unsigned n = 0;
|
||||
unsigned start = 0;
|
||||
for (; static_cast<int>(mid - back) >= 0; mid--) {
|
||||
Elem* e = &array_[mid & kMask];
|
||||
uint8_t s = e->state.load(std::memory_order_relaxed);
|
||||
if (n == 0) {
|
||||
if (s != kReady ||
|
||||
!e->state.compare_exchange_strong(s, kBusy,
|
||||
std::memory_order_acquire))
|
||||
continue;
|
||||
start = mid;
|
||||
} else {
|
||||
// Note: no need to store temporal kBusy, we exclusively own these
|
||||
// elements.
|
||||
eigen_assert(s == kReady);
|
||||
}
|
||||
result->push_back(std::move(e->w));
|
||||
e->state.store(kEmpty, std::memory_order_release);
|
||||
n++;
|
||||
}
|
||||
if (n != 0)
|
||||
back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
|
||||
return n;
|
||||
}
|
||||
|
||||
// Size returns current queue size.
|
||||
// Can be called by any thread at any time.
|
||||
unsigned Size() const {
|
||||
// Emptiness plays critical role in thread pool blocking. So we go to great
|
||||
// effort to not produce false positives (claim non-empty queue as empty).
|
||||
for (;;) {
|
||||
// Capture a consistent snapshot of front/tail.
|
||||
unsigned front = front_.load(std::memory_order_acquire);
|
||||
unsigned back = back_.load(std::memory_order_acquire);
|
||||
unsigned front1 = front_.load(std::memory_order_relaxed);
|
||||
if (front != front1) continue;
|
||||
int size = (front & kMask2) - (back & kMask2);
|
||||
// Fix overflow.
|
||||
if (size < 0) size += 2 * kSize;
|
||||
// Order of modification in push/pop is crafted to make the queue look
|
||||
// larger than it is during concurrent modifications. E.g. pop can
|
||||
// decrement size before the corresponding push has incremented it.
|
||||
// So the computed size can be up to kSize + 1, fix it.
|
||||
if (size > kSize) size = kSize;
|
||||
return size;
|
||||
}
|
||||
}
|
||||
|
||||
// Empty tests whether container is empty.
|
||||
// Can be called by any thread at any time.
|
||||
bool Empty() const { return Size() == 0; }
|
||||
|
||||
private:
|
||||
static const unsigned kMask = kSize - 1;
|
||||
static const unsigned kMask2 = (kSize << 1) - 1;
|
||||
struct Elem {
|
||||
std::atomic<uint8_t> state;
|
||||
Work w;
|
||||
};
|
||||
enum {
|
||||
kEmpty,
|
||||
kBusy,
|
||||
kReady,
|
||||
};
|
||||
std::mutex mutex_;
|
||||
// Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
|
||||
// front/back, repsectively. The remaining bits contain modification counters
|
||||
// that are incremented on Push operations. This allows us to (1) distinguish
|
||||
// between empty and full conditions (if we would use log(kSize) bits for
|
||||
// position, these conditions would be indistinguishable); (2) obtain
|
||||
// consistent snapshot of front_/back_ for Size operation using the
|
||||
// modification counters.
|
||||
std::atomic<unsigned> front_;
|
||||
std::atomic<unsigned> back_;
|
||||
Elem array_[kSize];
|
||||
|
||||
RunQueue(const RunQueue&) = delete;
|
||||
void operator=(const RunQueue&) = delete;
|
||||
};
|
||||
|
||||
} // namespace Eigen
|
||||
|
||||
#endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
|
127
unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h
Normal file
127
unsupported/Eigen/CXX11/src/ThreadPool/SimpleThreadPool.h
Normal file
@ -0,0 +1,127 @@
|
||||
// This file is part of Eigen, a lightweight C++ template library
|
||||
// for linear algebra.
|
||||
//
|
||||
// Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla
|
||||
// Public License v. 2.0. If a copy of the MPL was not distributed
|
||||
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
#ifndef EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
|
||||
#define EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
|
||||
|
||||
namespace Eigen {
|
||||
|
||||
// The implementation of the ThreadPool type ensures that the Schedule method
|
||||
// runs the functions it is provided in FIFO order when the scheduling is done
|
||||
// by a single thread.
|
||||
// Environment provides a way to create threads and also allows to intercept
|
||||
// task submission and execution.
|
||||
template <typename Environment>
|
||||
class SimpleThreadPoolTempl : public ThreadPoolInterface {
|
||||
public:
|
||||
// Construct a pool that contains "num_threads" threads.
|
||||
explicit SimpleThreadPoolTempl(int num_threads, Environment env = Environment())
|
||||
: env_(env), threads_(num_threads), waiters_(num_threads) {
|
||||
for (int i = 0; i < num_threads; i++) {
|
||||
threads_.push_back(env.CreateThread([this]() { WorkerLoop(); }));
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until all scheduled work has finished and then destroy the
|
||||
// set of threads.
|
||||
~SimpleThreadPoolTempl() {
|
||||
{
|
||||
// Wait for all work to get done.
|
||||
std::unique_lock<std::mutex> l(mu_);
|
||||
while (!pending_.empty()) {
|
||||
empty_.wait(l);
|
||||
}
|
||||
exiting_ = true;
|
||||
|
||||
// Wakeup all waiters.
|
||||
for (auto w : waiters_) {
|
||||
w->ready = true;
|
||||
w->task.f = nullptr;
|
||||
w->cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for threads to finish.
|
||||
for (auto t : threads_) {
|
||||
delete t;
|
||||
}
|
||||
}
|
||||
|
||||
// Schedule fn() for execution in the pool of threads. The functions are
|
||||
// executed in the order in which they are scheduled.
|
||||
void Schedule(std::function<void()> fn) {
|
||||
Task t = env_.CreateTask(std::move(fn));
|
||||
std::unique_lock<std::mutex> l(mu_);
|
||||
if (waiters_.empty()) {
|
||||
pending_.push_back(std::move(t));
|
||||
} else {
|
||||
Waiter* w = waiters_.back();
|
||||
waiters_.pop_back();
|
||||
w->ready = true;
|
||||
w->task = std::move(t);
|
||||
w->cv.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
void WorkerLoop() {
|
||||
std::unique_lock<std::mutex> l(mu_);
|
||||
Waiter w;
|
||||
Task t;
|
||||
while (!exiting_) {
|
||||
if (pending_.empty()) {
|
||||
// Wait for work to be assigned to me
|
||||
w.ready = false;
|
||||
waiters_.push_back(&w);
|
||||
while (!w.ready) {
|
||||
w.cv.wait(l);
|
||||
}
|
||||
t = w.task;
|
||||
w.task.f = nullptr;
|
||||
} else {
|
||||
// Pick up pending work
|
||||
t = std::move(pending_.front());
|
||||
pending_.pop_front();
|
||||
if (pending_.empty()) {
|
||||
empty_.notify_all();
|
||||
}
|
||||
}
|
||||
if (t.f) {
|
||||
mu_.unlock();
|
||||
env_.ExecuteTask(t);
|
||||
t.f = nullptr;
|
||||
mu_.lock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
typedef typename Environment::Task Task;
|
||||
typedef typename Environment::EnvThread Thread;
|
||||
|
||||
struct Waiter {
|
||||
std::condition_variable cv;
|
||||
Task task;
|
||||
bool ready;
|
||||
};
|
||||
|
||||
Environment env_;
|
||||
std::mutex mu_;
|
||||
MaxSizeVector<Thread*> threads_; // All threads
|
||||
MaxSizeVector<Waiter*> waiters_; // Stack of waiting threads.
|
||||
std::deque<Task> pending_; // Queue of pending work
|
||||
std::condition_variable empty_; // Signaled on pending_.empty()
|
||||
bool exiting_ = false;
|
||||
};
|
||||
|
||||
typedef SimpleThreadPoolTempl<StlThreadEnvironment> SimpleThreadPool;
|
||||
|
||||
} // namespace Eigen
|
||||
|
||||
#endif // EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
|
38
unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h
Normal file
38
unsupported/Eigen/CXX11/src/ThreadPool/ThreadEnvironment.h
Normal file
@ -0,0 +1,38 @@
|
||||
// This file is part of Eigen, a lightweight C++ template library
|
||||
// for linear algebra.
|
||||
//
|
||||
// Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla
|
||||
// Public License v. 2.0. If a copy of the MPL was not distributed
|
||||
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
#ifndef EIGEN_CXX11_THREADPOOL_THREAD_ENVIRONMENT_H
|
||||
#define EIGEN_CXX11_THREADPOOL_THREAD_ENVIRONMENT_H
|
||||
|
||||
namespace Eigen {
|
||||
|
||||
struct StlThreadEnvironment {
|
||||
struct Task {
|
||||
std::function<void()> f;
|
||||
};
|
||||
|
||||
// EnvThread constructor must start the thread,
|
||||
// destructor must join the thread.
|
||||
class EnvThread {
|
||||
public:
|
||||
EnvThread(std::function<void()> f) : thr_(f) {}
|
||||
~EnvThread() { thr_.join(); }
|
||||
|
||||
private:
|
||||
std::thread thr_;
|
||||
};
|
||||
|
||||
EnvThread* CreateThread(std::function<void()> f) { return new EnvThread(f); }
|
||||
Task CreateTask(std::function<void()> f) { return Task{std::move(f)}; }
|
||||
void ExecuteTask(const Task& t) { t.f(); }
|
||||
};
|
||||
|
||||
} // namespace Eigen
|
||||
|
||||
#endif // EIGEN_CXX11_THREADPOOL_THREAD_ENVIRONMENT_H
|
26
unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h
Normal file
26
unsupported/Eigen/CXX11/src/ThreadPool/ThreadPoolInterface.h
Normal file
@ -0,0 +1,26 @@
|
||||
// This file is part of Eigen, a lightweight C++ template library
|
||||
// for linear algebra.
|
||||
//
|
||||
// Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla
|
||||
// Public License v. 2.0. If a copy of the MPL was not distributed
|
||||
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
#ifndef EIGEN_CXX11_THREADPOOL_THREAD_POOL_INTERFACE_H
|
||||
#define EIGEN_CXX11_THREADPOOL_THREAD_POOL_INTERFACE_H
|
||||
|
||||
namespace Eigen {
|
||||
|
||||
// This defines an interface that ThreadPoolDevice can take to use
|
||||
// custom thread pools underneath.
|
||||
class ThreadPoolInterface {
|
||||
public:
|
||||
virtual void Schedule(std::function<void()> fn) = 0;
|
||||
|
||||
virtual ~ThreadPoolInterface() {}
|
||||
};
|
||||
|
||||
} // namespace Eigen
|
||||
|
||||
#endif // EIGEN_CXX11_THREADPOOL_THREAD_POOL_INTERFACE_H
|
Loading…
Reference in New Issue
Block a user