Add block evaluation V2 to TensorAsyncExecutor.

Add async evaluation to a number of ops.
This commit is contained in:
Rasmus Munk Larsen 2019-10-22 12:42:44 -07:00
parent 668ab3fc47
commit 97c0c5d485
9 changed files with 226 additions and 66 deletions

View File

@ -1129,16 +1129,11 @@ class TensorBase : public TensorBase<Derived, ReadOnlyAccessors> {
return TensorDevice<Derived, DeviceType>(dev, derived());
}
#ifdef EIGEN_USE_THREADS
// Select the async device on which to evaluate the expression.
template <typename DeviceType, typename DoneCallback>
typename internal::enable_if<
internal::is_same<DeviceType, ThreadPoolDevice>::value,
TensorAsyncDevice<Derived, DeviceType, DoneCallback>>::type
device(const DeviceType& dev, DoneCallback done) {
TensorAsyncDevice<Derived, DeviceType, DoneCallback> device(const DeviceType& dev, DoneCallback done) {
return TensorAsyncDevice<Derived, DeviceType, DoneCallback>(dev, derived(), std::move(done));
}
#endif // EIGEN_USE_THREADS
protected:
EIGEN_DEVICE_FUNC

View File

@ -63,18 +63,18 @@ template <typename ExpressionType, typename DeviceType> class TensorDevice {
ExpressionType& m_expression;
};
#ifdef EIGEN_USE_THREADS
/** \class TensorAsyncDevice
* \ingroup CXX11_Tensor_Module
*
* \brief Pseudo expression providing an operator = that will evaluate its
* argument asynchronously on the specified device (currently supports only
* ThreadPoolDevice).
*
* Example:
* auto done = []() { ... expression evaluation done ... };
* C.device(EIGEN_THREAD_POOL, std::move(done)) = A + B;
* \ingroup CXX11_Tensor_Module
*
* \brief Pseudo expression providing an operator = that will evaluate its
* argument asynchronously on the specified device. Currently only
* ThreadPoolDevice implements proper asynchronous execution, while the default
* and GPU devices just run the expression synchronously and call m_done() on
* completion..
*
* Example:
* auto done = []() { ... expression evaluation done ... };
* C.device(thread_pool_device, std::move(done)) = A + B;
*/
template <typename ExpressionType, typename DeviceType, typename DoneCallback>
@ -87,11 +87,11 @@ class TensorAsyncDevice {
template <typename OtherDerived>
EIGEN_STRONG_INLINE TensorAsyncDevice& operator=(const OtherDerived& other) {
typedef TensorAssignOp<ExpressionType, const OtherDerived> Assign;
typedef internal::TensorAsyncExecutor<const Assign, DeviceType, DoneCallback> Executor;
typedef internal::TensorExecutor<const Assign, DeviceType> Executor;
// WARNING: After assignment 'm_done' callback will be in undefined state.
Assign assign(m_expression, other);
Executor::runAsync(assign, m_device, std::move(m_done));
Executor::run(assign, m_device);
m_done();
return *this;
}
@ -102,7 +102,33 @@ class TensorAsyncDevice {
DoneCallback m_done;
};
#endif // EIGEN_USE_THREADS
#ifdef EIGEN_USE_THREADS
template <typename ExpressionType, typename DoneCallback>
class TensorAsyncDevice<ExpressionType, ThreadPoolDevice, DoneCallback> {
public:
TensorAsyncDevice(const ThreadPoolDevice& device, ExpressionType& expression,
DoneCallback done)
: m_device(device), m_expression(expression), m_done(std::move(done)) {}
template <typename OtherDerived>
EIGEN_STRONG_INLINE TensorAsyncDevice& operator=(const OtherDerived& other) {
typedef TensorAssignOp<ExpressionType, const OtherDerived> Assign;
typedef internal::TensorAsyncExecutor<const Assign, ThreadPoolDevice, DoneCallback> Executor;
// WARNING: After assignment 'm_done' callback will be in undefined state.
Assign assign(m_expression, other);
Executor::runAsync(assign, m_device, std::move(m_done));
return *this;
}
protected:
const ThreadPoolDevice& m_device;
ExpressionType& m_expression;
DoneCallback m_done;
};
#endif
} // end namespace Eigen

View File

@ -151,6 +151,16 @@ struct TensorEvaluator<const TensorEvalToOp<ArgType, MakePointer_>, Device>
return m_impl.evalSubExprsIfNeeded(m_buffer);
}
#ifdef EIGEN_USE_THREADS
template <typename EvalSubExprsCallback>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync(
EvaluatorPointerType scalar, EvalSubExprsCallback done) {
EIGEN_UNUSED_VARIABLE(scalar);
eigen_assert(scalar == NULL);
m_impl.evalSubExprsIfNeededAsync(m_buffer, std::move(done));
}
#endif
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalScalar(Index i) {
m_buffer[i] = m_impl.coeff(i);
}

View File

@ -102,7 +102,7 @@ class TensorExecutor {
* available for ThreadPoolDevice (see definition below).
*/
template <typename Expression, typename Device, typename DoneCallback,
bool Vectorizable, bool Tileable>
bool Vectorizable, TiledEvaluation Tiling>
class TensorAsyncExecutor {};
/**
@ -544,9 +544,9 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable,
};
template <typename Expression, typename DoneCallback, bool Vectorizable,
bool Tileable>
TiledEvaluation Tiling>
class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback,
Vectorizable, Tileable> {
Vectorizable, Tiling> {
public:
typedef typename Expression::Index StorageIndex;
typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
@ -598,7 +598,7 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback,
template <typename Expression, typename DoneCallback, bool Vectorizable>
class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback,
Vectorizable, /*Tileable*/ true> {
Vectorizable, /*Tileable*/ TiledEvaluation::Legacy> {
public:
typedef typename traits<Expression>::Index StorageIndex;
typedef typename traits<Expression>::Scalar Scalar;
@ -607,7 +607,9 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback,
static const int NumDims = traits<Expression>::NumDimensions;
typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> BlockMapper;
typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims,
Evaluator::Layout>
BlockMapper;
typedef TensorExecutorTilingContext<BlockMapper> TilingContext;
static EIGEN_STRONG_INLINE void runAsync(const Expression& expr,
@ -624,7 +626,7 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback,
auto delete_ctx = [ctx]() { delete ctx; };
internal::TensorAsyncExecutor<
Expression, ThreadPoolDevice, decltype(delete_ctx), Vectorizable,
/*Tileable*/ false>::runAsync(expr, device, std::move(delete_ctx));
/*Tileable*/ TiledEvaluation::Off>::runAsync(expr, device, std::move(delete_ctx));
return;
}
@ -635,22 +637,102 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback,
}
ctx->tiling =
GetTensorExecutorTilingContext<Evaluator, BlockMapper,
Vectorizable>(device, ctx->evaluator);
GetTensorExecutorTilingContext<Evaluator, BlockMapper, Vectorizable>(
device, ctx->evaluator);
device.parallelForAsync(
ctx->tiling.block_mapper.total_block_count(), ctx->tiling.cost,
[ctx](StorageIndex firstIdx, StorageIndex lastIdx) {
ScalarNoConst* thread_buf =
ctx->tiling.template GetCurrentThreadBuffer<ScalarNoConst>(
ctx->device);
for (StorageIndex i = firstIdx; i < lastIdx; ++i) {
auto block =
ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf);
ctx->evaluator.evalBlock(&block);
}
},
[ctx]() { delete ctx; });
auto eval_block = [ctx](StorageIndex firstIdx, StorageIndex lastIdx) {
ScalarNoConst* thread_buf =
ctx->tiling.template GetCurrentThreadBuffer<ScalarNoConst>(
ctx->device);
for (StorageIndex i = firstIdx; i < lastIdx; ++i) {
auto block = ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf);
ctx->evaluator.evalBlock(&block);
}
};
device.parallelForAsync(ctx->tiling.block_mapper.total_block_count(),
ctx->tiling.cost, eval_block,
[ctx]() { delete ctx; });
};
ctx->evaluator.evalSubExprsIfNeededAsync(nullptr, on_eval_subexprs);
}
private:
struct TensorAsyncExecutorContext {
TensorAsyncExecutorContext(const Expression& expr,
const ThreadPoolDevice& thread_pool,
DoneCallback done)
: device(thread_pool),
evaluator(expr, thread_pool),
on_done(std::move(done)) {}
~TensorAsyncExecutorContext() {
on_done();
device.deallocate(tiling.buffer);
evaluator.cleanup();
}
const ThreadPoolDevice& device;
Evaluator evaluator;
TilingContext tiling;
private:
DoneCallback on_done;
};
};
template <typename Expression, typename DoneCallback, bool Vectorizable>
class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback,
Vectorizable, /*Tileable*/ TiledEvaluation::On> {
public:
typedef typename traits<Expression>::Index IndexType;
typedef typename traits<Expression>::Scalar Scalar;
typedef typename remove_const<Scalar>::type ScalarNoConst;
static const int NumDims = traits<Expression>::NumDimensions;
typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
typedef TensorBlockMapper<ScalarNoConst, IndexType, NumDims,
Evaluator::Layout>
BlockMapper;
typedef TensorExecutorTilingContext<BlockMapper> TilingContext;
typedef internal::TensorBlockDescriptor<NumDims, IndexType> TensorBlockDesc;
typedef internal::TensorBlockScratchAllocator<ThreadPoolDevice>
TensorBlockScratch;
static EIGEN_STRONG_INLINE void runAsync(const Expression& expr,
const ThreadPoolDevice& device,
DoneCallback done) {
TensorAsyncExecutorContext* const ctx =
new TensorAsyncExecutorContext(expr, device, std::move(done));
const auto on_eval_subexprs = [ctx](bool need_assign) -> void {
if (!need_assign) {
delete ctx;
return;
}
ctx->tiling =
internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper,
Vectorizable>(
ctx->device, ctx->evaluator, /*allocate_buffer=*/false);
auto eval_block = [ctx](IndexType firstBlockIdx, IndexType lastBlockIdx) {
TensorBlockScratch scratch(ctx->device);
for (IndexType block_idx = firstBlockIdx; block_idx < lastBlockIdx;
++block_idx) {
auto block =
ctx->tiling.block_mapper.GetBlockForIndex(block_idx, nullptr);
TensorBlockDesc desc(block.first_coeff_index(), block.block_sizes());
ctx->evaluator.evalBlockV2(desc, scratch);
scratch.reset();
}
};
ctx->device.parallelForAsync(ctx->tiling.block_mapper.total_block_count(),
ctx->tiling.cost, eval_block, [ctx]() { delete ctx; });
};
ctx->evaluator.evalSubExprsIfNeededAsync(nullptr, on_eval_subexprs);
@ -682,7 +764,6 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback,
#endif // EIGEN_USE_THREADS
// GPU: the evaluation of the expression is offloaded to a GPU.
#if defined(EIGEN_USE_GPU)

View File

@ -132,14 +132,6 @@ struct TensorEvaluator<const TensorForcedEvalOp<ArgType_>, Device>
EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(EvaluatorPointerType) {
const Index numValues = internal::array_prod(m_impl.dimensions());
m_buffer = m_device.get((CoeffReturnType*)m_device.allocate_temp(numValues * sizeof(CoeffReturnType)));
#ifndef EIGEN_USE_SYCL
// Should initialize the memory in case we're dealing with non POD types.
if (NumTraits<CoeffReturnType>::RequireInitialization) {
for (Index i = 0; i < numValues; ++i) {
new(m_buffer+i) CoeffReturnType();
}
}
#endif
typedef TensorEvalToOp< const typename internal::remove_const<ArgType>::type > EvalTo;
EvalTo evalToTmp(m_device.get(m_buffer), m_op);
@ -151,6 +143,29 @@ struct TensorEvaluator<const TensorForcedEvalOp<ArgType_>, Device>
return true;
}
#ifdef EIGEN_USE_THREADS
template <typename EvalSubExprsCallback>
EIGEN_STRONG_INLINE EIGEN_DEVICE_FUNC void evalSubExprsIfNeededAsync(
EvaluatorPointerType, EvalSubExprsCallback done) {
const Index numValues = internal::array_prod(m_impl.dimensions());
m_buffer = m_device.get((CoeffReturnType*)m_device.allocate_temp(
numValues * sizeof(CoeffReturnType)));
typedef TensorEvalToOp<const typename internal::remove_const<ArgType>::type>
EvalTo;
EvalTo evalToTmp(m_device.get(m_buffer), m_op);
auto on_done = std::bind([](EvalSubExprsCallback done) { done(true); },
std::move(done));
internal::TensorAsyncExecutor<
const EvalTo, typename internal::remove_const<Device>::type,
decltype(on_done),
/*Vectorizable=*/internal::IsVectorizable<Device, const ArgType>::value,
/*Tiling=*/internal::IsTileable<Device, const ArgType>::value>::
runAsync(evalToTmp, m_device, std::move(on_done));
}
#endif
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() {
m_device.deallocate_temp(m_buffer);
m_buffer = NULL;

View File

@ -185,12 +185,12 @@ template <typename Expression, typename Device,
TiledEvaluation Tiling = IsTileable<Device, Expression>::value>
class TensorExecutor;
// TODO(ezhulenev): Add TiledEvaluation support to async executor.
template <typename Expression, typename Device, typename DoneCallback,
bool Vectorizable = IsVectorizable<Device, Expression>::value,
bool Tileable = IsTileable<Device, Expression>::BlockAccess>
TiledEvaluation Tiling = IsTileable<Device, Expression>::value>
class TensorAsyncExecutor;
} // end namespace internal
} // end namespace Eigen

View File

@ -205,6 +205,14 @@ struct TensorEvaluator<const TensorReshapingOp<NewDimensions, ArgType>, Device>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE const Dimensions& dimensions() const { return m_dimensions; }
#ifdef EIGEN_USE_THREADS
template <typename EvalSubExprsCallback>
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void evalSubExprsIfNeededAsync(
EvaluatorPointerType data, EvalSubExprsCallback done) {
m_impl.evalSubExprsIfNeededAsync(data, std::move(done));
}
#endif
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE bool evalSubExprsIfNeeded(EvaluatorPointerType data) {
return m_impl.evalSubExprsIfNeeded(data);
}

View File

@ -689,15 +689,14 @@ struct TensorReductionEvaluatorBase<const TensorReductionOp<Op, Dims, ArgType, M
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE const Dimensions& dimensions() const { return m_dimensions; }
EIGEN_STRONG_INLINE
#if !defined(EIGEN_HIPCC)
// Marking this as EIGEN_DEVICE_FUNC for HIPCC requires also doing the same for all the functions
// being called within here, which then leads to proliferation of EIGEN_DEVICE_FUNC markings, one
// of which will eventually result in an NVCC error
EIGEN_DEVICE_FUNC
#endif
bool evalSubExprsIfNeeded(EvaluatorPointerType data) {
m_impl.evalSubExprsIfNeeded(NULL);
#if !defined(EIGEN_HIPCC)
// Marking this as EIGEN_DEVICE_FUNC for HIPCC requires also doing the same
// for all the functions being called within here, which then leads to
// proliferation of EIGEN_DEVICE_FUNC markings, one of which will eventually
// result in an NVCC error
EIGEN_DEVICE_FUNC
#endif
bool evalSubExprsIfNeededCommon(EvaluatorPointerType data) {
// Use the FullReducer if possible.
if ((RunningFullReduction && RunningOnSycl) ||(RunningFullReduction &&
internal::FullReducer<Self, Op, Device>::HasOptimizedImplementation &&
@ -802,6 +801,34 @@ struct TensorReductionEvaluatorBase<const TensorReductionOp<Op, Dims, ArgType, M
return true;
}
#ifdef EIGEN_USE_THREADS
template <typename EvalSubExprsCallback>
EIGEN_STRONG_INLINE
#if !defined(EIGEN_HIPCC)
EIGEN_DEVICE_FUNC
#endif
void
evalSubExprsIfNeededAsync(EvaluatorPointerType data,
EvalSubExprsCallback done) {
m_impl.evalSubExprsIfNeededAsync(NULL, [this, data, done](bool) {
done(evalSubExprsIfNeededCommon(data));
});
}
#endif
EIGEN_STRONG_INLINE
#if !defined(EIGEN_HIPCC)
// Marking this as EIGEN_DEVICE_FUNC for HIPCC requires also doing the same
// for all the functions being called within here, which then leads to
// proliferation of EIGEN_DEVICE_FUNC markings, one of which will eventually
// result in an NVCC error
EIGEN_DEVICE_FUNC
#endif
bool evalSubExprsIfNeeded(EvaluatorPointerType data) {
m_impl.evalSubExprsIfNeeded(NULL);
return evalSubExprsIfNeededCommon(data);
}
EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void cleanup() {
m_impl.cleanup();
if (m_result) {

View File

@ -604,11 +604,10 @@ static void test_async_execute_unary_expr(Device d)
Eigen::Barrier done(1);
auto on_done = [&done]() { done.Notify(); };
static const bool TilingOn = Tiling == TiledEvaluation::Off ? false : true;
using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>;
using DoneCallback = decltype(on_done);
using Executor = internal::TensorAsyncExecutor<const Assign, Device, DoneCallback,
Vectorizable, TilingOn>;
Vectorizable, Tiling>;
Executor::runAsync(Assign(dst, expr), d, on_done);
done.Wait();
@ -641,11 +640,10 @@ static void test_async_execute_binary_expr(Device d)
Eigen::Barrier done(1);
auto on_done = [&done]() { done.Notify(); };
static const bool TilingOn = Tiling == TiledEvaluation::Off ? false : true;
using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>;
using DoneCallback = decltype(on_done);
using Executor = internal::TensorAsyncExecutor<const Assign, Device, DoneCallback,
Vectorizable, TilingOn>;
Vectorizable, Tiling>;
Executor::runAsync(Assign(dst, expr), d, on_done);
done.Wait();