[llvm] [Offload] Improve `olDestroyQueue` logic (PR #153041)

Ross Brunton via llvm-commits llvm-commits at lists.llvm.org
Fri Aug 15 02:00:44 PDT 2025


https://github.com/RossBrunton updated https://github.com/llvm/llvm-project/pull/153041

>From c36df62b5e7247238cc9987b382de739c4926915 Mon Sep 17 00:00:00 2001
From: Ross Brunton <ross at codeplay.com>
Date: Mon, 11 Aug 2025 16:51:30 +0100
Subject: [PATCH 1/4] [Offload] Improve `olDestroyQueue` logic

Previously, `olDestroyQueue` would not actually destroy the queue,
instead leaving it for the device to clean up when it was destroyed.
Now, the queue is either released immediately if it is complete or put
into a list of "pending" queues if it is not. Whenever we create a new
queue, we check this list to see if any are now completed. If there are
any we release their resources and use them instead of pulling from
the pool.

This prevents long running programs that create and drop many queues
without syncing them from leaking memory all over the place.
---
 offload/liboffload/src/OffloadImpl.cpp        | 69 ++++++++++++++++++-
 .../common/src/PluginInterface.cpp            | 15 ++--
 2 files changed, 76 insertions(+), 8 deletions(-)

diff --git a/offload/liboffload/src/OffloadImpl.cpp b/offload/liboffload/src/OffloadImpl.cpp
index 1c9dfc69d445a..720dfdb7adf8c 100644
--- a/offload/liboffload/src/OffloadImpl.cpp
+++ b/offload/liboffload/src/OffloadImpl.cpp
@@ -43,6 +43,17 @@ using namespace error;
 // we add some additional data here for now to avoid churn in the plugin
 // interface.
 struct ol_device_impl_t {
+  // std::mutex doesn't have a move constructor, so we need to create a new one
+  // in the destination. Since devices don't get moved once the platform has
+  // been created, this is safe.
+  ol_device_impl_t(ol_device_impl_t &&Other)
+      : DeviceNum(Other.DeviceNum), Device(Other.Device),
+        Platform(Other.Platform), Info(std::move(Other.Info)),
+        OutstandingQueues(), OutstandingQueuesMutex() {
+    assert(Other.OutstandingQueuesMutex.try_lock());
+    assert(!Other.OutstandingQueues.size());
+  }
+
   ol_device_impl_t(int DeviceNum, GenericDeviceTy *Device,
                    ol_platform_handle_t Platform, InfoTreeNode &&DevInfo)
       : DeviceNum(DeviceNum), Device(Device), Platform(Platform),
@@ -51,6 +62,9 @@ struct ol_device_impl_t {
   GenericDeviceTy *Device;
   ol_platform_handle_t Platform;
   InfoTreeNode Info;
+
+  llvm::SmallVector<__tgt_async_info *> OutstandingQueues;
+  std::mutex OutstandingQueuesMutex;
 };
 
 struct ol_platform_impl_t {
@@ -474,14 +488,65 @@ Error olMemFree_impl(void *Address) {
 
 Error olCreateQueue_impl(ol_device_handle_t Device, ol_queue_handle_t *Queue) {
   auto CreatedQueue = std::make_unique<ol_queue_impl_t>(nullptr, Device);
-  if (auto Err = Device->Device->initAsyncInfo(&(CreatedQueue->AsyncInfo)))
+
+  // The device may have some outstanding queues created by olDestroyQueue,
+  // check if any of those are finished and can be reused.
+  // Not locking the `size()` access is fine here - In the worst case we either
+  // miss a queue that exists or loop through an empty array after taking the
+  // lock. Both are sub-optimal but not that bad.
+  __tgt_async_info *OutstandingQueue = nullptr;
+  if (Device->OutstandingQueues.size()) {
+    std::lock_guard<std::mutex> Lock(Device->OutstandingQueuesMutex);
+
+    // As queues are pulled and popped from this list, longer running queues
+    // naturally bubble to the start of the array. Hence looping backwards.
+    for (auto Q = Device->OutstandingQueues.rbegin();
+         Q != Device->OutstandingQueues.rend(); Q++) {
+      if (!Device->Device->hasPendingWork(*Q)) {
+        OutstandingQueue = *Q;
+        *Q = Device->OutstandingQueues.back();
+        Device->OutstandingQueues.pop_back();
+      }
+    }
+  }
+
+  if (OutstandingQueue) {
+    // The queue is empty, but we still need to sync it to release any temporary
+    // memory allocations or do other cleanup
+    if (auto Err =
+            Device->Device->synchronize(OutstandingQueue, /*Release=*/false))
+      return Err;
+    CreatedQueue->AsyncInfo = OutstandingQueue;
+  } else if (auto Err =
+                 Device->Device->initAsyncInfo(&(CreatedQueue->AsyncInfo))) {
     return Err;
+  }
 
   *Queue = CreatedQueue.release();
   return Error::success();
 }
 
-Error olDestroyQueue_impl(ol_queue_handle_t Queue) { return olDestroy(Queue); }
+Error olDestroyQueue_impl(ol_queue_handle_t Queue) {
+  // This is safe; as soon as olDestroyQueue is called it is not possible to add
+  // any more work to the queue, so if it's finished now it will remain finished
+  // forever.
+  auto Res = Queue->Device->Device->hasPendingWork(Queue->AsyncInfo);
+  if (!Res)
+    return Res.takeError();
+
+  if (!*Res) {
+    // The queue is complete, so sync it and throw it back into the pool
+    if (auto Err = Queue->Device->Device->synchronize(Queue->AsyncInfo,
+                                                      /*Release=*/true))
+      return Err;
+  } else {
+    // The queue still has outstanding work. Store it so we can check it later
+    std::lock_guard<std::mutex> Lock(Queue->Device->OutstandingQueuesMutex);
+    Queue->Device->OutstandingQueues.push_back(Queue->AsyncInfo);
+  }
+
+  return olDestroy(Queue);
+}
 
 Error olSyncQueue_impl(ol_queue_handle_t Queue) {
   // Host plugin doesn't have a queue set so it's not safe to call synchronize
diff --git a/offload/plugins-nextgen/common/src/PluginInterface.cpp b/offload/plugins-nextgen/common/src/PluginInterface.cpp
index f177c5bc9f487..cd872065b9faa 100644
--- a/offload/plugins-nextgen/common/src/PluginInterface.cpp
+++ b/offload/plugins-nextgen/common/src/PluginInterface.cpp
@@ -1337,16 +1337,19 @@ Error PinnedAllocationMapTy::unlockUnmappedHostBuffer(void *HstPtr) {
 
 Error GenericDeviceTy::synchronize(__tgt_async_info *AsyncInfo,
                                    bool ReleaseQueue) {
+  if (!AsyncInfo)
+    return Plugin::error(ErrorCode::INVALID_ARGUMENT,
+                         "invalid async info queue");
+
   SmallVector<void *> AllocsToDelete{};
   {
     std::lock_guard<std::mutex> AllocationGuard{AsyncInfo->Mutex};
 
-    if (!AsyncInfo || !AsyncInfo->Queue)
-      return Plugin::error(ErrorCode::INVALID_ARGUMENT,
-                           "invalid async info queue");
-
-    if (auto Err = synchronizeImpl(*AsyncInfo, ReleaseQueue))
-      return Err;
+    // This can be false when no work has been added to the AsyncInfo. In which
+    // case, the device has nothing to synchronize.
+    if (AsyncInfo->Queue)
+      if (auto Err = synchronizeImpl(*AsyncInfo, ReleaseQueue))
+        return Err;
 
     std::swap(AllocsToDelete, AsyncInfo->AssociatedAllocations);
   }

>From 2f68492f1d8e2002eac3138e7c50489df7534a96 Mon Sep 17 00:00:00 2001
From: Ross Brunton <ross at codeplay.com>
Date: Tue, 12 Aug 2025 10:22:19 +0100
Subject: [PATCH 2/4] Store devices as pointers rather than in the array
 directly

---
 offload/liboffload/src/OffloadImpl.cpp | 24 +++++++-----------------
 1 file changed, 7 insertions(+), 17 deletions(-)

diff --git a/offload/liboffload/src/OffloadImpl.cpp b/offload/liboffload/src/OffloadImpl.cpp
index 720dfdb7adf8c..c7cde6bf44f10 100644
--- a/offload/liboffload/src/OffloadImpl.cpp
+++ b/offload/liboffload/src/OffloadImpl.cpp
@@ -43,17 +43,6 @@ using namespace error;
 // we add some additional data here for now to avoid churn in the plugin
 // interface.
 struct ol_device_impl_t {
-  // std::mutex doesn't have a move constructor, so we need to create a new one
-  // in the destination. Since devices don't get moved once the platform has
-  // been created, this is safe.
-  ol_device_impl_t(ol_device_impl_t &&Other)
-      : DeviceNum(Other.DeviceNum), Device(Other.Device),
-        Platform(Other.Platform), Info(std::move(Other.Info)),
-        OutstandingQueues(), OutstandingQueuesMutex() {
-    assert(Other.OutstandingQueuesMutex.try_lock());
-    assert(!Other.OutstandingQueues.size());
-  }
-
   ol_device_impl_t(int DeviceNum, GenericDeviceTy *Device,
                    ol_platform_handle_t Platform, InfoTreeNode &&DevInfo)
       : DeviceNum(DeviceNum), Device(Device), Platform(Platform),
@@ -72,7 +61,7 @@ struct ol_platform_impl_t {
                      ol_platform_backend_t BackendType)
       : Plugin(std::move(Plugin)), BackendType(BackendType) {}
   std::unique_ptr<GenericPluginTy> Plugin;
-  std::vector<ol_device_impl_t> Devices;
+  llvm::SmallVector<std::unique_ptr<ol_device_impl_t>> Devices;
   ol_platform_backend_t BackendType;
 };
 
@@ -144,7 +133,7 @@ struct OffloadContext {
 
   ol_device_handle_t HostDevice() {
     // The host platform is always inserted last
-    return &Platforms.back().Devices[0];
+    return Platforms.back().Devices[0].get();
   }
 
   static OffloadContext &get() {
@@ -203,8 +192,8 @@ Error initPlugins(OffloadContext &Context) {
         auto Info = Device->obtainInfoImpl();
         if (auto Err = Info.takeError())
           return Err;
-        Platform.Devices.emplace_back(DevNum, Device, &Platform,
-                                      std::move(*Info));
+        Platform.Devices.emplace_back(std::make_unique<ol_device_impl_t>(
+            DevNum, Device, &Platform, std::move(*Info)));
       }
     }
   }
@@ -212,7 +201,8 @@ Error initPlugins(OffloadContext &Context) {
   // Add the special host device
   auto &HostPlatform = Context.Platforms.emplace_back(
       ol_platform_impl_t{nullptr, OL_PLATFORM_BACKEND_HOST});
-  HostPlatform.Devices.emplace_back(-1, nullptr, nullptr, InfoTreeNode{});
+  HostPlatform.Devices.emplace_back(
+      std::make_unique<ol_device_impl_t>(-1, nullptr, nullptr, InfoTreeNode{}));
   Context.HostDevice()->Platform = &HostPlatform;
 
   Context.TracingEnabled = std::getenv("OFFLOAD_TRACE");
@@ -434,7 +424,7 @@ Error olGetDeviceInfoSize_impl(ol_device_handle_t Device,
 Error olIterateDevices_impl(ol_device_iterate_cb_t Callback, void *UserData) {
   for (auto &Platform : OffloadContext::get().Platforms) {
     for (auto &Device : Platform.Devices) {
-      if (!Callback(&Device, UserData)) {
+      if (!Callback(Device.get(), UserData)) {
         break;
       }
     }

>From 19d5d81849f7ed2e0c8b61c7d08794617efcefce Mon Sep 17 00:00:00 2001
From: Ross Brunton <ross at codeplay.com>
Date: Tue, 12 Aug 2025 10:57:37 +0100
Subject: [PATCH 3/4] Refactoring in response to code review

---
 offload/liboffload/src/OffloadImpl.cpp | 97 +++++++++++++++++++-------
 1 file changed, 70 insertions(+), 27 deletions(-)

diff --git a/offload/liboffload/src/OffloadImpl.cpp b/offload/liboffload/src/OffloadImpl.cpp
index c7cde6bf44f10..05fc34f72bc10 100644
--- a/offload/liboffload/src/OffloadImpl.cpp
+++ b/offload/liboffload/src/OffloadImpl.cpp
@@ -47,6 +47,12 @@ struct ol_device_impl_t {
                    ol_platform_handle_t Platform, InfoTreeNode &&DevInfo)
       : DeviceNum(DeviceNum), Device(Device), Platform(Platform),
         Info(std::forward<InfoTreeNode>(DevInfo)) {}
+
+  ~ol_device_impl_t() {
+    assert(!OutstandingQueues.size() &&
+           "Device object dropped with outstanding queues");
+  }
+
   int DeviceNum;
   GenericDeviceTy *Device;
   ol_platform_handle_t Platform;
@@ -54,6 +60,46 @@ struct ol_device_impl_t {
 
   llvm::SmallVector<__tgt_async_info *> OutstandingQueues;
   std::mutex OutstandingQueuesMutex;
+
+  /// If the device has any outstanding queues that are now complete, remove it
+  /// from the list and return it.
+  ///
+  /// Queues may be added to the outstanding queue list by olDestroyQueue if
+  /// they are destroyed but not completed.
+  std::optional<__tgt_async_info *> GetOutstandingQueue() {
+    // Not locking the `size()` access is fine here - In the worst case we
+    // either miss a queue that exists or loop through an empty array after
+    // taking the lock. Both are sub-optimal but not that bad.
+    if (OutstandingQueues.size()) {
+      std::lock_guard<std::mutex> Lock(OutstandingQueuesMutex);
+
+      // As queues are pulled and popped from this list, longer running queues
+      // naturally bubble to the start of the array. Hence looping backwards.
+      for (auto Q = OutstandingQueues.rbegin(); Q != OutstandingQueues.rend();
+           Q++) {
+        if (!Device->hasPendingWork(*Q)) {
+          auto OutstandingQueue = *Q;
+          *Q = OutstandingQueues.back();
+          OutstandingQueues.pop_back();
+          return OutstandingQueue;
+        }
+      }
+    }
+    return std::nullopt;
+  }
+
+  /// Complete all pending work for this device and perform any needed cleanup.
+  ///
+  /// After calling this function, no liboffload functions should be called with
+  /// this device handle.
+  llvm::Error Destroy() {
+    llvm::Error Result = Plugin::success();
+    for (auto Q : OutstandingQueues)
+      if (auto Err = Device->synchronize(Q, /*Release=*/true))
+        Result = llvm::joinErrors(std::move(Result), std::move(Err));
+    OutstandingQueues.clear();
+    return Result;
+  }
 };
 
 struct ol_platform_impl_t {
@@ -63,6 +109,23 @@ struct ol_platform_impl_t {
   std::unique_ptr<GenericPluginTy> Plugin;
   llvm::SmallVector<std::unique_ptr<ol_device_impl_t>> Devices;
   ol_platform_backend_t BackendType;
+
+  /// Complete all pending work for this platform and perform any needed
+  /// cleanup.
+  ///
+  /// After calling this function, no liboffload functions should be called with
+  /// this platform handle.
+  llvm::Error Destroy() {
+    llvm::Error Result = Plugin::success();
+    for (auto &D : Devices)
+      if (auto Err = D->Destroy())
+        Result = llvm::joinErrors(std::move(Result), std::move(Err));
+
+    if (auto Res = Plugin->deinit())
+      Result = llvm::joinErrors(std::move(Result), std::move(Res));
+
+    return Result;
+  }
 };
 
 struct ol_queue_impl_t {
@@ -243,7 +306,7 @@ Error olShutDown_impl() {
     if (!P.Plugin || !P.Plugin->is_initialized())
       continue;
 
-    if (auto Res = P.Plugin->deinit())
+    if (auto Res = P.Destroy())
       Result = llvm::joinErrors(std::move(Result), std::move(Res));
   }
 
@@ -479,34 +542,14 @@ Error olMemFree_impl(void *Address) {
 Error olCreateQueue_impl(ol_device_handle_t Device, ol_queue_handle_t *Queue) {
   auto CreatedQueue = std::make_unique<ol_queue_impl_t>(nullptr, Device);
 
-  // The device may have some outstanding queues created by olDestroyQueue,
-  // check if any of those are finished and can be reused.
-  // Not locking the `size()` access is fine here - In the worst case we either
-  // miss a queue that exists or loop through an empty array after taking the
-  // lock. Both are sub-optimal but not that bad.
-  __tgt_async_info *OutstandingQueue = nullptr;
-  if (Device->OutstandingQueues.size()) {
-    std::lock_guard<std::mutex> Lock(Device->OutstandingQueuesMutex);
-
-    // As queues are pulled and popped from this list, longer running queues
-    // naturally bubble to the start of the array. Hence looping backwards.
-    for (auto Q = Device->OutstandingQueues.rbegin();
-         Q != Device->OutstandingQueues.rend(); Q++) {
-      if (!Device->Device->hasPendingWork(*Q)) {
-        OutstandingQueue = *Q;
-        *Q = Device->OutstandingQueues.back();
-        Device->OutstandingQueues.pop_back();
-      }
-    }
-  }
-
+  auto OutstandingQueue = Device->GetOutstandingQueue();
   if (OutstandingQueue) {
     // The queue is empty, but we still need to sync it to release any temporary
-    // memory allocations or do other cleanup
+    // memory allocations or do other cleanup.
     if (auto Err =
-            Device->Device->synchronize(OutstandingQueue, /*Release=*/false))
+            Device->Device->synchronize(*OutstandingQueue, /*Release=*/false))
       return Err;
-    CreatedQueue->AsyncInfo = OutstandingQueue;
+    CreatedQueue->AsyncInfo = *OutstandingQueue;
   } else if (auto Err =
                  Device->Device->initAsyncInfo(&(CreatedQueue->AsyncInfo))) {
     return Err;
@@ -525,12 +568,12 @@ Error olDestroyQueue_impl(ol_queue_handle_t Queue) {
     return Res.takeError();
 
   if (!*Res) {
-    // The queue is complete, so sync it and throw it back into the pool
+    // The queue is complete, so sync it and throw it back into the pool.
     if (auto Err = Queue->Device->Device->synchronize(Queue->AsyncInfo,
                                                       /*Release=*/true))
       return Err;
   } else {
-    // The queue still has outstanding work. Store it so we can check it later
+    // The queue still has outstanding work. Store it so we can check it later.
     std::lock_guard<std::mutex> Lock(Queue->Device->OutstandingQueuesMutex);
     Queue->Device->OutstandingQueues.push_back(Queue->AsyncInfo);
   }

>From f0744102a7ca99642731b70f4aa69f3c2fe69c81 Mon Sep 17 00:00:00 2001
From: Ross Brunton <ross at codeplay.com>
Date: Fri, 15 Aug 2025 09:29:21 +0100
Subject: [PATCH 4/4] Respond to feedback

---
 offload/liboffload/src/OffloadImpl.cpp | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/offload/liboffload/src/OffloadImpl.cpp b/offload/liboffload/src/OffloadImpl.cpp
index 05fc34f72bc10..9b748a973350a 100644
--- a/offload/liboffload/src/OffloadImpl.cpp
+++ b/offload/liboffload/src/OffloadImpl.cpp
@@ -66,7 +66,7 @@ struct ol_device_impl_t {
   ///
   /// Queues may be added to the outstanding queue list by olDestroyQueue if
   /// they are destroyed but not completed.
-  std::optional<__tgt_async_info *> GetOutstandingQueue() {
+  __tgt_async_info *getOutstandingQueue() {
     // Not locking the `size()` access is fine here - In the worst case we
     // either miss a queue that exists or loop through an empty array after
     // taking the lock. Both are sub-optimal but not that bad.
@@ -85,14 +85,14 @@ struct ol_device_impl_t {
         }
       }
     }
-    return std::nullopt;
+    return nullptr;
   }
 
   /// Complete all pending work for this device and perform any needed cleanup.
   ///
   /// After calling this function, no liboffload functions should be called with
   /// this device handle.
-  llvm::Error Destroy() {
+  llvm::Error destroy() {
     llvm::Error Result = Plugin::success();
     for (auto Q : OutstandingQueues)
       if (auto Err = Device->synchronize(Q, /*Release=*/true))
@@ -115,10 +115,10 @@ struct ol_platform_impl_t {
   ///
   /// After calling this function, no liboffload functions should be called with
   /// this platform handle.
-  llvm::Error Destroy() {
+  llvm::Error destroy() {
     llvm::Error Result = Plugin::success();
     for (auto &D : Devices)
-      if (auto Err = D->Destroy())
+      if (auto Err = D->destroy())
         Result = llvm::joinErrors(std::move(Result), std::move(Err));
 
     if (auto Res = Plugin->deinit())
@@ -306,7 +306,7 @@ Error olShutDown_impl() {
     if (!P.Plugin || !P.Plugin->is_initialized())
       continue;
 
-    if (auto Res = P.Destroy())
+    if (auto Res = P.destroy())
       Result = llvm::joinErrors(std::move(Result), std::move(Res));
   }
 
@@ -542,14 +542,14 @@ Error olMemFree_impl(void *Address) {
 Error olCreateQueue_impl(ol_device_handle_t Device, ol_queue_handle_t *Queue) {
   auto CreatedQueue = std::make_unique<ol_queue_impl_t>(nullptr, Device);
 
-  auto OutstandingQueue = Device->GetOutstandingQueue();
+  auto OutstandingQueue = Device->getOutstandingQueue();
   if (OutstandingQueue) {
     // The queue is empty, but we still need to sync it to release any temporary
     // memory allocations or do other cleanup.
     if (auto Err =
-            Device->Device->synchronize(*OutstandingQueue, /*Release=*/false))
+            Device->Device->synchronize(OutstandingQueue, /*Release=*/false))
       return Err;
-    CreatedQueue->AsyncInfo = *OutstandingQueue;
+    CreatedQueue->AsyncInfo = OutstandingQueue;
   } else if (auto Err =
                  Device->Device->initAsyncInfo(&(CreatedQueue->AsyncInfo))) {
     return Err;



More information about the llvm-commits mailing list