[Lldb-commits] [lldb] [lldb] Add timed callbacks to the MainLoop class (PR #112895)

Pavel Labath via lldb-commits lldb-commits at lists.llvm.org
Mon Nov 18 06:02:21 PST 2024


https://github.com/labath updated https://github.com/llvm/llvm-project/pull/112895

>From bf20e3125480268373a42c678bbf62c8c312227f Mon Sep 17 00:00:00 2001
From: Pavel Labath <pavel at labath.sk>
Date: Wed, 16 Oct 2024 17:34:53 +0200
Subject: [PATCH] [lldb] Add timed callbacks to the MainLoop class

The motivating use case is being able to "time out" certain operations
(by adding a timed callback which will force the termination of the
loop), but the design is flexible enough to accomodate other use cases
as well (e.g. running a periodic task in the background).

The implementation builds on the existing "pending callback" mechanism,
by associating a time point with each callback -- every time the loop
wakes up, it runs all of the callbacks which are past their point, and
it also makes sure to sleep only until the next callback is scheduled to
run.

I've done some renaming as names like "TriggerPendingCallbacks" were
no longer accurate -- the function may no longer cause any callbacks to be
called (it may just cause the main loop thread to recalculate the time
it wants to sleep).
---
 lldb/include/lldb/Host/MainLoopBase.h         | 32 ++++++--
 lldb/include/lldb/Host/posix/MainLoopPosix.h  |  6 +-
 .../lldb/Host/windows/MainLoopWindows.h       |  4 +-
 lldb/source/Host/common/MainLoopBase.cpp      | 40 +++++++---
 lldb/source/Host/posix/MainLoopPosix.cpp      | 79 ++++++++++++-------
 lldb/source/Host/windows/MainLoopWindows.cpp  | 37 ++++++---
 lldb/unittests/Host/MainLoopTest.cpp          | 64 +++++++++++++--
 7 files changed, 194 insertions(+), 68 deletions(-)

diff --git a/lldb/include/lldb/Host/MainLoopBase.h b/lldb/include/lldb/Host/MainLoopBase.h
index 7365ee7a65ee64..be9a2676e7443e 100644
--- a/lldb/include/lldb/Host/MainLoopBase.h
+++ b/lldb/include/lldb/Host/MainLoopBase.h
@@ -13,8 +13,10 @@
 #include "lldb/Utility/Status.h"
 #include "llvm/ADT/DenseMap.h"
 #include "llvm/Support/ErrorHandling.h"
+#include <chrono>
 #include <functional>
 #include <mutex>
+#include <queue>
 
 namespace lldb_private {
 
@@ -38,6 +40,9 @@ class MainLoopBase {
   class ReadHandle;
 
 public:
+  using TimePoint = std::chrono::time_point<std::chrono::steady_clock,
+                                            std::chrono::nanoseconds>;
+
   MainLoopBase() : m_terminate_request(false) {}
   virtual ~MainLoopBase() = default;
 
@@ -52,7 +57,18 @@ class MainLoopBase {
   // Add a pending callback that will be executed once after all the pending
   // events are processed. The callback will be executed even if termination
   // was requested.
-  void AddPendingCallback(const Callback &callback);
+  void AddPendingCallback(const Callback &callback) {
+    AddCallback(callback, std::chrono::steady_clock::time_point());
+  }
+
+  // Add a callback that will be executed after a certain amount of time has
+  // passed.
+  void AddCallback(const Callback &callback, std::chrono::nanoseconds delay) {
+    AddCallback(callback, std::chrono::steady_clock::now() + delay);
+  }
+
+  // Add a callback that will be executed after a given point in time.
+  void AddCallback(const Callback &callback, TimePoint point);
 
   // Waits for registered events and invoke the proper callbacks. Returns when
   // all callbacks deregister themselves or when someone requests termination.
@@ -69,14 +85,18 @@ class MainLoopBase {
 
   virtual void UnregisterReadObject(IOObject::WaitableHandle handle) = 0;
 
-  // Interrupt the loop that is currently waiting for events and execute
-  // the current pending callbacks immediately.
-  virtual void TriggerPendingCallbacks() = 0;
+  // Interrupt the loop that is currently waiting for events.
+  virtual void Interrupt() = 0;
+
+  void ProcessCallbacks();
 
-  void ProcessPendingCallbacks();
+  std::optional<TimePoint> GetNextWakeupTime();
 
   std::mutex m_callback_mutex;
-  std::vector<Callback> m_pending_callbacks;
+  std::priority_queue<std::pair<TimePoint, Callback>,
+                      std::vector<std::pair<TimePoint, Callback>>,
+                      llvm::on_first<std::greater<TimePoint>>>
+      m_callbacks;
   bool m_terminate_request : 1;
 
 private:
diff --git a/lldb/include/lldb/Host/posix/MainLoopPosix.h b/lldb/include/lldb/Host/posix/MainLoopPosix.h
index 1988dde7c65aee..e9ac798b948df9 100644
--- a/lldb/include/lldb/Host/posix/MainLoopPosix.h
+++ b/lldb/include/lldb/Host/posix/MainLoopPosix.h
@@ -54,7 +54,7 @@ class MainLoopPosix : public MainLoopBase {
   void UnregisterReadObject(IOObject::WaitableHandle handle) override;
   void UnregisterSignal(int signo, std::list<Callback>::iterator callback_it);
 
-  void TriggerPendingCallbacks() override;
+  void Interrupt() override;
 
 private:
   void ProcessReadObject(IOObject::WaitableHandle handle);
@@ -88,8 +88,8 @@ class MainLoopPosix : public MainLoopBase {
 
   llvm::DenseMap<IOObject::WaitableHandle, Callback> m_read_fds;
   llvm::DenseMap<int, SignalInfo> m_signals;
-  Pipe m_trigger_pipe;
-  std::atomic<bool> m_triggering;
+  Pipe m_interrupt_pipe;
+  std::atomic<bool> m_interrupting = false;
 #if HAVE_SYS_EVENT_H
   int m_kqueue;
 #endif
diff --git a/lldb/include/lldb/Host/windows/MainLoopWindows.h b/lldb/include/lldb/Host/windows/MainLoopWindows.h
index 33e179e6c1286c..3937a24645d955 100644
--- a/lldb/include/lldb/Host/windows/MainLoopWindows.h
+++ b/lldb/include/lldb/Host/windows/MainLoopWindows.h
@@ -34,7 +34,7 @@ class MainLoopWindows : public MainLoopBase {
 protected:
   void UnregisterReadObject(IOObject::WaitableHandle handle) override;
 
-  void TriggerPendingCallbacks() override;
+  void Interrupt() override;
 
 private:
   void ProcessReadObject(IOObject::WaitableHandle handle);
@@ -45,7 +45,7 @@ class MainLoopWindows : public MainLoopBase {
     Callback callback;
   };
   llvm::DenseMap<IOObject::WaitableHandle, FdInfo> m_read_fds;
-  void *m_trigger_event;
+  void *m_interrupt_event;
 };
 
 } // namespace lldb_private
diff --git a/lldb/source/Host/common/MainLoopBase.cpp b/lldb/source/Host/common/MainLoopBase.cpp
index 030a4f0371681e..64a57e65849e99 100644
--- a/lldb/source/Host/common/MainLoopBase.cpp
+++ b/lldb/source/Host/common/MainLoopBase.cpp
@@ -7,27 +7,43 @@
 //===----------------------------------------------------------------------===//
 
 #include "lldb/Host/MainLoopBase.h"
+#include <chrono>
 
 using namespace lldb;
 using namespace lldb_private;
 
-void MainLoopBase::AddPendingCallback(const Callback &callback) {
+void MainLoopBase::AddCallback(const Callback &callback, TimePoint point) {
+  bool interrupt_needed;
   {
     std::lock_guard<std::mutex> lock{m_callback_mutex};
-    m_pending_callbacks.push_back(callback);
+    // We need to interrupt the main thread if this callback is scheduled to
+    // execute at an earlier time than the earliest callback registered so far.
+    interrupt_needed = m_callbacks.empty() || point < m_callbacks.top().first;
+    m_callbacks.emplace(point, callback);
   }
-  TriggerPendingCallbacks();
+  if (interrupt_needed)
+    Interrupt();
 }
 
-void MainLoopBase::ProcessPendingCallbacks() {
-  // Move the callbacks to a local vector to avoid keeping m_pending_callbacks
-  // locked throughout the calls.
-  std::vector<Callback> pending_callbacks;
-  {
-    std::lock_guard<std::mutex> lock{m_callback_mutex};
-    pending_callbacks = std::move(m_pending_callbacks);
-  }
+void MainLoopBase::ProcessCallbacks() {
+  while (true) {
+    Callback callback;
+    {
+      std::lock_guard<std::mutex> lock{m_callback_mutex};
+      if (m_callbacks.empty() ||
+          std::chrono::steady_clock::now() < m_callbacks.top().first)
+        return;
+      callback = std::move(m_callbacks.top().second);
+      m_callbacks.pop();
+    }
 
-  for (const Callback &callback : pending_callbacks)
     callback(*this);
+  }
+}
+
+std::optional<MainLoopBase::TimePoint> MainLoopBase::GetNextWakeupTime() {
+  std::lock_guard<std::mutex> lock(m_callback_mutex);
+  if (m_callbacks.empty())
+    return std::nullopt;
+  return m_callbacks.top().first;
 }
diff --git a/lldb/source/Host/posix/MainLoopPosix.cpp b/lldb/source/Host/posix/MainLoopPosix.cpp
index 040af480771d42..519347dd0cfad5 100644
--- a/lldb/source/Host/posix/MainLoopPosix.cpp
+++ b/lldb/source/Host/posix/MainLoopPosix.cpp
@@ -15,6 +15,7 @@
 #include <algorithm>
 #include <cassert>
 #include <cerrno>
+#include <chrono>
 #include <csignal>
 #include <ctime>
 #include <fcntl.h>
@@ -67,6 +68,30 @@ static void SignalHandler(int signo, siginfo_t *info, void *) {
   assert(bytes_written == 1 || (bytes_written == -1 && errno == EAGAIN));
 }
 
+class ToTimeSpec {
+public:
+  explicit ToTimeSpec(std::optional<MainLoopPosix::TimePoint> point) {
+    using namespace std::chrono;
+
+    if (!point) {
+      m_ts_ptr = nullptr;
+      return;
+    }
+    nanoseconds dur = std::max(*point - steady_clock::now(), nanoseconds(0));
+    m_ts_ptr = &m_ts;
+    m_ts.tv_sec = duration_cast<seconds>(dur).count();
+    m_ts.tv_nsec = (dur % seconds(1)).count();
+  }
+  ToTimeSpec(const ToTimeSpec &) = delete;
+  ToTimeSpec &operator=(const ToTimeSpec &) = delete;
+
+  operator struct timespec *() { return m_ts_ptr; }
+
+private:
+  struct timespec m_ts;
+  struct timespec *m_ts_ptr;
+};
+
 class MainLoopPosix::RunImpl {
 public:
   RunImpl(MainLoopPosix &loop);
@@ -99,8 +124,9 @@ Status MainLoopPosix::RunImpl::Poll() {
   for (auto &fd : loop.m_read_fds)
     EV_SET(&in_events[i++], fd.first, EVFILT_READ, EV_ADD, 0, 0, 0);
 
-  num_events = kevent(loop.m_kqueue, in_events.data(), in_events.size(),
-                      out_events, std::size(out_events), nullptr);
+  num_events =
+      kevent(loop.m_kqueue, in_events.data(), in_events.size(), out_events,
+             std::size(out_events), ToTimeSpec(loop.GetNextWakeupTime()));
 
   if (num_events < 0) {
     if (errno == EINTR) {
@@ -144,7 +170,7 @@ Status MainLoopPosix::RunImpl::Poll() {
   }
 
   if (ppoll(read_fds.data(), read_fds.size(),
-            /*timeout=*/nullptr,
+            ToTimeSpec(loop.GetNextWakeupTime()),
             /*sigmask=*/nullptr) == -1 &&
       errno != EINTR)
     return Status(errno, eErrorTypePOSIX);
@@ -165,27 +191,28 @@ void MainLoopPosix::RunImpl::ProcessReadEvents() {
 }
 #endif
 
-MainLoopPosix::MainLoopPosix() : m_triggering(false) {
-  Status error = m_trigger_pipe.CreateNew(/*child_process_inherit=*/false);
+MainLoopPosix::MainLoopPosix() {
+  Status error = m_interrupt_pipe.CreateNew(/*child_process_inherit=*/false);
   assert(error.Success());
 
   // Make the write end of the pipe non-blocking.
-  int result = fcntl(m_trigger_pipe.GetWriteFileDescriptor(), F_SETFL,
-                     fcntl(m_trigger_pipe.GetWriteFileDescriptor(), F_GETFL) |
+  int result = fcntl(m_interrupt_pipe.GetWriteFileDescriptor(), F_SETFL,
+                     fcntl(m_interrupt_pipe.GetWriteFileDescriptor(), F_GETFL) |
                          O_NONBLOCK);
   assert(result == 0);
   UNUSED_IF_ASSERT_DISABLED(result);
 
-  const int trigger_pipe_fd = m_trigger_pipe.GetReadFileDescriptor();
-  m_read_fds.insert({trigger_pipe_fd, [trigger_pipe_fd](MainLoopBase &loop) {
-                       char c;
-                       ssize_t bytes_read = llvm::sys::RetryAfterSignal(
-                           -1, ::read, trigger_pipe_fd, &c, 1);
-                       assert(bytes_read == 1);
-                       UNUSED_IF_ASSERT_DISABLED(bytes_read);
-                       // NB: This implicitly causes another loop iteration
-                       // and therefore the execution of pending callbacks.
-                     }});
+  const int interrupt_pipe_fd = m_interrupt_pipe.GetReadFileDescriptor();
+  m_read_fds.insert(
+      {interrupt_pipe_fd, [interrupt_pipe_fd](MainLoopBase &loop) {
+         char c;
+         ssize_t bytes_read =
+             llvm::sys::RetryAfterSignal(-1, ::read, interrupt_pipe_fd, &c, 1);
+         assert(bytes_read == 1);
+         UNUSED_IF_ASSERT_DISABLED(bytes_read);
+         // NB: This implicitly causes another loop iteration
+         // and therefore the execution of pending callbacks.
+       }});
 #if HAVE_SYS_EVENT_H
   m_kqueue = kqueue();
   assert(m_kqueue >= 0);
@@ -196,8 +223,8 @@ MainLoopPosix::~MainLoopPosix() {
 #if HAVE_SYS_EVENT_H
   close(m_kqueue);
 #endif
-  m_read_fds.erase(m_trigger_pipe.GetReadFileDescriptor());
-  m_trigger_pipe.Close();
+  m_read_fds.erase(m_interrupt_pipe.GetReadFileDescriptor());
+  m_interrupt_pipe.Close();
   assert(m_read_fds.size() == 0); 
   assert(m_signals.size() == 0);
 }
@@ -244,11 +271,9 @@ MainLoopPosix::RegisterSignal(int signo, const Callback &callback,
   sigset_t old_set;
 
   // Set signal info before installing the signal handler!
-  g_signal_info[signo].pipe_fd = m_trigger_pipe.GetWriteFileDescriptor();
+  g_signal_info[signo].pipe_fd = m_interrupt_pipe.GetWriteFileDescriptor();
   g_signal_info[signo].flag = 0;
 
-  // Even if using kqueue, the signal handler will still be invoked, so it's
-  // important to replace it with our "benign" handler.
   int ret = sigaction(signo, &new_action, &info.old_action);
   UNUSED_IF_ASSERT_DISABLED(ret);
   assert(ret == 0 && "sigaction failed");
@@ -307,8 +332,8 @@ Status MainLoopPosix::Run() {
 
     ProcessSignals();
 
-    m_triggering = false;
-    ProcessPendingCallbacks();
+    m_interrupting = false;
+    ProcessCallbacks();
   }
   return Status();
 }
@@ -346,13 +371,13 @@ void MainLoopPosix::ProcessSignal(int signo) {
   }
 }
 
-void MainLoopPosix::TriggerPendingCallbacks() {
-  if (m_triggering.exchange(true))
+void MainLoopPosix::Interrupt() {
+  if (m_interrupting.exchange(true))
     return;
 
   char c = '.';
   size_t bytes_written;
-  Status error = m_trigger_pipe.Write(&c, 1, bytes_written);
+  Status error = m_interrupt_pipe.Write(&c, 1, bytes_written);
   assert(error.Success());
   UNUSED_IF_ASSERT_DISABLED(error);
   assert(bytes_written == 1);
diff --git a/lldb/source/Host/windows/MainLoopWindows.cpp b/lldb/source/Host/windows/MainLoopWindows.cpp
index c9aa6d339d8f48..0a5a35e9db9dde 100644
--- a/lldb/source/Host/windows/MainLoopWindows.cpp
+++ b/lldb/source/Host/windows/MainLoopWindows.cpp
@@ -21,14 +21,24 @@
 using namespace lldb;
 using namespace lldb_private;
 
+static DWORD ToTimeout(std::optional<MainLoopWindows::TimePoint> point) {
+  using namespace std::chrono;
+
+  if (!point)
+    return WSA_INFINITE;
+
+  nanoseconds dur = (std::max)(*point - steady_clock::now(), nanoseconds(0));
+  return duration_cast<milliseconds>(dur).count();
+}
+
 MainLoopWindows::MainLoopWindows() {
-  m_trigger_event = WSACreateEvent();
-  assert(m_trigger_event != WSA_INVALID_EVENT);
+  m_interrupt_event = WSACreateEvent();
+  assert(m_interrupt_event != WSA_INVALID_EVENT);
 }
 
 MainLoopWindows::~MainLoopWindows() {
   assert(m_read_fds.empty());
-  BOOL result = WSACloseEvent(m_trigger_event);
+  BOOL result = WSACloseEvent(m_interrupt_event);
   assert(result == TRUE);
   UNUSED_IF_ASSERT_DISABLED(result);
 }
@@ -43,10 +53,11 @@ llvm::Expected<size_t> MainLoopWindows::Poll() {
 
     events.push_back(info.event);
   }
-  events.push_back(m_trigger_event);
+  events.push_back(m_interrupt_event);
 
-  DWORD result = WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
-                                          WSA_INFINITE, FALSE);
+  DWORD result =
+      WSAWaitForMultipleEvents(events.size(), events.data(), FALSE,
+                               ToTimeout(GetNextWakeupTime()), FALSE);
 
   for (auto &fd : m_read_fds) {
     int result = WSAEventSelect(fd.first, WSA_INVALID_EVENT, 0);
@@ -54,9 +65,13 @@ llvm::Expected<size_t> MainLoopWindows::Poll() {
     UNUSED_IF_ASSERT_DISABLED(result);
   }
 
-  if (result >= WSA_WAIT_EVENT_0 && result <= WSA_WAIT_EVENT_0 + events.size())
+  if (result >= WSA_WAIT_EVENT_0 && result < WSA_WAIT_EVENT_0 + events.size())
     return result - WSA_WAIT_EVENT_0;
 
+  // A timeout is treated as a (premature) signalization of the interrupt event.
+  if (result == WSA_WAIT_TIMEOUT)
+    return events.size() - 1;
+
   return llvm::createStringError(llvm::inconvertibleErrorCode(),
                                  "WSAWaitForMultipleEvents failed");
 }
@@ -127,13 +142,11 @@ Status MainLoopWindows::Run() {
       ProcessReadObject(KV.first);
     } else {
       assert(*signaled_event == m_read_fds.size());
-      WSAResetEvent(m_trigger_event);
+      WSAResetEvent(m_interrupt_event);
     }
-    ProcessPendingCallbacks();
+    ProcessCallbacks();
   }
   return Status();
 }
 
-void MainLoopWindows::TriggerPendingCallbacks() {
-  WSASetEvent(m_trigger_event);
-}
+void MainLoopWindows::Interrupt() { WSASetEvent(m_interrupt_event); }
diff --git a/lldb/unittests/Host/MainLoopTest.cpp b/lldb/unittests/Host/MainLoopTest.cpp
index 622a547fa22f04..e7425b737a6dab 100644
--- a/lldb/unittests/Host/MainLoopTest.cpp
+++ b/lldb/unittests/Host/MainLoopTest.cpp
@@ -15,6 +15,7 @@
 #include "llvm/Config/llvm-config.h" // for LLVM_ON_UNIX
 #include "llvm/Testing/Support/Error.h"
 #include "gtest/gtest.h"
+#include <chrono>
 #include <future>
 #include <thread>
 
@@ -106,13 +107,9 @@ TEST_F(MainLoopTest, NoSpuriousReads) {
       error);
   ASSERT_THAT_ERROR(error.ToError(), llvm::Succeeded());
   // Terminate the loop after one second.
-  std::thread terminate_thread([&loop] {
-    std::this_thread::sleep_for(std::chrono::seconds(1));
-    loop.AddPendingCallback(
-        [](MainLoopBase &loop) { loop.RequestTermination(); });
-  });
+  loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); },
+                   std::chrono::seconds(1));
   ASSERT_THAT_ERROR(loop.Run().ToError(), llvm::Succeeded());
-  terminate_thread.join();
 
   // Make sure the callback was called only once.
   ASSERT_EQ(1u, callback_count);
@@ -223,6 +220,61 @@ TEST_F(MainLoopTest, ManyPendingCallbacks) {
   ASSERT_TRUE(loop.Run().Success());
 }
 
+TEST_F(MainLoopTest, CallbackWithTimeout) {
+  MainLoop loop;
+  loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); },
+                   std::chrono::seconds(2));
+  auto start = std::chrono::steady_clock::now();
+  ASSERT_THAT_ERROR(loop.Run().takeError(), llvm::Succeeded());
+  EXPECT_GE(std::chrono::steady_clock::now() - start, std::chrono::seconds(2));
+}
+
+TEST_F(MainLoopTest, TimedCallbacksRunInOrder) {
+  MainLoop loop;
+  auto start = std::chrono::steady_clock::now();
+  std::chrono::milliseconds epsilon(10);
+  std::vector<int> order;
+  auto add_cb = [&](int id) {
+    loop.AddCallback([&order, id](MainLoopBase &) { order.push_back(id); },
+                     start + id * epsilon);
+  };
+  add_cb(3);
+  add_cb(2);
+  add_cb(4);
+  add_cb(1);
+  loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); },
+                   start + 5 * epsilon);
+  ASSERT_THAT_ERROR(loop.Run().takeError(), llvm::Succeeded());
+  EXPECT_GE(std::chrono::steady_clock::now() - start, 5 * epsilon);
+  ASSERT_THAT(order, testing::ElementsAre(1, 2, 3, 4));
+}
+
+TEST_F(MainLoopTest, TimedCallbackShortensSleep) {
+  MainLoop loop;
+  auto start = std::chrono::steady_clock::now();
+  bool long_callback_called = false;
+  loop.AddCallback(
+      [&](MainLoopBase &loop) {
+        long_callback_called = true;
+        loop.RequestTermination();
+      },
+      std::chrono::seconds(30));
+  std::future<Status> async_run =
+      std::async(std::launch::async, &MainLoop::Run, std::ref(loop));
+  std::this_thread::sleep_for(std::chrono::milliseconds(100));
+  bool short_callback_called = false;
+  loop.AddCallback(
+      [&](MainLoopBase &loop) {
+        short_callback_called = true;
+        loop.RequestTermination();
+      },
+      std::chrono::seconds(1));
+  ASSERT_THAT_ERROR(async_run.get().takeError(), llvm::Succeeded());
+  EXPECT_LT(std::chrono::steady_clock::now() - start, std::chrono::seconds(10));
+  EXPECT_TRUE(short_callback_called);
+  EXPECT_FALSE(long_callback_called);
+}
+
 #ifdef LLVM_ON_UNIX
 TEST_F(MainLoopTest, DetectsEOF) {
 



More information about the lldb-commits mailing list