[libcxx-commits] [libcxx] [libc++] `stop_token` uses `mutex` (PR #69600)

via libcxx-commits libcxx-commits at lists.llvm.org
Thu Oct 19 05:42:51 PDT 2023


llvmbot wrote:


<!--LLVM PR SUMMARY COMMENT-->

@llvm/pr-subscribers-libcxx

Author: Hui (huixie90)

<details>
<summary>Changes</summary>

- [libc++][test] add more benchmarks for `stop_token`
- [libc++] use `mutex` in the `stop_token`
- remove unused


---
Full diff: https://github.com/llvm/llvm-project/pull/69600.diff


2 Files Affected:

- (modified) libcxx/benchmarks/stop_token.bench.cpp (+78-3) 
- (modified) libcxx/include/__stop_token/stop_state.h (+60-46) 


``````````diff
diff --git a/libcxx/benchmarks/stop_token.bench.cpp b/libcxx/benchmarks/stop_token.bench.cpp
index 293d55ed82a08cf..e059a1166af16bd 100644
--- a/libcxx/benchmarks/stop_token.bench.cpp
+++ b/libcxx/benchmarks/stop_token.bench.cpp
@@ -14,6 +14,81 @@
 
 using namespace std::chrono_literals;
 
+// We have a single thread created by std::jthread consuming the stop_token:
+// polling for stop_requested.
+void BM_stop_token_single_thread_polling_stop_requested(benchmark::State& state) {
+  auto thread_func = [&](std::stop_token st, std::atomic<std::uint64_t>* loop_count) {
+    while (!st.stop_requested()) {
+      // doing some work
+      loop_count->fetch_add(1, std::memory_order_relaxed);
+    }
+  };
+
+  std::atomic<std::uint64_t> loop_count(0);
+  std::uint64_t total_loop_test_param = state.range(0);
+
+  auto thread = support::make_test_jthread(thread_func, &loop_count);
+
+  for (auto _ : state) {
+    auto start_total = loop_count.load(std::memory_order_relaxed);
+
+    while (loop_count.load(std::memory_order_relaxed) - start_total < total_loop_test_param) {
+      std::this_thread::yield();
+    }
+  }
+}
+
+BENCHMARK(BM_stop_token_single_thread_polling_stop_requested)->RangeMultiplier(2)->Range(1 << 10, 1 << 24);
+
+// We have multiple threads polling for stop_requested of the same stop_token.
+void BM_stop_token_multi_thread_polling_stop_requested(benchmark::State& state) {
+  std::atomic<bool> start{false};
+
+  auto thread_func = [&start](std::atomic<std::uint64_t>* loop_count, std::stop_token st) {
+    start.wait(false);
+    while (!st.stop_requested()) {
+      // doing some work
+      loop_count->fetch_add(1, std::memory_order_relaxed);
+    }
+  };
+
+  constexpr size_t thread_count = 20;
+
+  std::uint64_t total_loop_test_param = state.range(0);
+
+  std::vector<std::atomic<std::uint64_t>> loop_counts(thread_count);
+  std::stop_source ss;
+  std::vector<std::jthread> threads;
+  threads.reserve(thread_count);
+
+  for (size_t i = 0; i < thread_count; ++i) {
+    threads.emplace_back(support::make_test_jthread(thread_func, &loop_counts[i], ss.get_token()));
+  }
+
+  auto get_total_loop = [&loop_counts] {
+    std::uint64_t total = 0;
+    for (const auto& loop_count : loop_counts) {
+      total += loop_count.load(std::memory_order_relaxed);
+    }
+    return total;
+  };
+
+  start = true;
+  start.notify_all();
+
+  for (auto _ : state) {
+    auto start_total = get_total_loop();
+
+    while (get_total_loop() - start_total < total_loop_test_param) {
+      std::this_thread::yield();
+    }
+  }
+
+  ss.request_stop();
+}
+
+BENCHMARK(BM_stop_token_multi_thread_polling_stop_requested)->RangeMultiplier(2)->Range(1 << 10, 1 << 24);
+
 // We have a single thread created by std::jthread consuming the stop_token:
 // registering/deregistering callbacks, one at a time.
 void BM_stop_token_single_thread_reg_unreg_callback(benchmark::State& state) {
@@ -59,11 +134,11 @@ void BM_stop_token_async_reg_unreg_callback(benchmark::State& state) {
   std::atomic<bool> start{false};
 
   std::uint64_t total_reg_test_param = state.range(0);
+  std::vector<std::atomic<std::uint64_t>> reg_counts(thread_count);
 
   std::stop_source ss;
   std::vector<std::jthread> threads;
   threads.reserve(thread_count);
-  std::vector<std::atomic<std::uint64_t>> reg_counts(thread_count);
 
   auto thread_func = [&start](std::atomic<std::uint64_t>* count, std::stop_token st) {
     std::vector<std::optional<std::stop_callback<dummy_stop_callback>>> cbs(concurrent_request_count);
@@ -84,8 +159,8 @@ void BM_stop_token_async_reg_unreg_callback(benchmark::State& state) {
 
   auto get_total_reg = [&] {
     std::uint64_t total = 0;
-    for (const auto& reg_counts : reg_counts) {
-      total += reg_counts.load(std::memory_order_relaxed);
+    for (const auto& reg_count : reg_counts) {
+      total += reg_count.load(std::memory_order_relaxed);
     }
     return total;
   };
diff --git a/libcxx/include/__stop_token/stop_state.h b/libcxx/include/__stop_token/stop_state.h
index 462aa73952b84f9..f3fca6554b378a7 100644
--- a/libcxx/include/__stop_token/stop_state.h
+++ b/libcxx/include/__stop_token/stop_state.h
@@ -12,7 +12,7 @@
 
 #include <__availability>
 #include <__config>
-#include <__stop_token/atomic_unique_lock.h>
+#include <__mutex/mutex.h>
 #include <__stop_token/intrusive_list_view.h>
 #include <__thread/id.h>
 #include <atomic>
@@ -37,10 +37,51 @@ struct __stop_callback_base : __intrusive_node_base<__stop_callback_base> {
   bool* __destroyed_        = nullptr;
 };
 
+// stop_token needs to lock with noexcept. mutex::lock can throw.
+// wrap it with a while loop and catch all exceptions
+class __nothrow_mutex_lock {
+  std::mutex& __mutex_;
+  bool __is_locked_;
+
+public:
+  _LIBCPP_HIDE_FROM_ABI explicit __nothrow_mutex_lock(std::mutex& __mutex) noexcept
+      : __mutex_(__mutex), __is_locked_(true) {
+    __lock();
+  }
+
+  __nothrow_mutex_lock(const __nothrow_mutex_lock&)            = delete;
+  __nothrow_mutex_lock(__nothrow_mutex_lock&&)                 = delete;
+  __nothrow_mutex_lock& operator=(const __nothrow_mutex_lock&) = delete;
+  __nothrow_mutex_lock& operator=(__nothrow_mutex_lock&&)      = delete;
+
+  _LIBCPP_HIDE_FROM_ABI ~__nothrow_mutex_lock() {
+    if (__is_locked_) {
+      __unlock();
+    }
+  }
+
+  _LIBCPP_HIDE_FROM_ABI bool __owns_lock() const noexcept { return __is_locked_; }
+
+  _LIBCPP_HIDE_FROM_ABI void __lock() noexcept {
+    while (true) {
+      try {
+        __mutex_.lock();
+        break;
+      } catch (...) {
+      }
+    }
+    __is_locked_ = true;
+  }
+
+  _LIBCPP_HIDE_FROM_ABI void __unlock() noexcept {
+    __mutex_.unlock(); // throws nothing
+    __is_locked_ = false;
+  }
+};
+
 class __stop_state {
   static constexpr uint32_t __stop_requested_bit        = 1;
-  static constexpr uint32_t __callback_list_locked_bit  = 1 << 1;
-  static constexpr uint32_t __stop_source_counter_shift = 2;
+  static constexpr uint32_t __stop_source_counter_shift = 1;
 
   // The "stop_source counter" is not used for lifetime reference counting.
   // When the number of stop_source reaches 0, the remaining stop_tokens's
@@ -49,9 +90,10 @@ class __stop_state {
   // The "callback list locked" bit implements the atomic_unique_lock to
   // guard the operations on the callback list
   //
-  //       31 - 2          |  1                   |    0           |
-  //  stop_source counter  | callback list locked | stop_requested |
+  //       31 - 1          |    0           |
+  //  stop_source counter  | stop_requested |
   atomic<uint32_t> __state_ = 0;
+  std::mutex __mutex_;
 
   // Reference count for stop_token + stop_callback + stop_source
   // When the counter reaches zero, the state is destroyed
@@ -59,7 +101,7 @@ class __stop_state {
   atomic<uint32_t> __ref_count_ = 0;
 
   using __state_t            = uint32_t;
-  using __callback_list_lock = __atomic_unique_lock<__state_t, __callback_list_locked_bit>;
+  using __callback_list_lock = __nothrow_mutex_lock;
   using __callback_list      = __intrusive_list_view<__stop_callback_base>;
 
   __callback_list __callback_list_;
@@ -101,8 +143,9 @@ class __stop_state {
   }
 
   _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __request_stop() noexcept {
-    auto __cb_list_lock = __try_lock_for_request_stop();
-    if (!__cb_list_lock.__owns_lock()) {
+    __callback_list_lock __cb_list_lock(__mutex_);
+    auto __old = __state_.fetch_or(__stop_requested_bit, std::memory_order_release);
+    if ((__old & __stop_requested_bit) == __stop_requested_bit) {
       return false;
     }
     __requesting_thread_ = this_thread::get_id();
@@ -138,20 +181,15 @@ class __stop_state {
   }
 
   _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI bool __add_callback(__stop_callback_base* __cb) noexcept {
-    // If it is already stop_requested. Do not try to request it again.
-    const auto __give_up_trying_to_lock_condition = [__cb](__state_t __state) {
-      if ((__state & __stop_requested_bit) != 0) {
-        // already stop requested, synchronously run the callback and no need to lock the list again
-        __cb->__invoke();
-        return true;
-      }
-      // no stop source. no need to lock the list to add the callback as it can never be invoked
-      return (__state >> __stop_source_counter_shift) == 0;
-    };
-
-    __callback_list_lock __cb_list_lock(__state_, __give_up_trying_to_lock_condition);
+    __callback_list_lock __cb_list_lock(__mutex_);
+    auto __state = __state_.load(std::memory_order_acquire);
+    if ((__state & __stop_requested_bit) != 0) {
+      // already stop requested, synchronously run the callback and no need to lock the list again
+      __cb->__invoke();
+      return false;
+    }
 
-    if (!__cb_list_lock.__owns_lock()) {
+    if ((__state >> __stop_source_counter_shift) == 0) {
       return false;
     }
 
@@ -165,7 +203,7 @@ class __stop_state {
 
   // called by the destructor of stop_callback
   _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI void __remove_callback(__stop_callback_base* __cb) noexcept {
-    __callback_list_lock __cb_list_lock(__state_);
+    __callback_list_lock __cb_list_lock(__mutex_);
 
     // under below condition, the request_stop call just popped __cb from the list and could execute it now
     bool __potentially_executing_now = __cb->__prev_ == nullptr && !__callback_list_.__is_head(__cb);
@@ -191,30 +229,6 @@ class __stop_state {
     }
   }
 
-private:
-  _LIBCPP_AVAILABILITY_SYNC _LIBCPP_HIDE_FROM_ABI __callback_list_lock __try_lock_for_request_stop() noexcept {
-    // If it is already stop_requested, do not try to request stop or lock the list again.
-    const auto __lock_fail_condition = [](__state_t __state) { return (__state & __stop_requested_bit) != 0; };
-
-    // set locked and requested bit at the same time
-    const auto __after_lock_state = [](__state_t __state) {
-      return __state | __callback_list_locked_bit | __stop_requested_bit;
-    };
-
-    // acq because [thread.stoptoken.intro] Registration of a callback synchronizes with the invocation of that
-    //     callback. We are going to invoke the callback after getting the lock, acquire so that we can see the
-    //     registration of a callback (and other writes that happens-before the add_callback)
-    //     Note: the rel (unlock) in the add_callback syncs with this acq
-    // rel because [thread.stoptoken.intro] A call to request_stop that returns true synchronizes with a call
-    //     to stop_requested on an associated stop_token or stop_source object that returns true.
-    //     We need to make sure that all writes (including user code) before request_stop will be made visible
-    //     to the threads that waiting for `stop_requested == true`
-    //     Note: this rel syncs with the acq in `stop_requested`
-    const auto __locked_ordering = std::memory_order_acq_rel;
-
-    return __callback_list_lock(__state_, __lock_fail_condition, __after_lock_state, __locked_ordering);
-  }
-
   template <class _Tp>
   friend struct __intrusive_shared_ptr_traits;
 };

``````````

</details>


https://github.com/llvm/llvm-project/pull/69600


More information about the libcxx-commits mailing list