kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [kudu] 03/04: subprocess: use a fifo instead of stdout for IO
Date Mon, 30 Mar 2020 18:53:36 GMT
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 697301fb076e9649e7f8eb4556c91e59ab9b185f
Author: Andrew Wong <awong@cloudera.com>
AuthorDate: Fri Mar 27 22:21:21 2020 -0700

    subprocess: use a fifo instead of stdout for IO
    
    Today, communication from a Kudu Java subprocess to the C++
    SubprocessServer is done via the subprocess's stdout (the C++ message
    protocol listens on the stdout FD of the subprocess). This is risky
    because any amount of logging by the subprocess to stdout may end up
    breaking the message protocol, preventing any calls to the subprocess
    from succeeding.
    
    This updates the Kudu subprocess module on both the Java and C++ side to
    use a fifo for output instead of the subprocess's stdout. For Ranger,
    this fifo will by default be placed in the Ranger config directory,
    which, in some distributions (e.g. CDPD) will be unique per Kudu
    process. Additionally, the name of this file is chosen to avoid
    collisions with existing fifos.
    
    This patch also updates the subprocess to share stdout and stderr with
    the parent process by default, adding basic log4j configuration to the
    Java subprocess to output to System.out. Here's a snippet of the output
    from subprocess_proxy-test (which includes both Kudu's glog and stdout):
    
    0 [pool-1-thread-1] DEBUG org.apache.kudu.subprocess.QueueUtil  - Message: org.apache.kudu.subprocess.InboundRequest@48c4ccb9
has been put on the queue
    0 [pool-2-thread-1] DEBUG org.apache.kudu.subprocess.QueueUtil  - Message: org.apache.kudu.subprocess.InboundRequest@48c4ccb9
has been taken from the queue
    1096 [pool-2-thread-1] DEBUG org.apache.kudu.subprocess.QueueUtil  - Message: org.apache.kudu.subprocess.OutboundResponse@6c275a0f
has been put on the queue
    1096 [pool-3-thread-1] DEBUG org.apache.kudu.subprocess.QueueUtil  - Message: org.apache.kudu.subprocess.OutboundResponse@6c275a0f
has been taken from the queue
    WARNING: Logging before InitGoogleLogging() is written to STDERR
    I0327 23:35:20.583730 244412416 server.cc:279] Received an EOF from the subprocess
    I0327 23:35:20.590333 244948992 server.cc:412] outbound queue shut down: Aborted:
    
    This may not be the final form of our logging story for the subprocess
    (e.g. maybe we'll want to log to a file, or maybe we'll want to try
    folding it into the server's glog). But at the very least, this is
    better than not having logs at all, which is the state of logging today.
    
    I considered using Netty's FileDescriptor[1] to leverage the FDs from
    calls to pipe(). I opted not to since that brought in some hairiness
    around its usage of JNI (I saw an UnsatisfiedLinkError when I tried
    writing to the FD).
    
    [1] https://netty.io/4.0/api/io/netty/channel/unix/FileDescriptor.html
    
    Change-Id: I63e91a090fe196713a013d28301c7980e452456c
    Reviewed-on: http://gerrit.cloudera.org:8080/15574
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Attila Bukor <abukor@apache.org>
    Reviewed-by: Hao Hao <hao.hao@cloudera.com>
---
 .../kudu/subprocess/SubprocessConfiguration.java   | 46 ++++++++++++
 .../apache/kudu/subprocess/SubprocessExecutor.java |  4 +-
 src/kudu/master/catalog_manager.cc                 |  3 +-
 src/kudu/master/ranger_authz_provider.cc           |  7 +-
 src/kudu/master/ranger_authz_provider.h            |  3 +-
 src/kudu/ranger/ranger_client-test.cc              |  5 +-
 src/kudu/ranger/ranger_client.cc                   | 44 ++++++++----
 src/kudu/ranger/ranger_client.h                    |  7 +-
 src/kudu/subprocess/server.cc                      | 34 +++++++--
 src/kudu/subprocess/server.h                       | 15 +++-
 src/kudu/subprocess/subprocess_proxy-test.cc       | 12 +++-
 src/kudu/subprocess/subprocess_proxy.h             |  6 +-
 src/kudu/subprocess/subprocess_server-test.cc      | 81 ++++++++++++++++++----
 13 files changed, 221 insertions(+), 46 deletions(-)

diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
index 7234695..e80f3ba 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
@@ -17,6 +17,13 @@
 
 package org.apache.kudu.subprocess;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
@@ -24,6 +31,7 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.log4j.BasicConfigurator;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -40,6 +48,7 @@ public class SubprocessConfiguration {
   private static final String KEYTAB_FILE_DEFAULT = "";
   private String servicePrincipal;
   private static final String SERVICE_PRINCIPAL_DEFAULT = "";
+  private OutputStream outputStream;
 
   @VisibleForTesting
   static final int MAX_MESSAGE_BYTES_DEFAULT = 1024 * 1024;
@@ -49,6 +58,13 @@ public class SubprocessConfiguration {
   }
 
   /**
+   * @return the output stream to output messages to.
+   */
+  OutputStream getOutputStream() {
+    return outputStream;
+  }
+
+  /**
    * @return the size of the message queue, or the default value if not
    * provided
    */
@@ -131,26 +147,56 @@ public class SubprocessConfiguration {
     principalOpt.setRequired(false);
     options.addOption(principalOpt);
 
+    final String outputPipeLongOpt = "outputPipe";
+    Option outputPipeOpt = new Option(
+        "o", outputPipeLongOpt, /* hasArg= */ true,
+        "The pipe to output messages to. If not set, outputs to stdout (this " +
+        "is generally unsafe and should only be used in tests)");
+    outputPipeOpt.setRequired(false);
+    options.addOption(outputPipeOpt);
+
     CommandLineParser parser = new BasicParser();
+    String outputPipePath;
     try {
       CommandLine cmd = parser.parse(options, args);
       String queueSize = cmd.getOptionValue(queueSizeLongOpt);
       this.queueSize = queueSize == null ?
           QUEUE_SIZE_DEFAULT : Integer.parseInt(queueSize);
+
       String maxParserThreads = cmd.getOptionValue(maxMsgParserThreadsLongOpt);
       this.maxMsgParserThreads = maxParserThreads == null ?
           MAX_MSG_PARSER_THREADS_DEFAULT : Integer.parseInt(maxParserThreads);
+
       String maxMsgBytes = cmd.getOptionValue(maxMsgBytesLongOpt);
       this.maxMsgBytes = maxMsgBytes == null ?
           MAX_MESSAGE_BYTES_DEFAULT : Integer.parseInt(maxMsgBytes);
+
       String keytab = cmd.getOptionValue(keytabFileLongOpt);
       this.keytabFile = keytab == null ?
           KEYTAB_FILE_DEFAULT : keytab;
+
       String principal = cmd.getOptionValue(principalLongOpt);
       this.servicePrincipal = principal == null ?
           SERVICE_PRINCIPAL_DEFAULT : principal;
+
+      outputPipePath = cmd.getOptionValue(outputPipeLongOpt);
     } catch (ParseException e) {
       throw new KuduSubprocessException("Cannot parse the subprocess command line", e);
     }
+
+    try {
+      if (outputPipePath == null) {
+        this.outputStream = new SubprocessOutputStream(System.out);
+      } else {
+        // If we're not sending messages to System.out, redirect our logs to it.
+        BasicConfigurator.configure();
+        RandomAccessFile outputFile = new RandomAccessFile(new File(outputPipePath), "rw");
+        this.outputStream = new FileOutputStream(outputFile.getFD());
+      }
+    } catch (FileNotFoundException e) {
+      throw new KuduSubprocessException("Output file not found", e);
+    } catch (IOException e) {
+      throw new KuduSubprocessException("IO error opening file", e);
+    }
   }
 }
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
index e9c06e5..1363ea0 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
@@ -103,8 +103,8 @@ public class SubprocessExecutor {
     // a message to ensure the underlying buffer can hold the entire message before
     // flushing.
     try (BufferedInputStream in = new BufferedInputStream(System.in);
-         BufferedOutputStream out = new BufferedOutputStream(
-             new SubprocessOutputStream(System.out), maxMessageBytes)) {
+         BufferedOutputStream out = new BufferedOutputStream(conf.getOutputStream(),
+                                                             maxMessageBytes)) {
       MessageIO messageIO = new MessageIO(maxMessageBytes, in, out);
 
       // Start a single reader thread and run the task asynchronously.
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 53e7919..14e4173 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -787,7 +787,8 @@ CatalogManager::CatalogManager(Master* master)
   if (SentryAuthzProvider::IsEnabled()) {
     authz_provider_.reset(new SentryAuthzProvider(master_->metric_entity()));
   } else if (RangerAuthzProvider::IsEnabled()) {
-    authz_provider_.reset(new RangerAuthzProvider(master_->metric_entity()));
+    authz_provider_.reset(new RangerAuthzProvider(master_->fs_manager()->env(),
+                                                  master_->metric_entity()));
   } else {
     authz_provider_.reset(new DefaultAuthzProvider);
   }
diff --git a/src/kudu/master/ranger_authz_provider.cc b/src/kudu/master/ranger_authz_provider.cc
index f33d8f7..43ecfe5 100644
--- a/src/kudu/master/ranger_authz_provider.cc
+++ b/src/kudu/master/ranger_authz_provider.cc
@@ -31,7 +31,9 @@
 DECLARE_string(ranger_config_path);
 
 namespace kudu {
+
 class MetricEntity;
+class Env;
 
 namespace master {
 
@@ -43,8 +45,9 @@ using kudu::ranger::RangerClient;
 using std::string;
 using std::unordered_set;
 
-RangerAuthzProvider::RangerAuthzProvider(const scoped_refptr<MetricEntity>& metric_entity)
:
-  client_(metric_entity) {}
+RangerAuthzProvider::RangerAuthzProvider(Env* env,
+    const scoped_refptr<MetricEntity>& metric_entity) :
+  client_(env, metric_entity) {}
 
 Status RangerAuthzProvider::Start() {
   RETURN_NOT_OK(client_.Start());
diff --git a/src/kudu/master/ranger_authz_provider.h b/src/kudu/master/ranger_authz_provider.h
index bdfb11a..6dabf33 100644
--- a/src/kudu/master/ranger_authz_provider.h
+++ b/src/kudu/master/ranger_authz_provider.h
@@ -28,6 +28,7 @@
 
 namespace kudu {
 
+class Env;
 class MetricEntity;
 class SchemaPB;
 
@@ -43,7 +44,7 @@ namespace master {
 class RangerAuthzProvider : public AuthzProvider {
  public:
 
-  explicit RangerAuthzProvider(const scoped_refptr<MetricEntity>& metric_entity);
+  explicit RangerAuthzProvider(Env* env, const scoped_refptr<MetricEntity>& metric_entity);
 
   Status Start() override;
 
diff --git a/src/kudu/ranger/ranger_client-test.cc b/src/kudu/ranger/ranger_client-test.cc
index 9a85aaa..b15be50 100644
--- a/src/kudu/ranger/ranger_client-test.cc
+++ b/src/kudu/ranger/ranger_client-test.cc
@@ -33,6 +33,7 @@
 #include "kudu/ranger/ranger.pb.h"
 #include "kudu/subprocess/server.h"
 #include "kudu/subprocess/subprocess.pb.h"
+#include "kudu/util/env.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -93,7 +94,7 @@ class MockSubprocessServer : public SubprocessServer {
   ~MockSubprocessServer() override {}
 
   MockSubprocessServer()
-      : SubprocessServer({"mock"}, SubprocessMetrics()) {}
+      : SubprocessServer(Env::Default(), "", {"mock"}, SubprocessMetrics()) {}
 
   Status Execute(SubprocessRequestPB* req,
                  SubprocessResponsePB* resp) override {
@@ -123,7 +124,7 @@ class MockSubprocessServer : public SubprocessServer {
 class RangerClientTest : public KuduTest {
  public:
   RangerClientTest() :
-    client_(METRIC_ENTITY_server.Instantiate(&metric_registry_, "ranger_client-test"))
{}
+    client_(env_, METRIC_ENTITY_server.Instantiate(&metric_registry_, "ranger_client-test"))
{}
 
   void SetUp() override {
     std::unique_ptr<MockSubprocessServer> server(new MockSubprocessServer());
diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index a2abee1..53371ee 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -17,8 +17,7 @@
 
 #include "kudu/ranger/ranger_client.h"
 
-#include <stdint.h>
-
+#include <cstdint>
 #include <ostream>
 #include <string>
 #include <utility>
@@ -34,6 +33,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/ranger/ranger.pb.h"
 #include "kudu/security/init.h"
+#include "kudu/subprocess/server.h"
 #include "kudu/util/env.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/flag_validators.h"
@@ -59,9 +59,16 @@ TAG_FLAG(ranger_config_path, experimental);
 DEFINE_string(ranger_jar_path, "",
               "Path to the JAR file containing the Ranger subprocess. "
               "If not set, the default JAR file path is expected to be"
-              "next to the master binary");
+              "next to the master binary.");
 TAG_FLAG(ranger_jar_path, experimental);
 
+DEFINE_string(ranger_receiver_fifo_dir, "",
+              "Directory in which to create a fifo used to receive messages "
+              "from the Ranger subprocess. Existing fifos at this path will be "
+              "overwritten. If not specified, a fifo will be created in the "
+              "--ranger_config_path directory.");
+TAG_FLAG(ranger_receiver_fifo_dir, advanced);
+
 METRIC_DEFINE_histogram(server, ranger_subprocess_execution_time_ms,
     "Ranger subprocess execution time (ms)",
     kudu::MetricUnit::kMilliseconds,
@@ -155,12 +162,21 @@ static string GetJavaClasspath() {
   return Substitute("$0:$1", GetRangerJarPath(), FLAGS_ranger_config_path);
 }
 
-// Builds the arguments for the Ranger subprocess. Specifically pass
-// the principal and keytab file that the Ranger subprocess will log in with
-// if Kerberos is enabled. 'args' has the final arguments.
-// Returns 'OK' if arguments successfully created, error otherwise.
-static Status BuildArgv(vector<string>* argv) {
+
+static string ranger_fifo_base() {
+  DCHECK(!FLAGS_ranger_config_path.empty());
+  const string& fifo_dir = FLAGS_ranger_receiver_fifo_dir.empty() ?
+      FLAGS_ranger_config_path : FLAGS_ranger_receiver_fifo_dir;
+  return JoinPathSegments(fifo_dir, "ranger_receiever_fifo");
+}
+
+// Builds the arguments to start the Ranger subprocess with the given receiver
+// fifo path. Specifically pass the principal and keytab file that the Ranger
+// subprocess will log in with if Kerberos is enabled. 'args' has the final
+// arguments.  Returns 'OK' if arguments successfully created, error otherwise.
+static Status BuildArgv(const string& fifo_path, vector<string>* argv) {
   DCHECK(argv);
+  DCHECK(!FLAGS_ranger_config_path.empty());
   // Pass the required arguments to run the Ranger subprocess.
   vector<string> ret = { FLAGS_ranger_java_path, "-cp", GetJavaClasspath(), kMainClass
};
   // When Kerberos is enabled in Kudu, pass both Kudu principal and keytab file
@@ -174,7 +190,8 @@ static Status BuildArgv(vector<string>* argv) {
     ret.emplace_back("-k");
     ret.emplace_back(FLAGS_keytab_file);
   }
-
+  ret.emplace_back("-o");
+  ret.emplace_back(fifo_path);
   *argv = std::move(ret);
   return Status::OK();
 }
@@ -227,16 +244,17 @@ RangerSubprocessMetrics::RangerSubprocessMetrics(const scoped_refptr<MetricEntit
 }
 #undef HISTINIT
 
-RangerClient::RangerClient(const scoped_refptr<MetricEntity>& metric_entity)
-    : metric_entity_(metric_entity) {
+RangerClient::RangerClient(Env* env, const scoped_refptr<MetricEntity>& metric_entity)
+    : env_(env), metric_entity_(metric_entity) {
   DCHECK(metric_entity);
 }
 
 Status RangerClient::Start() {
   VLOG(1) << "Initializing Ranger subprocess server";
   vector<string> argv;
-  RETURN_NOT_OK(BuildArgv(&argv));
-  subprocess_.reset(new RangerSubprocess(std::move(argv), metric_entity_));
+  const string fifo_path = subprocess::SubprocessServer::FifoPath(ranger_fifo_base());
+  RETURN_NOT_OK(BuildArgv(fifo_path, &argv));
+  subprocess_.reset(new RangerSubprocess(env_, fifo_path, std::move(argv), metric_entity_));
   return subprocess_->Start();
 }
 
diff --git a/src/kudu/ranger/ranger_client.h b/src/kudu/ranger/ranger_client.h
index cdfb8ed..1117785 100644
--- a/src/kudu/ranger/ranger_client.h
+++ b/src/kudu/ranger/ranger_client.h
@@ -32,6 +32,8 @@
 
 namespace kudu {
 
+class Env;
+
 namespace ranger {
 
 struct ActionHash {
@@ -64,7 +66,7 @@ class RangerClient {
   };
 
   // Creates a Ranger client.
-  explicit RangerClient(const scoped_refptr<MetricEntity>& metric_entity);
+  explicit RangerClient(Env* env, const scoped_refptr<MetricEntity>& metric_entity);
 
   // Starts the RangerClient, initializes the subprocess server.
   Status Start() WARN_UNUSED_RESULT;
@@ -105,12 +107,13 @@ class RangerClient {
   void ReplaceServerForTests(std::unique_ptr<subprocess::SubprocessServer> server)
{
     // Creates a dummy RangerSubprocess if it is not initialized.
     if (!subprocess_) {
-      subprocess_.reset(new RangerSubprocess({""}, metric_entity_));
+      subprocess_.reset(new RangerSubprocess(env_, "", {""}, metric_entity_));
     }
     subprocess_->ReplaceServerForTests(std::move(server));
   }
 
  private:
+  Env* env_;
   std::unique_ptr<RangerSubprocess> subprocess_;
   scoped_refptr<MetricEntity> metric_entity_;
 };
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 4f7b8ae..76f3a3d 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -17,6 +17,8 @@
 
 #include "kudu/subprocess/server.h"
 
+#include <unistd.h>
+
 #include <csignal>
 #include <memory>
 #include <ostream>
@@ -33,6 +35,7 @@
 #include "kudu/subprocess/subprocess.pb.h"
 #include "kudu/subprocess/subprocess_protocol.h"
 #include "kudu/util/async_util.h"
+#include "kudu/util/env.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
@@ -83,16 +86,25 @@ using strings::Substitute;
 namespace kudu {
 namespace subprocess {
 
-SubprocessServer::SubprocessServer(vector<string> subprocess_argv, SubprocessMetrics
metrics)
+string SubprocessServer::FifoPath(const string& base) {
+  return Substitute("$0.$1.$2", base, getpid(), Thread::CurrentThreadId());
+}
+
+SubprocessServer::SubprocessServer(Env* env, const string& receiver_file,
+                                   vector<string> subprocess_argv,
+                                   SubprocessMetrics metrics)
     : call_timeout_(MonoDelta::FromSeconds(FLAGS_subprocess_timeout_secs)),
       next_id_(1),
       closing_(1),
+      env_(env),
+      receiver_file_(receiver_file),
       process_(make_shared<Subprocess>(std::move(subprocess_argv))),
       outbound_call_queue_(FLAGS_subprocess_request_queue_size_bytes),
       inbound_response_queue_(FLAGS_subprocess_response_queue_size_bytes),
       metrics_(std::move(metrics)) {
   process_->ShareParentStdin(false);
-  process_->ShareParentStdout(false);
+  process_->ShareParentStdout(true);
+  process_->ShareParentStderr(true);
 }
 
 SubprocessServer::~SubprocessServer() {
@@ -111,6 +123,7 @@ void SubprocessServer::StartSubprocessThread(const StatusCallback&
cb) {
 
 Status SubprocessServer::Init() {
   VLOG(2) << "Starting the subprocess";
+
   Synchronizer sync;
   auto cb = sync.AsStatusCallback();
   RETURN_NOT_OK(Thread::Create("subprocess", "start",
@@ -118,11 +131,20 @@ Status SubprocessServer::Init() {
                                &read_thread_));
   RETURN_NOT_OK_PREPEND(sync.Wait(), "Failed to start subprocess");
 
+  // NOTE: callers should try to ensure each receiver file path is used by a
+  // single subprocess.
+  if (env_->FileExists(receiver_file_)) {
+    RETURN_NOT_OK(env_->DeleteFile(receiver_file_));
+  }
+  // Open the file we'll use for receiving messages.
+  RETURN_NOT_OK(env_->NewFifo(receiver_file_, &receiver_fifo_));
+  RETURN_NOT_OK(receiver_fifo_->OpenForReads());
+
   // Start the message protocol.
   CHECK(!message_protocol_);
   message_protocol_.reset(new SubprocessProtocol(SubprocessProtocol::SerializationMode::PB,
                                                  SubprocessProtocol::CloseMode::CLOSE_ON_DESTROY,
-                                                 process_->ReleaseChildStdoutFd(),
+                                                 receiver_fifo_->read_fd(),
                                                  process_->ReleaseChildStdinFd()));
   const int num_threads = FLAGS_subprocess_num_responder_threads;
   responder_threads_.resize(num_threads);
@@ -195,6 +217,11 @@ void SubprocessServer::Shutdown() {
   for (const auto& t : responder_threads_) {
     t->Join();
   }
+  // Delete the receiver fifo.
+  receiver_fifo_.reset();
+  if (env_->FileExists(receiver_file_)) {
+    WARN_NOT_OK(env_->DeleteFile(receiver_file_), "Error deleting receiver file");
+  }
 
   // Call any of the remaining callbacks.
   std::map<CallId, shared_ptr<SubprocessCall>> calls;
@@ -216,7 +243,6 @@ void SubprocessServer::ReceiveMessagesThread() {
     Status s = message_protocol_->ReceiveMessage(&response);
     if (s.IsEndOfFile()) {
       // The underlying pipe was closed. We're likely shutting down.
-      DCHECK_EQ(0, closing_.count());
       LOG(INFO) << "Received an EOF from the subprocess";
       return;
     }
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index ce0a2df..5d46ef7 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -47,6 +47,8 @@
 
 namespace kudu {
 
+class Env;
+class Fifo;
 class Subprocess;
 class Thread;
 
@@ -216,7 +218,12 @@ typedef BlockingQueue<ResponsePBAndTimer, ResponseLogicalSize>
ResponseQueue;
 // Public methods are virtual so a mock server can be used in tests.
 class SubprocessServer {
  public:
-  SubprocessServer(std::vector<std::string> subprocess_argv, SubprocessMetrics metrics);
+  // Returns a path based on 'base' that can be used as a fifo, avoiding
+  // collisions between subprocesses started in different process and threads.
+  static std::string FifoPath(const std::string& base);
+
+  SubprocessServer(Env* env, const std::string& receiver_file,
+      std::vector<std::string> subprocess_argv, SubprocessMetrics metrics);
   virtual ~SubprocessServer();
 
   // Initialize the server, starting the subprocess and worker threads.
@@ -270,6 +277,12 @@ class SubprocessServer {
   // Latch used to indicate that the server is shutting down.
   CountDownLatch closing_;
 
+  Env* env_;
+  const std::string receiver_file_;
+
+  // The fifo used to receieve messages from the subprocess.
+  std::unique_ptr<Fifo> receiver_fifo_;
+
   // The underlying subprocess.
   std::shared_ptr<Subprocess> process_;
 
diff --git a/src/kudu/subprocess/subprocess_proxy-test.cc b/src/kudu/subprocess/subprocess_proxy-test.cc
index ac03cc5..ebfe003 100644
--- a/src/kudu/subprocess/subprocess_proxy-test.cc
+++ b/src/kudu/subprocess/subprocess_proxy-test.cc
@@ -30,6 +30,7 @@
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/subprocess/server.h"
 #include "kudu/subprocess/echo_subprocess.h"
 #include "kudu/subprocess/subprocess.pb.h"
 #include "kudu/util/env.h"
@@ -64,7 +65,8 @@ class EchoSubprocessTest : public KuduTest {
  public:
   EchoSubprocessTest()
       : metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
-                                                        "subprocess_proxy-test")) {}
+                                                        "subprocess_proxy-test")),
+        test_dir_(GetTestDataDirectory()) {}
 
   void SetUp() override {
     KuduTest::SetUp();
@@ -77,12 +79,15 @@ class EchoSubprocessTest : public KuduTest {
     const string bin_dir = DirName(exe);
     string java_home;
     RETURN_NOT_OK(FindHomeDir("java", bin_dir, &java_home));
+    const string& pipe_file = SubprocessServer::FifoPath(JoinPathSegments(test_dir_,
"echo_pipe"));
     vector<string> argv = {
       Substitute("$0/bin/java", java_home),
       "-cp", Substitute("$0/kudu-subprocess.jar", bin_dir),
-      "org.apache.kudu.subprocess.echo.EchoSubprocessMain"
+      "org.apache.kudu.subprocess.echo.EchoSubprocessMain",
+      "-o", pipe_file,
     };
-    echo_subprocess_.reset(new EchoSubprocess(std::move(argv), metric_entity_));
+    echo_subprocess_.reset(new EchoSubprocess(env_, pipe_file, std::move(argv),
+                                              metric_entity_));
     return echo_subprocess_->Start();
   }
 
@@ -90,6 +95,7 @@ class EchoSubprocessTest : public KuduTest {
   MetricRegistry metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
   unique_ptr<EchoSubprocess> echo_subprocess_;
+  const string test_dir_;
 };
 
 #define GET_HIST(metric_entity, metric_name) \
diff --git a/src/kudu/subprocess/subprocess_proxy.h b/src/kudu/subprocess/subprocess_proxy.h
index d8cf6c7..76289bd 100644
--- a/src/kudu/subprocess/subprocess_proxy.h
+++ b/src/kudu/subprocess/subprocess_proxy.h
@@ -29,6 +29,7 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
+#include "kudu/util/thread.h"
 
 namespace kudu {
 namespace subprocess {
@@ -39,8 +40,9 @@ namespace subprocess {
 template<class ReqPB, class RespPB, class MetricsPB>
 class SubprocessProxy {
  public:
-  SubprocessProxy(std::vector<std::string> argv, const scoped_refptr<MetricEntity>&
entity)
-      : server_(new SubprocessServer(std::move(argv), MetricsPB(entity))) {}
+  SubprocessProxy(Env* env, const std::string& receiver_file,
+                  std::vector<std::string> argv, const scoped_refptr<MetricEntity>&
entity)
+      : server_(new SubprocessServer(env, receiver_file, std::move(argv), MetricsPB(entity)))
{}
 
   // Starts the underlying subprocess.
   Status Start() {
diff --git a/src/kudu/subprocess/subprocess_server-test.cc b/src/kudu/subprocess/subprocess_server-test.cc
index 90decac..af00559 100644
--- a/src/kudu/subprocess/subprocess_server-test.cc
+++ b/src/kudu/subprocess/subprocess_server-test.cc
@@ -36,6 +36,7 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/path_util.h"
+#include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
@@ -74,6 +75,19 @@ SubprocessRequestPB CreateEchoSubprocessRequestPB(const string& payload,
   return request;
 }
 
+Status CheckMessage(const SubprocessResponsePB& resp, const string& expected_msg)
{
+  EchoResponsePB echo_resp;
+  if (!resp.response().UnpackTo(&echo_resp)) {
+    return Status::Corruption(Substitute("Failed to unpack echo response: $0",
+                                         pb_util::SecureDebugString(resp)));
+  }
+  if (expected_msg != echo_resp.data()) {
+    return Status::Corruption(Substitute("Expected: '$0', got: '$1'",
+                                         expected_msg, echo_resp.data()));
+  }
+  return Status::OK();
+}
+
 const char* kHello = "hello world";
 
 } // anonymous namespace
@@ -81,16 +95,17 @@ const char* kHello = "hello world";
 class SubprocessServerTest : public KuduTest {
  public:
   SubprocessServerTest()
-      : metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
+      : test_dir_(GetTestDataDirectory()),
+        metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
                                                         "subprocess_server-test")) {}
   void SetUp() override {
     KuduTest::SetUp();
     ASSERT_OK(ResetSubprocessServer());
   }
 
-  // Resets the subprocess server to account for any new configuration.
-  Status ResetSubprocessServer(int java_queue_size = 0,
-                               int java_parser_threads = 0) {
+  Status InitSubprocessServer(int java_queue_size,
+                              int java_parser_threads,
+                              shared_ptr<SubprocessServer>* out) {
     // Set up a subprocess server pointing at the kudu-subprocess.jar that
     // contains an echo handler and call EchoSubprocessMain.
     string exe;
@@ -98,25 +113,34 @@ class SubprocessServerTest : public KuduTest {
     const string bin_dir = DirName(exe);
     string java_home;
     RETURN_NOT_OK(FindHomeDir("java", bin_dir, &java_home));
+    const string pipe_path = SubprocessServer::FifoPath(JoinPathSegments(test_dir_, "echo_pipe"));
     vector<string> argv = {
       Substitute("$0/bin/java", java_home),
       "-cp", Substitute("$0/kudu-subprocess.jar", bin_dir),
-      "org.apache.kudu.subprocess.echo.EchoSubprocessMain"
+      "org.apache.kudu.subprocess.echo.EchoSubprocessMain",
+      "-o", pipe_path,
     };
     if (java_queue_size > 0) {
-      argv.emplace_back("q");
+      argv.emplace_back("-q");
       argv.emplace_back(std::to_string(java_queue_size));
     }
     if (java_parser_threads > 0) {
-      argv.emplace_back("p");
+      argv.emplace_back("-p");
       argv.emplace_back(std::to_string(java_parser_threads));
     }
-    server_ = make_shared<SubprocessServer>(std::move(argv),
-                                            EchoSubprocessMetrics(metric_entity_));
-    return server_->Init();
+    *out = make_shared<SubprocessServer>(env_, pipe_path, std::move(argv),
+                                         EchoSubprocessMetrics(metric_entity_));
+    return (*out)->Init();
+  }
+
+  // Resets the subprocess server to account for any new configuration.
+  Status ResetSubprocessServer(int java_queue_size = 0,
+                               int java_parser_threads = 0) {
+    return InitSubprocessServer(java_queue_size, java_parser_threads, &server_);
   }
 
  protected:
+  const string test_dir_;
   MetricRegistry metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
   shared_ptr<SubprocessServer> server_;
@@ -170,11 +194,9 @@ TEST_F(SubprocessServerTest, TestManyConcurrentCalls) {
                           reqs_sent, elapsed_seconds, reqs_sent / elapsed_seconds);
   for (int t = 0; t < kNumThreads; t++) {
     for (int i = 0; i < kNumPerThread; i++) {
-      EchoResponsePB echo_resp;
-      ASSERT_TRUE(responses[t][i].response().UnpackTo(&echo_resp));
       EchoRequestPB echo_req;
       requests[t][i].request().UnpackTo(&echo_req);
-      ASSERT_EQ(echo_resp.data(), echo_req.data());
+      ASSERT_OK(CheckMessage(responses[t][i], echo_req.data()));
     }
   }
 }
@@ -280,6 +302,39 @@ TEST_F(SubprocessServerTest, TestInitFromThread) {
   ASSERT_OK(server_->Execute(&request, &response));
 }
 
+// Test that we've configured out subprocess server such that we can run it
+// from multiple threads without having them collide with each other.
+TEST_F(SubprocessServerTest, TestRunFromMultipleThreads) {
+  const int kNumThreads = 3;
+  vector<thread> threads;
+  vector<Status> results(kNumThreads);
+#define EXIT_NOT_OK(s, n) do { \
+  Status _s = (s); \
+  if (!_s.ok()) { \
+    results[n] = _s; \
+    return; \
+  } \
+} while (0);
+
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&, i] {
+      shared_ptr<SubprocessServer> server;
+      EXIT_NOT_OK(InitSubprocessServer(0, 0, &server), i);
+      const string msg = Substitute("$0 bottles of tea on the wall", i);
+      SubprocessRequestPB req = CreateEchoSubprocessRequestPB(msg);
+      SubprocessResponsePB resp;
+      EXIT_NOT_OK(server->Execute(&req, &resp), i);
+      EXIT_NOT_OK(CheckMessage(resp, msg), i);
+    });
+  }
+  for (auto& t : threads) {
+    t.join();
+  }
+  for (const auto& r : results) {
+    ASSERT_OK(r);
+  }
+}
+
 } // namespace subprocess
 } // namespace kudu
 


Mime
View raw message