[libc-commits] [libc] [llvm] [libc][CndVar] reimplmement conditional variable with FIFO ordering (PR #192748)
Alexey Samsonov via libc-commits
libc-commits at lists.llvm.org
Wed Apr 22 10:54:59 PDT 2026
================
@@ -6,49 +6,339 @@
//
//===----------------------------------------------------------------------===//
-#ifndef LLVM_LIBC___SUPPORT_SRC_THREADS_LINUX_CNDVAR_H
-#define LLVM_LIBC___SUPPORT_SRC_THREADS_LINUX_CNDVAR_H
+#ifndef LLVM_LIBC_SRC___SUPPORT_THREADS_CNDVAR_H
+#define LLVM_LIBC_SRC___SUPPORT_THREADS_CNDVAR_H
#include "hdr/stdint_proxy.h" // uint32_t
+#include "src/__support/CPP/limits.h"
+#include "src/__support/CPP/mutex.h"
+#include "src/__support/CPP/new.h"
#include "src/__support/macros/config.h"
-#include "src/__support/threads/futex_utils.h" // Futex
-#include "src/__support/threads/mutex.h" // Mutex
-#include "src/__support/threads/raw_mutex.h" // RawMutex
+#include "src/__support/threads/futex_utils.h" // Futex
+#include "src/__support/threads/mutex.h" // Mutex
+#include "src/__support/threads/raw_mutex.h" // RawMutex
+#include "src/__support/threads/sleep.h"
+
+#ifdef LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
+#include "src/__support/time/monotonicity.h"
+#endif
namespace LIBC_NAMESPACE_DECL {
+enum class CndVarResult {
+ Success,
+ MutexError,
+ Timeout,
+};
+
class CndVar {
- enum CndWaiterStatus : uint32_t {
- WS_Waiting = 0xE,
- WS_Signalled = 0x5,
+public:
+ using Timeout = internal::AbsTimeout;
+
+private:
+ // A single-waiter multiple-notifier barrier used to keep
+ // track of cancellation threads. We use this barrier to
+ // ensure in-queue threads that have posted their cancellation
+ // request have finished dequeue themselves.
+ class CancellationBarrier {
+ LIBC_INLINE_VAR static constexpr size_t CANCEL_STEP = 2;
+ LIBC_INLINE_VAR static constexpr size_t SLEEPING_BIT = 1;
+
+ // LSB indicates whether the waiter is in sleeping state.
+ Futex futex;
+
+ public:
+ LIBC_INLINE CancellationBarrier() : futex(0) {}
+ // Add one more notification request.
+ LIBC_INLINE void add_one() {
+ futex.fetch_add(CANCEL_STEP, cpp::MemoryOrder::RELAXED);
+ }
+ // Send notification to one waiter.
+ LIBC_INLINE void notify() {
+ FutexWordType res = futex.fetch_sub(CANCEL_STEP);
+ // Only need to goto syscall if waiter is sleep and we are the last one
+ if (res <= (CANCEL_STEP | SLEEPING_BIT) && (res & SLEEPING_BIT) != 0)
+ futex.notify_one();
+ }
+ LIBC_INLINE void wait() {
+ size_t spin = 0;
+ while (auto remaining = futex.load(cpp::MemoryOrder::RELAXED)) {
+ // Set LSB to 1 to indicate that the waiter is entering sleeping
+ // state.
+ FutexWordType new_val = remaining | SLEEPING_BIT;
+ if (spin > LIBC_COPT_RAW_MUTEX_DEFAULT_SPIN_COUNT &&
+ futex.compare_exchange_strong(remaining, new_val)) {
+ futex.wait(new_val, /*timeout=*/cpp::nullopt, /*is_pshared=*/false);
+ futex.fetch_sub(1);
+ spin = 0;
+ }
+ sleep_briefly();
+ spin++;
+ }
+ }
+ };
+
+ enum WaiterState : uint8_t {
+ Waiting = 0,
+ Signalled = 1,
+ Cancelled = 2,
+ Requeued = 3,
};
- struct CndWaiter {
- Futex futex_word = WS_Waiting;
- CndWaiter *next = nullptr;
+ struct QueueNode {
+ QueueNode *prev;
+ QueueNode *next;
+
+ // We use cyclic dummy node to avoid handing corner cases.
+ LIBC_INLINE void ensure_queue_initialization() {
+ if (LIBC_UNLIKELY(prev == nullptr))
+ prev = next = this;
+ }
+
+ // Assume `this` the dummy node of queue. Push back `waiter` to the queue.
+ LIBC_INLINE void push_back(QueueNode *waiter) {
+ ensure_queue_initialization();
+ waiter->next = this;
+ waiter->prev = prev;
+ waiter->next->prev = waiter;
+ waiter->prev->next = waiter;
+ }
+
+ // Remove `waiter` from the queue.
+ LIBC_INLINE static void remove(QueueNode *waiter) {
+ waiter->next->prev = waiter->prev;
+ waiter->prev->next = waiter->next;
+ waiter->prev = waiter->next = waiter;
+ }
+
+ // Assume `this` the dummy node of queue. Pop the first waiter from the
+ // queue.
+ LIBC_INLINE QueueNode *pop_front() {
+ ensure_queue_initialization();
+ if (next == this)
+ return nullptr;
+ QueueNode *first = next;
+ remove(first);
+ return first;
+ }
};
- CndWaiter *waitq_front;
- CndWaiter *waitq_back;
- RawMutex qmtx;
+ // This node will be on the per-thread stack.
+ struct CndWaiter : QueueNode {
+ cpp::Atomic<CancellationBarrier *> cancellation_barrier;
+ RawMutex barrier;
+ cpp::Atomic<uint8_t> state;
+
+ LIBC_INLINE CndWaiter()
+ : QueueNode{}, cancellation_barrier(nullptr), barrier{},
+ state{Waiting} {
+ // this lock should always success as no contention is possible
+ [[maybe_unused]] bool locked = barrier.try_lock();
+ LIBC_ASSERT(locked);
+ }
+
+ LIBC_INLINE void confirm_cancellation() {
+ if (CancellationBarrier *sender = cancellation_barrier.load())
+ sender->notify();
+ }
+ };
+
+ union {
+ QueueNode waiter_queue;
+ cpp::Atomic<size_t> shared_waiters;
+ };
+
+ union {
+ RawMutex queue_lock;
+ Futex shared_futex;
+ };
+
+ bool is_shared;
+
+ LIBC_INLINE void notify(bool is_broadcast) {
+ if (LIBC_UNLIKELY(is_shared)) {
+ if (shared_waiters.load() == 0)
+ return;
+ // increase the sequence number
+ shared_futex.fetch_add(1);
+ if (is_broadcast)
+ shared_futex.notify_all();
+ else
+ shared_futex.notify_one();
+ return;
+ }
+
+ size_t limit =
+ is_broadcast ? cpp::numeric_limits<size_t>::max() : size_t{1};
+ CancellationBarrier cancellation_barrier{};
+ CndWaiter *head = nullptr;
+ CndWaiter *cursor = nullptr;
+ // Go through the queue, try send signal to waiters.
+ // 1. if signal is sent, we reduce the number of pending signals
+ // 2. if waiter cancelled before signal is sent, we add it
+ // to cancellation barrier and continue
+ // Notice that cancelled sender will not continue before
+ // we release the queue lock, because they also need to
+ // acquire the lock and dequeue themselves.
+ {
+ cpp::lock_guard lock(queue_lock);
+ waiter_queue.ensure_queue_initialization();
+ if (waiter_queue.next == &waiter_queue)
+ return;
+ for (cursor = static_cast<CndWaiter *>(waiter_queue.next);
+ cursor != &waiter_queue;
+ cursor = static_cast<CndWaiter *>(cursor->next)) {
+ if (limit == 0)
+ break;
+ uint8_t expected = Waiting;
+ if (!cursor->state.compare_exchange_strong(expected, Signalled)) {
+ cancellation_barrier.add_one();
+ cursor->cancellation_barrier.store(&cancellation_barrier);
+ continue;
+ }
+ if (!head)
+ head = cursor;
+ limit--;
+ }
+ // remove everything before cursor
+ auto removed_head = waiter_queue.next;
+ auto removed_tail = cursor->prev;
+ waiter_queue.next = cursor;
+ cursor->prev = &waiter_queue;
+ removed_tail->next = removed_head;
+ removed_head->prev = removed_tail;
----------------
vonosmas wrote:
Should this be a part of queue interface?
https://github.com/llvm/llvm-project/pull/192748
More information about the libc-commits
mailing list