Moved away from std::async and std::future as the underlying mechnism for the thread pool device. On several platforms, the functions passed to std::async are not scheduled in the order in which they are given to std::async, which leads to massive performance issues in the contraction code.

Instead we now have a custom thread pool that ensures that the functions are picked up by the threads in the pool in the order in which they are enqueued in the pool.
This commit is contained in:
Benoit Steiner 2015-05-20 13:52:07 -07:00
parent 48f6b274e2
commit 6b800744ce
5 changed files with 212 additions and 48 deletions

View File

@ -35,7 +35,10 @@
#endif
#ifdef EIGEN_USE_THREADS
#include <future>
#include <condition_variable>
#include <deque>
#include <mutex>
#include <thread>
#endif
#ifdef EIGEN_USE_GPU

View File

@ -46,8 +46,8 @@ struct packRhsAndKernelArg {
const Index n_block_idx;
const Index m_blocks;
const Index n_blocks;
std::vector<Promise>* kernel_promises;
const std::vector<Future>* lhs_futures;
std::vector<Notification*>* kernel_notifications;
const std::vector<Notification*>* lhs_notifications;
const bool need_to_pack;
};
@ -219,17 +219,13 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
blockBs.push_back(static_cast<RhsScalar *>(this->m_device.allocate(sizeB * sizeof(RhsScalar))));
}
// lhs_futures starts with all null futures
std::vector<Future> lhs_futures(num_threads);
// lhs_notifications starts with all null Notifications
std::vector<Notification*> lhs_notifications(num_threads, nullptr);
// this should really be numBlockAs * n_blocks;
const Index num_kernel_promises = num_threads * n_blocks;
std::vector<Promise> kernel_promises(num_kernel_promises);
std::vector<Future> kernel_futures(num_kernel_promises);
for (std::size_t i = 0; i < kernel_promises.size(); ++i) {
kernel_promises[i].set_value();
kernel_futures[i] = kernel_promises[i].get_future();
}
const Index num_kernel_notifications = num_threads * n_blocks;
std::vector<Notification*> kernel_notifications(num_kernel_notifications,
nullptr);
for (Index k_block_idx = 0; k_block_idx < k_blocks; k_block_idx++) {
const Index k_start = k_block_idx * kc;
@ -245,11 +241,16 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
eigen_assert(actual_mc > 0);
Index blockAId = (k_block_idx * m_blocks + mt_block_idx) % num_threads;
for (int i = 0; i < n_blocks; ++i) {
Index future_id = (blockAId * n_blocks + i);
wait_until_ready(&kernel_futures[future_id]);
kernel_promises[future_id] = Promise();
kernel_futures[future_id] = kernel_promises[future_id].get_future();
Index notification_id = (blockAId * n_blocks + i);
// Wait for any current kernels using this slot to complete
// before using it.
if (kernel_notifications[notification_id]) {
wait_until_ready(kernel_notifications[notification_id]);
delete kernel_notifications[notification_id];
}
kernel_notifications[notification_id] = new Notification();
}
const packLArg arg = {
blockAs[blockAId], // blockA
@ -260,8 +261,12 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
actual_kc, // kc
};
lhs_futures[blockAId] =
this->m_device.enqueue(&Self::packLhs<packLArg, LhsPacker>, arg);
// Delete any existing notification since we may be
// replacing it. The algorithm should ensure that there are
// no existing waiters on this notification.
delete lhs_notifications[blockAId];
lhs_notifications[blockAId] =
this->m_device.enqueue(&Self::packLhs<packLArg, LhsPacker>, arg);
}
// now start kernels.
@ -278,7 +283,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
for (Index i = num_blocks; i < num_threads; ++i) {
Index blockAId = (k_block_idx * m_blocks + i + m_block_idx) % num_threads;
Index future_id = (blockAId * n_blocks + n_block_idx);
wait_until_ready(&kernel_futures[future_id]);
wait_until_ready(kernel_notifications[future_id]);
}
}
@ -301,19 +306,29 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
n_block_idx, // n_block_idx
m_blocks, // m_blocks
n_blocks, // n_blocks
&kernel_promises, // kernel_promises
&lhs_futures, // lhs_futures
&kernel_notifications, // kernel notifications
&lhs_notifications, // lhs notifications
need_to_pack, // need_to_pack
};
this->m_device.enqueueNoFuture(&Self::packRhsAndKernel<packRKArg, RhsPacker, GebpKernel>, arg);
// We asynchronously kick off this function, which ends up
// notifying the appropriate kernel_notifications objects,
// which this thread waits on before exiting.
this->m_device.enqueueNoNotification(&Self::packRhsAndKernel<packRKArg, RhsPacker, GebpKernel>, arg);
}
}
}
// Make sure all the kernels are done.
for (size_t i = 0; i < kernel_futures.size(); ++i) {
wait_until_ready(&kernel_futures[i]);
for (size_t i = 0; i < kernel_notifications.size(); ++i) {
wait_until_ready(kernel_notifications[i]);
delete kernel_notifications[i];
}
// No need to wait for lhs notifications since they should have
// already been waited on. Just clean them up.
for (size_t i = 0; i < lhs_notifications.size(); ++i) {
delete lhs_notifications[i];
}
// deallocate all of the memory for both A and B's
@ -360,15 +375,15 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
const Index m_base_start = arg.m + arg.mc*mt_block_idx;
if (m_base_start < arg.max_m) {
Index blockAId = (arg.k_block_idx * arg.m_blocks + mt_block_idx + arg.m_block_idx) % arg.num_threads;
wait_until_ready(&(*arg.lhs_futures)[blockAId]);
wait_until_ready((*arg.lhs_notifications)[blockAId]);
const Index actual_mc = (std::min)(m_base_start + arg.mc, arg.max_m) - m_base_start;
gebp(arg.output.getSubMapper(m_base_start, arg.n),
(*arg.blockAs)[blockAId], arg.blockB,
actual_mc, arg.kc, arg.nc, 1.0, -1, -1, 0, 0);
// Notify that the kernel is done.
const Index set_idx = blockAId * arg.n_blocks + arg.n_block_idx;
(*arg.kernel_promises)[set_idx].set_value();
(*arg.kernel_notifications)[set_idx]->Notify();
}
}
}

View File

@ -38,19 +38,151 @@ struct DefaultDevice {
// We should really use a thread pool here but first we need to find a portable thread pool library.
#ifdef EIGEN_USE_THREADS
typedef std::future<void> Future;
typedef std::promise<void> Promise;
// The implementation of the ThreadPool type ensures that the Schedule method
// runs the functions it is provided in FIFO order when the scheduling is done
// by a single thread.
class ThreadPool {
public:
// Construct a pool that contains "num_threads" threads.
explicit ThreadPool(int num_threads) {
for (int i = 0; i < num_threads; i++) {
threads_.push_back(new std::thread([this]() { WorkerLoop(); }));
}
}
static EIGEN_STRONG_INLINE void wait_until_ready(const Future* f) {
f->wait();
}
static EIGEN_STRONG_INLINE void get_when_ready(Future* f) {
f->get();
// Wait until all scheduled work has finished and then destroy the
// set of threads.
~ThreadPool()
{
{
// Wait for all work to get done.
std::unique_lock<std::mutex> l(mu_);
empty_.wait(l, [this]() { return pending_.empty(); });
exiting_ = true;
// Wakeup all waiters.
for (auto w : waiters_) {
w->ready = true;
w->work = nullptr;
w->cv.notify_one();
}
}
// Wait for threads to finish.
for (auto t : threads_) {
t->join();
delete t;
}
}
// Schedule fn() for execution in the pool of threads. The functions are
// executed in the order in which they are scheduled.
void Schedule(std::function<void()> fn) {
std::unique_lock<std::mutex> l(mu_);
if (waiters_.empty()) {
pending_.push_back(fn);
} else {
Waiter* w = waiters_.back();
waiters_.pop_back();
w->ready = true;
w->work = fn;
w->cv.notify_one();
}
}
protected:
void WorkerLoop() {
std::unique_lock<std::mutex> l(mu_);
Waiter w;
while (!exiting_) {
std::function<void()> fn;
if (pending_.empty()) {
// Wait for work to be assigned to me
w.ready = false;
waiters_.push_back(&w);
w.cv.wait(l, [&w]() { return w.ready; });
fn = w.work;
w.work = nullptr;
} else {
// Pick up pending work
fn = pending_.front();
pending_.pop_front();
if (pending_.empty()) {
empty_.notify_all();
}
}
if (fn) {
mu_.unlock();
fn();
mu_.lock();
}
}
}
private:
struct Waiter {
std::condition_variable cv;
std::function<void()> work;
bool ready;
};
std::mutex mu_;
std::vector<std::thread*> threads_; // All threads
std::vector<Waiter*> waiters_; // Stack of waiting threads.
std::deque<std::function<void()>> pending_; // Queue of pending work
std::condition_variable empty_; // Signaled on pending_.empty()
bool exiting_ = false;
};
// Notification is an object that allows a user to to wait for another
// thread to signal a notification that an event has occurred.
//
// Multiple threads can wait on the same Notification object.
// but only one caller must call Notify() on the object.
class Notification {
public:
Notification() : notified_(false) {}
~Notification() {}
void Notify() {
std::unique_lock<std::mutex> l(mu_);
eigen_assert(!notified_);
notified_ = true;
cv_.notify_all();
}
void WaitForNotification() {
std::unique_lock<std::mutex> l(mu_);
cv_.wait(l, [this]() { return notified_; } );
}
private:
std::mutex mu_;
std::condition_variable cv_;
bool notified_;
};
// Runs an arbitrary function and then calls Notify() on the passed in
// Notification.
template <typename Function, typename... Args> struct FunctionWrapper
{
static void run(Notification* n, Function f, Args... args) {
f(args...);
n->Notify();
}
};
static EIGEN_STRONG_INLINE void wait_until_ready(Notification* n) {
if (n) {
n->WaitForNotification();
}
}
// Build a thread pool device on top the an existing pool of threads.
struct ThreadPoolDevice {
ThreadPoolDevice(size_t num_cores) : num_threads_(num_cores) { }
ThreadPoolDevice(ThreadPool* pool, size_t num_cores) : pool_(pool), num_threads_(num_cores) { }
EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
return internal::aligned_malloc(num_bytes);
@ -73,15 +205,21 @@ struct ThreadPoolDevice {
}
template <class Function, class... Args>
EIGEN_STRONG_INLINE Future enqueue(Function&& f, Args&&... args) const {
return std::async(std::launch::async, f, args...);
EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
Notification* n = new Notification();
std::function<void()> func =
std::bind(&FunctionWrapper<Function, Args...>::run, n, f, args...);
pool_->Schedule(func);
return n;
}
template <class Function, class... Args>
EIGEN_STRONG_INLINE void enqueueNoFuture(Function&& f, Args&&... args) const {
std::async(std::launch::async, f, args...);
EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
std::function<void()> func = std::bind(f, args...);
pool_->Schedule(func);
}
private:
ThreadPool* pool_;
size_t num_threads_;
};

View File

@ -131,7 +131,7 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable>
const Index blocksize = std::max<Index>(PacketSize, (blocksz - (blocksz % PacketSize)));
const Index numblocks = size / blocksize;
std::vector<Future> results;
std::vector<Notification*> results;
results.reserve(numblocks);
for (int i = 0; i < numblocks; ++i) {
results.push_back(device.enqueue(&EvalRange<Evaluator, Index>::run, evaluator, i*blocksize, (i+1)*blocksize));
@ -142,7 +142,8 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable>
}
for (int i = 0; i < numblocks; ++i) {
get_when_ready(&results[i]);
wait_until_ready(results[i]);
delete results[i];
}
}

View File

@ -26,7 +26,8 @@ static void test_multithread_elementwise()
in1.setRandom();
in2.setRandom();
Eigen::ThreadPoolDevice thread_pool_device(internal::random<int>(3, 11));
Eigen::ThreadPool tp(internal::random<int>(3, 11));
Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11));
out.device(thread_pool_device) = in1 + in2 * 3.14f;
for (int i = 0; i < 2; ++i) {
@ -48,7 +49,8 @@ static void test_multithread_compound_assignment()
in1.setRandom();
in2.setRandom();
Eigen::ThreadPoolDevice thread_pool_device(internal::random<int>(3, 11));
Eigen::ThreadPool tp(internal::random<int>(3, 11));
Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11));
out.device(thread_pool_device) = in1;
out.device(thread_pool_device) += in2 * 3.14f;
@ -80,7 +82,8 @@ static void test_multithread_contraction()
MapXf m_right(t_right.data(), 1147, 1400);
Matrix<float, Dynamic, Dynamic, DataLayout> m_result(1500, 1400);
Eigen::ThreadPoolDevice thread_pool_device(4);
Eigen::ThreadPool tp(4);
Eigen::ThreadPoolDevice thread_pool_device(&tp, 4);
// compute results by separate methods
t_result.device(thread_pool_device) = t_left.contract(t_right, dims);
@ -115,7 +118,8 @@ static void test_contraction_corner_cases()
MapXf m_right(t_right.data(), 32, 28*28);
Matrix<float, Dynamic, Dynamic, DataLayout> m_result(500, 28*28);
Eigen::ThreadPoolDevice thread_pool_device(12);
Eigen::ThreadPool tp(12);
Eigen::ThreadPoolDevice thread_pool_device(&tp, 12);
// compute results by separate methods
t_result.device(thread_pool_device) = t_left.contract(t_right, dims);
@ -204,7 +208,8 @@ static void test_multithread_contraction_agrees_with_singlethread() {
typedef Tensor<float, 1>::DimensionPair DimPair;
Eigen::array<DimPair, 1> dims({{DimPair(1, 2)}});
Eigen::ThreadPoolDevice thread_pool_device(internal::random<int>(2, 11));
Eigen::ThreadPool tp(internal::random<int>(2, 11));
Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(2, 11));
Tensor<float, 5, DataLayout> st_result;
st_result = left.contract(right, dims);
@ -227,7 +232,8 @@ static void test_memcpy() {
for (int i = 0; i < 5; ++i) {
const int num_threads = internal::random<int>(3, 11);
Eigen::ThreadPoolDevice thread_pool_device(num_threads);
Eigen::ThreadPool tp(num_threads);
Eigen::ThreadPoolDevice thread_pool_device(&tp, num_threads);
const int size = internal::random<int>(13, 7632);
Tensor<float, 1> t1(size);
@ -243,7 +249,8 @@ static void test_memcpy() {
static void test_multithread_random()
{
Eigen::ThreadPoolDevice device(2);
Eigen::ThreadPool tp(2);
Eigen::ThreadPoolDevice device(&tp, 2);
Tensor<float, 1> t(1 << 20);
t.device(device) = t.random<Eigen::internal::NormalRandomGenerator<float>>();
}