[libc-commits] [libc] [libc] Introduce MCS-based Flat-Combining Lambda Lock (PR #101916)

Schrodinger ZHU Yifan via libc-commits libc-commits at lists.llvm.org
Tue Aug 6 21:09:00 PDT 2024


https://github.com/SchrodingerZhu updated https://github.com/llvm/llvm-project/pull/101916

>From 64984428904aeb94a9d103b48c922578a38f82ed Mon Sep 17 00:00:00 2001
From: Schrodinger ZHU Yifan <i at zhuyi.fan>
Date: Sun, 4 Aug 2024 14:37:00 -0700
Subject: [PATCH 1/3] [libc] add a MCS-based flat-combining lambda lock

---
 libc/src/__support/CPP/atomic.h               |   3 +-
 libc/src/__support/threads/CMakeLists.txt     |  16 ++
 libc/src/__support/threads/lambda_lock.h      | 239 ++++++++++++++++++
 .../src/__support/threads/CMakeLists.txt      |  13 +
 .../__support/threads/lambda_lock_test.cpp    |  66 +++++
 5 files changed, 336 insertions(+), 1 deletion(-)
 create mode 100644 libc/src/__support/threads/lambda_lock.h
 create mode 100644 libc/test/integration/src/__support/threads/lambda_lock_test.cpp

diff --git a/libc/src/__support/CPP/atomic.h b/libc/src/__support/CPP/atomic.h
index 72e7f2adde6a43..58c9ec03921ed9 100644
--- a/libc/src/__support/CPP/atomic.h
+++ b/libc/src/__support/CPP/atomic.h
@@ -41,7 +41,8 @@ enum class MemoryScope : int {
 
 template <typename T> struct Atomic {
   // For now, we will restrict to only arithmetic types.
-  static_assert(is_arithmetic_v<T>, "Only arithmetic types can be atomic.");
+  static_assert(is_arithmetic_v<T> || is_pointer_v<T>,
+                "Only arithmetic types or pointer types can be atomic.");
 
 private:
   // The value stored should be appropriately aligned so that
diff --git a/libc/src/__support/threads/CMakeLists.txt b/libc/src/__support/threads/CMakeLists.txt
index bd49bbb5ad2fe7..632cb49d4ef7e8 100644
--- a/libc/src/__support/threads/CMakeLists.txt
+++ b/libc/src/__support/threads/CMakeLists.txt
@@ -105,3 +105,19 @@ add_header_library(
     libc.hdr.types.pid_t
     ${identifier_dependency_on_thread}
 )
+
+if(TARGET libc.src.__support.threads.${LIBC_TARGET_OS}.futex_utils)
+  add_header_library(
+    lambda_lock
+    HDRS
+      lambda_lock
+    DEPENDS
+      .sleep
+      .spin_lock
+      libc.src.__support.CPP.atomic
+      libc.src.__support.CPP.optional
+      libc.src.__support.CPP.limits
+      libc.src.__support.libc_assert
+      libc.src.__support.threads.${LIBC_TARGET_OS}.futex_utils
+  )
+endif()
diff --git a/libc/src/__support/threads/lambda_lock.h b/libc/src/__support/threads/lambda_lock.h
new file mode 100644
index 00000000000000..a6428904de08d8
--- /dev/null
+++ b/libc/src/__support/threads/lambda_lock.h
@@ -0,0 +1,239 @@
+//===-- MCS-based Flat-Combining Lambda Lock --------------------*- C++ -*-===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef LLVM_LIBC_SRC_SUPPORT_THREADS_LAMBDA_LOCK_H
+#define LLVM_LIBC_SRC_SUPPORT_THREADS_LAMBDA_LOCK_H
+
+#include "src/__support/CPP/atomic.h"
+#include "src/__support/CPP/limits.h"
+#include "src/__support/common.h"
+#include "src/__support/libc_assert.h"
+#include "src/__support/macros/attributes.h"
+#include "src/__support/threads/linux/futex_utils.h"
+#include "src/__support/threads/linux/futex_word.h"
+#include "src/__support/threads/sleep.h"
+#include "src/__support/threads/spin_lock.h"
+
+namespace LIBC_NAMESPACE_DECL {
+// On stack lock node header for MCS-based flat combining lock.
+class RawLambdaLockHeader {
+  // Status of the futex.
+  Futex status;
+  // Next pointer.
+  cpp::Atomic<RawLambdaLockHeader *> next;
+  // Lambda function to execute.
+  void (*lambda)(RawLambdaLockHeader &);
+
+  // Possible values for the status:
+  // - WAITING: the task asscociated with the node is not finished.
+  // - DONE: the task associated with the node is finished.
+  // - COMBINING: the task is not finished and target thread is the combiner.
+  // - SLEEPING: the task is not finished and target thread is sleeping.
+  LIBC_INLINE_VAR static constexpr FutexWordType WAITING = 0;
+  LIBC_INLINE_VAR static constexpr FutexWordType DONE = 1;
+  LIBC_INLINE_VAR static constexpr FutexWordType COMBINING = 2;
+  LIBC_INLINE_VAR static constexpr FutexWordType SLEEPING = 3;
+
+  friend class RawLambdaLock;
+
+public:
+  LIBC_INLINE constexpr RawLambdaLockHeader(
+      void (*lambda)(RawLambdaLockHeader &))
+      : status(WAITING), next(nullptr), lambda(lambda) {}
+};
+
+// Helper to maintain the lock ownership.
+class RawLambdaLockCombinerToken {
+  bool handed_over;
+  SpinLock &lock;
+
+  LIBC_INLINE RawLambdaLockCombinerToken(SpinLock &lock)
+      : handed_over(false), lock(lock) {}
+
+public:
+  // Notice that the lock ownership is "announced" rather than "acquired".
+  LIBC_INLINE static RawLambdaLockCombinerToken announce(SpinLock &lock) {
+    LIBC_ASSERT(lock.is_locked());
+    return {lock};
+  }
+
+  LIBC_INLINE
+  RawLambdaLockCombinerToken(const RawLambdaLockCombinerToken &) = delete;
+
+  LIBC_INLINE
+  RawLambdaLockCombinerToken(RawLambdaLockCombinerToken &&that)
+      : handed_over(that.handed_over), lock(that.lock) {
+    that.handed_over = true;
+  };
+
+  // Hand over the lock ownership.
+  LIBC_INLINE void hand_over() { handed_over = true; }
+
+  LIBC_INLINE ~RawLambdaLockCombinerToken() {
+    LIBC_ASSERT(handed_over || lock.is_locked());
+    if (!handed_over)
+      lock.unlock();
+  }
+};
+
+// Global status of the lock.
+class RawLambdaLock {
+  LIBC_INLINE_VAR static constexpr int SPIN_COUNT = 100;
+  // tail of the waiting queue.
+  cpp::Atomic<RawLambdaLockHeader *> tail;
+  // spin lock to protect the queue.
+  SpinLock spin_lock;
+
+public:
+  LIBC_INLINE constexpr RawLambdaLock() : tail(nullptr), spin_lock() {}
+
+  LIBC_INLINE cpp::optional<RawLambdaLockCombinerToken>
+  try_lock_without_queueing() {
+    if (tail.load(cpp::MemoryOrder::RELAXED) == nullptr && spin_lock.try_lock())
+      return RawLambdaLockCombinerToken::announce(spin_lock);
+    return cpp::nullopt;
+  }
+
+  LIBC_INLINE void enqueue(RawLambdaLockHeader &header,
+                           size_t combining_limit) {
+    // Set ourself as the tail and acquire previous tail.
+    RawLambdaLockHeader *prev =
+        tail.exchange(&header, cpp::MemoryOrder::ACQ_REL);
+
+    if (prev) {
+      // If there is a previous waiter, we should stay put until hearing back
+      // from the previous node. To do so, we need to register ourself to the
+      // next pointer of the previous node.
+      prev->next.store(&header, cpp::MemoryOrder::RELEASE);
+      int remaining_spins = SPIN_COUNT;
+      // Do spin polling for certain amount of time.
+      while (remaining_spins > 0) {
+        if (header.status.load(cpp::MemoryOrder::RELAXED) !=
+            RawLambdaLockHeader::WAITING)
+          break;
+        sleep_briefly();
+        remaining_spins--;
+      }
+      // If we used up all spins, we may need to go to sleep.
+      if (remaining_spins == 0) {
+        FutexWordType expected = RawLambdaLockHeader::WAITING;
+        if (header.status.compare_exchange_strong(
+                expected, RawLambdaLockHeader::SLEEPING,
+                cpp::MemoryOrder::ACQ_REL, cpp::MemoryOrder::RELAXED))
+          header.status.wait(RawLambdaLockHeader::SLEEPING);
+      }
+      // We are here if and only if the status is modified by previous waiter.
+      FutexWordType status = header.status.load(cpp::MemoryOrder::ACQUIRE);
+      if (status == RawLambdaLockHeader::DONE)
+        return;
+      LIBC_ASSERT(status == RawLambdaLockHeader::COMBINING &&
+                  spin_lock.is_locked());
+      // We are the combiner: the lock ownership is handed over to us.
+    } else
+      // We are the combiner: but we need to acquire the lock on our own.
+      spin_lock.lock();
+
+    // Announce the lock ownership.
+    RawLambdaLockCombinerToken token =
+        RawLambdaLockCombinerToken::announce(spin_lock);
+
+    RawLambdaLockHeader *current = &header;
+    auto wakeup = [](RawLambdaLockHeader *node, FutexWordType new_status) {
+      if (node->status.exchange(new_status, cpp::MemoryOrder::ACQ_REL) ==
+          RawLambdaLockHeader::SLEEPING)
+        node->status.notify_one();
+    };
+    size_t remaining = combining_limit;
+    while (remaining != 0) {
+      // Execute the lambda function.
+      current->lambda(*current);
+      RawLambdaLockHeader *next = current->next.load(cpp::MemoryOrder::ACQUIRE);
+      // Break the loop if there is no next node.
+      if (!next)
+        break;
+      // Wake up the current node if it is sleeping.
+      wakeup(current, RawLambdaLockHeader::DONE);
+      current = next;
+      remaining--;
+    }
+
+    // We are here either because the combining limit is reached or the last
+    // attempt to get the next node failed.
+
+    // Try close current waiting queue.
+    RawLambdaLockHeader *expected = current;
+    if (tail.compare_exchange_strong(expected, nullptr,
+                                     cpp::MemoryOrder::ACQ_REL,
+                                     cpp::MemoryOrder::RELAXED)) {
+      // The queue is closed. We are good to go, before which we need to
+      // wake up the last node if it is sleeping.
+      wakeup(current, RawLambdaLockHeader::DONE);
+      return;
+    }
+
+    // We failed to close the queue, handover the combiner role to the next one
+    // in queue.
+    token.hand_over();
+    while (!current->next.load(cpp::MemoryOrder::RELAXED))
+      sleep_briefly();
+    RawLambdaLockHeader *combiner =
+        current->next.load(cpp::MemoryOrder::ACQUIRE);
+    wakeup(combiner, RawLambdaLockHeader::COMBINING);
+    wakeup(current, RawLambdaLockHeader::DONE);
+  }
+};
+
+template <typename T> class LambdaLock {
+  RawLambdaLock inner;
+  T data;
+
+  template <typename F>
+  [[gnu::cold]] LIBC_INLINE void enqueue_slow(F &&lambda,
+                                              size_t combining_limit) {
+    // Create the waiting node on stack and enqueue it.
+    struct Node {
+      RawLambdaLockHeader header;
+      F lambda;
+      T &data;
+    };
+    auto closure = [](RawLambdaLockHeader &header) {
+      Node *node = reinterpret_cast<Node *>(&header);
+      node->lambda(node->data);
+    };
+    Node node{{closure}, cpp::forward<F>(lambda), data};
+    inner.enqueue(node.header, combining_limit);
+  }
+
+public:
+  template <typename... U>
+  LIBC_INLINE constexpr LambdaLock(U &&...args)
+      : inner(), data(cpp::forward<U>(args)...) {}
+  template <typename F>
+  LIBC_INLINE void
+  enqueue(F &&lambda,
+          size_t combining_limit = cpp::numeric_limits<size_t>::max()) {
+    // First try to lock without queueing. In this situation, we do not even
+    // register ourself to the queue.
+    // The futex sleeping happens only when prev node exists so the local
+    // execution of the lambda function in this case won't put all other threads
+    // to sleep. Hence, if contention really happens, there will be a failed one
+    // to go into the slow path and become the real combiner.
+    if (cpp::optional<RawLambdaLockCombinerToken> token =
+            inner.try_lock_without_queueing()) {
+      lambda(data);
+      return;
+    }
+    enqueue_slow(cpp::forward<F>(lambda), combining_limit);
+  }
+  // This Should only be invoked in single-threaded context or other known
+  // race-free context.
+  T &get_unsafe() { return data; }
+};
+} // namespace LIBC_NAMESPACE_DECL
+
+#endif // LLVM_LIBC_SRC_SUPPORT_THREADS_LAMBDA_LOCK_H
diff --git a/libc/test/integration/src/__support/threads/CMakeLists.txt b/libc/test/integration/src/__support/threads/CMakeLists.txt
index 5a12d28ada3fda..10f9b6545de589 100644
--- a/libc/test/integration/src/__support/threads/CMakeLists.txt
+++ b/libc/test/integration/src/__support/threads/CMakeLists.txt
@@ -25,3 +25,16 @@ add_integration_test(
   DEPENDS
     libc.src.__support.threads.thread
 )
+
+add_integration_test(
+  lambda_lock_test
+  SUITE
+    libc-support-threads-integration-tests
+  SRCS
+    lambda_lock_test.cpp
+  DEPENDS
+    libc.src.__support.threads.lambda_lock
+    libc.src.pthread.pthread_create
+    libc.src.pthread.pthread_join
+    libc.src.__support.CPP.stringstream
+)
diff --git a/libc/test/integration/src/__support/threads/lambda_lock_test.cpp b/libc/test/integration/src/__support/threads/lambda_lock_test.cpp
new file mode 100644
index 00000000000000..f94949ac0a9c67
--- /dev/null
+++ b/libc/test/integration/src/__support/threads/lambda_lock_test.cpp
@@ -0,0 +1,66 @@
+//===-- Test of Lambda Locks ----------------------------------------------===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+
+#include "src/__support/CPP/stringstream.h"
+#include "src/__support/threads/lambda_lock.h"
+#include "src/pthread/pthread_create.h"
+#include "src/pthread/pthread_join.h"
+#include "test/IntegrationTest/test.h"
+
+void simple_addition() {
+  LIBC_NAMESPACE::LambdaLock<int> sum{0};
+  pthread_t threads[10];
+  for (int i = 0; i < 10; ++i) {
+    LIBC_NAMESPACE::pthread_create(
+        &threads[i], nullptr,
+        [](void *arg) -> void * {
+          auto *lock = static_cast<LIBC_NAMESPACE::LambdaLock<int> *>(arg);
+          for (int j = 0; j < 1000; ++j) {
+            lock->enqueue([j](int &sum) { sum += j; });
+          }
+          return nullptr;
+        },
+        &sum);
+  }
+  for (int i = 0; i < 10; ++i) {
+    LIBC_NAMESPACE::pthread_join(threads[i], nullptr);
+  }
+  ASSERT_EQ(sum.get_unsafe(), 4995000);
+}
+
+void string_concat() {
+  static char buffer[10001];
+  LIBC_NAMESPACE::LambdaLock<LIBC_NAMESPACE::cpp::StringStream> shared{buffer};
+  pthread_t threads[10];
+  for (int i = 0; i < 10; ++i) {
+    LIBC_NAMESPACE::pthread_create(
+        &threads[i], nullptr,
+        [](void *arg) -> void * {
+          auto *lock = static_cast<decltype(shared) *>(arg);
+          for (int j = 0; j < 100; ++j) {
+            for (int c = 0; c < 10; ++c) {
+              lock->enqueue(
+                  [c](LIBC_NAMESPACE::cpp::StringStream &data) { data << c; });
+            }
+          }
+          return nullptr;
+        },
+        &shared);
+  }
+  for (int i = 0; i < 10; ++i) {
+    LIBC_NAMESPACE::pthread_join(threads[i], nullptr);
+  }
+  int x = shared.get_unsafe().str().size();
+  ASSERT_EQ(x, 10000);
+}
+
+TEST_MAIN() {
+  simple_addition();
+  string_concat();
+  return 0;
+}

>From db8651eb83d7e9404b18e949d5307935ac8ff60a Mon Sep 17 00:00:00 2001
From: Schrodinger ZHU Yifan <i at zhuyi.fan>
Date: Sun, 4 Aug 2024 14:51:19 -0700
Subject: [PATCH 2/3] [libc] enhance test

---
 .../__support/threads/lambda_lock_test.cpp    | 27 ++++++++++---------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git a/libc/test/integration/src/__support/threads/lambda_lock_test.cpp b/libc/test/integration/src/__support/threads/lambda_lock_test.cpp
index f94949ac0a9c67..db9860fa3e9462 100644
--- a/libc/test/integration/src/__support/threads/lambda_lock_test.cpp
+++ b/libc/test/integration/src/__support/threads/lambda_lock_test.cpp
@@ -20,16 +20,15 @@ void simple_addition() {
         &threads[i], nullptr,
         [](void *arg) -> void * {
           auto *lock = static_cast<LIBC_NAMESPACE::LambdaLock<int> *>(arg);
-          for (int j = 0; j < 1000; ++j) {
+          for (int j = 0; j < 1000; ++j)
             lock->enqueue([j](int &sum) { sum += j; });
-          }
           return nullptr;
         },
         &sum);
   }
-  for (int i = 0; i < 10; ++i) {
+  for (int i = 0; i < 10; ++i)
     LIBC_NAMESPACE::pthread_join(threads[i], nullptr);
-  }
+
   ASSERT_EQ(sum.get_unsafe(), 4995000);
 }
 
@@ -42,21 +41,25 @@ void string_concat() {
         &threads[i], nullptr,
         [](void *arg) -> void * {
           auto *lock = static_cast<decltype(shared) *>(arg);
-          for (int j = 0; j < 100; ++j) {
-            for (int c = 0; c < 10; ++c) {
+          for (int j = 0; j < 100; ++j)
+            for (int c = 0; c < 10; ++c)
               lock->enqueue(
                   [c](LIBC_NAMESPACE::cpp::StringStream &data) { data << c; });
-            }
-          }
           return nullptr;
         },
         &shared);
   }
-  for (int i = 0; i < 10; ++i) {
+  for (int i = 0; i < 10; ++i)
     LIBC_NAMESPACE::pthread_join(threads[i], nullptr);
-  }
-  int x = shared.get_unsafe().str().size();
-  ASSERT_EQ(x, 10000);
+
+  LIBC_NAMESPACE::cpp::string_view x = shared.get_unsafe().str();
+  ASSERT_EQ(x.size(), 10000);
+  int count[10] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
+  for (char c : x)
+    count[c - '0']++;
+
+  for (int i = 0; i < 10; ++i)
+    ASSERT_EQ(count[i], 1000);
 }
 
 TEST_MAIN() {

>From 5bf65cf1c60649c099e9029a163961609e38fd70 Mon Sep 17 00:00:00 2001
From: Schrodinger ZHU Yifan <i at zhuyi.fan>
Date: Sun, 4 Aug 2024 18:21:41 -0700
Subject: [PATCH 3/3] [libc] add comment

---
 libc/src/__support/threads/lambda_lock.h | 83 ++++++++++++++++++++++++
 1 file changed, 83 insertions(+)

diff --git a/libc/src/__support/threads/lambda_lock.h b/libc/src/__support/threads/lambda_lock.h
index a6428904de08d8..650d1d7eda9537 100644
--- a/libc/src/__support/threads/lambda_lock.h
+++ b/libc/src/__support/threads/lambda_lock.h
@@ -19,6 +19,89 @@
 #include "src/__support/threads/sleep.h"
 #include "src/__support/threads/spin_lock.h"
 
+// This file contains an implementation of a flat combining lock based on MCS
+// queue.
+//
+// What is an MCS queue?
+// =====================
+// An MCS queue is a queue-based lock that is designed to be cache-friendly.
+// Each thread only spin on its local node, with minimal traffic across threads.
+// The thread-local node is not nessarily a node inside TLS storage. Rather, the
+// node can be allocated on stack. It is assumed that, during the lifespan of
+// the node, the thread is waiting in its locking routine, thus the stack space
+// is always valid.
+//
+// --------        ----------------------      ----------------------
+// | Tail |        |  Thread 1 (Stack)  |      |  Thread 2 (Stack)  |
+// --------        ----------------------      ----------------------
+//    |            |  ----------------  | next |                    |
+//    *-------------> |    Node N    | <---*   |     ..........     |
+//                 |  ----------------  |  |   |   ---------------  | next
+//                 |     ..........     |  *------ |  Node N-1   | <---- ....
+//                 |                    |      |   ---------------  |
+//                 ----------------------      ----------------------
+//
+// The queue is processed in a FIFO manner. When submitting a task to the lock
+// queue, the thread swaps its own node with the global tail pointer to register
+// itself to the queue. The thread then waits until its own turn. This is
+// usually signaled by the previous node in the queue. Once the thread receives
+// the signal, it executes the task and then signals the next node in the queue.
+//
+// What is a flat combining lock?
+// ==============================
+// Normally, a Mutex maintains the exclusivity using a lock word. Threads
+// polls/parks on that lock word until it finds an opportunity to acquire the
+// lock by a successful posting to the lock word. Then the thread go ahead to do
+// its task on the shared data. In heavy contention, however, the shared data is
+// bouncing among threads, causing a lot of extra traffic.
+// Flat combining is a technique to resolve such problem. The general idea is
+// that when a thread acquires the lock and finished its critical section, its
+// cache has the "ownership" of the shared data. Instead of passing such
+// ownership to the next thread, the current thread can continue to execute the
+// the crtitical section on behalf of the next thread and signal the next thread
+// once the task is done.
+//
+// How to achieve flat combining with MCS queue?
+// =============================================
+// We can use the queue itself specifies the job waiting to be done by adding
+// additional fields to the node such as a function pointer to the critical
+// section. When a thread acquires the lock on the head of the queue, it begins
+// to follow the pointers among the nodes and execute their critical sections.
+// The exclusivity is garanteed by the fact that the combiner thread always
+// holds a global lock. Such lock is passed to the next combiner if needed.
+// Instead of let each individual thread to propagate the finishing signal, the
+// combiner just notify each waiting thread onces their critical section is
+// executed.
+//
+// Pros and Cons
+// =============
+// Pros:
+// - Flat-combing to reduce coherence traffic.
+// - Threads are served in FIFO order.
+// Cons:
+// - Access to thread-local data needs to be carefully managed (e.g. by passing
+// pointers to the desired TLS slot if needed).
+// - The combiner thread may execute mutiple critical sections in a row. One may
+// want limit the combination sometimes.
+// - Stack corruption may corrupt the whole waiting queue. Thread cancellation
+// should not be allowed during flat combining.
+//
+// Why does it matter?
+// ===================
+// Its pros certainly matter. Flat-combining is new synchronization paradigm.
+// Many systems actually want to introduce such mechanism but hindered by
+// existing codebase and many works are done to progressive migration. [1] shows
+// that flat combining can significantly improve the performance inside Linux
+// kernel. SnMalloc recently introduced flat combining to speed up its
+// initialization process [2]. As llvm-libc is still relatively new, we have
+// opportunities to embrace flat-combining from the beginning rather migrating
+// to it later on.
+//
+// [1]: Ship your Critical Section, Not Your Data: Enabling Transparent
+// Delegation with TCLOCKS (OSDI '23)
+// https://www.usenix.org/conference/osdi23/presentation/gupta)
+// [2]: SnMalloc: Implement MCS Combining lock
+// https://github.com/microsoft/snmalloc/commit/6af38acd94d401313b7237bb7c2df5e662e135cf
 namespace LIBC_NAMESPACE_DECL {
 // On stack lock node header for MCS-based flat combining lock.
 class RawLambdaLockHeader {



More information about the libc-commits mailing list