Merged in rmlarsen/eigen3 (pull request PR-468)

Add support for emulating thread local.
This commit is contained in:
Christoph Hertzberg 2018-08-24 17:29:03 +00:00
commit 949b0ad9cb
8 changed files with 172 additions and 82 deletions

View File

@ -125,7 +125,7 @@ inline void on_temporary_creation(long int size) {
if(nb_temporaries!=(N)) { std::cerr << "nb_temporaries == " << nb_temporaries << "\n"; }\
VERIFY( (#XPR) && nb_temporaries==(N) ); \
}
#endif
#include "split_test_helper.h"
@ -328,7 +328,7 @@ namespace Eigen
#define VERIFY_RAISES_STATIC_ASSERT(a) \
std::cout << "Can't VERIFY_RAISES_STATIC_ASSERT( " #a " ) with exceptions disabled\n";
#endif
#if !defined(__CUDACC__) && !defined(__HIPCC__) && !defined(__SYCL_DEVICE_ONLY__)
#define EIGEN_USE_CUSTOM_ASSERT
#endif

View File

@ -44,17 +44,27 @@
#include <thread>
#include <functional>
#include <memory>
#include "src/util/CXX11Meta.h"
#include "src/util/MaxSizeVector.h"
#include "src/ThreadPool/ThreadLocal.h"
#ifndef EIGEN_THREAD_LOCAL
// There are non-parenthesized calls to "max" in the <unordered_map> header,
// which trigger a check in test/main.h causing compilation to fail.
// We work around the check here by removing the check for max in
// the case where we have to emulate thread_local.
#ifdef max
#undef max
#endif
#include <unordered_map>
#endif
#include "src/ThreadPool/ThreadYield.h"
#include "src/ThreadPool/ThreadCancel.h"
#include "src/ThreadPool/EventCount.h"
#include "src/ThreadPool/RunQueue.h"
#include "src/ThreadPool/ThreadPoolInterface.h"
#include "src/ThreadPool/ThreadEnvironment.h"
#include "src/ThreadPool/Barrier.h"
#include "src/ThreadPool/NonBlockingThreadPool.h"
#endif
@ -62,4 +72,3 @@
#include <Eigen/src/Core/util/ReenableStupidWarnings.h>
#endif // EIGEN_CXX11_THREADPOOL_MODULE

View File

@ -12,56 +12,6 @@
namespace Eigen {
// Barrier is an object that allows one or more threads to wait until
// Notify has been called a specified number of times.
class Barrier {
public:
Barrier(unsigned int count) : state_(count << 1), notified_(false) {
eigen_assert(((count << 1) >> 1) == count);
}
~Barrier() {
eigen_assert((state_>>1) == 0);
}
void Notify() {
unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
if (v != 1) {
eigen_assert(((v + 2) & ~1) != 0);
return; // either count has not dropped to 0, or waiter is not waiting
}
std::unique_lock<std::mutex> l(mu_);
eigen_assert(!notified_);
notified_ = true;
cv_.notify_all();
}
void Wait() {
unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
if ((v >> 1) == 0) return;
std::unique_lock<std::mutex> l(mu_);
while (!notified_) {
cv_.wait(l);
}
}
private:
std::mutex mu_;
std::condition_variable cv_;
std::atomic<unsigned int> state_; // low bit is waiter flag
bool notified_;
};
// Notification is an object that allows a user to to wait for another
// thread to signal a notification that an event has occurred.
//
// Multiple threads can wait on the same Notification object,
// but only one caller must call Notify() on the object.
struct Notification : Barrier {
Notification() : Barrier(1) {};
};
// Runs an arbitrary function and then calls Notify() on the passed in
// Notification.
template <typename Function, typename... Args> struct FunctionWrapperWithNotification

View File

@ -0,0 +1,64 @@
// This file is part of Eigen, a lightweight C++ template library
// for linear algebra.
//
// Copyright (C) 2018 Rasmus Munk Larsen <rmlarsen@google.com>
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
// Barrier is an object that allows one or more threads to wait until
// Notify has been called a specified number of times.
#ifndef EIGEN_CXX11_THREADPOOL_BARRIER_H
#define EIGEN_CXX11_THREADPOOL_BARRIER_H
namespace Eigen {
class Barrier {
public:
Barrier(unsigned int count) : state_(count << 1), notified_(false) {
eigen_assert(((count << 1) >> 1) == count);
}
~Barrier() { eigen_plain_assert((state_ >> 1) == 0); }
void Notify() {
unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
if (v != 1) {
eigen_assert(((v + 2) & ~1) != 0);
return; // either count has not dropped to 0, or waiter is not waiting
}
std::unique_lock<std::mutex> l(mu_);
eigen_assert(!notified_);
notified_ = true;
cv_.notify_all();
}
void Wait() {
unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
if ((v >> 1) == 0) return;
std::unique_lock<std::mutex> l(mu_);
while (!notified_) {
cv_.wait(l);
}
}
private:
std::mutex mu_;
std::condition_variable cv_;
std::atomic<unsigned int> state_; // low bit is waiter flag
bool notified_;
};
// Notification is an object that allows a user to to wait for another
// thread to signal a notification that an event has occurred.
//
// Multiple threads can wait on the same Notification object,
// but only one caller must call Notify() on the object.
struct Notification : Barrier {
Notification() : Barrier(1){};
};
} // namespace Eigen
#endif // EIGEN_CXX11_THREADPOOL_BARRIER_H

View File

@ -58,7 +58,7 @@ class EventCount {
~EventCount() {
// Ensure there are no waiters.
eigen_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask);
eigen_plain_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask);
}
// Prewait prepares for waiting.
@ -169,7 +169,8 @@ class EventCount {
class Waiter {
friend class EventCount;
// Align to 128 byte boundary to prevent false sharing with other Waiter objects in the same vector.
// Align to 128 byte boundary to prevent false sharing with other Waiter
// objects in the same vector.
EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<Waiter*> next;
std::mutex mu;
std::condition_variable cv;

View File

@ -10,7 +10,6 @@
#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
namespace Eigen {
template <typename Environment>
@ -23,7 +22,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
: ThreadPoolTempl(num_threads, true, env) {}
ThreadPoolTempl(int num_threads, bool allow_spinning,
Environment env = Environment())
Environment env = Environment())
: env_(env),
num_threads_(num_threads),
allow_spinning_(allow_spinning),
@ -59,9 +58,17 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
}
}
queues_.resize(num_threads_);
#ifndef EIGEN_THREAD_LOCAL
init_barrier_.reset(new Barrier(num_threads_));
#endif
for (int i = 0; i < num_threads_; i++) {
threads_.emplace_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
}
#ifndef EIGEN_THREAD_LOCAL
// Wait for workers to initialize per_thread_map_. Otherwise we might race
// with them in Schedule or CurrentThreadId.
init_barrier_->Wait();
#endif
}
~ThreadPoolTempl() {
@ -82,6 +89,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// Join threads explicitly to avoid destruction order issues.
threads_.resize(0);
queues_.resize(0);
}
void Schedule(std::function<void()> fn) {
@ -106,8 +114,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// this is kept alive while any threads can potentially be in Schedule.
if (!t.f) {
ec_.Notify(false);
}
else {
} else {
env_.ExecuteTask(t); // Push failed, execute directly.
}
}
@ -127,13 +134,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
ec_.Notify(true);
}
int NumThreads() const final {
return num_threads_;
}
int NumThreads() const final { return num_threads_; }
int CurrentThreadId() const final {
const PerThread* pt =
const_cast<ThreadPoolTempl*>(this)->GetPerThread();
const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
if (pt->pool == this) {
return pt->thread_id;
} else {
@ -145,10 +149,14 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
typedef typename Environment::EnvThread Thread;
struct PerThread {
constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { }
constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
uint64_t rand; // Random generator state.
int thread_id; // Worker thread index in pool.
uint64_t rand; // Random generator state.
int thread_id; // Worker thread index in pool.
#ifndef EIGEN_THREAD_LOCAL
// Prevent false sharing.
char pad_[128];
#endif
};
Environment env_;
@ -163,12 +171,25 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
std::atomic<bool> done_;
std::atomic<bool> cancelled_;
EventCount ec_;
#ifndef EIGEN_THREAD_LOCAL
std::unique_ptr<Barrier> init_barrier_;
std::mutex per_thread_map_mutex_; // Protects per_thread_map_.
std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_;
#endif
// Main worker thread loop.
void WorkerLoop(int thread_id) {
#ifndef EIGEN_THREAD_LOCAL
std::unique_ptr<PerThread> new_pt(new PerThread());
per_thread_map_mutex_.lock();
eigen_assert(per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second);
per_thread_map_mutex_.unlock();
init_barrier_->Notify();
init_barrier_->Wait();
#endif
PerThread* pt = GetPerThread();
pt->pool = this;
pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
pt->rand = GlobalThreadIdHash();
pt->thread_id = thread_id;
Queue& q = queues_[thread_id];
EventCount::Waiter* waiter = &waiters_[thread_id];
@ -320,10 +341,24 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
return -1;
}
static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
return std::hash<std::thread::id>()(std::this_thread::get_id());
}
EIGEN_STRONG_INLINE PerThread* GetPerThread() {
#ifndef EIGEN_THREAD_LOCAL
static PerThread dummy;
auto it = per_thread_map_.find(GlobalThreadIdHash());
if (it == per_thread_map_.end()) {
return &dummy;
} else {
return it->second.get();
}
#else
EIGEN_THREAD_LOCAL PerThread per_thread_;
PerThread* pt = &per_thread_;
return pt;
#endif
}
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
@ -331,7 +366,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// Update the internal state
*state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
// Generate the random output (using the PCG-XSH-RS scheme)
return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
return static_cast<unsigned>((current ^ (current >> 22)) >>
(22 + (current >> 61)));
}
};

View File

@ -10,7 +10,6 @@
#ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
#define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
namespace Eigen {
// RunQueue is a fixed-size, partially non-blocking deque or Work items.
@ -47,7 +46,7 @@ class RunQueue {
array_[i].state.store(kEmpty, std::memory_order_relaxed);
}
~RunQueue() { eigen_assert(Size() == 0); }
~RunQueue() { eigen_plain_assert(Size() == 0); }
// PushFront inserts w at the beginning of the queue.
// If queue is full returns w, otherwise returns default-constructed Work.
@ -131,9 +130,8 @@ class RunQueue {
Elem* e = &array_[mid & kMask];
uint8_t s = e->state.load(std::memory_order_relaxed);
if (n == 0) {
if (s != kReady ||
!e->state.compare_exchange_strong(s, kBusy,
std::memory_order_acquire))
if (s != kReady || !e->state.compare_exchange_strong(
s, kBusy, std::memory_order_acquire))
continue;
start = mid;
} else {

View File

@ -10,13 +10,45 @@
#ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
#define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
// Try to come up with a portable implementation of thread local variables
#if EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7)
#define EIGEN_THREAD_LOCAL static __thread
#elif EIGEN_COMP_CLANG
#define EIGEN_THREAD_LOCAL static __thread
#else
#if EIGEN_MAX_CPP_VER >= 11 && \
((EIGEN_COMP_GNUC && EIGEN_GNUC_AT_LEAST(4, 8)) || \
__has_feature(cxx_thread_local))
#define EIGEN_THREAD_LOCAL static thread_local
#endif
// Disable TLS for Apple and Android builds with older toolchains.
#if defined(__APPLE__)
// Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED,
// __IPHONE_8_0.
#include <Availability.h>
#include <TargetConditionals.h>
#endif
// Checks whether C++11's `thread_local` storage duration specifier is
// supported.
#if defined(__apple_build_version__) && \
((__apple_build_version__ < 8000042) || \
(TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0))
// Notes: Xcode's clang did not support `thread_local` until version
// 8, and even then not for all iOS < 9.0.
#undef EIGEN_THREAD_LOCAL
#elif defined(__ANDROID__) && EIGEN_COMP_CLANG
// There are platforms for which TLS should not be used even though the compiler
// makes it seem like it's supported (Android NDK < r12b for example).
// This is primarily because of linker problems and toolchain misconfiguration:
// TLS isn't supported until NDK r12b per
// https://developer.android.com/ndk/downloads/revision_history.html
// Since NDK r16, `__NDK_MAJOR__` and `__NDK_MINOR__` are defined in
// <android/ndk-version.h>. For NDK < r16, users should define these macros,
// e.g. `-D__NDK_MAJOR__=11 -D__NKD_MINOR__=0` for NDK r11.
#if __has_include(<android/ndk-version.h>)
#include <android/ndk-version.h>
#endif // __has_include(<android/ndk-version.h>)
#if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && \
defined(__NDK_MINOR__) && \
((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1)))
#undef EIGEN_THREAD_LOCAL
#endif
#endif // defined(__ANDROID__) && defined(__clang__)
#endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H