Add support for thread local support on platforms that do not support it through emulation using a hash map.

This commit is contained in:
Rasmus Munk Larsen 2018-08-13 15:31:23 -07:00
parent 3ec60215df
commit 8278ae6313
5 changed files with 103 additions and 76 deletions

View File

@ -125,7 +125,7 @@ inline void on_temporary_creation(long int size) {
if(nb_temporaries!=(N)) { std::cerr << "nb_temporaries == " << nb_temporaries << "\n"; }\
VERIFY( (#XPR) && nb_temporaries==(N) ); \
}
#endif
#include "split_test_helper.h"
@ -328,7 +328,7 @@ namespace Eigen
#define VERIFY_RAISES_STATIC_ASSERT(a) \
std::cout << "Can't VERIFY_RAISES_STATIC_ASSERT( " #a " ) with exceptions disabled\n";
#endif
#if !defined(__CUDACC__) && !defined(__HIPCC__) && !defined(__SYCL_DEVICE_ONLY__)
#define EIGEN_USE_CUSTOM_ASSERT
#endif
@ -845,4 +845,4 @@ int main(int argc, char *argv[])
#ifdef _MSC_VER
// 4503 - decorated name length exceeded, name was truncated
#pragma warning( disable : 4503)
#endif
#endif

View File

@ -44,6 +44,14 @@
#include <thread>
#include <functional>
#include <memory>
#ifndef EIGEN_THREAD_LOCAL
// There are non-parenthesized calls to "max" in the <unordered_map> header,
// which trigger a check in test/main.h causing compilation to fail.
// We work around the check here by removing the check for max in
// the case where we have to emulate thread_local.
#undef max
#include <unordered_map>
#endif
#include "src/util/CXX11Meta.h"
#include "src/util/MaxSizeVector.h"
@ -55,6 +63,7 @@
#include "src/ThreadPool/RunQueue.h"
#include "src/ThreadPool/ThreadPoolInterface.h"
#include "src/ThreadPool/ThreadEnvironment.h"
#include "src/ThreadPool/Barrier.h"
#include "src/ThreadPool/NonBlockingThreadPool.h"
#endif
@ -62,4 +71,3 @@
#include <Eigen/src/Core/util/ReenableStupidWarnings.h>
#endif // EIGEN_CXX11_THREADPOOL_MODULE

View File

@ -12,56 +12,6 @@
namespace Eigen {
// Barrier is an object that allows one or more threads to wait until
// Notify has been called a specified number of times.
class Barrier {
public:
Barrier(unsigned int count) : state_(count << 1), notified_(false) {
eigen_assert(((count << 1) >> 1) == count);
}
~Barrier() {
eigen_assert((state_>>1) == 0);
}
void Notify() {
unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
if (v != 1) {
eigen_assert(((v + 2) & ~1) != 0);
return; // either count has not dropped to 0, or waiter is not waiting
}
std::unique_lock<std::mutex> l(mu_);
eigen_assert(!notified_);
notified_ = true;
cv_.notify_all();
}
void Wait() {
unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
if ((v >> 1) == 0) return;
std::unique_lock<std::mutex> l(mu_);
while (!notified_) {
cv_.wait(l);
}
}
private:
std::mutex mu_;
std::condition_variable cv_;
std::atomic<unsigned int> state_; // low bit is waiter flag
bool notified_;
};
// Notification is an object that allows a user to to wait for another
// thread to signal a notification that an event has occurred.
//
// Multiple threads can wait on the same Notification object,
// but only one caller must call Notify() on the object.
struct Notification : Barrier {
Notification() : Barrier(1) {};
};
// Runs an arbitrary function and then calls Notify() on the passed in
// Notification.
template <typename Function, typename... Args> struct FunctionWrapperWithNotification

View File

@ -10,7 +10,6 @@
#ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
#define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
namespace Eigen {
template <typename Environment>
@ -23,7 +22,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
: ThreadPoolTempl(num_threads, true, env) {}
ThreadPoolTempl(int num_threads, bool allow_spinning,
Environment env = Environment())
Environment env = Environment())
: env_(env),
num_threads_(num_threads),
allow_spinning_(allow_spinning),
@ -61,9 +60,17 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
for (int i = 0; i < num_threads_; i++) {
queues_.push_back(new Queue());
}
#ifndef EIGEN_THREAD_LOCAL
init_barrier_.reset(new Barrier(num_threads_));
#endif
for (int i = 0; i < num_threads_; i++) {
threads_.push_back(env_.CreateThread([this, i]() { WorkerLoop(i); }));
}
#ifndef EIGEN_THREAD_LOCAL
// Wait for workers to initialize per_thread_map_. Otherwise we might race
// with them in Schedule or CurrentThreadId.
init_barrier_->Wait();
#endif
}
~ThreadPoolTempl() {
@ -85,6 +92,9 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// Join threads explicitly to avoid destruction order issues.
for (size_t i = 0; i < num_threads_; i++) delete threads_[i];
for (size_t i = 0; i < num_threads_; i++) delete queues_[i];
#ifndef EIGEN_THREAD_LOCAL
for (auto it : per_thread_map_) delete it.second;
#endif
}
void Schedule(std::function<void()> fn) {
@ -109,8 +119,7 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// this is kept alive while any threads can potentially be in Schedule.
if (!t.f) {
ec_.Notify(false);
}
else {
} else {
env_.ExecuteTask(t); // Push failed, execute directly.
}
}
@ -130,13 +139,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
ec_.Notify(true);
}
int NumThreads() const final {
return num_threads_;
}
int NumThreads() const final { return num_threads_; }
int CurrentThreadId() const final {
const PerThread* pt =
const_cast<ThreadPoolTempl*>(this)->GetPerThread();
const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
if (pt->pool == this) {
return pt->thread_id;
} else {
@ -148,10 +154,10 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
typedef typename Environment::EnvThread Thread;
struct PerThread {
constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) { }
constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
uint64_t rand; // Random generator state.
int thread_id; // Worker thread index in pool.
uint64_t rand; // Random generator state.
int thread_id; // Worker thread index in pool.
};
Environment env_;
@ -166,12 +172,26 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
std::atomic<bool> done_;
std::atomic<bool> cancelled_;
EventCount ec_;
#ifndef EIGEN_THREAD_LOCAL
std::unique_ptr<Barrier> init_barrier_;
std::mutex mu; // Protects per_thread_map_.
std::unordered_map<uint64_t, PerThread*> per_thread_map_;
#endif
// Main worker thread loop.
void WorkerLoop(int thread_id) {
#ifndef EIGEN_THREAD_LOCAL
PerThread* pt = new PerThread();
mu.lock();
per_thread_map_[GlobalThreadIdHash()] = pt;
mu.unlock();
init_barrier_->Notify();
init_barrier_->Wait();
#else
PerThread* pt = GetPerThread();
#endif
pt->pool = this;
pt->rand = std::hash<std::thread::id>()(std::this_thread::get_id());
pt->rand = GlobalThreadIdHash();
pt->thread_id = thread_id;
Queue* q = queues_[thread_id];
EventCount::Waiter* waiter = &waiters_[thread_id];
@ -322,10 +342,24 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
return -1;
}
static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
return std::hash<std::thread::id>()(std::this_thread::get_id());
}
EIGEN_STRONG_INLINE PerThread* GetPerThread() {
#ifndef EIGEN_THREAD_LOCAL
static PerThread dummy;
auto it = per_thread_map_.find(GlobalThreadIdHash());
if (it == per_thread_map_.end()) {
return &dummy;
} else {
return it->second;
}
#else
EIGEN_THREAD_LOCAL PerThread per_thread_;
PerThread* pt = &per_thread_;
return pt;
#endif
}
static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
@ -333,7 +367,8 @@ class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
// Update the internal state
*state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
// Generate the random output (using the PCG-XSH-RS scheme)
return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
return static_cast<unsigned>((current ^ (current >> 22)) >>
(22 + (current >> 61)));
}
};

View File

@ -10,13 +10,47 @@
#ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
#define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
// Try to come up with a portable implementation of thread local variables
#if EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7)
#define EIGEN_THREAD_LOCAL static __thread
#elif EIGEN_COMP_CLANG
#define EIGEN_THREAD_LOCAL static __thread
#else
#define EIGEN_THREAD_LOCAL static thread_local
#undef EIGEN_THREAD_LOCAL
#if EIGEN_MAX_CPP_VER>=11 && (__has_feature(cxx_thread_local))
#define EIGEN_THREAD_LOCAL static thread_local
#elif (EIGEN_COMP_GNUC && EIGEN_GNUC_AT_MOST(4, 7)) || EIGEN_COMP_CLANG
#define EIGEN_THREAD_LOCAL static __thread
#endif
// Disable TLS for Apple and Android builds with older toolchains.
#if defined(__APPLE__)
// Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED,
// __IPHONE_8_0.
#include <Availability.h>
#include <TargetConditionals.h>
#endif
// Checks whether C++11's `thread_local` storage duration specifier is
// supported.
#if defined(__apple_build_version__) && \
((__apple_build_version__ < 8000042) || \
(TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0))
// Notes: Xcode's clang did not support `thread_local` until version
// 8, and even then not for all iOS < 9.0.
#undef EIGEN_THREAD_LOCAL
#elif defined(__ANDROID__) && EIGEN_COMP_CLANG
// There are platforms for which TLS should not be used even though the compiler
// makes it seem like it's supported (Android NDK < r12b for example).
// This is primarily because of linker problems and toolchain misconfiguration:
// TLS isn't supported until NDK r12b per
// https://developer.android.com/ndk/downloads/revision_history.html
// Since NDK r16, `__NDK_MAJOR__` and `__NDK_MINOR__` are defined in
// <android/ndk-version.h>. For NDK < r16, users should define these macros,
// e.g. `-D__NDK_MAJOR__=11 -D__NKD_MINOR__=0` for NDK r11.
#if __has_include(<android/ndk-version.h>)
#include <android/ndk-version.h>
#endif // __has_include(<android/ndk-version.h>)
#if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && \
defined(__NDK_MINOR__) && \
((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1)))
#undef EIGEN_THREAD_LOCAL
#endif
#endif // defined(__ANDROID__) && defined(__clang__)
#endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H