mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject git commit: Introduced io::redirect.
Date Thu, 29 May 2014 00:51:16 GMT
Repository: mesos
Updated Branches:
  refs/heads/master 60865b2f4 -> aa27d9344


Introduced io::redirect.

Review: https://reviews.apache.org/r/21998


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/aa27d934
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/aa27d934
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/aa27d934

Branch: refs/heads/master
Commit: aa27d93441459e3ac0ea8b5d583ba4d5a0a8481b
Parents: 60865b2
Author: Benjamin Hindman <benjamin.hindman@gmail.com>
Authored: Wed May 28 16:46:09 2014 -0700
Committer: Benjamin Hindman <benjamin.hindman@gmail.com>
Committed: Wed May 28 17:50:35 2014 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/io.hpp |   9 ++
 3rdparty/libprocess/src/process.cpp        |  70 +++++++++++++
 3rdparty/libprocess/src/tests/io_tests.cpp | 124 +++++++++++++++++++-----
 3 files changed, 177 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/aa27d934/3rdparty/libprocess/include/process/io.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp
index 7f9b242..c775290 100644
--- a/3rdparty/libprocess/include/process/io.hpp
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -64,6 +64,15 @@ Future<Nothing> write(int fd, const std::string& data);
 // both the 'from' and 'to' file descriptors must be non-blocking.
 Future<Nothing> splice(int from, int to, size_t chunk = 4096);
 
+
+// Redirect output from 'from' file descriptor to 'to' file descriptor
+// or /dev/null if 'to' is None. Note that depending on how we
+// redirect output we duplicate the 'from' and 'to' file descriptors
+// so we can control their lifetimes. Returns after EOF has been hit
+// on 'from' or some form of failure has occured.
+// TODO(benh): Consider subsuming lower-level 'splice'.
+Future<Nothing> redirect(int from, Option<int> to, size_t chunk = 4096);
+
 } // namespace io {
 } // namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/aa27d934/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 58bae5b..8501d2f 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -4114,6 +4114,76 @@ Future<Nothing> splice(int from, int to, size_t chunk)
   return future;
 }
 
+
+Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
+{
+  // Make sure we've got "valid" file descriptors.
+  if (from < 0 || (to.isSome() && to.get() < 0)) {
+    return Failure(strerror(EBADF));
+  }
+
+  if (to.isNone()) {
+    // Open up /dev/null that we can splice into.
+    Try<int> open = os::open("/dev/null", O_WRONLY);
+
+    if (open.isError()) {
+      return Failure("Failed to open /dev/null for writing: " + open.error());
+    }
+
+    to = open.get();
+  } else {
+    // Duplicate 'to' so that we're in control of its lifetime.
+    int fd = dup(to.get());
+    if (fd == -1) {
+      return Failure(ErrnoError("Failed to duplicate 'to' file descriptor"));
+    }
+
+    to = fd;
+  }
+
+  CHECK_SOME(to);
+
+  // Duplicate 'from' so that we're in control of its lifetime.
+  from = dup(from);
+  if (from == -1) {
+    return Failure(ErrnoError("Failed to duplicate 'from' file descriptor"));
+  }
+
+  // Set the close-on-exec flag (no-op if already set).
+  Try<Nothing> cloexec = os::cloexec(from);
+  if (cloexec.isError()) {
+    os::close(from);
+    os::close(to.get());
+    return Failure("Failed to set close-on-exec on 'from': " + cloexec.error());
+  }
+
+  cloexec = os::cloexec(to.get());
+  if (cloexec.isError()) {
+    os::close(from);
+    os::close(to.get());
+    return Failure("Failed to set close-on-exec on 'to': " + cloexec.error());
+  }
+
+  // Make the file descriptors non-blocking (no-op if already set).
+  Try<Nothing> nonblock = os::nonblock(from);
+  if (nonblock.isError()) {
+    os::close(from);
+    os::close(to.get());
+    return Failure("Failed to make 'from' non-blocking: " + nonblock.error());
+  }
+
+  nonblock = os::nonblock(to.get());
+  if (nonblock.isError()) {
+    os::close(from);
+    os::close(to.get());
+    return Failure("Failed to make 'to' non-blocking: " + nonblock.error());
+  }
+
+  return splice(from, to.get(), chunk)
+    .onAny(lambda::bind(&os::close, from))
+    .onAny(lambda::bind(&os::close, to.get()));
+}
+
 } // namespace io {
 
 namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/aa27d934/3rdparty/libprocess/src/tests/io_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp
index 3c7b459..05ea7bb 100644
--- a/3rdparty/libprocess/src/tests/io_tests.cpp
+++ b/3rdparty/libprocess/src/tests/io_tests.cpp
@@ -35,8 +35,8 @@ TEST(IO, Poll)
   ASSERT_EQ(3, write(pipes[1], "hi", 3));
   AWAIT_EXPECT_EQ(io::READ, future);
 
-  close(pipes[0]);
-  close(pipes[1]);
+  ASSERT_SOME(os::close(pipes[0]));
+  ASSERT_SOME(os::close(pipes[1]));
 }
 
 
@@ -53,8 +53,8 @@ TEST(IO, Read)
   // Test on a blocking file descriptor.
   AWAIT_EXPECT_FAILED(io::read(pipes[0], data, 3));
 
-  close(pipes[0]);
-  close(pipes[1]);
+  ASSERT_SOME(os::close(pipes[0]));
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Test on a closed file descriptor.
   AWAIT_EXPECT_FAILED(io::read(pipes[0], data, 3));
@@ -100,11 +100,11 @@ TEST(IO, Read)
   future = io::read(pipes[0], data, 3);
   ASSERT_FALSE(future.isReady());
 
-  close(pipes[1]);
+  ASSERT_SOME(os::close(pipes[1]));
 
   AWAIT_ASSERT_EQ(0u, future);
 
-  close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
 }
 
 
@@ -135,7 +135,7 @@ TEST(IO, BufferedRead)
   ASSERT_TRUE(os::nonblock(fd.get()).isSome());
   AWAIT_EXPECT_EQ(data, io::read(fd.get()));
 
-  os::close(fd.get());
+  ASSERT_SOME(os::close(fd.get()));
 
   // Now read from pipes.
   int pipes[2];
@@ -146,8 +146,8 @@ TEST(IO, BufferedRead)
   // Test on a blocking pipe.
   AWAIT_EXPECT_FAILED(io::read(pipes[0]));
 
-  close(pipes[0]);
-  close(pipes[1]);
+  ASSERT_SOME(os::close(pipes[0]));
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Test on a closed pipe.
   AWAIT_EXPECT_FAILED(io::read(pipes[0]));
@@ -164,11 +164,11 @@ TEST(IO, BufferedRead)
   ASSERT_FALSE(future.isReady());
 
   ASSERT_SOME(os::write(pipes[1], data));
-  close(pipes[1]);
+  ASSERT_SOME(os::close(pipes[1]));
 
   AWAIT_EXPECT_EQ(data, future);
 
-  close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
 
   ASSERT_SOME(os::rm("file"));
 }
@@ -186,8 +186,8 @@ TEST(IO, Write)
   // Test on a blocking file descriptor.
   AWAIT_EXPECT_FAILED(io::write(pipes[1], (void*) "hi", 2));
 
-  close(pipes[0]);
-  close(pipes[1]);
+  ASSERT_SOME(os::close(pipes[0]));
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Test on a closed file descriptor.
   AWAIT_EXPECT_FAILED(io::write(pipes[1], (void*) "hi", 2));
@@ -208,10 +208,10 @@ TEST(IO, Write)
   EXPECT_EQ("hi", string(data, 2));
 
   // Test write to broken pipe.
-  close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
   AWAIT_EXPECT_FAILED(io::write(pipes[1], (void*) "hi", 2));
 
-  close(pipes[1]);
+  ASSERT_SOME(os::close(pipes[1]));
 }
 
 
@@ -235,8 +235,8 @@ TEST(IO, DISABLED_BlockingWrite)
 
   ASSERT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK);
 
-  close(pipes[0]);
-  close(pipes[1]);
+  ASSERT_SOME(os::close(pipes[0]));
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Recreate a nonblocking pipe.
   ASSERT_NE(-1, ::pipe(pipes));
@@ -285,8 +285,8 @@ TEST(IO, DISABLED_BlockingWrite)
 
   AWAIT_EXPECT_READY(future1);
 
-  close(pipes[0]);
-  close(pipes[1]);
+  ASSERT_SOME(os::close(pipes[0]));
+  ASSERT_SOME(os::close(pipes[1]));
 }
 
 
@@ -300,7 +300,9 @@ TEST(IO, splice)
 
   Try<int> fd = os::open(
       path.get(),
-      O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+      O_WRONLY | O_CREAT | O_TRUNC,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
   ASSERT_SOME(fd);
 
   ASSERT_SOME(os::nonblock(fd.get()));
@@ -314,8 +316,8 @@ TEST(IO, splice)
   // Test splicing on a blocking file descriptor.
   AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
 
-  close(pipes[0]);
-  close(pipes[1]);
+  ASSERT_SOME(os::close(pipes[0]));
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Test on a closed file descriptor.
   AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
@@ -326,7 +328,7 @@ TEST(IO, splice)
   ASSERT_SOME(os::nonblock(pipes[1]));
 
   // Test write to broken pipe.
-  close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
   AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get()));
 
   close(pipes[1]);
@@ -363,13 +365,83 @@ TEST(IO, splice)
 
   // Closing the write pipe should cause an EOF on the read end, thus
   // completing 'splice'.
-  close(pipes[1]);
+  ASSERT_SOME(os::close(pipes[1]));
 
   AWAIT_READY(splice);
 
-  close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
+  ASSERT_SOME(os::close(fd.get()));
+
+  // Now make sure all the data is there!
+  Try<string> read = os::read(path.get());
+  ASSERT_SOME(read);
+  EXPECT_EQ(data, read.get());
+}
+
+
+TEST(IO, redirect)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  // Start by checking that using "invalid" file descriptors fails.
+  AWAIT_EXPECT_FAILED(io::redirect(-1, 0));
+  AWAIT_EXPECT_FAILED(io::redirect(0, -1));
+
+  // Create a temporary file for redirecting into.
+  Try<string> path = os::mktemp();
+  ASSERT_SOME(path);
+
+  Try<int> fd = os::open(
+      path.get(),
+      O_WRONLY | O_CREAT | O_TRUNC,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+  ASSERT_SOME(fd);
+
+  ASSERT_SOME(os::nonblock(fd.get()));
+
+  // Use a nonblocking pipe for doing the redirection.
+  int pipes[2];
+
+  ASSERT_NE(-1, ::pipe(pipes));
+  ASSERT_SOME(os::nonblock(pipes[0]));
+  ASSERT_SOME(os::nonblock(pipes[1]));
+
+  // Now write data to the pipe and splice to the file.
+  string data =
+    "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+    "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
+    "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+    "aliquip ex ea commodo consequat. Duis aute irure dolor in "
+    "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
+    "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
+    "culpa qui officia deserunt mollit anim id est laborum.";
+
+  // Create more data!
+  while (Bytes(data.size()) < Megabytes(1)) {
+    data.append(data);
+  }
+
+  Future<Nothing> redirect = io::redirect(pipes[0], fd.get());
+
+  // Closing the read end of the pipe and the file should not have any
+  // impact as we dup the file descriptor.
+  ASSERT_SOME(os::close(pipes[0]));
+  ASSERT_SOME(os::close(fd.get()));
+
+  EXPECT_TRUE(redirect.isPending());
+
+  // Writing the data should keep the future pending as it hasn't seen
+  // EOF yet.
+  AWAIT_READY(io::write(pipes[1], data));
+
+  EXPECT_TRUE(redirect.isPending());
+
+  // Now closing the write pipe should cause an EOF on the read end,
+  // thus completing underlying splice in io::redirect.
+  ASSERT_SOME(os::close(pipes[1]));
 
-  os::close(fd.get());
+  AWAIT_READY(redirect);
 
   // Now make sure all the data is there!
   Try<string> read = os::read(path.get());


Mime
View raw message