[Lldb-commits] [lldb] [lldb] Add timed callbacks to the MainLoop class (PR #112895)

Pavel Labath via lldb-commits lldb-commits at lists.llvm.org
Fri Oct 18 05:24:27 PDT 2024


https://github.com/labath created https://github.com/llvm/llvm-project/pull/112895

The motivating use case is being able to "time out" certain operations (by adding a timed callback which will force the termination of the loop), but the design is flexible enough to accomodate other use cases as well (e.g. running a periodic task in the background).

The implementation builds on the existing "pending callback" mechanism, by associating a time point with each callback -- every time the loop wakes up, it runs all of the callbacks which are past their point, and it also makes sure to sleep only until the next callback is scheduled to run.

I've done some renaming as names like "TriggerPendingCallbacks" were no longer accurate -- the function may no longer cause any callbacks to be called (it may just cause the main loop thread to recalculate the time it wants to sleep).

>From cc3a4a94e9e2eb8b4cf763d4c8c477c71776ee93 Mon Sep 17 00:00:00 2001
From: Pavel Labath <pavel at labath.sk>
Date: Wed, 16 Oct 2024 17:34:53 +0200
Subject: [PATCH] [lldb] Add timed callbacks to the MainLoop class

The motivating use case is being able to "time out" certain operations
(by adding a timed callback which will force the termination of the
loop), but the design is flexible enough to accomodate other use cases
as well (e.g. running a periodic task in the background).

The implementation builds on the existing "pending callback" mechanism,
by associating a time point with each callback -- every time the loop
wakes up, it runs all of the callbacks which are past their point, and
it also makes sure to sleep only until the next callback is scheduled to
run.

I've done some renaming as names like "TriggerPendingCallbacks" were
no longer accurate -- the function may no longer cause any callbacks to be
called (it may just cause the main loop thread to recalculate the time
it wants to sleep).
---
 lldb/include/lldb/Host/MainLoopBase.h         | 33 ++++++++--
 lldb/include/lldb/Host/posix/MainLoopPosix.h  |  6 +-
 .../lldb/Host/windows/MainLoopWindows.h       |  4 +-
 lldb/source/Host/common/MainLoopBase.cpp      | 40 ++++++++----
 lldb/source/Host/posix/MainLoopPosix.cpp      | 63 +++++++++++++-----
 lldb/source/Host/windows/MainLoopWindows.cpp  | 37 +++++++----
 lldb/unittests/Host/MainLoopTest.cpp          | 64 +++++++++++++++++--
 7 files changed, 191 insertions(+), 56 deletions(-)

diff --git a/lldb/include/lldb/Host/MainLoopBase.h b/lldb/include/lldb/Host/MainLoopBase.h
index 7365ee7a65ee64..0b79c52a358f4b 100644
--- a/lldb/include/lldb/Host/MainLoopBase.h
+++ b/lldb/include/lldb/Host/MainLoopBase.h
@@ -13,8 +13,10 @@
 #include "lldb/Utility/Status.h"
 #include "llvm/ADT/DenseMap.h"
 #include "llvm/Support/ErrorHandling.h"
+#include <chrono>
 #include <functional>
 #include <mutex>
+#include <queue>
 
 namespace lldb_private {
 
@@ -38,6 +40,9 @@ class MainLoopBase {
   class ReadHandle;
 
 public:
+  using TimePoint = std::chrono::time_point<std::chrono::steady_clock,
+                                            std::chrono::nanoseconds>;
+
   MainLoopBase() : m_terminate_request(false) {}
   virtual ~MainLoopBase() = default;
 
@@ -52,7 +57,19 @@ class MainLoopBase {
   // Add a pending callback that will be executed once after all the pending
   // events are processed. The callback will be executed even if termination
   // was requested.
-  void AddPendingCallback(const Callback &callback);
+  void AddPendingCallback(const Callback &callback) {
+    AddCallback(callback, std::chrono::steady_clock::time_point());
+  }
+
+  // Add a callback that will be executed after a certain amount of time has
+  // passed.
+  void AddCallback(const Callback &callback,
+                   std::chrono::nanoseconds delay) {
+    AddCallback(callback, std::chrono::steady_clock::now() + delay);
+  }
+
+  // Add a callback that will be executed after a given point in time.
+  void AddCallback(const Callback &callback, TimePoint point);
 
   // Waits for registered events and invoke the proper callbacks. Returns when
   // all callbacks deregister themselves or when someone requests termination.
@@ -69,14 +86,18 @@ class MainLoopBase {
 
   virtual void UnregisterReadObject(IOObject::WaitableHandle handle) = 0;
 
-  // Interrupt the loop that is currently waiting for events and execute
-  // the current pending callbacks immediately.
-  virtual void TriggerPendingCallbacks() = 0;
+  // Interrupt the loop that is currently waiting for events.
+  virtual void Interrupt() = 0;
+
+  void ProcessCallbacks();
 
-  void ProcessPendingCallbacks();
+  std::optional<TimePoint> GetNextWakeupTime();
 
   std::mutex m_callback_mutex;
-  std::vector<Callback> m_pending_callbacks;
+  std::priority_queue<std::pair<TimePoint, Callback>,
+                      std::vector<std::pair<TimePoint, Callback>>,
+                      llvm::on_first<std::greater<TimePoint>>>
+      m_callbacks;
   bool m_terminate_request : 1;
 
 private:
diff --git a/lldb/include/lldb/Host/posix/MainLoopPosix.h b/lldb/include/lldb/Host/posix/MainLoopPosix.h
index 07497b7b8c259a..bff96583bf5424 100644
--- a/lldb/include/lldb/Host/posix/MainLoopPosix.h
+++ b/lldb/include/lldb/Host/posix/MainLoopPosix.h
@@ -54,7 +54,7 @@ class MainLoopPosix : public MainLoopBase {
   void UnregisterReadObject(IOObject::WaitableHandle handle) override;
   void UnregisterSignal(int signo, std::list<Callback>::iterator callback_it);
 
-  void TriggerPendingCallbacks() override;
+  void Interrupt() override;
 
 private:
   void ProcessReadObject(IOObject::WaitableHandle handle);
@@ -87,8 +87,8 @@ class MainLoopPosix : public MainLoopBase {
 
   llvm::DenseMap<IOObject::WaitableHandle, Callback> m_read_fds;
   llvm::DenseMap<int, SignalInfo> m_signals;
-  Pipe m_trigger_pipe;
-  std::atomic<bool> m_triggering;
+  Pipe m_interrupt_pipe;
+  std::atomic<bool> m_interrupting = false;
 #if HAVE_SYS_EVENT_H
   int m_kqueue;
 #endif
diff --git a/lldb/include/lldb/Host/windows/MainLoopWindows.h b/lldb/include/lldb/Host/windows/MainLoopWindows.h
index 33e179e6c1286c..3937a24645d955 100644
--- a/lldb/include/lldb/Host/windows/MainLoopWindows.h
+++ b/lldb/include/lldb/Host/windows/MainLoopWindows.h
@@ -34,7 +34,7 @@ class MainLoopWindows : public MainLoopBase {
 protected:
   void UnregisterReadObject(IOObject::WaitableHandle handle) override;
 
-  void TriggerPendingCallbacks() override;
+  void Interrupt() override;
 
 private:
   void ProcessReadObject(IOObject::WaitableHandle handle);
@@ -45,7 +45,7 @@ class MainLoopWindows : public MainLoopBase {
     Callback callback;
   };
   llvm::DenseMap<IOObject::WaitableHandle, FdInfo> m_read_fds;
-  void *m_trigger_event;
+  void *m_interrupt_event;
 };
 
 } // namespace lldb_private
diff --git a/lldb/source/Host/common/MainLoopBase.cpp b/lldb/source/Host/common/MainLoopBase.cpp
index 030a4f0371681e..64a57e65849e99 100644
--- a/lldb/source/Host/common/MainLoopBase.cpp
+++ b/lldb/source/Host/common/MainLoopBase.cpp
@@ -7,27 +7,43 @@
 //===----------------------------------------------------------------------===//
 
 #include "lldb/Host/MainLoopBase.h"
+#include <chrono>
 
 using namespace lldb;
 using namespace lldb_private;
 
-void MainLoopBase::AddPendingCallback(const Callback &callback) {
+void MainLoopBase::AddCallback(const Callback &callback, TimePoint point) {
+  bool interrupt_needed;
   {
     std::lock_guard<std::mutex> lock{m_callback_mutex};
-    m_pending_callbacks.push_back(callback);
+    // We need to interrupt the main thread if this callback is scheduled to
+    // execute at an earlier time than the earliest callback registered so far.
+    interrupt_needed = m_callbacks.empty() || point < m_callbacks.top().first;
+    m_callbacks.emplace(point, callback);
   }
-  TriggerPendingCallbacks();
+  if (interrupt_needed)
+    Interrupt();
 }
 
-void MainLoopBase::ProcessPendingCallbacks() {
-  // Move the callbacks to a local vector to avoid keeping m_pending_callbacks
-  // locked throughout the calls.
-  std::vector<Callback> pending_callbacks;
-  {
-    std::lock_guard<std::mutex> lock{m_callback_mutex};
-    pending_callbacks = std::move(m_pending_callbacks);
-  }
+void MainLoopBase::ProcessCallbacks() {
+  while (true) {
+    Callback callback;
+    {
+      std::lock_guard<std::mutex> lock{m_callback_mutex};
+      if (m_callbacks.empty() ||
+          std::chrono::steady_clock::now() < m_callbacks.top().first)
+        return;
+      callback = std::move(m_callbacks.top().second);
+      m_callbacks.pop();
+    }
 
-  for (const Callback &callback : pending_callbacks)
     callback(*this);
+  }
+}
+
+std::optional<MainLoopBase::TimePoint> MainLoopBase::GetNextWakeupTime() {
+  std::lock_guard<std::mutex> lock(m_callback_mutex);
+  if (m_callbacks.empty())
+    return std::nullopt;
+  return m_callbacks.top().first;
 }
diff --git a/lldb/source/Host/posix/MainLoopPosix.cpp b/lldb/source/Host/posix/MainLoopPosix.cpp
index 6f8eaa55cfdf09..788c6c23007c41 100644
--- a/lldb/source/Host/posix/MainLoopPosix.cpp
+++ b/lldb/source/Host/posix/MainLoopPosix.cpp
@@ -15,6 +15,7 @@
 #include <algorithm>
 #include <cassert>
 #include <cerrno>
+#include <chrono>
 #include <csignal>
 #include <ctime>
 #include <vector>
@@ -42,6 +43,31 @@ static void SignalHandler(int signo, siginfo_t *info, void *) {
   g_signal_flags[signo] = 1;
 }
 
+
+class ToTimeSpec {
+public:
+  explicit ToTimeSpec(std::optional<MainLoopPosix::TimePoint> point) {
+    using namespace std::chrono;
+
+    if (!point) {
+      m_ts_ptr = nullptr;
+      return;
+    }
+    nanoseconds dur = std::max(*point - steady_clock::now(), nanoseconds(0));
+    m_ts_ptr = &m_ts;
+    m_ts.tv_sec = duration_cast<seconds>(dur).count();
+    m_ts.tv_nsec = (dur % seconds(1)).count();
+  }
+  ToTimeSpec(const ToTimeSpec &) = delete;
+  ToTimeSpec &operator=(const ToTimeSpec &) = delete;
+
+  operator struct timespec *() { return m_ts_ptr; }
+
+private:
+  struct timespec m_ts;
+  struct timespec *m_ts_ptr;
+};
+
 class MainLoopPosix::RunImpl {
 public:
   RunImpl(MainLoopPosix &loop);
@@ -80,8 +106,9 @@ Status MainLoopPosix::RunImpl::Poll() {
   for (auto &fd : loop.m_read_fds)
     EV_SET(&in_events[i++], fd.first, EVFILT_READ, EV_ADD, 0, 0, 0);
 
-  num_events = kevent(loop.m_kqueue, in_events.data(), in_events.size(),
-                      out_events, std::size(out_events), nullptr);
+  num_events =
+      kevent(loop.m_kqueue, in_events.data(), in_events.size(), out_events,
+             std::size(out_events), ToTimeSpec(loop.GetNextWakeupTime()));
 
   if (num_events < 0) {
     if (errno == EINTR) {
@@ -155,7 +182,9 @@ Status MainLoopPosix::RunImpl::Poll() {
     void *sigset_ptr;
     size_t sigset_len;
   } extra_data = {&kernel_sigset, sizeof(kernel_sigset)};
-  if (syscall(__NR_pselect6, nfds, &read_fd_set, nullptr, nullptr, nullptr,
+  if (syscall(__NR_pselect6, nfds, &read_fd_set, /*writefds=*/nullptr,
+              /*exceptfds=*/nullptr,
+              ToTimeSpec(loop.GetNextWakeupTime()).operator struct timespec *(),
               &extra_data) == -1) {
     if (errno != EINTR)
       return Status(errno, eErrorTypePOSIX);
@@ -179,7 +208,8 @@ Status MainLoopPosix::RunImpl::Poll() {
     read_fds.push_back(pfd);
   }
 
-  if (ppoll(read_fds.data(), read_fds.size(), nullptr, &sigmask) == -1 &&
+  if (ppoll(read_fds.data(), read_fds.size(),
+            ToTimeSpec(loop.GetNextWakeupTime()), &sigmask) == -1 &&
       errno != EINTR)
     return Status(errno, eErrorTypePOSIX);
 
@@ -225,14 +255,15 @@ void MainLoopPosix::RunImpl::ProcessEvents() {
 }
 #endif
 
-MainLoopPosix::MainLoopPosix() : m_triggering(false) {
-  Status error = m_trigger_pipe.CreateNew(/*child_process_inherit=*/false);
+MainLoopPosix::MainLoopPosix() {
+  Status error = m_interrupt_pipe.CreateNew(/*child_process_inherit=*/false);
   assert(error.Success());
-  const int trigger_pipe_fd = m_trigger_pipe.GetReadFileDescriptor();
-  m_read_fds.insert({trigger_pipe_fd, [trigger_pipe_fd](MainLoopBase &loop) {
+  const int interrupt_pipe_fd = m_interrupt_pipe.GetReadFileDescriptor();
+  m_read_fds.insert({interrupt_pipe_fd,
+                     [interrupt_pipe_fd](MainLoopBase &loop) {
                        char c;
                        ssize_t bytes_read = llvm::sys::RetryAfterSignal(
-                           -1, ::read, trigger_pipe_fd, &c, 1);
+                           -1, ::read, interrupt_pipe_fd, &c, 1);
                        assert(bytes_read == 1);
                        UNUSED_IF_ASSERT_DISABLED(bytes_read);
                        // NB: This implicitly causes another loop iteration
@@ -248,8 +279,8 @@ MainLoopPosix::~MainLoopPosix() {
 #if HAVE_SYS_EVENT_H
   close(m_kqueue);
 #endif
-  m_read_fds.erase(m_trigger_pipe.GetReadFileDescriptor());
-  m_trigger_pipe.Close();
+  m_read_fds.erase(m_interrupt_pipe.GetReadFileDescriptor());
+  m_interrupt_pipe.Close();
   assert(m_read_fds.size() == 0); 
   assert(m_signals.size() == 0);
 }
@@ -372,8 +403,8 @@ Status MainLoopPosix::Run() {
 
     impl.ProcessEvents();
 
-    m_triggering = false;
-    ProcessPendingCallbacks();
+    m_interrupting = false;
+    ProcessCallbacks();
   }
   return Status();
 }
@@ -396,13 +427,13 @@ void MainLoopPosix::ProcessSignal(int signo) {
   }
 }
 
-void MainLoopPosix::TriggerPendingCallbacks() {
-  if (m_triggering.exchange(true))
+void MainLoopPosix::Interrupt() {
+  if (m_interrupting.exchange(true))
     return;
 
   char c = '.';
   size_t bytes_written;
-  Status error = m_trigger_pipe.Write(&c, 1, bytes_written);
+  Status error = m_interrupt_pipe.Write(&c, 1, bytes_written);
   assert(error.Success());
   UNUSED_IF_ASSERT_DISABLED(error);
   assert(bytes_written == 1);
diff --git a/lldb/source/Host/windows/MainLoopWindows.cpp b/lldb/source/Host/windows/MainLoopWindows.cpp
index c9aa6d339d8f48..2f2249b1086add 100644
--- a/lldb/source/Host/windows/MainLoopWindows.cpp
+++ b/lldb/source/Host/windows/MainLoopWindows.cpp
@@ -21,14 +21,24 @@
 using namespace lldb;
 using namespace lldb_private;
 
+static DWORD ToTimeout(std::optional<MainLoopWindows::TimePoint> point) {
+  using namespace std::chrono;
+
+  if (!point)
+    return WSA_INFINITE;
+
+  nanoseconds dur = (std::max)(*point - steady_clock::now(), nanoseconds(0));
+  return duration_cast<milliseconds>(dur).count();
+}
+
 MainLoopWindows::MainLoopWindows() {
-  m_trigger_event = WSACreateEvent();
-  assert(m_trigger_event != WSA_INVALID_EVENT);
+  m_interrupt_event = WSACreateEvent();
+  assert(m_interrupt_event != WSA_INVALID_EVENT);
 }
 
 MainLoopWindows::~MainLoopWindows() {
   assert(m_read_fds.empty());
-  BOOL result = WSACloseEvent(m_trigger_event);
+  BOOL result = WSACloseEvent(m_interrupt_event);
   assert(result == TRUE);
   UNUSED_IF_ASSERT_DISABLED(result);
 }
@@ -43,10 +53,11 @@ llvm::Expected<size_t> MainLoopWindows::Poll() {
 
     events.push_back(info.event);
   }
-  events.push_back(m_trigger_event);
+  events.push_back(m_interrupt_event);
 
-  DWORD result = WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
-                                          WSA_INFINITE, FALSE);
+  DWORD result =
+      WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
+                               ToTimeout(GetNextWakeupTime()), FALSE);
 
   for (auto &fd : m_read_fds) {
     int result = WSAEventSelect(fd.first, WSA_INVALID_EVENT, 0);
@@ -54,9 +65,13 @@ llvm::Expected<size_t> MainLoopWindows::Poll() {
     UNUSED_IF_ASSERT_DISABLED(result);
   }
 
-  if (result >= WSA_WAIT_EVENT_0 && result <= WSA_WAIT_EVENT_0 + events.size())
+  if (result >= WSA_WAIT_EVENT_0 && result < WSA_WAIT_EVENT_0 + events.size())
     return result - WSA_WAIT_EVENT_0;
 
+  // A timeout is treated as a (premature) signalization of the interrupt event.
+  if (result == WSA_WAIT_TIMEOUT)
+    return events.size() - 1;
+
   return llvm::createStringError(llvm::inconvertibleErrorCode(),
                                  "WSAWaitForMultipleEvents failed");
 }
@@ -127,13 +142,13 @@ Status MainLoopWindows::Run() {
       ProcessReadObject(KV.first);
     } else {
       assert(*signaled_event == m_read_fds.size());
-      WSAResetEvent(m_trigger_event);
+      WSAResetEvent(m_interrupt_event);
     }
-    ProcessPendingCallbacks();
+    ProcessCallbacks();
   }
   return Status();
 }
 
-void MainLoopWindows::TriggerPendingCallbacks() {
-  WSASetEvent(m_trigger_event);
+void MainLoopWindows::Interrupt() {
+  WSASetEvent(m_interrupt_event);
 }
diff --git a/lldb/unittests/Host/MainLoopTest.cpp b/lldb/unittests/Host/MainLoopTest.cpp
index 4688d4fed475b6..70876f694a7acc 100644
--- a/lldb/unittests/Host/MainLoopTest.cpp
+++ b/lldb/unittests/Host/MainLoopTest.cpp
@@ -15,6 +15,7 @@
 #include "llvm/Config/llvm-config.h" // for LLVM_ON_UNIX
 #include "llvm/Testing/Support/Error.h"
 #include "gtest/gtest.h"
+#include <chrono>
 #include <future>
 #include <thread>
 
@@ -106,13 +107,9 @@ TEST_F(MainLoopTest, NoSpuriousReads) {
       error);
   ASSERT_THAT_ERROR(error.ToError(), llvm::Succeeded());
   // Terminate the loop after one second.
-  std::thread terminate_thread([&loop] {
-    std::this_thread::sleep_for(std::chrono::seconds(1));
-    loop.AddPendingCallback(
-        [](MainLoopBase &loop) { loop.RequestTermination(); });
-  });
+  loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); },
+                   std::chrono::seconds(1));
   ASSERT_THAT_ERROR(loop.Run().ToError(), llvm::Succeeded());
-  terminate_thread.join();
 
   // Make sure the callback was called only once.
   ASSERT_EQ(1u, callback_count);
@@ -223,6 +220,61 @@ TEST_F(MainLoopTest, ManyPendingCallbacks) {
   ASSERT_TRUE(loop.Run().Success());
 }
 
+TEST_F(MainLoopTest, CallbackWithTimeout) {
+  MainLoop loop;
+  loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); },
+                   std::chrono::seconds(2));
+  auto start = std::chrono::steady_clock::now();
+  ASSERT_THAT_ERROR(loop.Run().takeError(), llvm::Succeeded());
+  EXPECT_GE(std::chrono::steady_clock::now() - start, std::chrono::seconds(2));
+}
+
+TEST_F(MainLoopTest, TimedCallbacksRunInOrder) {
+  MainLoop loop;
+  auto start = std::chrono::steady_clock::now();
+  std::chrono::milliseconds epsilon(10);
+  std::vector<int> order;
+  auto add_cb = [&](int id) {
+    loop.AddCallback([&order, id](MainLoopBase &) { order.push_back(id); },
+                     start + id * epsilon);
+  };
+  add_cb(3);
+  add_cb(2);
+  add_cb(4);
+  add_cb(1);
+  loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); },
+                   start + 5 * epsilon);
+  ASSERT_THAT_ERROR(loop.Run().takeError(), llvm::Succeeded());
+  EXPECT_GE(std::chrono::steady_clock::now() - start, 5 * epsilon);
+  ASSERT_THAT(order, testing::ElementsAre(1, 2, 3, 4));
+}
+
+TEST_F(MainLoopTest, TimedCallbackShortensSleep) {
+  MainLoop loop;
+  auto start = std::chrono::steady_clock::now();
+  bool long_callback_called = false;
+  loop.AddCallback(
+      [&](MainLoopBase &loop) {
+        long_callback_called = true;
+        loop.RequestTermination();
+      },
+      std::chrono::seconds(30));
+  std::future<Status> async_run =
+      std::async(std::launch::async, &MainLoop::Run, std::ref(loop));
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+  bool short_callback_called = false;
+  loop.AddCallback(
+      [&](MainLoopBase &loop) {
+        short_callback_called = true;
+        loop.RequestTermination();
+      },
+      std::chrono::seconds(1));
+  ASSERT_THAT_ERROR(async_run.get().takeError(), llvm::Succeeded());
+  EXPECT_LT(std::chrono::steady_clock::now() - start, std::chrono::seconds(10));
+  EXPECT_TRUE(short_callback_called);
+  EXPECT_FALSE(long_callback_called);
+}
+
 #ifdef LLVM_ON_UNIX
 TEST_F(MainLoopTest, DetectsEOF) {
 



More information about the lldb-commits mailing list