[llvm] 83051c5 - [mlgo] Make InteractiveModelRunner actually work with named pipes

Mircea Trofin via llvm-commits llvm-commits at lists.llvm.org
Wed Feb 1 15:24:57 PST 2023


Author: Mircea Trofin
Date: 2023-02-01T15:24:44-08:00
New Revision: 83051c5a5f09fe4de5d5b504e5fb864060e9794b

URL: https://github.com/llvm/llvm-project/commit/83051c5a5f09fe4de5d5b504e5fb864060e9794b
DIFF: https://github.com/llvm/llvm-project/commit/83051c5a5f09fe4de5d5b504e5fb864060e9794b.diff

LOG: [mlgo] Make InteractiveModelRunner actually work with named pipes

Turns out raw_fd_stream doesn't work with named pipes, so we just need
to lower the abstraction. Updated the unittest accordingly. Because
mkfifo's path argument requires a certain naming pattern on Windows
(IIUC), restricted the test to Linux only.

Differential Revision: https://reviews.llvm.org/D143110

Added: 
    

Modified: 
    llvm/include/llvm/Analysis/InteractiveModelRunner.h
    llvm/lib/Analysis/InteractiveModelRunner.cpp
    llvm/unittests/Analysis/MLModelRunnerTest.cpp

Removed: 
    


################################################################################
diff  --git a/llvm/include/llvm/Analysis/InteractiveModelRunner.h b/llvm/include/llvm/Analysis/InteractiveModelRunner.h
index a9324f19aceb8..ffcd4a37c5c73 100644
--- a/llvm/include/llvm/Analysis/InteractiveModelRunner.h
+++ b/llvm/include/llvm/Analysis/InteractiveModelRunner.h
@@ -14,6 +14,7 @@
 #include "llvm/Analysis/TensorSpec.h"
 #include "llvm/Analysis/Utils/TrainingLogger.h"
 #include "llvm/Config/llvm-config.h"
+#include "llvm/Support/FileSystem.h"
 #include "llvm/Support/raw_ostream.h"
 #include <system_error>
 
@@ -32,6 +33,11 @@ namespace llvm {
 /// Note that the correctness of the received data is the responsibility of the
 /// host. In particular, if insufficient data were sent, the compiler will block
 /// when waiting for an advice.
+///
+/// Note that the host can either open the pipes RW, or open first the pipe to
+/// the compiler - i.e. the "Inbound" - and then the "Outbound", to avoid
+/// deadlock. This is because the compiler first tries to open the inbound
+/// (which will hang until there's a writer on the other end).
 class InteractiveModelRunner : public MLModelRunner {
 public:
   InteractiveModelRunner(LLVMContext &Ctx,
@@ -43,19 +49,21 @@ class InteractiveModelRunner : public MLModelRunner {
     return R->getKind() == MLModelRunner::Kind::Interactive;
   }
   void switchContext(StringRef Name) {
-    Log.switchContext(Name);
-    Log.flush();
+    Log->switchContext(Name);
+    Log->flush();
   }
 
+  virtual ~InteractiveModelRunner();
+
 private:
   void *evaluateUntyped() override;
   const std::vector<TensorSpec> InputSpecs;
   const TensorSpec OutputSpec;
   std::error_code OutEC;
   std::error_code InEC;
-  raw_fd_stream Inbound;
+  sys::fs::file_t Inbound;
   std::vector<char> OutputBuffer;
-  Logger Log;
+  std::unique_ptr<Logger> Log;
 };
 } // namespace llvm
 #endif // LLVM_ANALYSIS_INTERACTIVEMODELRUNNER_H

diff  --git a/llvm/lib/Analysis/InteractiveModelRunner.cpp b/llvm/lib/Analysis/InteractiveModelRunner.cpp
index a347b49eb0729..c449ab4dffdac 100644
--- a/llvm/lib/Analysis/InteractiveModelRunner.cpp
+++ b/llvm/lib/Analysis/InteractiveModelRunner.cpp
@@ -13,6 +13,7 @@
 #include "llvm/Analysis/TensorSpec.h"
 #include "llvm/Support/CommandLine.h"
 #include "llvm/Support/ErrorHandling.h"
+#include "llvm/Support/FileSystem.h"
 #include "llvm/Support/raw_ostream.h"
 
 using namespace llvm;
@@ -34,44 +35,53 @@ InteractiveModelRunner::InteractiveModelRunner(
     LLVMContext &Ctx, const std::vector<TensorSpec> &Inputs,
     const TensorSpec &Advice, StringRef OutboundName, StringRef InboundName)
     : MLModelRunner(Ctx, MLModelRunner::Kind::Interactive, Inputs.size()),
-      InputSpecs(Inputs), OutputSpec(Advice), Inbound(InboundName, InEC),
-      OutputBuffer(OutputSpec.getTotalTensorBufferSize()),
-      Log(std::make_unique<raw_fd_ostream>(OutboundName, OutEC), InputSpecs,
-          Advice, /*IncludeReward=*/false, Advice) {
+      InputSpecs(Inputs), OutputSpec(Advice),
+      InEC(sys::fs::openFileForRead(InboundName, Inbound)),
+      OutputBuffer(OutputSpec.getTotalTensorBufferSize()) {
   if (InEC) {
     Ctx.emitError("Cannot open inbound file: " + InEC.message());
     return;
   }
-  if (OutEC) {
-    Ctx.emitError("Cannot open outbound file: " + OutEC.message());
-    return;
+  {
+    auto OutStream = std::make_unique<raw_fd_ostream>(OutboundName, OutEC);
+    if (OutEC) {
+      Ctx.emitError("Cannot open outbound file: " + OutEC.message());
+      return;
+    }
+    Log = std::make_unique<Logger>(std::move(OutStream), InputSpecs, Advice,
+                                   /*IncludeReward=*/false, Advice);
   }
   // Just like in the no inference case, this will allocate an appropriately
   // sized buffer.
   for (size_t I = 0; I < InputSpecs.size(); ++I)
     setUpBufferForTensor(I, InputSpecs[I], nullptr);
-  Log.flush();
+  Log->flush();
+}
+
+InteractiveModelRunner::~InteractiveModelRunner() {
+  sys::fs::closeFile(Inbound);
 }
 
 void *InteractiveModelRunner::evaluateUntyped() {
-  Log.startObservation();
+  Log->startObservation();
   for (size_t I = 0; I < InputSpecs.size(); ++I)
-    Log.logTensorValue(I, reinterpret_cast<const char *>(getTensorUntyped(I)));
-  Log.endObservation();
-  Log.flush();
+    Log->logTensorValue(I, reinterpret_cast<const char *>(getTensorUntyped(I)));
+  Log->endObservation();
+  Log->flush();
 
   size_t InsPoint = 0;
   char *Buff = OutputBuffer.data();
   const size_t Limit = OutputBuffer.size();
   while (InsPoint < Limit) {
-    auto Read = Inbound.read(Buff + InsPoint, OutputBuffer.size() - InsPoint);
-    if (Read < 0) {
+    auto ReadOrErr = ::sys::fs::readNativeFile(
+        Inbound, {Buff + InsPoint, OutputBuffer.size() - InsPoint});
+    if (ReadOrErr.takeError()) {
       Ctx.emitError("Failed reading from inbound file");
       break;
     }
-    InsPoint += Read;
+    InsPoint += *ReadOrErr;
   }
   if (DebugReply != TensorType::Invalid)
     dbgs() << tensorValueToString(OutputBuffer.data(), OutputSpec);
   return OutputBuffer.data();
-}
\ No newline at end of file
+}

diff  --git a/llvm/unittests/Analysis/MLModelRunnerTest.cpp b/llvm/unittests/Analysis/MLModelRunnerTest.cpp
index 1f80eb7820983..f953c45cfc318 100644
--- a/llvm/unittests/Analysis/MLModelRunnerTest.cpp
+++ b/llvm/unittests/Analysis/MLModelRunnerTest.cpp
@@ -11,9 +11,12 @@
 #include "llvm/Analysis/NoInferenceModelRunner.h"
 #include "llvm/Analysis/ReleaseModeModelRunner.h"
 #include "llvm/Support/BinaryByteStream.h"
+#include "llvm/Support/FileSystem.h"
 #include "llvm/Support/FileUtilities.h"
 #include "llvm/Support/JSON.h"
+#include "llvm/Support/Path.h"
 #include "llvm/Support/raw_ostream.h"
+#include "llvm/Testing/Support/SupportHelpers.h"
 #include "gtest/gtest.h"
 
 #include <atomic>
@@ -126,6 +129,7 @@ TEST(ReleaseModeRunner, ExtraFeaturesOutOfOrder) {
   EXPECT_EQ(*Evaluator->getTensor<int64_t>(2), -3);
 }
 
+#if defined(LLVM_ON_UNIX)
 TEST(InteractiveModelRunner, Evaluation) {
   LLVMContext Ctx;
   // Test the interaction with an external advisor by asking for advice twice.
@@ -141,68 +145,65 @@ TEST(InteractiveModelRunner, Evaluation) {
   // Create the 2 files. Ideally we'd create them as named pipes, but that's not
   // quite supported by the generic API.
   std::error_code EC;
-  SmallString<64> FromCompilerName;
-  SmallString<64> ToCompilerName;
-  int FromCompilerFD = 0;
-  int ToCompilerFD = 0;
-  ASSERT_EQ(sys::fs::createTemporaryFile("InteractiveModelRunner_Evaluation",
-                                         "temp", FromCompilerFD,
-                                         FromCompilerName),
-            std::error_code());
+  llvm::unittest::TempDir Tmp("tmpdir", /*Unique=*/true);
+  SmallString<128> FromCompilerName(Tmp.path().begin(), Tmp.path().end());
+  SmallString<128> ToCompilerName(Tmp.path().begin(), Tmp.path().end());
+  sys::path::append(FromCompilerName, "InteractiveModelRunner_Evaluation.out");
+  sys::path::append(ToCompilerName, "InteractiveModelRunner_Evaluation.in");
+  EXPECT_EQ(::mkfifo(FromCompilerName.c_str(), 0666), 0);
+  EXPECT_EQ(::mkfifo(ToCompilerName.c_str(), 0666), 0);
 
-  ASSERT_EQ(sys::fs::createTemporaryFile("InteractiveModelRunner_Evaluation",
-                                         "temp", ToCompilerFD, ToCompilerName),
-            std::error_code());
-
-  raw_fd_stream FromCompiler(FromCompilerName, EC);
-  EXPECT_FALSE(EC);
-  raw_fd_ostream ToCompiler(ToCompilerName, EC);
-  EXPECT_FALSE(EC);
   FileRemover Cleanup1(FromCompilerName);
   FileRemover Cleanup2(ToCompilerName);
-  InteractiveModelRunner Evaluator(Ctx, Inputs, AdviceSpec, FromCompilerName,
-                                   ToCompilerName);
-
-  Evaluator.switchContext("hi");
-
-  // Helper to read headers and other json lines.
-  SmallVector<char, 1024> Buffer;
-  auto ReadLn = [&]() {
-    Buffer.clear();
-    while (true) {
-      char Chr = 0;
-      auto Read = FromCompiler.read(&Chr, 1);
-      EXPECT_GE(Read, 0);
-      if (!Read)
-        continue;
-      if (Chr == '\n')
-        return StringRef(Buffer.data(), Buffer.size());
-      Buffer.push_back(Chr);
-    }
-  };
-  // See include/llvm/Analysis/Utils/TrainingLogger.h
-  // First comes the header
-  auto Header = json::parse(ReadLn());
-  EXPECT_FALSE(Header.takeError());
-  EXPECT_NE(Header->getAsObject()->getArray("features"), nullptr);
-  EXPECT_NE(Header->getAsObject()->getObject("advice"), nullptr);
-  // Then comes the context
-  EXPECT_FALSE(json::parse(ReadLn()).takeError());
 
   // Since the evaluator sends the features over and then blocks waiting for
   // an answer, we must spawn a thread playing the role of the advisor / host:
   std::atomic<int> SeenObservations = 0;
+  // Start the host first to make sure the pipes are being prepared. Otherwise
+  // the evaluator will hang.
   std::thread Advisor([&]() {
+    // Open the writer first. This is because the evaluator will try opening
+    // the "input" pipe first. An alternative that avoids ordering is for the
+    // host to open the pipes RW.
+    raw_fd_ostream ToCompiler(ToCompilerName, EC);
+    EXPECT_FALSE(EC);
+    sys::fs::file_t FromCompiler = {};
+    EXPECT_FALSE(sys::fs::openFileForRead(FromCompilerName, FromCompiler));
     EXPECT_EQ(SeenObservations, 0);
+    // Helper to read headers and other json lines.
+    SmallVector<char, 1024> Buffer;
+    auto ReadLn = [&]() {
+      Buffer.clear();
+      while (true) {
+        char Chr = 0;
+        auto ReadOrErr = sys::fs::readNativeFile(FromCompiler, {&Chr, 1});
+        EXPECT_FALSE(ReadOrErr.takeError());
+        if (!*ReadOrErr)
+          continue;
+        if (Chr == '\n')
+          return StringRef(Buffer.data(), Buffer.size());
+        Buffer.push_back(Chr);
+      }
+    };
+    // See include/llvm/Analysis/Utils/TrainingLogger.h
+    // First comes the header
+    auto Header = json::parse(ReadLn());
+    EXPECT_FALSE(Header.takeError());
+    EXPECT_NE(Header->getAsObject()->getArray("features"), nullptr);
+    EXPECT_NE(Header->getAsObject()->getObject("advice"), nullptr);
+    // Then comes the context
+    EXPECT_FALSE(json::parse(ReadLn()).takeError());
+
     int64_t Features[3] = {0};
     auto FullyRead = [&]() {
       size_t InsPt = 0;
       const size_t ToRead = 3 * Inputs[0].getTotalTensorBufferSize();
       char *Buff = reinterpret_cast<char *>(Features);
       while (InsPt < ToRead) {
-        auto Read = FromCompiler.read(Buff + InsPt, ToRead - InsPt);
-        EXPECT_GE(Read, 0);
-        InsPt += Read;
+        auto ReadOrErr = sys::fs::readNativeFile(
+            FromCompiler, {Buff + InsPt, ToRead - InsPt});
+        EXPECT_FALSE(ReadOrErr.takeError());
+        InsPt += *ReadOrErr;
       }
     };
     // Observation
@@ -211,8 +212,15 @@ TEST(InteractiveModelRunner, Evaluation) {
     FullyRead();
     // a "\n"
     char Chr = 0;
-    while (FromCompiler.read(&Chr, 1) == 0) {
-    }
+    auto ReadNL = [&]() {
+      do {
+        auto ReadOrErr = sys::fs::readNativeFile(FromCompiler, {&Chr, 1});
+        EXPECT_FALSE(ReadOrErr.takeError());
+        if (*ReadOrErr == 1)
+          break;
+      } while (true);
+    };
+    ReadNL();
     EXPECT_EQ(Chr, '\n');
     EXPECT_EQ(Features[0], 42);
     EXPECT_EQ(Features[1], 43);
@@ -228,8 +236,7 @@ TEST(InteractiveModelRunner, Evaluation) {
     // Second observation, and same idea as above
     EXPECT_FALSE(json::parse(ReadLn()).takeError());
     FullyRead();
-    while (FromCompiler.read(&Chr, 1) == 0) {
-    }
+    ReadNL();
     EXPECT_EQ(Chr, '\n');
     EXPECT_EQ(Features[0], 10);
     EXPECT_EQ(Features[1], -2);
@@ -239,8 +246,14 @@ TEST(InteractiveModelRunner, Evaluation) {
     ToCompiler.write(reinterpret_cast<const char *>(&Advice),
                      AdviceSpec.getTotalTensorBufferSize());
     ToCompiler.flush();
+    sys::fs::closeFile(FromCompiler);
   });
 
+  InteractiveModelRunner Evaluator(Ctx, Inputs, AdviceSpec, FromCompilerName,
+                                   ToCompilerName);
+
+  Evaluator.switchContext("hi");
+
   EXPECT_EQ(SeenObservations, 0);
   *Evaluator.getTensor<int64_t>(0) = 42;
   *Evaluator.getTensor<int64_t>(1) = 43;
@@ -256,4 +269,5 @@ TEST(InteractiveModelRunner, Evaluation) {
   EXPECT_EQ(SeenObservations, 2);
   EXPECT_FLOAT_EQ(Ret, 50.30);
   Advisor.join();
-}
\ No newline at end of file
+}
+#endif


        


More information about the llvm-commits mailing list