mirror of
https://gitlab.com/libeigen/eigen.git
synced 2025-01-12 14:25:16 +08:00
e5ac8cbd7a
This fixed 2 deadlocks caused by sloppiness in the EventCount logic.
Both most likely were introduced by cl/236729920 which includes the new EventCount algorithm:
01da8caf00
bug #1 (Prewait):
Prewait must not consume existing signals.
Consider the following scenario.
There are 2 thread pool threads (1 and 2) and 1 external thread (3). RunQueue is empty.
Thread 1 checks the queue, calls Prewait, checks RunQueue again and now is going to call CommitWait.
Thread 2 checks the queue and now is going to call Prewait.
Thread 3 submits 2 tasks, EventCount signals is set to 1 because only 1 waiter is registered the second signal is discarded).
Now thread 2 resumes and calls Prewait and takes away the signal.
Thread 1 resumes and calls CommitWait, there are no pending signals anymore, so it blocks.
As the result we have 2 tasks, but only 1 thread is running.
bug #2 (CancelWait):
CancelWait must not take away a signal if it's not sure that the signal was meant for this thread.
When one thread blocks and another submits a new task concurrently, the EventCount protocol guarantees only the following properties (similar to the Dekker's algorithm):
(a) the registered waiter notices presence of the new task and does not block
(b) the signaler notices presence of the waiters and wakes it
(c) both the waiter notices presence of the new task and signaler notices presence of the waiter
[it's only that both of them do not notice each other must not be possible, because it would lead to a deadlock]
CancelWait is called for cases (a) and (c). For case (c) it is OK to take the notification signal away, but it's not OK for (a) because nobody queued a signals for us and we take away a signal meant for somebody else.
Consider:
Thread 1 calls Prewait, checks RunQueue, it's empty, now it's going to call CommitWait.
Thread 3 submits 2 tasks, EventCount signals is set to 1 because only 1 waiter is registered the second signal is discarded).
Thread 2 calls Prewait, checks RunQueue, discovers the tasks, calls CancelWait and consumes the pending signal (meant for thread 1).
Now Thread 1 resumes and calls CommitWait, since there are no signals it blocks.
As the result we have 2 tasks, but only 1 thread is running.
Both deadlocks are only a problem if the tasks require parallelism. Most computational tasks do not require parallelism, i.e. a single thread will run task 1, finish it and then dequeue and run task 2.
This fix undoes some of the sloppiness in the EventCount that was meant to reduce CPU consumption by idle threads, because we now have more threads running in these corner cases. But we still don't have pthread_yield's and maybe the strictness introduced by this change will actually help to reduce tail latency because we will have threads running when we actually need them running.
B) fix deadlock in thread pool caused by RunQueue
This fixed a deadlock caused by sloppiness in the RunQueue logic.
Most likely this was introduced with the non-blocking thread pool.
The deadlock only affects workloads that require parallelism.
Most computational tasks don't require parallelism.
PopBack must not fail spuriously. If it does, it can effectively lead to single thread consuming several wake up signals.
Consider 2 worker threads are blocked.
External thread submits a task. One of the threads is woken.
It tries to steal the task, but fails due to a spurious failure in PopBack (external thread submits another task and holds the lock).
The thread executes blocking protocol again (it won't block because NonEmptyQueueIndex is precise and the thread will discover pending work, but it has called PrepareWait).
Now external thread submits another task and signals EventCount again.
The signal is consumed by the first thread again. But now we have 2 tasks pending but only 1 worker thread running.
It may be possible to fix this in a different way: make EventCount::CancelWait forward wakeup signal to a blocked thread rather then consuming it. But this looks more complex and I am not 100% that it will fix the bug.
It's also possible to have 2 versions of PopBack: one will do try_to_lock and another won't. Then worker threads could first opportunistically check all queues with try_to_lock, and only use the blocking version before blocking. But let's first fix the bug with the simpler change.
143 lines
3.9 KiB
C++
143 lines
3.9 KiB
C++
// This file is part of Eigen, a lightweight C++ template library
|
|
// for linear algebra.
|
|
//
|
|
// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
|
|
// Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com>
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla
|
|
// Public License v. 2.0. If a copy of the MPL was not distributed
|
|
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
#define EIGEN_USE_THREADS
|
|
#include "main.h"
|
|
#include <Eigen/CXX11/ThreadPool>
|
|
|
|
// Visual studio doesn't implement a rand_r() function since its
|
|
// implementation of rand() is already thread safe
|
|
int rand_reentrant(unsigned int* s) {
|
|
#ifdef EIGEN_COMP_MSVC_STRICT
|
|
EIGEN_UNUSED_VARIABLE(s);
|
|
return rand();
|
|
#else
|
|
return rand_r(s);
|
|
#endif
|
|
}
|
|
|
|
static void test_basic_eventcount()
|
|
{
|
|
MaxSizeVector<EventCount::Waiter> waiters(1);
|
|
waiters.resize(1);
|
|
EventCount ec(waiters);
|
|
EventCount::Waiter& w = waiters[0];
|
|
ec.Notify(false);
|
|
ec.Prewait();
|
|
ec.Notify(true);
|
|
ec.CommitWait(&w);
|
|
ec.Prewait();
|
|
ec.CancelWait();
|
|
}
|
|
|
|
// Fake bounded counter-based queue.
|
|
struct TestQueue {
|
|
std::atomic<int> val_;
|
|
static const int kQueueSize = 10;
|
|
|
|
TestQueue() : val_() {}
|
|
|
|
~TestQueue() { VERIFY_IS_EQUAL(val_.load(), 0); }
|
|
|
|
bool Push() {
|
|
int val = val_.load(std::memory_order_relaxed);
|
|
for (;;) {
|
|
VERIFY_GE(val, 0);
|
|
VERIFY_LE(val, kQueueSize);
|
|
if (val == kQueueSize) return false;
|
|
if (val_.compare_exchange_weak(val, val + 1, std::memory_order_relaxed))
|
|
return true;
|
|
}
|
|
}
|
|
|
|
bool Pop() {
|
|
int val = val_.load(std::memory_order_relaxed);
|
|
for (;;) {
|
|
VERIFY_GE(val, 0);
|
|
VERIFY_LE(val, kQueueSize);
|
|
if (val == 0) return false;
|
|
if (val_.compare_exchange_weak(val, val - 1, std::memory_order_relaxed))
|
|
return true;
|
|
}
|
|
}
|
|
|
|
bool Empty() { return val_.load(std::memory_order_relaxed) == 0; }
|
|
};
|
|
|
|
const int TestQueue::kQueueSize;
|
|
|
|
// A number of producers send messages to a set of consumers using a set of
|
|
// fake queues. Ensure that it does not crash, consumers don't deadlock and
|
|
// number of blocked and unblocked threads match.
|
|
static void test_stress_eventcount()
|
|
{
|
|
const int kThreads = std::thread::hardware_concurrency();
|
|
static const int kEvents = 1 << 16;
|
|
static const int kQueues = 10;
|
|
|
|
MaxSizeVector<EventCount::Waiter> waiters(kThreads);
|
|
waiters.resize(kThreads);
|
|
EventCount ec(waiters);
|
|
TestQueue queues[kQueues];
|
|
|
|
std::vector<std::unique_ptr<std::thread>> producers;
|
|
for (int i = 0; i < kThreads; i++) {
|
|
producers.emplace_back(new std::thread([&ec, &queues]() {
|
|
unsigned int rnd = static_cast<unsigned int>(std::hash<std::thread::id>()(std::this_thread::get_id()));
|
|
for (int j = 0; j < kEvents; j++) {
|
|
unsigned idx = rand_reentrant(&rnd) % kQueues;
|
|
if (queues[idx].Push()) {
|
|
ec.Notify(false);
|
|
continue;
|
|
}
|
|
EIGEN_THREAD_YIELD();
|
|
j--;
|
|
}
|
|
}));
|
|
}
|
|
|
|
std::vector<std::unique_ptr<std::thread>> consumers;
|
|
for (int i = 0; i < kThreads; i++) {
|
|
consumers.emplace_back(new std::thread([&ec, &queues, &waiters, i]() {
|
|
EventCount::Waiter& w = waiters[i];
|
|
unsigned int rnd = static_cast<unsigned int>(std::hash<std::thread::id>()(std::this_thread::get_id()));
|
|
for (int j = 0; j < kEvents; j++) {
|
|
unsigned idx = rand_reentrant(&rnd) % kQueues;
|
|
if (queues[idx].Pop()) continue;
|
|
j--;
|
|
ec.Prewait();
|
|
bool empty = true;
|
|
for (int q = 0; q < kQueues; q++) {
|
|
if (!queues[q].Empty()) {
|
|
empty = false;
|
|
break;
|
|
}
|
|
}
|
|
if (!empty) {
|
|
ec.CancelWait();
|
|
continue;
|
|
}
|
|
ec.CommitWait(&w);
|
|
}
|
|
}));
|
|
}
|
|
|
|
for (int i = 0; i < kThreads; i++) {
|
|
producers[i]->join();
|
|
consumers[i]->join();
|
|
}
|
|
}
|
|
|
|
EIGEN_DECLARE_TEST(cxx11_eventcount)
|
|
{
|
|
CALL_SUBTEST(test_basic_eventcount());
|
|
CALL_SUBTEST(test_stress_eventcount());
|
|
}
|