Allow move-only done callback in TensorAsyncDevice

This commit is contained in:
Eugene Zhulenev 2019-09-03 17:20:56 -07:00
parent a8d264fa9c
commit 47fefa235f
6 changed files with 70 additions and 54 deletions

View File

@ -1065,12 +1065,12 @@ class TensorBase : public TensorBase<Derived, ReadOnlyAccessors> {
#ifdef EIGEN_USE_THREADS
// Select the async device on which to evaluate the expression.
template <typename DeviceType>
template <typename DeviceType, typename DoneCallback>
typename internal::enable_if<
internal::is_same<DeviceType, ThreadPoolDevice>::value,
TensorAsyncDevice<Derived, DeviceType>>::type
device(const DeviceType& dev, std::function<void()> done) {
return TensorAsyncDevice<Derived, DeviceType>(dev, derived(), std::move(done));
TensorAsyncDevice<Derived, DeviceType, DoneCallback>>::type
device(const DeviceType& dev, DoneCallback done) {
return TensorAsyncDevice<Derived, DeviceType, DoneCallback>(dev, derived(), std::move(done));
}
#endif // EIGEN_USE_THREADS

View File

@ -73,21 +73,21 @@ template <typename ExpressionType, typename DeviceType> class TensorDevice {
* ThreadPoolDevice).
*
* Example:
* std::function<void()> done = []() {};
* auto done = []() { ... expression evaluation done ... };
* C.device(EIGEN_THREAD_POOL, std::move(done)) = A + B;
*/
template <typename ExpressionType, typename DeviceType>
template <typename ExpressionType, typename DeviceType, typename DoneCallback>
class TensorAsyncDevice {
public:
TensorAsyncDevice(const DeviceType& device, ExpressionType& expression,
std::function<void()> done)
DoneCallback done)
: m_device(device), m_expression(expression), m_done(std::move(done)) {}
template <typename OtherDerived>
EIGEN_STRONG_INLINE TensorAsyncDevice& operator=(const OtherDerived& other) {
typedef TensorAssignOp<ExpressionType, const OtherDerived> Assign;
typedef internal::TensorAsyncExecutor<const Assign, DeviceType> Executor;
typedef internal::TensorAsyncExecutor<const Assign, DeviceType, DoneCallback> Executor;
// WARNING: After assignment 'm_done' callback will be in undefined state.
Assign assign(m_expression, other);
@ -99,7 +99,7 @@ class TensorAsyncDevice {
protected:
const DeviceType& m_device;
ExpressionType& m_expression;
std::function<void()> m_done;
DoneCallback m_done;
};
#endif // EIGEN_USE_THREADS

View File

@ -101,8 +101,8 @@ class TensorExecutor {
* Default async execution strategy is not implemented. Currently it's only
* available for ThreadPoolDevice (see definition below).
*/
template <typename Expression, typename Device, bool Vectorizable,
bool Tileable>
template <typename Expression, typename Device, typename DoneCallback,
bool Vectorizable, bool Tileable>
class TensorAsyncExecutor {};
/**
@ -419,15 +419,17 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ tr
}
};
template <typename Expression, bool Vectorizable, bool Tileable>
class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> {
template <typename Expression, typename DoneCallback, bool Vectorizable,
bool Tileable>
class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback,
Vectorizable, Tileable> {
public:
typedef typename Expression::Index StorageIndex;
typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
static EIGEN_STRONG_INLINE void runAsync(const Expression& expr,
const ThreadPoolDevice& device,
std::function<void()> done) {
DoneCallback done) {
TensorAsyncExecutorContext* const ctx =
new TensorAsyncExecutorContext(expr, device, std::move(done));
@ -455,7 +457,7 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable>
struct TensorAsyncExecutorContext {
TensorAsyncExecutorContext(const Expression& expr,
const ThreadPoolDevice& thread_pool,
std::function<void()> done)
DoneCallback done)
: evaluator(expr, thread_pool), on_done(std::move(done)) {}
~TensorAsyncExecutorContext() {
@ -466,12 +468,13 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable>
Evaluator evaluator;
private:
std::function<void()> on_done;
DoneCallback on_done;
};
};
template <typename Expression, bool Vectorizable>
class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ true> {
template <typename Expression, typename DoneCallback, bool Vectorizable>
class TensorAsyncExecutor<Expression, ThreadPoolDevice, DoneCallback,
Vectorizable, /*Tileable*/ true> {
public:
typedef typename traits<Expression>::Index StorageIndex;
typedef typename traits<Expression>::Scalar Scalar;
@ -485,7 +488,7 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable
static EIGEN_STRONG_INLINE void runAsync(const Expression& expr,
const ThreadPoolDevice& device,
std::function<void()> done) {
DoneCallback done) {
TensorAsyncExecutorContext* const ctx =
new TensorAsyncExecutorContext(expr, device, std::move(done));
@ -494,9 +497,10 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable
if (total_size < cache_size &&
!ExpressionHasTensorBroadcastingOp<Expression>::value) {
internal::TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable,
/*Tileable*/ false>::runAsync(
expr, device, [ctx]() { delete ctx; });
auto delete_ctx = [ctx]() { delete ctx; };
internal::TensorAsyncExecutor<
Expression, ThreadPoolDevice, decltype(delete_ctx), Vectorizable,
/*Tileable*/ false>::runAsync(expr, device, std::move(delete_ctx));
return;
}
@ -532,7 +536,7 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable
struct TensorAsyncExecutorContext {
TensorAsyncExecutorContext(const Expression& expr,
const ThreadPoolDevice& thread_pool,
std::function<void()> done)
DoneCallback done)
: device(thread_pool),
evaluator(expr, thread_pool),
on_done(std::move(done)) {}
@ -548,7 +552,7 @@ class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable
TilingContext tiling;
private:
std::function<void()> on_done;
DoneCallback on_done;
};
};

View File

@ -94,7 +94,7 @@ template<typename XprType, template <class> class MakePointer_ = MakePointer> cl
template<typename XprType> class TensorForcedEvalOp;
template<typename ExpressionType, typename DeviceType> class TensorDevice;
template<typename ExpressionType, typename DeviceType> class TensorAsyncDevice;
template<typename ExpressionType, typename DeviceType, typename DoneCallback> class TensorAsyncDevice;
template<typename Derived, typename Device> struct TensorEvaluator;
struct NoOpOutputKernel;
@ -168,7 +168,7 @@ template <typename Expression, typename Device,
bool Tileable = IsTileable<Device, Expression>::value>
class TensorExecutor;
template <typename Expression, typename Device,
template <typename Expression, typename Device, typename DoneCallback,
bool Vectorizable = IsVectorizable<Device, Expression>::value,
bool Tileable = IsTileable<Device, Expression>::value>
class TensorAsyncExecutor;

View File

@ -578,11 +578,15 @@ static void test_async_execute_unary_expr(Device d)
src.setRandom();
const auto expr = src.square();
using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>;
using Executor = internal::TensorAsyncExecutor<const Assign, Device,
Vectorizable, Tileable>;
Eigen::Barrier done(1);
Executor::runAsync(Assign(dst, expr), d, [&done]() { done.Notify(); });
auto on_done = [&done]() { done.Notify(); };
using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>;
using DoneCallback = decltype(on_done);
using Executor = internal::TensorAsyncExecutor<const Assign, Device, DoneCallback,
Vectorizable, Tileable>;
Executor::runAsync(Assign(dst, expr), d, on_done);
done.Wait();
for (Index i = 0; i < dst.dimensions().TotalSize(); ++i) {
@ -610,12 +614,15 @@ static void test_async_execute_binary_expr(Device d)
const auto expr = lhs + rhs;
Eigen::Barrier done(1);
auto on_done = [&done]() { done.Notify(); };
using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>;
using Executor = internal::TensorAsyncExecutor<const Assign, Device,
using DoneCallback = decltype(on_done);
using Executor = internal::TensorAsyncExecutor<const Assign, Device, DoneCallback,
Vectorizable, Tileable>;
Eigen::Barrier done(1);
Executor::runAsync(Assign(dst, expr), d, [&done]() { done.Notify(); });
Executor::runAsync(Assign(dst, expr), d, on_done);
done.Wait();
for (Index i = 0; i < dst.dimensions().TotalSize(); ++i) {

View File

@ -683,34 +683,39 @@ EIGEN_DECLARE_TEST(cxx11_tensor_thread_pool)
CALL_SUBTEST_3(test_multithread_contraction_agrees_with_singlethread<RowMajor>());
CALL_SUBTEST_3(test_multithread_contraction_with_output_kernel<ColMajor>());
CALL_SUBTEST_3(test_multithread_contraction_with_output_kernel<RowMajor>());
CALL_SUBTEST_3(test_async_multithread_contraction_agrees_with_singlethread<ColMajor>());
CALL_SUBTEST_3(test_async_multithread_contraction_agrees_with_singlethread<RowMajor>());
CALL_SUBTEST_4(test_async_multithread_contraction_agrees_with_singlethread<ColMajor>());
CALL_SUBTEST_4(test_async_multithread_contraction_agrees_with_singlethread<RowMajor>());
// Test EvalShardedByInnerDimContext parallelization strategy.
CALL_SUBTEST_4(test_sharded_by_inner_dim_contraction<ColMajor>());
CALL_SUBTEST_4(test_sharded_by_inner_dim_contraction<RowMajor>());
CALL_SUBTEST_4(test_sharded_by_inner_dim_contraction_with_output_kernel<ColMajor>());
CALL_SUBTEST_4(test_sharded_by_inner_dim_contraction_with_output_kernel<RowMajor>());
CALL_SUBTEST_4(test_async_sharded_by_inner_dim_contraction<ColMajor>());
CALL_SUBTEST_4(test_async_sharded_by_inner_dim_contraction<RowMajor>());
CALL_SUBTEST_4(test_async_sharded_by_inner_dim_contraction_with_output_kernel<ColMajor>());
CALL_SUBTEST_4(test_async_sharded_by_inner_dim_contraction_with_output_kernel<RowMajor>());
CALL_SUBTEST_5(test_sharded_by_inner_dim_contraction<ColMajor>());
CALL_SUBTEST_5(test_sharded_by_inner_dim_contraction<RowMajor>());
CALL_SUBTEST_5(test_sharded_by_inner_dim_contraction_with_output_kernel<ColMajor>());
CALL_SUBTEST_5(test_sharded_by_inner_dim_contraction_with_output_kernel<RowMajor>());
CALL_SUBTEST_6(test_async_sharded_by_inner_dim_contraction<ColMajor>());
CALL_SUBTEST_6(test_async_sharded_by_inner_dim_contraction<RowMajor>());
CALL_SUBTEST_6(test_async_sharded_by_inner_dim_contraction_with_output_kernel<ColMajor>());
CALL_SUBTEST_6(test_async_sharded_by_inner_dim_contraction_with_output_kernel<RowMajor>());
// Exercise various cases that have been problematic in the past.
CALL_SUBTEST_5(test_contraction_corner_cases<ColMajor>());
CALL_SUBTEST_5(test_contraction_corner_cases<RowMajor>());
CALL_SUBTEST_7(test_contraction_corner_cases<ColMajor>());
CALL_SUBTEST_7(test_contraction_corner_cases<RowMajor>());
CALL_SUBTEST_6(test_full_contraction<ColMajor>());
CALL_SUBTEST_6(test_full_contraction<RowMajor>());
CALL_SUBTEST_8(test_full_contraction<ColMajor>());
CALL_SUBTEST_8(test_full_contraction<RowMajor>());
CALL_SUBTEST_7(test_multithreaded_reductions<ColMajor>());
CALL_SUBTEST_7(test_multithreaded_reductions<RowMajor>());
CALL_SUBTEST_9(test_multithreaded_reductions<ColMajor>());
CALL_SUBTEST_9(test_multithreaded_reductions<RowMajor>());
CALL_SUBTEST_7(test_memcpy());
CALL_SUBTEST_7(test_multithread_random());
CALL_SUBTEST_10(test_memcpy());
CALL_SUBTEST_10(test_multithread_random());
TestAllocator test_allocator;
CALL_SUBTEST_7(test_multithread_shuffle<ColMajor>(NULL));
CALL_SUBTEST_7(test_multithread_shuffle<RowMajor>(&test_allocator));
CALL_SUBTEST_7(test_threadpool_allocate(&test_allocator));
CALL_SUBTEST_11(test_multithread_shuffle<ColMajor>(NULL));
CALL_SUBTEST_11(test_multithread_shuffle<RowMajor>(&test_allocator));
CALL_SUBTEST_11(test_threadpool_allocate(&test_allocator));
// Force CMake to split this test.
// EIGEN_SUFFIXES;1;2;3;4;5;6;7;8;9;10;11
}