Asynchronous parallelFor in Eigen ThreadPoolDevice

This commit is contained in:
Eugene Zhulenev 2019-08-22 10:50:51 -07:00
parent 2fb24384c9
commit 6901788013

View File

@ -75,9 +75,9 @@ struct ThreadPoolDevice {
EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const {
deallocate(buffer);
}
template<typename Type>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const {
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const {
return data;
}
@ -181,44 +181,173 @@ struct ThreadPoolDevice {
return pool_->CurrentThreadId();
}
// parallelFor executes f with [0, n) arguments in parallel and waits for
// completion. F accepts a half-open interval [first, last).
// Block size is chosen based on the iteration cost and resulting parallel
// WARNING: This function is synchronous and will block the calling thread.
//
// Synchronous parallelFor executes f with [0, n) arguments in parallel and
// waits for completion. F accepts a half-open interval [first, last). Block
// size is chosen based on the iteration cost and resulting parallel
// efficiency. If block_align is not nullptr, it is called to round up the
// block size.
void parallelFor(Index n, const TensorOpCost& cost,
std::function<Index(Index)> block_align,
std::function<void(Index, Index)> f) const {
typedef TensorCostModel<ThreadPoolDevice> CostModel;
// Compute small problems directly in the caller thread.
if (n <= 1 || numThreads() == 1 ||
CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
f(0, n);
return;
}
// Calculate block size based on (1) the iteration cost and (2) parallel
// efficiency. We want blocks to be not too small to mitigate
// parallelization overheads; not too large to mitigate tail
// effect and potential load imbalance and we also want number
// of blocks to be evenly dividable across threads.
// Compute block size and total count of blocks.
ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
double block_size_f = 1.0 / CostModel::taskSize(1, cost);
// Recursively divide size into halves until we reach block_size.
// Division code rounds mid to block_size, so we are guaranteed to get
// block_count leaves that do actual computations.
Barrier barrier(static_cast<unsigned int>(block.count));
std::function<void(Index, Index)> handleRange;
handleRange = [=, &handleRange, &barrier, &f](Index firstIdx,
Index lastIdx) {
while (lastIdx - firstIdx > block.size) {
// Split into halves and schedule the second half on a different thread.
const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
lastIdx = midIdx;
}
// Single block or less, execute directly.
f(firstIdx, lastIdx);
barrier.Notify();
};
if (block.count <= numThreads()) {
// Avoid a thread hop by running the root of the tree and one block on the
// main thread.
handleRange(0, n);
} else {
// Execute the root in the thread pool to avoid running work on more than
// numThreads() threads.
pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
}
barrier.Wait();
}
// Convenience wrapper for parallelFor that does not align blocks.
void parallelFor(Index n, const TensorOpCost& cost,
std::function<void(Index, Index)> f) const {
parallelFor(n, cost, NULL, std::move(f));
}
// WARNING: This function is asynchronous and will not block the calling thread.
//
// Asynchronous parallelFor executes f with [0, n) arguments in parallel
// without waiting for completion. When the last block finished, it will call
// 'done' callback. F accepts a half-open interval [first, last). Block size
// is chosen based on the iteration cost and resulting parallel efficiency. If
// block_align is not nullptr, it is called to round up the block size.
void parallelForAsync(Index n, const TensorOpCost& cost,
std::function<Index(Index)> block_align,
std::function<void(Index, Index)> f,
std::function<void()> done) const {
// Compute block size and total count of blocks.
ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
ParallelForAsyncContext* const ctx =
new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
// Recursively divide size into halves until we reach block_size.
// Division code rounds mid to block_size, so we are guaranteed to get
// block_count leaves that do actual computations.
ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
while (lastIdx - firstIdx > block.size) {
// Split into halves and schedule the second half on a different thread.
const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
pool_->Schedule(
[ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
lastIdx = midIdx;
}
// Single block or less, execute directly.
ctx->f(firstIdx, lastIdx);
// Call 'done' callback if it was the last block.
if (ctx->count.fetch_sub(1) == 1) {
(ctx->done)();
// We can't delete ctx right now, because it will deallocate the closure
// we are currently in.
pool_->Schedule([ctx]() { delete ctx; });
}
};
// Execute the root in the thread pool.
pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
}
// Convenience wrapper for parallelForAsync that does not align blocks.
void parallelForAsync(Index n, const TensorOpCost& cost,
std::function<void(Index, Index)> f,
std::function<void()> done) const {
parallelForAsync(n, cost, NULL, std::move(f), std::move(done));
}
// Thread pool accessor.
ThreadPoolInterface* getPool() const { return pool_; }
// Allocator accessor.
Allocator* allocator() const { return allocator_; }
private:
typedef TensorCostModel<ThreadPoolDevice> CostModel;
// For parallelForAsync we must keep passed in closures on the heap, and
// delete them only after `done` callback finished.
struct ParallelForAsyncContext {
ParallelForAsyncContext(Index count, std::function<void(Index, Index)> f,
std::function<void()> done)
: count(count), f(std::move(f)), done(std::move(done)) {}
std::atomic<Index> count;
std::function<void(Index, Index)> f;
std::function<void()> done;
std::function<void(Index, Index)> handle_range;
};
struct ParallelForBlock {
Index size; // block size
Index count; // number of blocks
};
// Calculates block size based on (1) the iteration cost and (2) parallel
// efficiency. We want blocks to be not too small to mitigate parallelization
// overheads; not too large to mitigate tail effect and potential load
// imbalance and we also want number of blocks to be evenly dividable across
// threads.
ParallelForBlock CalculateParallelForBlock(
const Index n, const TensorOpCost& cost,
std::function<Index(Index)> block_align) const {
const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
const Index max_oversharding_factor = 4;
Index block_size = numext::mini(
n, numext::maxi<Index>(divup<Index>(n, max_oversharding_factor * numThreads()),
block_size_f));
n, numext::maxi<Index>(
divup<Index>(n, max_oversharding_factor * numThreads()),
block_size_f));
const Index max_block_size = numext::mini(n, 2 * block_size);
if (block_align) {
Index new_block_size = block_align(block_size);
eigen_assert(new_block_size >= block_size);
block_size = numext::mini(n, new_block_size);
}
Index block_count = divup(n, block_size);
// Calculate parallel efficiency as fraction of total CPU time used for
// computations:
double max_efficiency =
static_cast<double>(block_count) /
(divup<int>(block_count, numThreads()) * numThreads());
// Now try to increase block size up to max_block_size as long as it
// doesn't decrease parallel efficiency.
for (Index prev_block_count = block_count;
@ -251,47 +380,9 @@ struct ThreadPoolDevice {
}
}
// Recursively divide size into halves until we reach block_size.
// Division code rounds mid to block_size, so we are guaranteed to get
// block_count leaves that do actual computations.
Barrier barrier(static_cast<unsigned int>(block_count));
std::function<void(Index, Index)> handleRange;
handleRange = [=, &handleRange, &barrier, &f](Index firstIdx, Index lastIdx) {
while (lastIdx - firstIdx > block_size) {
// Split into halves and schedule the second half on a different thread.
const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block_size) * block_size;
pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
lastIdx = midIdx;
}
// Single block or less, execute directly.
f(firstIdx, lastIdx);
barrier.Notify();
};
if (block_count <= numThreads()) {
// Avoid a thread hop by running the root of the tree and one block on the
// main thread.
handleRange(0, n);
} else {
// Execute the root in the thread pool to avoid running work on more than
// numThreads() threads.
pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
}
barrier.Wait();
return {block_size, block_count};
}
// Convenience wrapper for parallelFor that does not align blocks.
void parallelFor(Index n, const TensorOpCost& cost,
std::function<void(Index, Index)> f) const {
parallelFor(n, cost, NULL, std::move(f));
}
// Thread pool accessor.
ThreadPoolInterface* getPool() const { return pool_; }
// Allocator accessor.
Allocator* allocator() const { return allocator_; }
private:
ThreadPoolInterface* pool_;
int num_threads_;
Allocator* allocator_;