Parallelize tensor contraction only by sharding dimension and use 'thread-local' memory for packing

This commit is contained in:
Eugene Zhulenev 2019-02-04 10:43:16 -08:00
parent 871e2e5339
commit eb21bab769
2 changed files with 212 additions and 17 deletions

View File

@ -335,6 +335,47 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
mem += rhs_size; mem += rhs_size;
} }
} }
// If there is enough available parallelism in sharding dimension we can
// call kernels in sync mode and use thread local memory for packed data.
const Index sharding_dim_tasks = shard_by_col ? nn : nm;
if (!parallel_pack_ && sharding_dim_tasks >= device_.numThreadsInPool()) {
parallelize_by_sharding_dim_only_ = true;
int num_worker_threads = device_.numThreadsInPool();
if (shard_by_col) {
can_use_thread_local_packed_ = new std::atomic<bool>[nn_];
for (int i = 0; i < nn_; ++i)
can_use_thread_local_packed_[i].store(true,
std::memory_order_relaxed);
Index num_blocks = num_worker_threads * gn_;
thread_local_packed_mem_ = device_.allocate(num_blocks * rhs_size);
mem = static_cast<char*>(thread_local_packed_mem_);
thread_local_packed_rhs_.resize(num_blocks, nullptr);
for (Index i = 0; i < num_blocks; ++i) {
thread_local_packed_rhs_[i] = reinterpret_cast<RhsScalar*>(mem);
mem += rhs_size;
}
} else {
can_use_thread_local_packed_ = new std::atomic<bool>[nm_];
for (int i = 0; i < nm_; ++i)
can_use_thread_local_packed_[i].store(true,
std::memory_order_relaxed);
Index num_blocks = num_worker_threads * gm_;
thread_local_packed_mem_ = device_.allocate(num_blocks * lhs_size);
mem = static_cast<char*>(thread_local_packed_mem_);
thread_local_packed_lhs_.resize(num_blocks, nullptr);
for (Index i = 0; i < num_blocks; ++i) {
thread_local_packed_lhs_[i] = reinterpret_cast<LhsScalar*>(mem);
mem += lhs_size;
}
}
}
} }
~Context() { ~Context() {
@ -343,6 +384,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
delete[] state_kernel_[x]; delete[] state_kernel_[x];
} }
device_.deallocate(packed_mem_); device_.deallocate(packed_mem_);
if (parallelize_by_sharding_dim_only_) {
device_.deallocate(thread_local_packed_mem_);
delete[] can_use_thread_local_packed_;
}
} }
void run() { void run() {
@ -426,6 +471,42 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
void* packed_mem_; void* packed_mem_;
std::vector<LhsScalar*> packed_lhs_[P - 1]; std::vector<LhsScalar*> packed_lhs_[P - 1];
std::vector<RhsScalar*> packed_rhs_[P - 1]; std::vector<RhsScalar*> packed_rhs_[P - 1];
// If there is enough concurrency in the sharding dimension, we choose not
// to paralellize by the other dimension, and execute all kernels in sync
// mode. This reduces parallelism from the nm_ x nn_ down to nn_
// (shard_by_col==true) or nm_ (shard_by_col==false).
bool parallelize_by_sharding_dim_only_ = false;
// If we choose to parallelize only by the sharding dimension, each thread
// will have it's own "thead local" (not a c++ thread local storage) memory
// for packed_lhs or packed_rhs (shard_by_col = false of true). This memory
// can't be passed to a kernel that might execute on a different thread.
//
// In practice when we are ready to pack memory for the sharding dimension
// (rhs if shard_by_col==true) of the K-th slice, all kernels for K-1 slice
// already computed (99% of the time), and we can pack data into the thread
// local storage, and guarantee that all the kernels will be executed
// immediately in the same thread. This significantly increases L1 cache hit
// ratio and reduces pressure on the memory bus.
//
// It's still possible that kernel for the K-th slice will be ready before
// completion of the K-1 kernel, so we have to allocate "global" packed_lhs_
// and packed_rhs_ to allow kernels to be executed later on a thread
// different from the thread that was used for packing.
void* thread_local_packed_mem_;
// Only one of these will beinitialized depending on shard_by_col value.
std::vector<LhsScalar*> thread_local_packed_lhs_;
std::vector<RhsScalar*> thread_local_packed_rhs_;
// After a particular shard for Kth slice missed thread local execution
// opportunity (K-1 slice didn't complete kernels execution), we can no
// longer schedule K+1 and following slices in thread local mode, because
// there is no more guarantee that previous kernels were executed
// sequentially in the same thread (size is nn_ or nm_).
std::atomic<bool>* can_use_thread_local_packed_;
std::atomic<uint8_t>** state_kernel_[P]; std::atomic<uint8_t>** state_kernel_[P];
// state_switch_ is frequently modified by worker threads, while other // state_switch_ is frequently modified by worker threads, while other
// fields are read-only after constructor. Let's move it to a separate cache // fields are read-only after constructor. Let's move it to a separate cache
@ -434,22 +515,96 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
std::atomic<Index> state_packing_ready_[P]; std::atomic<Index> state_packing_ready_[P];
std::atomic<Index> state_switch_[P]; std::atomic<Index> state_switch_[P];
LhsScalar* packed_lhs(Index m, Index k, Index m1, bool use_thread_local) {
if (use_thread_local) {
eigen_assert(!shard_by_col_);
Index base_idx = gm_ * device_.currentThreadId();
Index grain_idx = m1 - m * gm_;
Index block_idx = base_idx + grain_idx;
return thread_local_packed_lhs_[block_idx];
} else {
return packed_lhs_[k % (P - 1)][m1];
}
}
RhsScalar* packed_rhs(Index n, Index k, Index n1, bool use_thread_local) {
if (use_thread_local) {
eigen_assert(shard_by_col_);
Index base_idx = gn_ * device_.currentThreadId();
Index grain_idx = n1 - n * gn_;
Index block_idx = base_idx + grain_idx;
return thread_local_packed_rhs_[block_idx];
} else {
return packed_rhs_[k % (P - 1)][n1];
}
}
// In following two methods (pack_lhs and pack_rhs), if we know for sure
// that we'll be able to immediately call a kernel with packed data, and do
// not submit it to the thread pool, we can use thread local memory for
// packed data.
//
// We can only reliably check it if we are running all kernels in sync mode
// (parallelize only by sharding dim). If kernel for m==0 (n==0) is ready to
// run, it's guaranteed that all kernels with larger values of m (n) are
// also ready, because we execute them in the same order for all K slices.
void pack_lhs(Index m, Index k) { void pack_lhs(Index m, Index k) {
bool use_thread_local = false;
if (parallelize_by_sharding_dim_only_ && !shard_by_col_ &&
can_use_thread_local_packed_[m].load(std::memory_order_relaxed)) {
if (state_kernel_[k % P][m][0].load(std::memory_order_relaxed) == 1) {
use_thread_local = true;
} else {
// If we can't guarantee that all kernels in `k` slice will be
// executed sequentially in current thread, it's no longer safe to use
// thread local memory in followig slices along the k dimensions.
eigen_assert(k > 0);
can_use_thread_local_packed_[m].store(false,
std::memory_order_relaxed);
}
}
const Index mend = m * gm_ + gm(m); const Index mend = m * gm_ + gm(m);
for (Index m1 = m * gm_; m1 < mend; m1++) for (Index m1 = m * gm_; m1 < mend; m1++)
TensorContractionKernel::packLhs(packed_lhs_[k % (P - 1)][m1], TensorContractionKernel::packLhs(packed_lhs(m, k, m1, use_thread_local),
lhs_.getSubMapper(m1 * bm_, k * bk_), lhs_.getSubMapper(m1 * bm_, k * bk_),
bk(k), bm(m1)); bk(k), bm(m1));
if (!parallel_pack_ && shard_by_col_) { if (!parallel_pack_ && shard_by_col_) {
assert(!use_thread_local);
signal_packing(k); signal_packing(k);
} else { } else {
signal_switch(k + 1); signal_switch(k + 1);
for (Index n = nn_ - 1; n >= 0; n--) signal_kernel(m, n, k, n == 0); for (Index n = nn_ - 1; n >= 0; n--) {
bool sync = parallelize_by_sharding_dim_only_ || n == 0;
signal_kernel(m, n, k, sync, use_thread_local);
}
} }
} }
void pack_rhs(Index n, Index k) { void pack_rhs(Index n, Index k) {
bool use_thread_local = false;
if (parallelize_by_sharding_dim_only_ && shard_by_col_ &&
can_use_thread_local_packed_[n].load(std::memory_order_relaxed)) {
if (state_kernel_[k % P][0][n].load(std::memory_order_relaxed) == 1) {
use_thread_local = true;
} else {
// If we can't guarantee that all kernels in `k` slice will be
// executed sequentially in current thread, it's no longer safe to use
// thread local memory in followig slices along the k dimensions.
eigen_assert(k > 0);
can_use_thread_local_packed_[n].store(false,
std::memory_order_relaxed);
}
}
const Index nend = n * gn_ + gn(n); const Index nend = n * gn_ + gn(n);
for (Index n1 = n * gn_; n1 < nend; n1++) { for (Index n1 = n * gn_; n1 < nend; n1++) {
if (k == 0) { if (k == 0) {
@ -462,20 +617,24 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
// deadlocks. // deadlocks.
memset(buffer_ + n1 * bn_ * m_, 0, bn(n1) * m_ * sizeof(Scalar)); memset(buffer_ + n1 * bn_ * m_, 0, bn(n1) * m_ * sizeof(Scalar));
} }
TensorContractionKernel::packRhs(packed_rhs_[k % (P - 1)][n1], TensorContractionKernel::packRhs(packed_rhs(n, k, n1, use_thread_local),
rhs_.getSubMapper(k * bk_, n1 * bn_), rhs_.getSubMapper(k * bk_, n1 * bn_),
bk(k), bn(n1)); bk(k), bn(n1));
} }
if (parallel_pack_ || shard_by_col_) { if (parallel_pack_ || shard_by_col_) {
signal_switch(k + 1); signal_switch(k + 1);
for (Index m = nm_ - 1; m >= 0; m--) signal_kernel(m, n, k, m == 0); for (Index m = nm_ - 1; m >= 0; m--) {
bool sync = parallelize_by_sharding_dim_only_ || m == 0;
signal_kernel(m, n, k, sync, use_thread_local);
}
} else { } else {
assert(!use_thread_local);
signal_packing(k); signal_packing(k);
} }
} }
void kernel(Index m, Index n, Index k) { void kernel(Index m, Index n, Index k, bool use_thread_local) {
// Note: order of iteration matters here. Iteration over m is innermost // Note: order of iteration matters here. Iteration over m is innermost
// because we want to reuse the same packed rhs in consecutive tasks // because we want to reuse the same packed rhs in consecutive tasks
// (rhs fits into L2$ while lhs only into L3$). // (rhs fits into L2$ while lhs only into L3$).
@ -486,8 +645,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
for (Index m1 = m * gm_; m1 < mend; m1++) { for (Index m1 = m * gm_; m1 < mend; m1++) {
const auto output_mapper = output_.getSubMapper(m1 * bm_, n1 * bn_); const auto output_mapper = output_.getSubMapper(m1 * bm_, n1 * bn_);
TensorContractionKernel::invoke( TensorContractionKernel::invoke(
output_mapper, packed_lhs_[k % (P - 1)][m1], output_mapper,
packed_rhs_[k % (P - 1)][n1], bm(m1), bk(k), bn(n1), Scalar(1)); packed_lhs(m, k, m1, !shard_by_col_ && use_thread_local),
packed_rhs(n, k, n1, shard_by_col_ && use_thread_local), bm(m1),
bk(k), bn(n1), Scalar(1));
// We are done with the last task for the [m1, n1] block. // We are done with the last task for the [m1, n1] block.
if (k + 1 == nk_) { if (k + 1 == nk_) {
@ -501,8 +662,10 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
for (Index n1 = n * gn_; n1 < nend; n1++) { for (Index n1 = n * gn_; n1 < nend; n1++) {
const auto output_mapper = output_.getSubMapper(m1 * bm_, n1 * bn_); const auto output_mapper = output_.getSubMapper(m1 * bm_, n1 * bn_);
TensorContractionKernel::invoke( TensorContractionKernel::invoke(
output_mapper, packed_lhs_[k % (P - 1)][m1], output_mapper,
packed_rhs_[k % (P - 1)][n1], bm(m1), bk(k), bn(n1), Scalar(1)); packed_lhs(m, k, m1, !shard_by_col_ && use_thread_local),
packed_rhs(n, k, n1, shard_by_col_ && use_thread_local), bm(m1),
bk(k), bn(n1), Scalar(1));
// We are done with the last task for the [m1, n1] block. // We are done with the last task for the [m1, n1] block.
if (k + 1 == nk_) { if (k + 1 == nk_) {
@ -511,7 +674,7 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
} }
} }
} }
signal_kernel(m, n, k + 1, false); signal_kernel(m, n, k + 1, /*sync=*/false, /*use_thread_local=*/false);
signal_switch(k + 2); signal_switch(k + 2);
} }
@ -524,16 +687,23 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
enqueue_packing(k, shard_by_col_); enqueue_packing(k, shard_by_col_);
} }
void signal_kernel(Index m, Index n, Index k, bool sync) { void signal_kernel(Index m, Index n, Index k, bool sync,
bool use_thread_local) {
std::atomic<uint8_t>* state = &state_kernel_[k % P][m][n]; std::atomic<uint8_t>* state = &state_kernel_[k % P][m][n];
Index s = state->load(); Index s = state->load();
eigen_assert(s > 0); eigen_assert(s > 0);
if (s != 1 && state->fetch_sub(1) != 1) return; if (s != 1 && state->fetch_sub(1) != 1) {
eigen_assert(!use_thread_local);
return;
}
state->store(parallel_pack_ ? 3 : 2, std::memory_order_relaxed); state->store(parallel_pack_ ? 3 : 2, std::memory_order_relaxed);
if (sync) if (sync) {
kernel(m, n, k); kernel(m, n, k, use_thread_local);
else } else {
device_.enqueueNoNotification([=]() { kernel(m, n, k); }); eigen_assert(!use_thread_local);
device_.enqueueNoNotification(
[=]() { kernel(m, n, k, use_thread_local); });
}
} }
void signal_switch(Index k, Index v = 1) { void signal_switch(Index k, Index v = 1) {
@ -589,7 +759,26 @@ struct TensorEvaluator<const TensorContractionOp<Indices, LeftArgType, RightArgT
[=]() { enqueue_packing_helper(mid, end, k, rhs); }); [=]() { enqueue_packing_helper(mid, end, k, rhs); });
end = mid; end = mid;
} }
enqueue_packing_helper(start, end, k, rhs);
// Decide if we want to run first packing task (start == 0) in
// async mode if we parallelize only by sharding dim:
// (1) pack_lhs and pack_rhs call signal_switch before completing
// all calls to signal_kernel, which in sync mode might lead
// to the execution of the first kernel of the k+1 slice, before
// completing a call to the last kernel of the k slice.
// (2) all pack tasks for sharded dim must be executed in a thread
// pool.
bool pack_async =
(start == 0) &&
(parallelize_by_sharding_dim_only_&& shard_by_col_ == rhs) &&
(k > 0 || device_.currentThreadId() < 0);
if (pack_async) {
device_.enqueueNoNotification(
[=]() { enqueue_packing_helper(start, end, k, rhs); });
} else {
enqueue_packing_helper(start, end, k, rhs);
}
} }
} }

View File

@ -122,6 +122,12 @@ struct ThreadPoolDevice {
return num_threads_; return num_threads_;
} }
// Number of theads available in the underlying thread pool. This number can
// be different from the value returned by numThreads().
EIGEN_STRONG_INLINE int numThreadsInPool() const {
return pool_->NumThreads();
}
EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const { EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const {
return l1CacheSize(); return l1CacheSize();
} }