[llvm] r291399 - [Orc][RPC] Lock the pending results data structure when installing new result

Lang Hames via llvm-commits llvm-commits at lists.llvm.org
Sun Jan 8 12:09:36 PST 2017


Author: lhames
Date: Sun Jan  8 14:09:35 2017
New Revision: 291399

URL: http://llvm.org/viewvc/llvm-project?rev=291399&view=rev
Log:
[Orc][RPC] Lock the pending results data structure when installing new result
handlers, make abandonPendingResults public API.

This should make installing asynchronous result handlers thread safe.

The abandonPendingResults method is made public so that clients can disconnect
from a remote even if they have asynchronous handlers awaing results from that
remote. The asynchronous handlers will all receive "abandoned result" errors as
their argument.


Modified:
    llvm/trunk/include/llvm/ExecutionEngine/Orc/RPCUtils.h

Modified: llvm/trunk/include/llvm/ExecutionEngine/Orc/RPCUtils.h
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/include/llvm/ExecutionEngine/Orc/RPCUtils.h?rev=291399&r1=291398&r2=291399&view=diff
==============================================================================
--- llvm/trunk/include/llvm/ExecutionEngine/Orc/RPCUtils.h (original)
+++ llvm/trunk/include/llvm/ExecutionEngine/Orc/RPCUtils.h Sun Jan  8 14:09:35 2017
@@ -788,15 +788,21 @@ public:
       return FnIdOrErr.takeError();
     }
 
-    // Allocate a sequence number.
-    auto SeqNo = SequenceNumberMgr.getSequenceNumber();
-    assert(!PendingResponses.count(SeqNo) &&
-           "Sequence number already allocated");
+    SequenceNumberT SeqNo; // initialized in locked scope below.
+    {
+      // Lock the pending responses map and sequence number manager.
+      std::lock_guard<std::mutex> Lock(ResponsesMutex);
+
+      // Allocate a sequence number.
+      SeqNo = SequenceNumberMgr.getSequenceNumber();
+      assert(!PendingResponses.count(SeqNo) &&
+             "Sequence number already allocated");
 
-    // Install the user handler.
-    PendingResponses[SeqNo] =
+      // Install the user handler.
+      PendingResponses[SeqNo] =
         detail::createResponseHandler<ChannelT, typename Func::ReturnType>(
             std::move(Handler));
+    }
 
     // Open the function call message.
     if (auto Err = C.startSendMessage(FnId, SeqNo)) {
@@ -863,6 +869,24 @@ public:
     return detail::ReadArgs<ArgTs...>(Args...);
   }
 
+  /// Abandon all outstanding result handlers.
+  ///
+  /// This will call all currently registered result handlers to receive an
+  /// "abandoned" error as their argument. This is used internally by the RPC
+  /// in error situations, but can also be called directly by clients who are
+  /// disconnecting from the remote and don't or can't expect responses to their
+  /// outstanding calls. (Especially for outstanding blocking calls, calling
+  /// this function may be necessary to avoid dead threads).
+  void abandonPendingResponses() {
+    // Lock the pending responses map and sequence number manager.
+    std::lock_guard<std::mutex> Lock(ResponsesMutex);
+
+    for (auto &KV : PendingResponses)
+      KV.second->abandon();
+    PendingResponses.clear();
+    SequenceNumberMgr.reset();
+  }
+
 protected:
   // The LaunchPolicy type allows a launch policy to be specified when adding
   // a function handler. See addHandlerImpl.
@@ -888,28 +912,32 @@ protected:
         wrapHandler<Func>(std::move(Handler), std::move(Launch));
   }
 
-  // Abandon all outstanding results.
-  void abandonPendingResponses() {
-    for (auto &KV : PendingResponses)
-      KV.second->abandon();
-    PendingResponses.clear();
-    SequenceNumberMgr.reset();
-  }
-
   Error handleResponse(SequenceNumberT SeqNo) {
-    auto I = PendingResponses.find(SeqNo);
-    if (I == PendingResponses.end()) {
-      abandonPendingResponses();
-      return orcError(OrcErrorCode::UnexpectedRPCResponse);
+    using Handler = typename decltype(PendingResponses)::mapped_type;
+    Handler PRHandler;
+
+    {
+      // Lock the pending responses map and sequence number manager.
+      std::unique_lock<std::mutex> Lock(ResponsesMutex);
+      auto I = PendingResponses.find(SeqNo);
+
+      if (I != PendingResponses.end()) {
+        PRHandler = std::move(I->second);
+        PendingResponses.erase(I);
+        SequenceNumberMgr.releaseSequenceNumber(SeqNo);
+      } else {
+        // Unlock the pending results map to prevent recursive lock.
+        Lock.unlock();
+        abandonPendingResponses();
+        return orcError(OrcErrorCode::UnexpectedRPCResponse);
+      }
     }
 
-    auto PRHandler = std::move(I->second);
-    PendingResponses.erase(I);
-    SequenceNumberMgr.releaseSequenceNumber(SeqNo);
+    assert(PRHandler &&
+           "If we didn't find a response handler we should have bailed out");
 
     if (auto Err = PRHandler->handleResponse(C)) {
       abandonPendingResponses();
-      SequenceNumberMgr.reset();
       return Err;
     }
 
@@ -1016,6 +1044,7 @@ protected:
 
   std::map<FunctionIdT, WrappedHandlerFn> Handlers;
 
+  std::mutex ResponsesMutex;
   detail::SequenceNumberManager<SequenceNumberT> SequenceNumberMgr;
   std::map<SequenceNumberT, std::unique_ptr<detail::ResponseHandler<ChannelT>>>
       PendingResponses;




More information about the llvm-commits mailing list