Simplified the full reduction code

This commit is contained in:
Benoit Steiner 2016-03-08 16:02:00 -08:00
parent 5a427a94a9
commit 6d6413f768

View File

@ -221,121 +221,120 @@ struct FullReducer {
#ifdef EIGEN_USE_THREADS
// Multithreaded full reducers
template <typename Eval, typename Op, bool Vectorizable = (Eval::InputPacketAccess & Op::PacketAccess)>
template <typename Self, typename Op,
bool vectorizable = (Self::InputPacketAccess & Op::PacketAccess)>
struct FullReducerShard {
static void run(const Eval& eval, typename Eval::Index firstIndex, typename Eval::Index numValuesToReduce, Op& reducer, FullReducerShard* shard) {
shard->saccum = reducer.initialize();
for (typename Eval::Index j = 0; j < numValuesToReduce; ++j) {
reducer.reduce(eval.m_impl.coeff(firstIndex + j), &shard->saccum);
}
static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(const Self& self, typename Self::Index firstIndex,
typename Self::Index numValuesToReduce, Op& reducer,
typename Self::CoeffReturnType* output) {
*output = InnerMostDimReducer<Self, Op, vectorizable>::reduce(
self, firstIndex, numValuesToReduce, reducer);
}
typename Eval::CoeffReturnType saccum;
};
template <typename Eval, typename Op>
struct FullReducerShard<Eval, Op, true> {
static void run(const Eval& eval, typename Eval::Index firstIndex, typename Eval::Index numValuesToReduce, Op& reducer, FullReducerShard* shard) {
const int packetSize = internal::unpacket_traits<typename Eval::PacketReturnType>::size;
const typename Eval::Index VectorizedSize = (numValuesToReduce / packetSize) * packetSize;
shard->paccum = reducer.template initializePacket<typename Eval::PacketReturnType>();
for (typename Eval::Index j = 0; j < VectorizedSize; j += packetSize) {
reducer.reducePacket(eval.m_impl.template packet<Unaligned>(firstIndex + j), &shard->paccum);
}
shard->saccum = reducer.initialize();
for (typename Eval::Index j = VectorizedSize; j < numValuesToReduce; ++j) {
reducer.reduce(eval.m_impl.coeff(firstIndex + j), &shard->saccum);
}
}
typename Eval::PacketReturnType paccum;
typename Eval::CoeffReturnType saccum;
};
template <typename Self, typename Op>
struct FullReducer<Self, Op, ThreadPoolDevice, false> {
static const bool HasOptimizedImplementation = !Op::IsStateful;
static const int PacketSize =
unpacket_traits<typename Self::PacketReturnType>::size;
// launch one reducer per thread and accumulate the result.
static void run(const Self& self, Op& reducer, const ThreadPoolDevice& device, typename Self::CoeffReturnType* output) {
static void run(const Self& self, Op& reducer, const ThreadPoolDevice& device,
typename Self::CoeffReturnType* output) {
typedef typename Self::Index Index;
const Index num_coeffs = array_prod(self.m_impl.dimensions());
const Index blocksize = std::floor<Index>(static_cast<float>(num_coeffs)/device.numThreads());
const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0;
eigen_assert(num_coeffs >= numblocks * blocksize);
std::vector<Notification*> results;
results.reserve(numblocks);
std::vector<FullReducerShard<Self, Op, false> > shards;
shards.resize(numblocks);
for (Index i = 0; i < numblocks; ++i) {
results.push_back(device.enqueue(&FullReducerShard<Self, Op, false>::run, self, i*blocksize, blocksize, reducer, &shards[i]));
if (num_coeffs == 0) {
*output = reducer.finalize(reducer.initialize());
return;
}
FullReducerShard<Self, Op, false> finalShard;
if (numblocks * blocksize < num_coeffs) {
FullReducerShard<Self, Op, false>::run(self, numblocks * blocksize, num_coeffs - numblocks * blocksize, reducer, &finalShard);
const int num_threads = device.numThreads();
if (num_threads == 1) {
*output = InnerMostDimReducer<Self, Op, false>::reduce(self, 0, num_coeffs, reducer);
return;
} else {
finalShard.saccum = reducer.initialize();
}
const Index blocksize = std::floor<Index>(static_cast<float>(num_coeffs) / num_threads);
const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0;
eigen_assert(num_coeffs >= numblocks * blocksize);
for (Index i = 0; i < numblocks; ++i) {
wait_until_ready(results[i]);
delete results[i];
}
std::vector<Notification*> results;
results.reserve(numblocks);
std::vector<typename Self::CoeffReturnType> shards(numblocks, reducer.initialize());
for (Index i = 0; i < numblocks; ++i) {
results.push_back(
device.enqueue(&FullReducerShard<Self, Op, false>::run, self,
i * blocksize, blocksize, reducer, &shards[i]));
}
for (Index i = 0; i < numblocks; ++i) {
reducer.reduce(shards[i].saccum, &finalShard.saccum);
typename Self::CoeffReturnType finalShard;
if (numblocks * blocksize < num_coeffs) {
finalShard = InnerMostDimReducer<Self, Op, false>::reduce(
self, numblocks * blocksize, num_coeffs - numblocks * blocksize, reducer);
} else {
finalShard = reducer.initialize();
}
for (Index i = 0; i < numblocks; ++i) {
wait_until_ready(results[i]);
delete results[i];
}
for (Index i = 0; i < numblocks; ++i) {
reducer.reduce(shards[i], &finalShard);
}
*output = reducer.finalize(finalShard);
}
*output = reducer.finalize(finalShard.saccum);
}
};
template <typename Self, typename Op>
struct FullReducer<Self, Op, ThreadPoolDevice, true> {
static const bool HasOptimizedImplementation = !Op::IsStateful;
static const int PacketSize =
unpacket_traits<typename Self::PacketReturnType>::size;
// launch one reducer per thread and accumulate the result.
static void run(const Self& self, Op& reducer, const ThreadPoolDevice& device, typename Self::CoeffReturnType* output) {
static void run(const Self& self, Op& reducer, const ThreadPoolDevice& device,
typename Self::CoeffReturnType* output) {
typedef typename Self::Index Index;
const Index num_coeffs = array_prod(self.m_impl.dimensions());
const Index blocksize = std::floor<Index>(static_cast<float>(num_coeffs)/device.numThreads());
if (num_coeffs == 0) {
*output = reducer.finalize(reducer.initialize());
return;
}
const int num_threads = device.numThreads();
if (num_threads == 1) {
*output = InnerMostDimReducer<Self, Op, true>::reduce(self, 0, num_coeffs, reducer);
return;
}
const Index blocksize = std::floor<Index>(static_cast<float>(num_coeffs) / num_threads);
const Index numblocks = blocksize > 0 ? num_coeffs / blocksize : 0;
eigen_assert(num_coeffs >= numblocks * blocksize);
std::vector<Notification*> results;
results.reserve(numblocks);
std::vector<FullReducerShard<Self, Op, true> > shards;
shards.resize(numblocks);
std::vector<typename Self::CoeffReturnType> shards(numblocks, reducer.initialize());
for (Index i = 0; i < numblocks; ++i) {
results.push_back(device.enqueue(&FullReducerShard<Self, Op, true>::run, self, i*blocksize, blocksize, reducer, &shards[i]));
results.push_back(device.enqueue(&FullReducerShard<Self, Op, true>::run,
self, i * blocksize, blocksize, reducer,
&shards[i]));
}
FullReducerShard<Self, Op, true> finalShard;
typename Self::CoeffReturnType finalShard;
if (numblocks * blocksize < num_coeffs) {
FullReducerShard<Self, Op, true>::run(self, numblocks * blocksize, num_coeffs - numblocks * blocksize, reducer, &finalShard);
finalShard = InnerMostDimReducer<Self, Op, true>::reduce(
self, numblocks * blocksize, num_coeffs - numblocks * blocksize, reducer);
} else {
finalShard.paccum = reducer.template initializePacket<typename Self::PacketReturnType>();
finalShard.saccum = reducer.initialize();
finalShard = reducer.initialize();
}
for (Index i = 0; i < numblocks; ++i) {
wait_until_ready(results[i]);
delete results[i];
}
for (Index i = 0; i < numblocks; ++i) {
reducer.reducePacket(shards[i].paccum, &finalShard.paccum);
reducer.reduce(shards[i].saccum, &finalShard.saccum);
reducer.reduce(shards[i], &finalShard);
}
*output = reducer.finalizeBoth(finalShard.saccum, finalShard.paccum);
*output = reducer.finalize(finalShard);
}
};
#endif