Asynchronous expression evaluation with TensorAsyncDevice

This commit is contained in:
Eugene Zhulenev 2019-08-30 14:49:40 -07:00
parent f6c51d9209
commit 66665e7e76
8 changed files with 414 additions and 64 deletions

View File

@ -43,7 +43,7 @@
#include <mutex>
#include <thread>
#include <functional>
#include <memory>
#include <utility>
#include "src/util/CXX11Meta.h"
#include "src/util/MaxSizeVector.h"

View File

@ -1063,6 +1063,17 @@ class TensorBase : public TensorBase<Derived, ReadOnlyAccessors> {
return TensorDevice<Derived, DeviceType>(dev, derived());
}
#ifdef EIGEN_USE_THREADS
// Select the async device on which to evaluate the expression.
template <typename DeviceType>
typename internal::enable_if<
internal::is_same<DeviceType, ThreadPoolDevice>::value,
TensorAsyncDevice<Derived, DeviceType>>::type
device(const DeviceType& dev, std::function<void()> done) {
return TensorAsyncDevice<Derived, DeviceType>(dev, derived(), std::move(done));
}
#endif // EIGEN_USE_THREADS
protected:
EIGEN_DEVICE_FUNC
EIGEN_STRONG_INLINE Derived& derived() { return *static_cast<Derived*>(this); }

View File

@ -932,6 +932,7 @@ class TensorBlockMapper {
typedef TensorBlock<Scalar, StorageIndex, NumDims, Layout> Block;
typedef DSizes<StorageIndex, NumDims> Dimensions;
TensorBlockMapper() {}
TensorBlockMapper(const Dimensions& dims,
const TensorBlockShapeType block_shape,
Index min_target_size)

View File

@ -63,6 +63,47 @@ template <typename ExpressionType, typename DeviceType> class TensorDevice {
ExpressionType& m_expression;
};
#ifdef EIGEN_USE_THREADS
/** \class TensorAsyncDevice
* \ingroup CXX11_Tensor_Module
*
* \brief Pseudo expression providing an operator = that will evaluate its
* argument asynchronously on the specified device (currently supports only
* ThreadPoolDevice).
*
* Example:
* std::function<void()> done = []() {};
* C.device(EIGEN_THREAD_POOL, std::move(done)) = A + B;
*/
template <typename ExpressionType, typename DeviceType>
class TensorAsyncDevice {
public:
TensorAsyncDevice(const DeviceType& device, ExpressionType& expression,
std::function<void()> done)
: m_device(device), m_expression(expression), m_done(std::move(done)) {}
template <typename OtherDerived>
EIGEN_STRONG_INLINE TensorAsyncDevice& operator=(const OtherDerived& other) {
typedef TensorAssignOp<ExpressionType, const OtherDerived> Assign;
typedef internal::TensorAsyncExecutor<const Assign, DeviceType> Executor;
// WARNING: After assignment 'm_done' callback will be in undefined state.
Assign assign(m_expression, other);
Executor::runAsync(assign, m_device, std::move(m_done));
return *this;
}
protected:
const DeviceType& m_device;
ExpressionType& m_expression;
std::function<void()> m_done;
};
#endif // EIGEN_USE_THREADS
} // end namespace Eigen
#endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_H

View File

@ -84,7 +84,7 @@ class TensorExecutor {
EIGEN_DEVICE_FUNC
static EIGEN_STRONG_INLINE void run(const Expression& expr,
const Device& device = Device()) {
const Device& device = Device()) {
TensorEvaluator<Expression, Device> evaluator(expr, device);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
if (needs_assign) {
@ -97,6 +97,14 @@ class TensorExecutor {
}
};
/**
* Default async execution strategy is not implemented. Currently it's only
* available for ThreadPoolDevice (see definition below).
*/
template <typename Expression, typename Device, bool Vectorizable,
bool Tileable>
class TensorAsyncExecutor {};
/**
* Process all the data with a single cpu thread, using vectorized instructions.
*/
@ -107,8 +115,8 @@ class TensorExecutor<Expression, DefaultDevice, /*Vectorizable*/ true,
typedef typename Expression::Index StorageIndex;
EIGEN_DEVICE_FUNC
static EIGEN_STRONG_INLINE void run(const Expression& expr,
const DefaultDevice& device = DefaultDevice()) {
static EIGEN_STRONG_INLINE void run(
const Expression& expr, const DefaultDevice& device = DefaultDevice()) {
TensorEvaluator<Expression, DefaultDevice> evaluator(expr, device);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
if (needs_assign) {
@ -206,8 +214,81 @@ class TensorExecutor<Expression, DefaultDevice, Vectorizable,
/**
* Multicore strategy: the index space is partitioned and each partition is
* executed on a single core.
*
* (1) TensorExecutor will submit work to the ThreadPoolDevice managed thread
* pool, and will block the caller thread until all tasks are finished.
*
* (2) TensorAsyncExecutor is a non-blocking version, that will submit work to
* the ThreadPoolDevice managed thread pool, and will return immediately.
* It will call 'done' callback after all tasks are finished.
*/
#ifdef EIGEN_USE_THREADS
template <typename TensorBlockMapper>
struct TensorExecutorTilingContext {
typedef typename TensorBlockMapper::Block TensorBlock;
TensorExecutorTilingContext() : buffer(nullptr) {}
TensorExecutorTilingContext(const TensorBlockMapper& b_mapper,
const TensorOpCost& b_cost, void* b_buffer,
size_t b_aligned_size)
: block_mapper(b_mapper),
cost(b_cost),
buffer(b_buffer),
aligned_blocksize(b_aligned_size) {}
template <typename Scalar>
Scalar* GetCurrentThreadBuffer(const ThreadPoolDevice& device) const {
// ThreadPoolDevice::currentThreadId() returns -1 if called from a thread
// not in the thread pool, such as the main thread dispatching Eigen
// expressions.
const int thread_idx = device.currentThreadId();
eigen_assert(thread_idx >= -1 && thread_idx < device.numThreads());
const Index offset = aligned_blocksize * (thread_idx + 1);
return reinterpret_cast<Scalar*>(static_cast<char*>(buffer) + offset);
}
TensorBlockMapper block_mapper; // navigate through blocks
TensorOpCost cost; // cost of computing a single block
void* buffer; // temporary buffer for blocks
size_t aligned_blocksize; // block size after memory alignment
};
// Computes a block evaluation parameters, and allocates temporary memory buffer
// for blocks. See TensorExecutor/TensorAsyncExecutor (Tileable=true) below.
template <typename Evaluator, typename TensorBlockMapper, bool Vectorizable>
TensorExecutorTilingContext<TensorBlockMapper> GetTensorExecutorTilingContext(
const ThreadPoolDevice& device, const Evaluator& evaluator) {
// Prefer blocks skewed toward inner dimension.
TensorBlockShapeType block_shape = kSkewedInnerDims;
Index block_total_size = 0;
// Query expression tree for desired block size/shape.
std::vector<TensorOpResourceRequirements> resources;
evaluator.getResourceRequirements(&resources);
MergeResourceRequirements(resources, &block_shape, &block_total_size);
int num_threads = device.numThreads();
// Estimate minimum block size based on cost.
TensorOpCost cost = evaluator.costPerCoeff(Vectorizable);
double taskSize = TensorCostModel<ThreadPoolDevice>::taskSize(1, cost);
size_t block_size = static_cast<size_t>(1.0 / taskSize);
TensorBlockMapper block_mapper(
typename TensorBlockMapper::Dimensions(evaluator.dimensions()),
block_shape, block_size);
block_size = block_mapper.block_dims_total_size();
const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1);
const size_t aligned_blocksize =
align *
divup<size_t>(block_size * sizeof(typename Evaluator::Scalar), align);
void* buf = device.allocate((num_threads + 1) * aligned_blocksize);
return {block_mapper, cost * block_size, buf, aligned_blocksize};
}
template <typename Evaluator, typename StorageIndex, bool Vectorizable>
struct EvalRange {
static void run(Evaluator* evaluator_in, const StorageIndex firstIdx,
@ -274,7 +355,7 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> {
typedef EvalRange<Evaluator, StorageIndex, Vectorizable> EvalRange;
Evaluator evaluator(expr, device);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
const StorageIndex size = array_prod(evaluator.dimensions());
device.parallelFor(size, evaluator.costPerCoeff(Vectorizable),
@ -290,18 +371,18 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> {
template <typename Expression, bool Vectorizable>
class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ true> {
public:
typedef typename traits<Expression>::Index StorageIndex;
typedef typename traits<Expression>::Scalar Scalar;
typedef typename remove_const<Scalar>::type ScalarNoConst;
typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
typedef typename traits<Expression>::Index StorageIndex;
static const int NumDims = traits<Expression>::NumDimensions;
typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> BlockMapper;
typedef TensorExecutorTilingContext<BlockMapper> TilingContext;
static EIGEN_STRONG_INLINE void run(const Expression& expr,
const ThreadPoolDevice& device) {
typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> TensorBlockMapper;
Evaluator evaluator(expr, device);
Index total_size = array_prod(evaluator.dimensions());
Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar);
@ -315,50 +396,152 @@ class TensorExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ tr
return;
}
const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
TensorBlockShapeType block_shape = kSkewedInnerDims;
Index block_total_size = 0;
// Query expression tree for desired block size/shape.
std::vector<internal::TensorOpResourceRequirements> resources;
evaluator.getResourceRequirements(&resources);
MergeResourceRequirements(resources, &block_shape, &block_total_size);
int num_threads = device.numThreads();
const TilingContext tiling =
internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper,
Vectorizable>(device, evaluator);
// Estimate minimum block size based on cost.
TensorOpCost cost = evaluator.costPerCoeff(Vectorizable);
double taskSize = TensorCostModel<ThreadPoolDevice>::taskSize(1, cost);
size_t block_size = static_cast<size_t>(1.0 / taskSize);
TensorBlockMapper block_mapper(
typename TensorBlockMapper::Dimensions(evaluator.dimensions()),
block_shape, block_size);
block_size = block_mapper.block_dims_total_size();
const size_t align = numext::maxi(EIGEN_MAX_ALIGN_BYTES, 1);
const size_t aligned_blocksize =
align * divup<size_t>(block_size * sizeof(Scalar), align);
void* buf = device.allocate((num_threads + 1) * aligned_blocksize);
device.parallelFor(
block_mapper.total_block_count(), cost * block_size,
[=, &device, &evaluator, &block_mapper](StorageIndex firstIdx,
StorageIndex lastIdx) {
// currentThreadId() returns -1 if called from a thread not in the
// thread pool, such as the main thread dispatching Eigen
// expressions.
const int thread_idx = device.currentThreadId();
eigen_assert(thread_idx >= -1 && thread_idx < num_threads);
ScalarNoConst* thread_buf = reinterpret_cast<ScalarNoConst*>(
static_cast<char*>(buf) + aligned_blocksize * (thread_idx + 1));
tiling.block_mapper.total_block_count(), tiling.cost,
[=, &device, &evaluator, &tiling](StorageIndex firstIdx,
StorageIndex lastIdx) {
ScalarNoConst* thread_buf =
tiling.template GetCurrentThreadBuffer<ScalarNoConst>(device);
for (StorageIndex i = firstIdx; i < lastIdx; ++i) {
auto block = block_mapper.GetBlockForIndex(i, thread_buf);
auto block = tiling.block_mapper.GetBlockForIndex(i, thread_buf);
evaluator.evalBlock(&block);
}
});
device.deallocate(buf);
device.deallocate(tiling.buffer);
}
evaluator.cleanup();
}
};
template <typename Expression, bool Vectorizable, bool Tileable>
class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, Tileable> {
public:
typedef typename Expression::Index StorageIndex;
typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
static EIGEN_STRONG_INLINE void runAsync(const Expression& expr,
const ThreadPoolDevice& device,
std::function<void()> done) {
TensorAsyncExecutorContext* const ctx =
new TensorAsyncExecutorContext(expr, device, std::move(done));
// TODO(ezhulenev): This is a potentially blocking operation. Make it async!
const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr);
typedef EvalRange<Evaluator, StorageIndex, Vectorizable> EvalRange;
if (needs_assign) {
const StorageIndex size = array_prod(ctx->evaluator.dimensions());
device.parallelForAsync(
size, ctx->evaluator.costPerCoeff(Vectorizable),
EvalRange::alignBlockSize,
[ctx](StorageIndex firstIdx, StorageIndex lastIdx) {
EvalRange::run(&ctx->evaluator, firstIdx, lastIdx);
},
[ctx]() { delete ctx; });
}
}
private:
struct TensorAsyncExecutorContext {
TensorAsyncExecutorContext(const Expression& expr,
const ThreadPoolDevice& thread_pool,
std::function<void()> done)
: evaluator(expr, thread_pool), on_done(std::move(done)) {}
~TensorAsyncExecutorContext() {
on_done();
evaluator.cleanup();
}
Evaluator evaluator;
private:
std::function<void()> on_done;
};
};
template <typename Expression, bool Vectorizable>
class TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable, /*Tileable*/ true> {
public:
typedef typename traits<Expression>::Index StorageIndex;
typedef typename traits<Expression>::Scalar Scalar;
typedef typename remove_const<Scalar>::type ScalarNoConst;
static const int NumDims = traits<Expression>::NumDimensions;
typedef TensorEvaluator<Expression, ThreadPoolDevice> Evaluator;
typedef TensorBlockMapper<ScalarNoConst, StorageIndex, NumDims, Evaluator::Layout> BlockMapper;
typedef TensorExecutorTilingContext<BlockMapper> TilingContext;
static EIGEN_STRONG_INLINE void runAsync(const Expression& expr,
const ThreadPoolDevice& device,
std::function<void()> done) {
TensorAsyncExecutorContext* const ctx =
new TensorAsyncExecutorContext(expr, device, std::move(done));
Index total_size = array_prod(ctx->evaluator.dimensions());
Index cache_size = device.firstLevelCacheSize() / sizeof(Scalar);
if (total_size < cache_size &&
!ExpressionHasTensorBroadcastingOp<Expression>::value) {
internal::TensorAsyncExecutor<Expression, ThreadPoolDevice, Vectorizable,
/*Tileable*/ false>::runAsync(
expr, device, [ctx]() { delete ctx; });
return;
}
// TODO(ezhulenev): This is a potentially blocking operation. Make it async!
const bool needs_assign = ctx->evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
ctx->tiling =
internal::GetTensorExecutorTilingContext<Evaluator, BlockMapper,
Vectorizable>(device, ctx->evaluator);
device.parallelForAsync(
ctx->tiling.block_mapper.total_block_count(), ctx->tiling.cost,
[ctx](StorageIndex firstIdx, StorageIndex lastIdx) {
ScalarNoConst* thread_buf =
ctx->tiling.template GetCurrentThreadBuffer<ScalarNoConst>(ctx->device);
for (StorageIndex i = firstIdx; i < lastIdx; ++i) {
auto block = ctx->tiling.block_mapper.GetBlockForIndex(i, thread_buf);
ctx->evaluator.evalBlock(&block);
}
},
[ctx]() { delete ctx; });
}
}
private:
struct TensorAsyncExecutorContext {
TensorAsyncExecutorContext(const Expression& expr,
const ThreadPoolDevice& thread_pool,
std::function<void()> done)
: device(thread_pool),
evaluator(expr, thread_pool),
on_done(std::move(done)) {}
~TensorAsyncExecutorContext() {
on_done();
device.deallocate(tiling.buffer);
evaluator.cleanup();
}
const ThreadPoolDevice& device;
Evaluator evaluator;
TilingContext tiling;
private:
std::function<void()> on_done;
};
};
#endif // EIGEN_USE_THREADS
@ -419,7 +602,7 @@ template <typename Expression, bool Vectorizable, bool Tileable>
EIGEN_STRONG_INLINE void TensorExecutor<Expression, GpuDevice, Vectorizable, Tileable>::run(
const Expression& expr, const GpuDevice& device) {
TensorEvaluator<Expression, GpuDevice> evaluator(expr, device);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
const int block_size = device.maxGpuThreadsPerBlock();
@ -517,10 +700,10 @@ struct ExecExprFunctorKernel<Expr, false, Evaluator>
template <typename Expression, bool Vectorizable, bool Tileable>
class TensorExecutor<Expression, Eigen::SyclDevice, Vectorizable, Tileable> {
public:
typedef typename Expression::Index Index;
typedef typename Expression::Index Index;
static EIGEN_STRONG_INLINE void run(const Expression &expr, const Eigen::SyclDevice &dev) {
Eigen::TensorEvaluator<Expression, Eigen::SyclDevice> evaluator(expr, dev);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(NULL);
const bool needs_assign = evaluator.evalSubExprsIfNeeded(nullptr);
if (needs_assign) {
Index range, GRange, tileSize;
Index total_size = ::Eigen::internal::array_prod(evaluator.dimensions());

View File

@ -94,6 +94,7 @@ template<typename XprType, template <class> class MakePointer_ = MakePointer> cl
template<typename XprType> class TensorForcedEvalOp;
template<typename ExpressionType, typename DeviceType> class TensorDevice;
template<typename ExpressionType, typename DeviceType> class TensorAsyncDevice;
template<typename Derived, typename Device> struct TensorEvaluator;
struct NoOpOutputKernel;
@ -167,6 +168,11 @@ template <typename Expression, typename Device,
bool Tileable = IsTileable<Device, Expression>::value>
class TensorExecutor;
template <typename Expression, typename Device,
bool Vectorizable = IsVectorizable<Device, Expression>::value,
bool Tileable = IsTileable<Device, Expression>::value>
class TensorAsyncExecutor;
} // end namespace internal
} // end namespace Eigen

View File

@ -562,37 +562,112 @@ static void test_execute_reverse_rvalue(Device d)
}
}
template <typename T, int NumDims, typename Device, bool Vectorizable,
bool Tileable, int Layout>
static void test_async_execute_unary_expr(Device d)
{
static constexpr int Options = 0 | Layout;
// Pick a large enough tensor size to bypass small tensor block evaluation
// optimization.
auto dims = RandomDims<NumDims>(50 / NumDims, 100 / NumDims);
Tensor<T, NumDims, Options, Index> src(dims);
Tensor<T, NumDims, Options, Index> dst(dims);
src.setRandom();
const auto expr = src.square();
using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>;
using Executor = internal::TensorAsyncExecutor<const Assign, Device,
Vectorizable, Tileable>;
Eigen::Barrier done(1);
Executor::runAsync(Assign(dst, expr), d, [&done]() { done.Notify(); });
done.Wait();
for (Index i = 0; i < dst.dimensions().TotalSize(); ++i) {
T square = src.coeff(i) * src.coeff(i);
VERIFY_IS_EQUAL(square, dst.coeff(i));
}
}
template <typename T, int NumDims, typename Device, bool Vectorizable,
bool Tileable, int Layout>
static void test_async_execute_binary_expr(Device d)
{
static constexpr int Options = 0 | Layout;
// Pick a large enough tensor size to bypass small tensor block evaluation
// optimization.
auto dims = RandomDims<NumDims>(50 / NumDims, 100 / NumDims);
Tensor<T, NumDims, Options, Index> lhs(dims);
Tensor<T, NumDims, Options, Index> rhs(dims);
Tensor<T, NumDims, Options, Index> dst(dims);
lhs.setRandom();
rhs.setRandom();
const auto expr = lhs + rhs;
using Assign = TensorAssignOp<decltype(dst), const decltype(expr)>;
using Executor = internal::TensorAsyncExecutor<const Assign, Device,
Vectorizable, Tileable>;
Eigen::Barrier done(1);
Executor::runAsync(Assign(dst, expr), d, [&done]() { done.Notify(); });
done.Wait();
for (Index i = 0; i < dst.dimensions().TotalSize(); ++i) {
T sum = lhs.coeff(i) + rhs.coeff(i);
VERIFY_IS_EQUAL(sum, dst.coeff(i));
}
}
#ifdef EIGEN_DONT_VECTORIZE
#define VECTORIZABLE(VAL) !EIGEN_DONT_VECTORIZE && VAL
#else
#else
#define VECTORIZABLE(VAL) VAL
#endif
#define CALL_SUBTEST_PART(PART) \
CALL_SUBTEST_##PART
#define CALL_SUBTEST_COMBINATIONS(PART, NAME, T, NUM_DIMS) \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, false, ColMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, true, ColMajor>(default_device))); \
#define CALL_SUBTEST_COMBINATIONS(PART, NAME, T, NUM_DIMS) \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, false, ColMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, true, ColMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, VECTORIZABLE(true), false, ColMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, VECTORIZABLE(true), true, ColMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, false, RowMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, true, RowMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, false, RowMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, false, true, RowMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, VECTORIZABLE(true), false, RowMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, DefaultDevice, VECTORIZABLE(true), true, RowMajor>(default_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, ColMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, ColMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, ColMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, ColMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), false, ColMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), true, ColMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, RowMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, RowMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, RowMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, RowMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), false, RowMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), true, RowMajor>(tp_device)))
// NOTE: Currently only ThreadPoolDevice supports async expression evaluation.
#define CALL_ASYNC_SUBTEST_COMBINATIONS(PART, NAME, T, NUM_DIMS) \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, ColMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, ColMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), false, ColMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), true, ColMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, false, RowMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, false, true, RowMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), false, RowMajor>(tp_device))); \
CALL_SUBTEST_PART(PART)((NAME<T, NUM_DIMS, ThreadPoolDevice, VECTORIZABLE(true), true, RowMajor>(tp_device)))
EIGEN_DECLARE_TEST(cxx11_tensor_executor) {
Eigen::DefaultDevice default_device;
// Default device is unused in ASYNC tests.
EIGEN_UNUSED_VARIABLE(default_device);
const auto num_threads = internal::random<int>(1, 24);
const auto num_threads = internal::random<int>(20, 24);
Eigen::ThreadPool tp(num_threads);
Eigen::ThreadPoolDevice tp_device(&tp, num_threads);
@ -660,8 +735,16 @@ EIGEN_DECLARE_TEST(cxx11_tensor_executor) {
CALL_SUBTEST_COMBINATIONS(14, test_execute_reverse_rvalue, float, 4);
CALL_SUBTEST_COMBINATIONS(14, test_execute_reverse_rvalue, float, 5);
CALL_ASYNC_SUBTEST_COMBINATIONS(15, test_async_execute_unary_expr, float, 3);
CALL_ASYNC_SUBTEST_COMBINATIONS(15, test_async_execute_unary_expr, float, 4);
CALL_ASYNC_SUBTEST_COMBINATIONS(15, test_async_execute_unary_expr, float, 5);
CALL_ASYNC_SUBTEST_COMBINATIONS(16, test_async_execute_binary_expr, float, 3);
CALL_ASYNC_SUBTEST_COMBINATIONS(16, test_async_execute_binary_expr, float, 4);
CALL_ASYNC_SUBTEST_COMBINATIONS(16, test_async_execute_binary_expr, float, 5);
// Force CMake to split this test.
// EIGEN_SUFFIXES;1;2;3;4;5;6;7;8;9;10;11;12;13;14
// EIGEN_SUFFIXES;1;2;3;4;5;6;7;8;9;10;11;12;13;14;15;16
}
#undef CALL_SUBTEST_COMBINATIONS

View File

@ -38,9 +38,9 @@ class TestAllocator : public Allocator {
void test_multithread_elementwise()
{
Tensor<float, 3> in1(2,3,7);
Tensor<float, 3> in2(2,3,7);
Tensor<float, 3> out(2,3,7);
Tensor<float, 3> in1(200, 30, 70);
Tensor<float, 3> in2(200, 30, 70);
Tensor<float, 3> out(200, 30, 70);
in1.setRandom();
in2.setRandom();
@ -49,15 +49,39 @@ void test_multithread_elementwise()
Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11));
out.device(thread_pool_device) = in1 + in2 * 3.14f;
for (int i = 0; i < 2; ++i) {
for (int j = 0; j < 3; ++j) {
for (int k = 0; k < 7; ++k) {
VERIFY_IS_APPROX(out(i,j,k), in1(i,j,k) + in2(i,j,k) * 3.14f);
for (int i = 0; i < 200; ++i) {
for (int j = 0; j < 30; ++j) {
for (int k = 0; k < 70; ++k) {
VERIFY_IS_APPROX(out(i, j, k), in1(i, j, k) + in2(i, j, k) * 3.14f);
}
}
}
}
void test_async_multithread_elementwise()
{
Tensor<float, 3> in1(200, 30, 70);
Tensor<float, 3> in2(200, 30, 70);
Tensor<float, 3> out(200, 30, 70);
in1.setRandom();
in2.setRandom();
Eigen::ThreadPool tp(internal::random<int>(3, 11));
Eigen::ThreadPoolDevice thread_pool_device(&tp, internal::random<int>(3, 11));
Eigen::Barrier b(1);
out.device(thread_pool_device, [&b]() { b.Notify(); }) = in1 + in2 * 3.14f;
b.Wait();
for (int i = 0; i < 200; ++i) {
for (int j = 0; j < 30; ++j) {
for (int k = 0; k < 70; ++k) {
VERIFY_IS_APPROX(out(i, j, k), in1(i, j, k) + in2(i, j, k) * 3.14f);
}
}
}
}
void test_multithread_compound_assignment()
{
@ -516,6 +540,7 @@ void test_threadpool_allocate(TestAllocator* allocator)
EIGEN_DECLARE_TEST(cxx11_tensor_thread_pool)
{
CALL_SUBTEST_1(test_multithread_elementwise());
CALL_SUBTEST_1(test_async_multithread_elementwise());
CALL_SUBTEST_1(test_multithread_compound_assignment());
CALL_SUBTEST_2(test_multithread_contraction<ColMajor>());