[libc-commits] [libc] [libc][arc4random][patch 2/n] add ABA protected MPMCStack (PR #151361)

via libc-commits libc-commits at lists.llvm.org
Fri Aug 8 06:17:43 PDT 2025


llvmbot wrote:


<!--LLVM PR SUMMARY COMMENT-->

@llvm/pr-subscribers-libc

Author: Schrodinger ZHU Yifan (SchrodingerZhu)

<details>
<summary>Changes</summary>



---
Full diff: https://github.com/llvm/llvm-project/pull/151361.diff


5 Files Affected:

- (modified) libc/src/__support/CMakeLists.txt (+24) 
- (added) libc/src/__support/aba_ptr.h (+83) 
- (added) libc/src/__support/mpmc_stack.h (+107) 
- (modified) libc/test/integration/src/__support/CMakeLists.txt (+15) 
- (added) libc/test/integration/src/__support/mpmc_stack_test.cpp (+119) 


``````````diff
diff --git a/libc/src/__support/CMakeLists.txt b/libc/src/__support/CMakeLists.txt
index 2196d9e23bba7..c9d89cf6fc286 100644
--- a/libc/src/__support/CMakeLists.txt
+++ b/libc/src/__support/CMakeLists.txt
@@ -398,6 +398,30 @@ add_header_library(
     libc.src.__support.macros.attributes
 )
 
+add_header_library(
+  aba_ptr
+  HDRS
+    aba_ptr.h
+  DEPENDS
+    libc.hdr.types.size_t
+    libc.src.__support.common
+    libc.src.__support.threads.sleep
+)
+
+add_header_library(
+  mpmc_stack
+  HDRS
+    mpmc_stack.h
+  DEPENDS
+    libc.src.__support.aba_ptr
+    libc.src.__support.common
+    libc.src.__support.CPP.atomic
+    libc.src.__support.CPP.new
+    libc.src.__support.CPP.optional
+    libc.src.__support.CPP.type_traits
+)
+
+
 add_subdirectory(FPUtil)
 add_subdirectory(OSUtil)
 add_subdirectory(StringUtil)
diff --git a/libc/src/__support/aba_ptr.h b/libc/src/__support/aba_ptr.h
new file mode 100644
index 0000000000000..632cc466c295b
--- /dev/null
+++ b/libc/src/__support/aba_ptr.h
@@ -0,0 +1,83 @@
+//===-- Transactional Ptr for ABA prevention --------------------*- C++ -*-===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef LLVM_LIBC_SRC___SUPPORT_TAGGED_POINTER_H
+#define LLVM_LIBC_SRC___SUPPORT_TAGGED_POINTER_H
+
+#include "hdr/types/size_t.h"
+#include "src/__support/common.h"
+#include "src/__support/threads/sleep.h"
+
+#ifdef __GCC_HAVE_SYNC_COMPARE_AND_SWAP_16
+#define LIBC_ABA_PTR_IS_ATOMIC true
+#else
+#define LIBC_ABA_PTR_IS_ATOMIC false
+#endif
+
+namespace LIBC_NAMESPACE_DECL {
+
+template <class T, bool IsAtomic> struct AbaPtrImpl {
+  union Impl {
+    struct alignas(2 * alignof(void *)) Atomic {
+      T *ptr;
+      size_t tag;
+    } atomic;
+    struct Mutex {
+      T *ptr;
+      bool locked;
+    } mutex;
+  } impl;
+
+  LIBC_INLINE constexpr AbaPtrImpl(T *ptr)
+      : impl(IsAtomic ? Impl{.atomic{ptr, 0}} : Impl{.mutex{ptr, false}}) {}
+
+  /// User must guarantee that operation is redoable.
+  template <class Op> LIBC_INLINE void transaction(Op &&op) {
+    if constexpr (IsAtomic) {
+      for (;;) {
+        typename Impl::Atomic snapshot, next;
+        __atomic_load(&impl.atomic, &snapshot, __ATOMIC_RELAXED);
+        next.ptr = op(snapshot.ptr);
+        // Wrapping add for unsigned integers.
+        next.tag = snapshot.tag + 1;
+        if (__atomic_compare_exchange(&impl.atomic, &snapshot, &next, true,
+                                      __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
+          return;
+      }
+    } else {
+      // Acquire the lock.
+      while (__atomic_exchange_n(&impl.mutex.locked, true, __ATOMIC_ACQUIRE))
+        while (__atomic_load_n(&impl.mutex.locked, __ATOMIC_RELAXED))
+          LIBC_NAMESPACE::sleep_briefly();
+
+      impl.mutex.ptr = op(impl.mutex.ptr);
+      // Release the lock.
+      __atomic_store_n(&impl.mutex.locked, false, __ATOMIC_RELEASE);
+    }
+  }
+
+  LIBC_INLINE T *get() const {
+    if constexpr (IsAtomic) {
+      // Weak micro-architectures typically reguards simultaneous partial word
+      // loading and full word loading as a race condition. While there are
+      // implementations that uses racy read anyway, we still load the whole
+      // word to avoid any complications.
+      typename Impl::Atomic snapshot;
+      __atomic_load(&impl.atomic, &snapshot, __ATOMIC_RELAXED);
+      return snapshot.ptr;
+    } else {
+      return impl.mutex.ptr;
+    }
+  }
+};
+
+template <class T> using AbaPtr = AbaPtrImpl<T, LIBC_ABA_PTR_IS_ATOMIC>;
+} // namespace LIBC_NAMESPACE_DECL
+
+#undef LIBC_ABA_PTR_IS_ATOMIC
+#endif
diff --git a/libc/src/__support/mpmc_stack.h b/libc/src/__support/mpmc_stack.h
new file mode 100644
index 0000000000000..df235c2c1dfac
--- /dev/null
+++ b/libc/src/__support/mpmc_stack.h
@@ -0,0 +1,107 @@
+//===-- Simple Lock-free MPMC Stack -----------------------------*- C++ -*-===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef LLVM_LIBC_SRC___SUPPORT_MPMC_STACK_H
+#define LLVM_LIBC_SRC___SUPPORT_MPMC_STACK_H
+
+#include "src/__support/CPP/atomic.h"
+#include "src/__support/CPP/new.h"
+#include "src/__support/CPP/optional.h"
+#include "src/__support/aba_ptr.h"
+
+namespace LIBC_NAMESPACE_DECL {
+template <class T> class MPMCStack {
+  struct Node {
+    cpp::Atomic<size_t> visitor;
+    Node *next;
+    T value;
+
+    LIBC_INLINE Node(T val) : visitor(0), next(nullptr), value(val) {}
+  };
+  AbaPtr<Node> head;
+
+public:
+  static_assert(cpp::is_copy_constructible<T>::value,
+                "T must be copy constructible");
+  LIBC_INLINE constexpr MPMCStack() : head(nullptr) {}
+  LIBC_INLINE bool push(T value) {
+    AllocChecker ac;
+    Node *new_node = new (ac) Node(value);
+    if (!ac)
+      return false;
+    head.transaction([new_node](Node *old_head) {
+      new_node->next = old_head;
+      return new_node;
+    });
+    return true;
+  }
+  LIBC_INLINE bool push_all(T values[], size_t count) {
+    struct Guard {
+      size_t count;
+      Node **allocated;
+      LIBC_INLINE Guard(Node *allocated[]) : count(0), allocated(allocated) {}
+      LIBC_INLINE ~Guard() {
+        for (size_t i = 0; i < count; ++i)
+          delete allocated[i];
+      }
+      LIBC_INLINE void add(Node *node) { allocated[count++] = node; }
+      LIBC_INLINE void clear() { count = 0; }
+    };
+    // Variable sized array is a GNU extension.
+    __extension__ Node *allocated[count];
+    {
+      Guard guard(allocated);
+      for (size_t i = 0; i < count; ++i) {
+        AllocChecker ac;
+        Node *new_node = new (ac) Node(values[i]);
+        if (!ac)
+          return false;
+        guard.add(new_node);
+        if (i != 0)
+          new_node->next = allocated[i - 1];
+      }
+      guard.clear();
+    }
+    head.transaction([&allocated, count](Node *old_head) {
+      allocated[0]->next = old_head;
+      return allocated[count - 1];
+    });
+    return true;
+  }
+  LIBC_INLINE cpp::optional<T> pop() {
+    cpp::optional<T> res = cpp::nullopt;
+    Node *node = nullptr;
+    head.transaction([&](Node *current_head) -> Node * {
+      if (!current_head) {
+        res = cpp::nullopt;
+        return nullptr;
+      }
+      node = current_head;
+      node->visitor.fetch_add(1);
+      res = cpp::optional<T>{node->value};
+      Node *next = node->next;
+      node->visitor.fetch_sub(1);
+      return next;
+    });
+    // On a successful transaction, a node is popped by us. So we must delete
+    // it. When we are at here, no one else can acquire
+    // new reference to the node, but we still need to wait until other threads
+    // inside the transaction who may potentially be holding a reference to the
+    // node.
+    if (res) {
+      // Spin until the node is no longer in use.
+      while (node->visitor.load() != 0)
+        LIBC_NAMESPACE::sleep_briefly();
+      delete node;
+    }
+    return res;
+  }
+};
+} // namespace LIBC_NAMESPACE_DECL
+
+#endif
diff --git a/libc/test/integration/src/__support/CMakeLists.txt b/libc/test/integration/src/__support/CMakeLists.txt
index b5b6557e8d689..93f54083f3c00 100644
--- a/libc/test/integration/src/__support/CMakeLists.txt
+++ b/libc/test/integration/src/__support/CMakeLists.txt
@@ -2,3 +2,18 @@ add_subdirectory(threads)
 if(LIBC_TARGET_OS_IS_GPU)
   add_subdirectory(GPU)
 endif()
+
+add_libc_integration_test_suite(libc-support-integration-tests)
+
+add_integration_test(
+  mpmc_stack_test
+  SUITE
+    libc-support-integration-tests
+  SRCS
+    mpmc_stack_test.cpp
+  DEPENDS
+    libc.src.__support.mpmc_stack
+    libc.src.__support.threads.thread
+    libc.src.pthread.pthread_create
+    libc.src.pthread.pthread_join
+)
diff --git a/libc/test/integration/src/__support/mpmc_stack_test.cpp b/libc/test/integration/src/__support/mpmc_stack_test.cpp
new file mode 100644
index 0000000000000..9166a816a74fe
--- /dev/null
+++ b/libc/test/integration/src/__support/mpmc_stack_test.cpp
@@ -0,0 +1,119 @@
+#include "src/__support/CPP/atomic.h"
+#include "src/__support/mpmc_stack.h"
+#include "src/pthread/pthread_create.h"
+#include "src/pthread/pthread_join.h"
+#include "test/IntegrationTest/test.h"
+
+using namespace LIBC_NAMESPACE;
+
+void smoke_test() {
+  MPMCStack<int> stack;
+  for (int i = 0; i <= 100; ++i)
+    if (!stack.push(i))
+      __builtin_trap();
+  for (int i = 100; i >= 0; --i)
+    if (*stack.pop() != i)
+      __builtin_trap();
+  if (stack.pop())
+    __builtin_trap(); // Should be empty now.
+}
+
+void multithread_test() {
+  constexpr static size_t NUM_THREADS = 5;
+  constexpr static size_t NUM_PUSHES = 100;
+  struct State {
+    MPMCStack<size_t> stack;
+    cpp::Atomic<size_t> counter = 0;
+    cpp::Atomic<bool> flags[NUM_PUSHES];
+  } state;
+  pthread_t threads[NUM_THREADS];
+  for (size_t i = 0; i < NUM_THREADS; ++i) {
+    LIBC_NAMESPACE::pthread_create(
+        &threads[i], nullptr,
+        [](void *arg) -> void * {
+          State *state = static_cast<State *>(arg);
+          for (;;) {
+            size_t current = state->counter.fetch_add(1);
+            if (current >= NUM_PUSHES)
+              break;
+            if (!state->stack.push(current))
+              __builtin_trap();
+          }
+          while (auto res = state->stack.pop())
+            state->flags[res.value()].store(true);
+          return nullptr;
+        },
+        &state);
+  }
+  for (pthread_t thread : threads)
+    LIBC_NAMESPACE::pthread_join(thread, nullptr);
+  while (cpp::optional<size_t> res = state.stack.pop())
+    state.flags[res.value()].store(true);
+  for (size_t i = 0; i < NUM_PUSHES; ++i)
+    if (!state.flags[i].load())
+      __builtin_trap();
+}
+
+void multithread_push_all_test() {
+  constexpr static size_t NUM_THREADS = 4;
+  constexpr static size_t BATCH_SIZE = 10;
+  constexpr static size_t NUM_BATCHES = 20;
+  struct State {
+    MPMCStack<size_t> stack;
+    cpp::Atomic<size_t> counter = 0;
+    cpp::Atomic<bool> flags[NUM_THREADS * BATCH_SIZE * NUM_BATCHES];
+  } state;
+  pthread_t threads[NUM_THREADS];
+
+  for (size_t i = 0; i < NUM_THREADS; ++i) {
+    LIBC_NAMESPACE::pthread_create(
+        &threads[i], nullptr,
+        [](void *arg) -> void * {
+          State *state = static_cast<State *>(arg);
+          size_t values[BATCH_SIZE];
+
+          for (size_t batch = 0; batch < NUM_BATCHES; ++batch) {
+            // Prepare batch of values
+            for (size_t j = 0; j < BATCH_SIZE; ++j) {
+              size_t current = state->counter.fetch_add(1);
+              values[j] = current;
+            }
+
+            // Push all values in batch
+            if (!state->stack.push_all(values, BATCH_SIZE))
+              __builtin_trap();
+          }
+
+          // Pop and mark all values
+          while (auto res = state->stack.pop()) {
+            size_t value = res.value();
+            if (value < NUM_THREADS * BATCH_SIZE * NUM_BATCHES)
+              state->flags[value].store(true);
+          }
+          return nullptr;
+        },
+        &state);
+  }
+
+  for (pthread_t thread : threads)
+    LIBC_NAMESPACE::pthread_join(thread, nullptr);
+
+  // Pop any remaining values
+  while (cpp::optional<size_t> res = state.stack.pop()) {
+    size_t value = res.value();
+    if (value < NUM_THREADS * BATCH_SIZE * NUM_BATCHES)
+      state.flags[value].store(true);
+  }
+
+  // Verify all values were processed
+  for (size_t i = 0; i < NUM_THREADS * BATCH_SIZE * NUM_BATCHES; ++i)
+    if (!state.flags[i].load())
+      __builtin_trap();
+}
+
+TEST_MAIN() {
+  smoke_test();
+  multithread_test();
+  multithread_push_all_test();
+  return 0;
+}

``````````

</details>


https://github.com/llvm/llvm-project/pull/151361


More information about the libc-commits mailing list