Use ThreadLocal container in TensorContractionThreadPool

This commit is contained in:
Eugene Zhulenev 2019-09-13 12:14:44 -07:00
parent facdec5aa7
commit 553caeb6a3

View File

@ -379,7 +379,8 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
bool parallel_pack,
bool parallelize_by_sharding_dim_only,
DoneCallback done)
: done_(this, std::move(done)),
: created_by_thread_id_(std::this_thread::get_id()),
done_(this, std::move(done)),
device_(self->m_device),
lhs_(self->m_leftImpl, self->m_left_nocontract_strides,
self->m_i_strides, self->m_left_contracting_strides,
@ -408,7 +409,20 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
gn_(gn),
nm0_(nm0),
nn0_(nn0),
kernel_(m_, k_, n_, bm_, bk_, bn_) {
kernel_(m_, k_, n_, bm_, bk_, bn_),
num_thread_local_allocations_(0),
// We reserve 2X more capacity for a thread local values, than the
// number of threads in the pool to efficiently handle task stealing
// by threads that are not managed by the pool.
thread_local_capacity(2 * (parallelize_by_sharding_dim_only_
? device_.numThreadsInPool()
: 0)),
// We will use only one of the Lhs/Rhs thread local storage depending
// on the shard_by_col value and we parallelize by sharding dim ONLY.
lhs_thread_local_blocks_(shard_by_col_ ? 0 : thread_local_capacity,
{*this}, {*this}),
rhs_thread_local_blocks_(shard_by_col_ ? thread_local_capacity : 0,
{*this}, {*this}) {
// These two options are mutually exclusive.
eigen_assert(!(parallel_pack && parallelize_by_sharding_dim_only));
@ -455,12 +469,12 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
std::memory_order_relaxed);
Index num_blocks = num_worker_threads * gn_;
thread_local_packed_mem_ = kernel_.allocateSlices( //
device_, //
/*num_lhs=*/0, //
/*num_rhs=*/num_blocks, //
/*num_slices=*/1, //
/*lhs_blocks=*/nullptr, &thread_local_packed_rhs_);
thread_local_pre_alocated_mem_ = kernel_.allocateSlices( //
device_, //
/*num_lhs=*/0, //
/*num_rhs=*/num_blocks, //
/*num_slices=*/1, //
/*lhs_blocks=*/nullptr, &rhs_thread_local_pre_allocated_);
} else {
can_use_thread_local_packed_ = new std::atomic<bool>[nm_];
@ -469,11 +483,11 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
std::memory_order_relaxed);
Index num_blocks = num_worker_threads * gm_;
thread_local_packed_mem_ = kernel_.allocateSlices( //
device_, //
/*num_lhs=*/num_blocks, //
/*num_rhs=*/0, //
/*num_slices=*/1, &thread_local_packed_lhs_, //
thread_local_pre_alocated_mem_ = kernel_.allocateSlices( //
device_, //
/*num_lhs=*/num_blocks, //
/*num_rhs=*/0, //
/*num_slices=*/1, &lhs_thread_local_pre_allocated_, //
/*rhs_blocks=*/nullptr);
}
}
@ -486,7 +500,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
}
kernel_.deallocate(device_, packed_mem_);
if (parallelize_by_sharding_dim_only_) {
kernel_.deallocate(device_, thread_local_packed_mem_);
kernel_.deallocate(device_, thread_local_pre_alocated_mem_);
delete[] can_use_thread_local_packed_;
}
}
@ -512,6 +526,8 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
}
private:
std::thread::id created_by_thread_id_;
// This notification is specialized on the type of DoneCallback and can be
// blocking or non-blocking.
EvalParallelNotification<DoneCallback, EvalParallelContext> done_;
@ -606,11 +622,185 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
// completion of the K-1 kernel, so we have to allocate "global" packed_lhs_
// and packed_rhs_ to allow kernels to be executed later on a thread
// different from the thread that was used for packing.
BlockMemHandle thread_local_packed_mem_;
// Only one of these will be initialized depending on shard_by_col value.
std::vector<LhsBlock> thread_local_packed_lhs_;
std::vector<RhsBlock> thread_local_packed_rhs_;
// Handle for pre-allocated thread local memory buffers.
BlockMemHandle thread_local_pre_alocated_mem_;
// Only one of these will be initialized depending on shard_by_col value
// (the size will be `num_worker_threads * num_grains_in_the_sharding_dim`).
std::vector<LhsBlock> lhs_thread_local_pre_allocated_;
std::vector<RhsBlock> rhs_thread_local_pre_allocated_;
// How many thread local blocks were already allocated.
std::atomic<int> num_thread_local_allocations_;
const int thread_local_capacity;
// We will use pre-allocated Lhs/Rhs blocks defined above, if the number of
// unique threads in a system is below or equal to the number of threads in
// a thread pool. We will fallback on dynamic memory allocation after that.
// ThreadLocalBlocks is a container for Lhs or Rhs thread local buffers. Its
// size is equal to the grain size in Lhs/Rhs sharding dimension.
template <typename BlockType>
class ThreadLocalBlocks {
public:
ThreadLocalBlocks() = default;
ThreadLocalBlocks(BlockType* base, size_t grain_size)
: is_pre_allocated_(true),
thread_local_pre_allocated_base_(base),
grain_size_(grain_size) {}
ThreadLocalBlocks(BlockMemHandle mem_handle,
std::vector<BlockType> blocks)
: is_pre_allocated_(false),
mem_handle_(std::move(mem_handle)),
blocks_(std::move(blocks)) {}
BlockType& block(int grain_index) {
eigen_assert(grain_index >= 0);
eigen_assert(static_cast<size_t>(grain_index) < size());
return is_pre_allocated_ ? thread_local_pre_allocated_base_[grain_index]
: blocks_[grain_index];
}
void Release(EvalParallelContext& ctx) const {
if (!is_pre_allocated_) {
ctx.kernel_.deallocate(ctx.device_, mem_handle_);
}
}
size_t size() const {
return is_pre_allocated_ ? grain_size_ : blocks_.size();
}
private:
bool is_pre_allocated_;
// Reuse pre-allocated thread local buffers.
BlockType* thread_local_pre_allocated_base_ = nullptr;
size_t grain_size_;
// These will be initialized only if `is_pre_allocated == false`.
BlockMemHandle mem_handle_;
std::vector<BlockType> blocks_;
};
// ThreadLocalBlocksInitialize callable does custom thread local blocks
// initialization, and will reuse pre-allocated buffers if possible, or will
// dynamically allocate new memory.
//
// Lhs/Rhs blocks might be of the same type, so we have to pass explicitly
// for what side do we plan to do block allocation.
template <typename BlockType, bool is_rhs>
class ThreadLocalBlocksInitialize {
static constexpr bool kIsLhs =
!is_rhs && std::is_same<BlockType, LhsBlock>::value;
static const bool kIsRhs =
is_rhs && std::is_same<BlockType, RhsBlock>::value;
static_assert(kIsLhs || kIsRhs, "Unkown block type");
using Blocks = ThreadLocalBlocks<BlockType>;
public:
ThreadLocalBlocksInitialize(EvalParallelContext& ctx)
: ctx_(ctx),
num_worker_threads_(ctx_.device_.numThreadsInPool()) {}
void operator()(Blocks& blocks) {
const int n = ctx_.num_thread_local_allocations_.fetch_add(
1, std::memory_order_relaxed);
if (n >= num_worker_threads_) {
ThreadLocalBlocksAllocator<is_rhs>::allocate(ctx_, blocks);
} else {
ThreadLocalBlocksAllocator<is_rhs>::reuse(ctx_, n, blocks);
}
}
private:
// NOTE(ezhulenev): Without 'if constexpr' we have to put calls to
// TensorContractionKernel::allocateSlices into template specializations.
// Also explicit specializations are not allowed at class scope in C++03,
// EvalCtx type parameter is just a workaround for that limitation.
template <bool pack_rhs, typename EvalCtx = EvalParallelContext>
struct ThreadLocalBlocksAllocator;
template <typename EvalCtx>
struct ThreadLocalBlocksAllocator</*pack_rhs=*/true, EvalCtx> {
static void allocate(EvalCtx& ctx, Blocks& blocks) {
std::vector<RhsBlock> rhs_blocks;
BlockMemHandle mem_handle = ctx.kernel_.allocateSlices(
ctx.device_,
/*num_lhs=*/0,
/*num_rhs=*/ctx.gn_,
/*num_slices=*/1,
/*lhs_blocks=*/nullptr, /*rhs_blocks=*/&rhs_blocks);
blocks = ThreadLocalBlocks<RhsBlock>(std::move(mem_handle),
std::move(rhs_blocks));
}
static void reuse(EvalCtx& ctx, int index, Blocks& blocks) {
RhsBlock* ptr = &ctx.rhs_thread_local_pre_allocated_[ctx.gn_ * index];
blocks = ThreadLocalBlocks<RhsBlock>(ptr, ctx.gn_);
}
};
template <typename EvalCtx>
struct ThreadLocalBlocksAllocator</*pack_rhs=*/false, EvalCtx> {
static void allocate(EvalCtx& ctx, Blocks& blocks) {
std::vector<RhsBlock> lhs_blocks;
BlockMemHandle mem_handle = ctx.kernel_.allocateSlices(
ctx.device_,
/*num_lhs=*/ctx.gm_,
/*num_rhs=*/0,
/*num_slices=*/1,
/*lhs_blocks=*/&lhs_blocks, /*rhs_blocks=*/nullptr);
blocks = ThreadLocalBlocks<RhsBlock>(std::move(mem_handle),
std::move(lhs_blocks));
}
static void reuse(EvalCtx& ctx, int index, Blocks& blocks) {
LhsBlock* ptr = &ctx.lhs_thread_local_pre_allocated_[ctx.gm_ * index];
blocks = ThreadLocalBlocks<LhsBlock>(ptr, ctx.gm_);
}
};
EvalParallelContext& ctx_;
const int num_worker_threads_;
};
template <typename BlockType>
class ThreadLocalBlocksRelease {
public:
using Blocks = ThreadLocalBlocks<BlockType>;
ThreadLocalBlocksRelease(EvalParallelContext& ctx) : ctx_(ctx) {}
void operator()(Blocks& blocks) { blocks.Release(ctx_); }
private:
EvalParallelContext& ctx_;
};
// ThreadLocalBlocks initialization callables.
using ThreadLocalLhsInit =
ThreadLocalBlocksInitialize<LhsBlock, /*is_rhs=*/false>;
using ThreadLocalRhsInit =
ThreadLocalBlocksInitialize<RhsBlock, /*is_rhs=*/true>;
// ThreadLocalBlocks release callables.
using ThreadLocalLhsRelease = ThreadLocalBlocksRelease<LhsBlock>;
using ThreadLocalRhsRelease = ThreadLocalBlocksRelease<RhsBlock>;
// Thread local containers for Lhs/Rhs block packs. In practice only one of
// them will be used, depending on the shard_by_col value.
Eigen::ThreadLocal<ThreadLocalBlocks<LhsBlock>, ThreadLocalLhsInit,
ThreadLocalLhsRelease>
lhs_thread_local_blocks_;
Eigen::ThreadLocal<ThreadLocalBlocks<RhsBlock>, ThreadLocalRhsInit,
ThreadLocalRhsRelease>
rhs_thread_local_blocks_;
// After a particular shard for Kth slice missed thread local execution
// opportunity (K-1 slice didn't complete kernels execution), we can no
@ -630,12 +820,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
LhsBlock& packed_lhs(Index m, Index k, Index m1, bool use_thread_local) {
if (use_thread_local) {
eigen_assert(!shard_by_col_);
ThreadLocalBlocks<LhsBlock>& blocks = lhs_thread_local_blocks_.local();
Index base_idx = gm_ * device_.currentThreadId();
Index grain_idx = m1 - m * gm_;
Index block_idx = base_idx + grain_idx;
return thread_local_packed_lhs_[block_idx];
Index grain_index = m1 - m * gm_;
return blocks.block(grain_index);
} else {
return packed_lhs_[k % (P - 1)][m1];
}
@ -644,12 +832,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
RhsBlock& packed_rhs(Index n, Index k, Index n1, bool use_thread_local) {
if (use_thread_local) {
eigen_assert(shard_by_col_);
ThreadLocalBlocks<RhsBlock>& blocks = rhs_thread_local_blocks_.local();
Index base_idx = gn_ * device_.currentThreadId();
Index grain_idx = n1 - n * gn_;
Index block_idx = base_idx + grain_idx;
return thread_local_packed_rhs_[block_idx];
Index grain_index = n1 - n * gn_;
return blocks.block(grain_index);
} else {
return packed_rhs_[k % (P - 1)][n1];
}
@ -877,11 +1063,11 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
// to the execution of the first kernel of the k+1 slice, before
// completing a call to the last kernel of the k slice.
// (2) all pack tasks for sharded dim must be executed in a thread
// pool.
// pool to get pre-allocated thead local buffers.
bool pack_async =
(start == 0) &&
(parallelize_by_sharding_dim_only_&& shard_by_col_ == rhs) &&
(k > 0 || device_.currentThreadId() < 0);
(k > 0 || std::this_thread::get_id() == created_by_thread_id_);
if (pack_async) {
device_.enqueueNoNotification(