[clang-tools-extra] r365773 - [clangd] Add priorities to background index queue, extract to separate class

Sam McCall via cfe-commits cfe-commits at lists.llvm.org
Thu Jul 11 06:34:39 PDT 2019


Author: sammccall
Date: Thu Jul 11 06:34:38 2019
New Revision: 365773

URL: http://llvm.org/viewvc/llvm-project?rev=365773&view=rev
Log:
[clangd] Add priorities to background index queue, extract to separate class

Reviewers: kadircet

Subscribers: mgorny, ilya-biryukov, MaskRay, jkorous, arphaman, jfb, llvm-commits

Tags: #llvm

Differential Revision: https://reviews.llvm.org/D64560

Added:
    clang-tools-extra/trunk/clangd/index/BackgroundQueue.cpp
Modified:
    clang-tools-extra/trunk/clangd/CMakeLists.txt
    clang-tools-extra/trunk/clangd/index/Background.cpp
    clang-tools-extra/trunk/clangd/index/Background.h
    clang-tools-extra/trunk/clangd/tool/ClangdMain.cpp
    clang-tools-extra/trunk/clangd/unittests/BackgroundIndexTests.cpp

Modified: clang-tools-extra/trunk/clangd/CMakeLists.txt
URL: http://llvm.org/viewvc/llvm-project/clang-tools-extra/trunk/clangd/CMakeLists.txt?rev=365773&r1=365772&r2=365773&view=diff
==============================================================================
--- clang-tools-extra/trunk/clangd/CMakeLists.txt (original)
+++ clang-tools-extra/trunk/clangd/CMakeLists.txt Thu Jul 11 06:34:38 2019
@@ -73,8 +73,9 @@ add_clang_library(clangDaemon
   XRefs.cpp
 
   index/Background.cpp
-  index/BackgroundRebuild.cpp
   index/BackgroundIndexStorage.cpp
+  index/BackgroundQueue.cpp
+  index/BackgroundRebuild.cpp
   index/CanonicalIncludes.cpp
   index/FileIndex.cpp
   index/Index.cpp

Modified: clang-tools-extra/trunk/clangd/index/Background.cpp
URL: http://llvm.org/viewvc/llvm-project/clang-tools-extra/trunk/clangd/index/Background.cpp?rev=365773&r1=365772&r2=365773&view=diff
==============================================================================
--- clang-tools-extra/trunk/clangd/index/Background.cpp (original)
+++ clang-tools-extra/trunk/clangd/index/Background.cpp Thu Jul 11 06:34:38 2019
@@ -9,6 +9,7 @@
 #include "index/Background.h"
 #include "ClangdUnit.h"
 #include "Compiler.h"
+#include "Context.h"
 #include "Headers.h"
 #include "Logger.h"
 #include "Path.h"
@@ -33,8 +34,10 @@
 #include "llvm/ADT/StringRef.h"
 #include "llvm/ADT/StringSet.h"
 #include "llvm/Support/Error.h"
+#include "llvm/Support/Path.h"
 #include "llvm/Support/Threading.h"
 
+#include <algorithm>
 #include <atomic>
 #include <chrono>
 #include <condition_variable>
@@ -50,8 +53,6 @@ namespace clang {
 namespace clangd {
 namespace {
 
-static std::atomic<bool> PreventStarvation = {false};
-
 // Resolves URI to file paths with cache.
 class URIToFileCache {
 public:
@@ -134,8 +135,10 @@ BackgroundIndex::BackgroundIndex(
   assert(ThreadPoolSize > 0 && "Thread pool size can't be zero.");
   assert(this->IndexStorageFactory && "Storage factory can not be null!");
   for (unsigned I = 0; I < ThreadPoolSize; ++I) {
-    ThreadPool.runAsync("background-worker-" + llvm::Twine(I + 1),
-                        [this] { run(); });
+    ThreadPool.runAsync("background-worker-" + llvm::Twine(I + 1), [this] {
+      WithContext Ctx(this->BackgroundContext.clone());
+      Queue.work([&] { Rebuilder.idle(); });
+    });
   }
 }
 
@@ -144,113 +147,42 @@ BackgroundIndex::~BackgroundIndex() {
   ThreadPool.wait();
 }
 
-void BackgroundIndex::stop() {
-  Rebuilder.shutdown();
-  {
-    std::lock_guard<std::mutex> QueueLock(QueueMu);
-    std::lock_guard<std::mutex> IndexLock(IndexMu);
-    ShouldStop = true;
-  }
-  QueueCV.notify_all();
-  IndexCV.notify_all();
-}
-
-void BackgroundIndex::run() {
-  WithContext Background(BackgroundContext.clone());
-  while (true) {
-    llvm::Optional<Task> Task;
-    llvm::ThreadPriority Priority;
-    {
-      std::unique_lock<std::mutex> Lock(QueueMu);
-      QueueCV.wait(Lock, [&] { return ShouldStop || !Queue.empty(); });
-      if (ShouldStop) {
-        Queue.clear();
-        QueueCV.notify_all();
-        return;
-      }
-      ++NumActiveTasks;
-      std::tie(Task, Priority) = std::move(Queue.front());
-      Queue.pop_front();
-    }
-
-    if (Priority != llvm::ThreadPriority::Default && !PreventStarvation.load())
-      llvm::set_thread_priority(Priority);
-    (*Task)();
-    if (Priority != llvm::ThreadPriority::Default)
-      llvm::set_thread_priority(llvm::ThreadPriority::Default);
-
-    {
-      std::unique_lock<std::mutex> Lock(QueueMu);
-      if (NumActiveTasks == 1 && Queue.empty()) {
-        // We just finished the last item, the queue is going idle.
-        Lock.unlock();
-        Rebuilder.idle();
-        Lock.lock();
-      }
-      assert(NumActiveTasks > 0 && "before decrementing");
-      --NumActiveTasks;
-    }
-    QueueCV.notify_all();
-  }
-}
-
-bool BackgroundIndex::blockUntilIdleForTest(
-    llvm::Optional<double> TimeoutSeconds) {
-  std::unique_lock<std::mutex> Lock(QueueMu);
-  return wait(Lock, QueueCV, timeoutSeconds(TimeoutSeconds),
-              [&] { return Queue.empty() && NumActiveTasks == 0; });
-}
-
-void BackgroundIndex::enqueue(const std::vector<std::string> &ChangedFiles) {
-  enqueueTask(
-      [this, ChangedFiles] {
-        trace::Span Tracer("BackgroundIndexEnqueue");
-        // We're doing this asynchronously, because we'll read shards here too.
-        log("Enqueueing {0} commands for indexing", ChangedFiles.size());
-        SPAN_ATTACH(Tracer, "files", int64_t(ChangedFiles.size()));
-
-        auto NeedsReIndexing = loadShards(std::move(ChangedFiles));
-        // Run indexing for files that need to be updated.
-        std::shuffle(NeedsReIndexing.begin(), NeedsReIndexing.end(),
-                     std::mt19937(std::random_device{}()));
-        for (auto &Elem : NeedsReIndexing)
-          enqueue(std::move(Elem.first), Elem.second);
-      },
-      llvm::ThreadPriority::Default);
-}
-
-void BackgroundIndex::enqueue(tooling::CompileCommand Cmd,
-                              BackgroundIndexStorage *Storage) {
-  enqueueTask(Bind(
-                  [this, Storage](tooling::CompileCommand Cmd) {
-                    // We can't use llvm::StringRef here since we are going to
-                    // move from Cmd during the call below.
-                    const std::string FileName = Cmd.Filename;
-                    if (auto Error = index(std::move(Cmd), Storage))
-                      elog("Indexing {0} failed: {1}", FileName,
-                           std::move(Error));
-                  },
-                  std::move(Cmd)),
-              llvm::ThreadPriority::Background);
-}
-
-void BackgroundIndex::enqueueTask(Task T, llvm::ThreadPriority Priority) {
-  {
-    std::lock_guard<std::mutex> Lock(QueueMu);
-    auto I = Queue.end();
-    // We first store the tasks with Normal priority in the front of the queue.
-    // Then we store low priority tasks. Normal priority tasks are pretty rare,
-    // they should not grow beyond single-digit numbers, so it is OK to do
-    // linear search and insert after that.
-    if (Priority == llvm::ThreadPriority::Default) {
-      I = llvm::find_if(
-          Queue, [](const std::pair<Task, llvm::ThreadPriority> &Elem) {
-            return Elem.second == llvm::ThreadPriority::Background;
-          });
-    }
-    Queue.insert(I, {std::move(T), Priority});
-  }
-  QueueCV.notify_all();
+BackgroundQueue::Task BackgroundIndex::changedFilesTask(
+    const std::vector<std::string> &ChangedFiles) {
+  BackgroundQueue::Task T([this, ChangedFiles] {
+    trace::Span Tracer("BackgroundIndexEnqueue");
+    // We're doing this asynchronously, because we'll read shards here too.
+    log("Enqueueing {0} commands for indexing", ChangedFiles.size());
+    SPAN_ATTACH(Tracer, "files", int64_t(ChangedFiles.size()));
+
+    auto NeedsReIndexing = loadShards(std::move(ChangedFiles));
+    // Run indexing for files that need to be updated.
+    std::shuffle(NeedsReIndexing.begin(), NeedsReIndexing.end(),
+                 std::mt19937(std::random_device{}()));
+    std::vector<BackgroundQueue::Task> Tasks;
+    Tasks.reserve(NeedsReIndexing.size());
+    for (auto &Elem : NeedsReIndexing)
+      Tasks.push_back(indexFileTask(std::move(Elem.first), Elem.second));
+    Queue.append(std::move(Tasks));
+  });
+
+  T.QueuePri = LoadShards;
+  T.ThreadPri = llvm::ThreadPriority::Default;
+  return T;
+}
+
+BackgroundQueue::Task
+BackgroundIndex::indexFileTask(tooling::CompileCommand Cmd,
+                               BackgroundIndexStorage *Storage) {
+  BackgroundQueue::Task T([this, Storage, Cmd] {
+    // We can't use llvm::StringRef here since we are going to
+    // move from Cmd during the call below.
+    const std::string FileName = Cmd.Filename;
+    if (auto Error = index(std::move(Cmd), Storage))
+      elog("Indexing {0} failed: {1}", FileName, std::move(Error));
+  });
+  T.QueuePri = IndexFile;
+  return T;
 }
 
 /// Given index results from a TU, only update symbols coming from files that
@@ -649,9 +581,5 @@ BackgroundIndex::loadShards(std::vector<
   return NeedsReIndexing;
 }
 
-void BackgroundIndex::preventThreadStarvationInTests() {
-  PreventStarvation.store(true);
-}
-
 } // namespace clangd
 } // namespace clang

Modified: clang-tools-extra/trunk/clangd/index/Background.h
URL: http://llvm.org/viewvc/llvm-project/clang-tools-extra/trunk/clangd/index/Background.h?rev=365773&r1=365772&r2=365773&view=diff
==============================================================================
--- clang-tools-extra/trunk/clangd/index/Background.h (original)
+++ clang-tools-extra/trunk/clangd/index/Background.h Thu Jul 11 06:34:38 2019
@@ -25,6 +25,7 @@
 #include <condition_variable>
 #include <deque>
 #include <mutex>
+#include <queue>
 #include <string>
 #include <thread>
 #include <vector>
@@ -59,6 +60,47 @@ public:
   static Factory createDiskBackedStorageFactory();
 };
 
+// A priority queue of tasks which can be run on (external) worker threads.
+class BackgroundQueue {
+public:
+  /// A work item on the thread pool's queue.
+  struct Task {
+    template <typename Func>
+    explicit Task(Func &&F) : Run(std::forward<Func>(F)){};
+
+    std::function<void()> Run;
+    llvm::ThreadPriority ThreadPri = llvm::ThreadPriority::Background;
+    // Higher-priority tasks will run first.
+    unsigned QueuePri = 0;
+
+    bool operator<(const Task &O) const { return QueuePri < O.QueuePri; }
+  };
+
+  // Add tasks to the queue.
+  void push(Task);
+  void append(std::vector<Task>);
+
+  // Process items on the queue until the queue is stopped.
+  // If the queue becomes empty, OnIdle will be called (on one worker).
+  void work(std::function<void()> OnIdle = nullptr);
+
+  // Stop processing new tasks, allowing all work() calls to return soon.
+  void stop();
+
+  // Disables thread priority lowering to ensure progress on loaded systems.
+  // Only affects tasks that run after the call.
+  static void preventThreadStarvationInTests();
+  LLVM_NODISCARD bool
+  blockUntilIdleForTest(llvm::Optional<double> TimeoutSeconds);
+
+private:
+  std::mutex Mu;
+  unsigned NumActiveTasks = 0; // Only idle when queue is empty *and* no tasks.
+  std::condition_variable CV;
+  bool ShouldStop = false;
+  std::vector<Task> Queue; // max-heap
+};
+
 // Builds an in-memory index by by running the static indexer action over
 // all commands in a compilation database. Indexing happens in the background.
 // FIXME: it should also persist its state on disk for fast start.
@@ -78,19 +120,22 @@ public:
   // Enqueue translation units for indexing.
   // The indexing happens in a background thread, so the symbols will be
   // available sometime later.
-  void enqueue(const std::vector<std::string> &ChangedFiles);
+  void enqueue(const std::vector<std::string> &ChangedFiles) {
+    Queue.push(changedFilesTask(ChangedFiles));
+  }
 
   // Cause background threads to stop after ther current task, any remaining
   // tasks will be discarded.
-  void stop();
+  void stop() {
+    Rebuilder.shutdown();
+    Queue.stop();
+  }
 
   // Wait until the queue is empty, to allow deterministic testing.
   LLVM_NODISCARD bool
-  blockUntilIdleForTest(llvm::Optional<double> TimeoutSeconds = 10);
-
-  // Disables thread priority lowering in background index to make sure it can
-  // progress on loaded systems. Only affects tasks that run after the call.
-  static void preventThreadStarvationInTests();
+  blockUntilIdleForTest(llvm::Optional<double> TimeoutSeconds = 10) {
+    return Queue.blockUntilIdleForTest(TimeoutSeconds);
+  }
 
 private:
   /// Represents the state of a single file when indexing was performed.
@@ -111,11 +156,8 @@ private:
   const GlobalCompilationDatabase &CDB;
   Context BackgroundContext;
 
-  // index state
   llvm::Error index(tooling::CompileCommand,
                     BackgroundIndexStorage *IndexStorage);
-  std::mutex IndexMu;
-  std::condition_variable IndexCV;
 
   FileSymbols IndexedSymbols;
   BackgroundIndexRebuilder Rebuilder;
@@ -137,19 +179,18 @@ private:
   // Tries to load shards for the ChangedFiles.
   std::vector<std::pair<tooling::CompileCommand, BackgroundIndexStorage *>>
   loadShards(std::vector<std::string> ChangedFiles);
-  void enqueue(tooling::CompileCommand Cmd, BackgroundIndexStorage *Storage);
 
-  // queue management
-  using Task = std::function<void()>;
-  void run(); // Main loop executed by Thread. Runs tasks from Queue.
-  void enqueueTask(Task T, llvm::ThreadPriority Prioirty);
-  void enqueueLocked(tooling::CompileCommand Cmd,
-                     BackgroundIndexStorage *IndexStorage);
-  std::mutex QueueMu;
-  unsigned NumActiveTasks = 0; // Only idle when queue is empty *and* no tasks.
-  std::condition_variable QueueCV;
-  bool ShouldStop = false;
-  std::deque<std::pair<Task, llvm::ThreadPriority>> Queue;
+  BackgroundQueue::Task
+  changedFilesTask(const std::vector<std::string> &ChangedFiles);
+  BackgroundQueue::Task indexFileTask(tooling::CompileCommand Cmd,
+                                      BackgroundIndexStorage *Storage);
+
+  // from lowest to highest priority
+  enum QueuePriority {
+    IndexFile,
+    LoadShards,
+  };
+  BackgroundQueue Queue;
   AsyncTaskRunner ThreadPool;
   GlobalCompilationDatabase::CommandChanged::Subscription CommandsChanged;
 };

Added: clang-tools-extra/trunk/clangd/index/BackgroundQueue.cpp
URL: http://llvm.org/viewvc/llvm-project/clang-tools-extra/trunk/clangd/index/BackgroundQueue.cpp?rev=365773&view=auto
==============================================================================
--- clang-tools-extra/trunk/clangd/index/BackgroundQueue.cpp (added)
+++ clang-tools-extra/trunk/clangd/index/BackgroundQueue.cpp Thu Jul 11 06:34:38 2019
@@ -0,0 +1,93 @@
+//===-- BackgroundQueue.cpp - Task queue for background index -------------===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+
+#include "index/Background.h"
+
+namespace clang {
+namespace clangd {
+
+static std::atomic<bool> PreventStarvation = {false};
+
+void BackgroundQueue::preventThreadStarvationInTests() {
+  PreventStarvation.store(true);
+}
+
+void BackgroundQueue::work(std::function<void()> OnIdle) {
+  while (true) {
+    llvm::Optional<Task> Task;
+    {
+      std::unique_lock<std::mutex> Lock(Mu);
+      CV.wait(Lock, [&] { return ShouldStop || !Queue.empty(); });
+      if (ShouldStop) {
+        Queue.clear();
+        CV.notify_all();
+        return;
+      }
+      ++NumActiveTasks;
+      std::pop_heap(Queue.begin(), Queue.end());
+      Task = std::move(Queue.back());
+      Queue.pop_back();
+    }
+
+    if (Task->ThreadPri != llvm::ThreadPriority::Default &&
+        !PreventStarvation.load())
+      llvm::set_thread_priority(Task->ThreadPri);
+    Task->Run();
+    if (Task->ThreadPri != llvm::ThreadPriority::Default)
+      llvm::set_thread_priority(llvm::ThreadPriority::Default);
+
+    {
+      std::unique_lock<std::mutex> Lock(Mu);
+      if (NumActiveTasks == 1 && Queue.empty() && OnIdle) {
+        // We just finished the last item, the queue is going idle.
+        Lock.unlock();
+        OnIdle();
+        Lock.lock();
+      }
+      assert(NumActiveTasks > 0 && "before decrementing");
+      --NumActiveTasks;
+    }
+    CV.notify_all();
+  }
+}
+
+void BackgroundQueue::stop() {
+  {
+    std::lock_guard<std::mutex> QueueLock(Mu);
+    ShouldStop = true;
+  }
+  CV.notify_all();
+}
+
+void BackgroundQueue::push(Task T) {
+  {
+    std::lock_guard<std::mutex> Lock(Mu);
+    Queue.push_back(std::move(T));
+    std::push_heap(Queue.begin(), Queue.end());
+  }
+  CV.notify_all();
+}
+
+void BackgroundQueue::append(std::vector<Task> Tasks) {
+  {
+    std::lock_guard<std::mutex> Lock(Mu);
+    std::move(Tasks.begin(), Tasks.end(), std::back_inserter(Queue));
+    std::make_heap(Queue.begin(), Queue.end());
+  }
+  CV.notify_all();
+}
+
+bool BackgroundQueue::blockUntilIdleForTest(
+    llvm::Optional<double> TimeoutSeconds) {
+  std::unique_lock<std::mutex> Lock(Mu);
+  return wait(Lock, CV, timeoutSeconds(TimeoutSeconds),
+              [&] { return Queue.empty() && NumActiveTasks == 0; });
+}
+
+} // namespace clangd
+} // namespace clang

Modified: clang-tools-extra/trunk/clangd/tool/ClangdMain.cpp
URL: http://llvm.org/viewvc/llvm-project/clang-tools-extra/trunk/clangd/tool/ClangdMain.cpp?rev=365773&r1=365772&r2=365773&view=diff
==============================================================================
--- clang-tools-extra/trunk/clangd/tool/ClangdMain.cpp (original)
+++ clang-tools-extra/trunk/clangd/tool/ClangdMain.cpp Thu Jul 11 06:34:38 2019
@@ -356,7 +356,7 @@ int main(int argc, char *argv[]) {
     LogLevel = Logger::Verbose;
     PrettyPrint = true;
     // Ensure background index makes progress.
-    BackgroundIndex::preventThreadStarvationInTests();
+    BackgroundQueue::preventThreadStarvationInTests();
   }
   if (Test || EnableTestScheme) {
     static URISchemeRegistry::Add<TestScheme> X(

Modified: clang-tools-extra/trunk/clangd/unittests/BackgroundIndexTests.cpp
URL: http://llvm.org/viewvc/llvm-project/clang-tools-extra/trunk/clangd/unittests/BackgroundIndexTests.cpp?rev=365773&r1=365772&r2=365773&view=diff
==============================================================================
--- clang-tools-extra/trunk/clangd/unittests/BackgroundIndexTests.cpp (original)
+++ clang-tools-extra/trunk/clangd/unittests/BackgroundIndexTests.cpp Thu Jul 11 06:34:38 2019
@@ -82,7 +82,7 @@ public:
 
 class BackgroundIndexTest : public ::testing::Test {
 protected:
-  BackgroundIndexTest() { BackgroundIndex::preventThreadStarvationInTests(); }
+  BackgroundIndexTest() { BackgroundQueue::preventThreadStarvationInTests(); }
 };
 
 TEST_F(BackgroundIndexTest, NoCrashOnErrorFile) {
@@ -646,5 +646,37 @@ TEST_F(BackgroundIndexRebuilderTest, Loa
   EXPECT_TRUE(checkRebuild([&] { Rebuilder.doneLoading(); }));
 }
 
+TEST(BackgroundQueueTest, Priority) {
+  // Create high and low priority tasks.
+  // Once a bunch of high priority tasks have run, the queue is stopped.
+  // So the low priority tasks should never run.
+  BackgroundQueue Q;
+  std::atomic<unsigned> HiRan(0), LoRan(0);
+  BackgroundQueue::Task Lo([&] { ++LoRan; });
+  BackgroundQueue::Task Hi([&] {
+    if (++HiRan >= 10)
+      Q.stop();
+  });
+  Hi.QueuePri = 100;
+
+  // Enqueuing the low-priority ones first shouldn't make them run first.
+  Q.append(std::vector<BackgroundQueue::Task>(30, Lo));
+  for (unsigned I = 0; I < 30; ++I)
+    Q.push(Hi);
+
+  AsyncTaskRunner ThreadPool;
+  for (unsigned I = 0; I < 5; ++I)
+    ThreadPool.runAsync("worker", [&] { Q.work(); });
+  // We should test enqueue with active workers, but it's hard to avoid races.
+  // Just make sure we don't crash.
+  Q.push(Lo);
+  Q.append(std::vector<BackgroundQueue::Task>(2, Hi));
+
+  // After finishing, check the tasks that ran.
+  ThreadPool.wait();
+  EXPECT_GE(HiRan, 10u);
+  EXPECT_EQ(LoRan, 0u);
+}
+
 } // namespace clangd
 } // namespace clang




More information about the cfe-commits mailing list