[libc-commits] [libc] [llvm] [libc][CndVar] reimplmement conditional variable with FIFO ordering (PR #192748)

Michael Jones via libc-commits libc-commits at lists.llvm.org
Mon Apr 20 13:52:53 PDT 2026


================
@@ -6,49 +6,406 @@
 //
 //===----------------------------------------------------------------------===//
 
-#ifndef LLVM_LIBC___SUPPORT_SRC_THREADS_LINUX_CNDVAR_H
-#define LLVM_LIBC___SUPPORT_SRC_THREADS_LINUX_CNDVAR_H
+#ifndef LLVM_LIBC_SRC___SUPPORT_THREADS_CNDVAR_H
+#define LLVM_LIBC_SRC___SUPPORT_THREADS_CNDVAR_H
 
 #include "hdr/stdint_proxy.h" // uint32_t
+#include "src/__support/CPP/mutex.h"
+#include "src/__support/CPP/new.h"
 #include "src/__support/macros/config.h"
-#include "src/__support/threads/futex_utils.h"       // Futex
-#include "src/__support/threads/mutex.h"             // Mutex
-#include "src/__support/threads/raw_mutex.h"         // RawMutex
+#include "src/__support/threads/futex_utils.h" // Futex
+#include "src/__support/threads/mutex.h"       // Mutex
+#include "src/__support/threads/raw_mutex.h"   // RawMutex
+
+#ifndef LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
+#define LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY 1
+#endif
 
 namespace LIBC_NAMESPACE_DECL {
 
-class CndVar {
-  enum CndWaiterStatus : uint32_t {
-    WS_Waiting = 0xE,
-    WS_Signalled = 0x5,
+enum class CndVarResult {
+  // The waiter successfully received a signal.
+  Success,
+  // Error occurs during mutex acquisition.
+  MutexError,
+  // Timeout occurs.
+  Timeout,
+};
+
+class PrivateCndVar {
+  // A single-waiter multiple-notifier barrier used to keep
+  // track of cancellation threads. We use this barrier to
+  // ensure in-queue threads that have posted their cancellation
+  // request have finished dequeue themselves.
+  class CancellationBarrier {
+    LIBC_INLINE_VAR static constexpr size_t SPIN_LIMIT = 100;
+    LIBC_INLINE_VAR static constexpr size_t CANCEL_STEP = 2;
+    LIBC_INLINE_VAR static constexpr size_t SLEEPING_BIT = 1;
+
+    // LSB indicates whether the waiter is in sleeping state.
+    Futex futex;
+
+  public:
+    LIBC_INLINE CancellationBarrier() : futex(0) {}
+    // Add one more notification request.
+    LIBC_INLINE void add_one() {
+      futex.fetch_add(CANCEL_STEP, cpp::MemoryOrder::RELAXED);
+    }
+    // Send notification to one waiter.
+    LIBC_INLINE void notify() {
+      FutexWordType res = futex.fetch_sub(CANCEL_STEP);
+      // Only need to goto syscall if waiter is sleep and we are the last one
+      if (res <= (CANCEL_STEP | SLEEPING_BIT) && (res & SLEEPING_BIT) != 0)
+        futex.notify_one();
+    }
+    LIBC_INLINE void wait() {
+      size_t spin = 0;
+      while (auto remaining = futex.load(cpp::MemoryOrder::RELAXED)) {
+        // Set LSB to 1 to indicate that the waiter is entering sleeping
+        // state.
+        FutexWordType new_val = remaining | SLEEPING_BIT;
+        if (spin > SPIN_LIMIT &&
+            futex.compare_exchange_strong(remaining, new_val)) {
+          futex.wait(new_val, /*timeout=*/cpp::nullopt, /*is_pshared=*/false);
+          futex.fetch_sub(1);
+          spin = 0;
+        }
+        sleep_briefly();
+        spin++;
+      }
+    }
+  };
+
+  enum WaiterState : uint8_t {
+    // Initial state after entering the wait queue.
+    Waiting = 0,
+    // A signal has been received.
+    Signalled = 1,
+    // A cancellation has been requested.
+    Cancelled = 2,
+    // The thread has been requeued to the mutex.
+    Requeued = 3,
+  };
+
+  struct WaiterHeader {
+    WaiterHeader *prev;
+    WaiterHeader *next;
+
+    // We use cyclic dummy node to avoid handing corner cases.
+    LIBC_INLINE void ensure_queue_initialization() {
+      if (LIBC_UNLIKELY(prev == nullptr))
+        prev = next = this;
+    }
+
+    // Assume `this` the dummy node of queue. Push back `waiter` to the queue.
+    LIBC_INLINE void push_back(WaiterHeader *waiter) {
+      ensure_queue_initialization();
+      waiter->next = this;
+      waiter->prev = prev;
+      waiter->next->prev = waiter;
+      waiter->prev->next = waiter;
+    }
+
+    // Remove `waiter` from the queue.
+    LIBC_INLINE static void remove(WaiterHeader *waiter) {
+      waiter->next->prev = waiter->prev;
+      waiter->prev->next = waiter->next;
+      waiter->prev = waiter->next = waiter;
+    }
+
+    // Assume `this` the dummy node of queue. Pop the first waiter from the
+    // queue.
+    LIBC_INLINE WaiterHeader *pop_front() {
+      ensure_queue_initialization();
+      if (next == this)
+        return nullptr;
+      WaiterHeader *first = next;
+      remove(first);
+      return first;
+    }
   };
 
-  struct CndWaiter {
-    Futex futex_word = WS_Waiting;
-    CndWaiter *next = nullptr;
+  // This node will be on the per-thread stack.
+  struct CndWaiter : WaiterHeader {
+    cpp::Atomic<CancellationBarrier *> cancellation_barrier;
+    RawMutex barrier;
+    cpp::Atomic<uint8_t> state;
+
+    LIBC_INLINE CndWaiter()
+        : WaiterHeader{}, cancellation_barrier(nullptr), barrier{},
+          state{Waiting} {
+      // this lock should always success as no contention is possible
+      (void)barrier.try_lock();
+    }
+
+    LIBC_INLINE void confirm_cancellation() {
+      if (CancellationBarrier *sender = cancellation_barrier.load())
+        sender->notify();
+    }
   };
 
-  CndWaiter *waitq_front;
-  CndWaiter *waitq_back;
-  RawMutex qmtx;
+  /*
+  Layout:
+
+  struct {
+    void * __wait_queue_prev;
+    void * __wait_queue_next;
+    __futex_word __futex;
+  }
+  */
+  WaiterHeader waiter_queue;
+  RawMutex queue_lock;
 
 public:
-  LIBC_INLINE static int init(CndVar *cv) {
-    cv->waitq_front = cv->waitq_back = nullptr;
-    RawMutex::init(&cv->qmtx);
-    return 0;
+  using Timeout = internal::AbsTimeout;
+
+  LIBC_INLINE constexpr PrivateCndVar() : waiter_queue{}, queue_lock{} {}
+
+  LIBC_INLINE void reset() {
+    queue_lock.reset();
+    waiter_queue.prev = nullptr;
+    waiter_queue.next = nullptr;
+  }
+
+  // TODO: register callback for pthread cancellation
+  LIBC_INLINE CndVarResult wait(Mutex *mutex,
+                                cpp::optional<Timeout> timeout = cpp::nullopt) {
+#if LIBC_COPT_TIMEOUT_ENSURE_MONOTONICITY
+    if (timeout)
+      ensure_monotonicity(*timeout);
+#endif
+
+    CndWaiter waiter{};
+    // Register the waiter to the queue.
+    {
+      cpp::lock_guard lock(queue_lock);
+      waiter_queue.push_back(&waiter);
+    }
+
+    // Unlock the mutex and wait for the signal.
+    mutex->unlock();
+    // Notice that lock is already initialized as LOCKED. We abuse the LOCKED
+    // state to indicate that the waiter is pending.
+    bool locked = waiter.barrier.lock(timeout, /*is_shared=*/false);
+
+    // if we wake up and find that we are still waiting, this means
+    // timeout has been reached.
+    uint8_t old_state = Waiting;
+    if (waiter.state.compare_exchange_strong(old_state, Cancelled,
+                                             cpp::MemoryOrder::ACQ_REL)) {
+      // we haven't consumed the signal before timeout reaches.
+      {
+        cpp::lock_guard lock(queue_lock);
+        WaiterHeader::remove(&waiter);
+      }
+      waiter.confirm_cancellation();
+    } else if (!locked) {
+      // Whenever a signal is already consumed, we compete for the mutex
+      // in the FIFO order of the queue. We only relock if we previously
+      // wake up due to timeout. Otherwise, it means that our turn has
+      // come, so we don't need to relock.
+      waiter.barrier.lock();
+    }
+
+    MutexError mutex_result = mutex->lock();
+    // If we are requeued, we need to establish contention after lock, otherwise
+    // requeued thread may clear the contention bit even though
+    // there are still waiters behind it.
+    if (waiter.state.load(cpp::MemoryOrder::RELAXED) == Requeued)
+      mutex->get_raw_futex().store(RawMutex::IN_CONTENTION);
+    // If there is other in the queue after us, we need to wake the next waiter.
+    // If we cancelled, we should naturally have waiter.next == &waiter
+    if (waiter.next != &waiter) {
+      auto *next_waiter = static_cast<CndWaiter *>(waiter.next);
+      WaiterHeader::remove(&waiter);
+      auto &next_barrier_futex = next_waiter->barrier.get_raw_futex();
+      auto &mutex_futex = mutex->get_raw_futex();
+      // the following is basically an inlined version of mutex::unlock
+      // but with requeue instead of wake if it is possible.
+      FutexWordType prev = next_barrier_futex.exchange(
+          RawMutex::UNLOCKED, cpp::MemoryOrder::RELEASE);
+      // If next waiter in queue sleeps, it will establish contention its own
+      // barrier
+      if (prev == RawMutex::IN_CONTENTION) {
+        if (mutex->can_be_requeued()) {
+          ErrorOr<int> res = next_barrier_futex.requeue_to(
+              mutex_futex, cpp::nullopt, /*wake_limit=*/0,
+              /*requeue_limit=*/1,
+              /*is_shared=*/false);
+          if (!res.has_value()) // cannot requeue on this system
+            next_waiter->barrier.wake(/*is_shared=*/false);
+          else if (res.value() > 0) {
+            next_waiter->state.store(Requeued, cpp::MemoryOrder::RELAXED);
+            mutex->get_raw_futex().store(RawMutex::IN_CONTENTION);
+          }
+        } else { // cannot requeue under special lock mode
+          next_waiter->barrier.wake(/*is_shared=*/false);
+        }
+      }
+    }
+    if (mutex_result != MutexError::NONE)
+      return CndVarResult::MutexError;
+    return old_state == Waiting ? CndVarResult::Timeout : CndVarResult::Success;
   }
 
-  LIBC_INLINE static void destroy(CndVar *cv) {
-    cv->waitq_front = cv->waitq_back = nullptr;
+private:
+  LIBC_INLINE void notify(size_t limit) {
+    CancellationBarrier cancellation_barrier{};
+    CndWaiter *head = nullptr;
+    CndWaiter *cursor = nullptr;
+    // Go through the queue, try send signal to waiters.
+    // 1. if signal is sent, we reduce the number of pending signals
+    // 2. if waiter cancelled before signal is sent, we add it
+    //    to cancellation barrier and continue
+    // Notice that cancelled sender will not continue before
+    // we release the queue lock, because they also need to
+    // acquire the lock and dequeue themselves.
+    {
+      cpp::lock_guard lock(queue_lock);
+      waiter_queue.ensure_queue_initialization();
+      if (waiter_queue.next == &waiter_queue)
+        return;
+      for (cursor = static_cast<CndWaiter *>(waiter_queue.next);
+           cursor != &waiter_queue;
+           cursor = static_cast<CndWaiter *>(cursor->next)) {
+        if (limit == 0)
+          break;
+        uint8_t expected = Waiting;
+        if (!cursor->state.compare_exchange_strong(expected, Signalled)) {
+          cancellation_barrier.add_one();
+          cursor->cancellation_barrier.store(&cancellation_barrier);
+          continue;
+        }
+        if (!head)
+          head = cursor;
+        limit--;
+      }
+      // remove everything before cursor
+      auto removed_head = waiter_queue.next;
+      auto removed_tail = cursor->prev;
+      waiter_queue.next = cursor;
+      cursor->prev = &waiter_queue;
+      removed_tail->next = removed_head;
+      removed_head->prev = removed_tail;
+    }
+    // We want to make sure the propagation queue contain only threads
+    // that have consumed the signal. So we wait until all cancelled
+    // finishing their dequeue operation.
+    cancellation_barrier.wait();
+    // Start propagate notification to the first waiter in the queue.
+    // Waiters in the queue will acquire the lock in strict FIFO order:
+    // Only when the predecessor has acquired the lock can the successor
+    // be waken up to compete for the mutex.
+    if (head)
+      head->barrier.unlock();
   }
 
-  // Returns 0 on success, -1 on error.
-  int wait(Mutex *m);
-  void notify_one();
-  void broadcast();
+public:
+  LIBC_INLINE void notify_one() { notify(1); }
+  LIBC_INLINE void broadcast() { notify(cpp::numeric_limits<size_t>::max()); }
+};
+
+class SharedCndVar {
----------------
michaelrj-google wrote:

why are there two separate, similar CndVar classes? Would it make sense to have one class that can handle both the private and shared cases? The flag could be a template argument to avoid needing a runtime lookup if you already know which type.

https://github.com/llvm/llvm-project/pull/192748


More information about the libc-commits mailing list