Return-Path: X-Original-To: apmail-mesos-commits-archive@www.apache.org Delivered-To: apmail-mesos-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AD397108E3 for ; Thu, 29 May 2014 00:51:16 +0000 (UTC) Received: (qmail 91061 invoked by uid 500); 29 May 2014 00:51:16 -0000 Delivered-To: apmail-mesos-commits-archive@mesos.apache.org Received: (qmail 91039 invoked by uid 500); 29 May 2014 00:51:16 -0000 Mailing-List: contact commits-help@mesos.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mesos.apache.org Delivered-To: mailing list commits@mesos.apache.org Received: (qmail 91032 invoked by uid 99); 29 May 2014 00:51:16 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 May 2014 00:51:16 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5CFD84EBC4; Thu, 29 May 2014 00:51:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benh@apache.org To: commits@mesos.apache.org Message-Id: <659b2d6f3c3e498d9c11df9206090028@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: Introduced io::redirect. Date: Thu, 29 May 2014 00:51:16 +0000 (UTC) Repository: mesos Updated Branches: refs/heads/master 60865b2f4 -> aa27d9344 Introduced io::redirect. Review: https://reviews.apache.org/r/21998 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/aa27d934 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/aa27d934 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/aa27d934 Branch: refs/heads/master Commit: aa27d93441459e3ac0ea8b5d583ba4d5a0a8481b Parents: 60865b2 Author: Benjamin Hindman Authored: Wed May 28 16:46:09 2014 -0700 Committer: Benjamin Hindman Committed: Wed May 28 17:50:35 2014 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/io.hpp | 9 ++ 3rdparty/libprocess/src/process.cpp | 70 +++++++++++++ 3rdparty/libprocess/src/tests/io_tests.cpp | 124 +++++++++++++++++++----- 3 files changed, 177 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/aa27d934/3rdparty/libprocess/include/process/io.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/io.hpp b/3rdparty/libprocess/include/process/io.hpp index 7f9b242..c775290 100644 --- a/3rdparty/libprocess/include/process/io.hpp +++ b/3rdparty/libprocess/include/process/io.hpp @@ -64,6 +64,15 @@ Future write(int fd, const std::string& data); // both the 'from' and 'to' file descriptors must be non-blocking. Future splice(int from, int to, size_t chunk = 4096); + +// Redirect output from 'from' file descriptor to 'to' file descriptor +// or /dev/null if 'to' is None. Note that depending on how we +// redirect output we duplicate the 'from' and 'to' file descriptors +// so we can control their lifetimes. Returns after EOF has been hit +// on 'from' or some form of failure has occured. +// TODO(benh): Consider subsuming lower-level 'splice'. +Future redirect(int from, Option to, size_t chunk = 4096); + } // namespace io { } // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/aa27d934/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 58bae5b..8501d2f 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -4114,6 +4114,76 @@ Future splice(int from, int to, size_t chunk) return future; } + +Future redirect(int from, Option to, size_t chunk) +{ + // Make sure we've got "valid" file descriptors. + if (from < 0 || (to.isSome() && to.get() < 0)) { + return Failure(strerror(EBADF)); + } + + if (to.isNone()) { + // Open up /dev/null that we can splice into. + Try open = os::open("/dev/null", O_WRONLY); + + if (open.isError()) { + return Failure("Failed to open /dev/null for writing: " + open.error()); + } + + to = open.get(); + } else { + // Duplicate 'to' so that we're in control of its lifetime. + int fd = dup(to.get()); + if (fd == -1) { + return Failure(ErrnoError("Failed to duplicate 'to' file descriptor")); + } + + to = fd; + } + + CHECK_SOME(to); + + // Duplicate 'from' so that we're in control of its lifetime. + from = dup(from); + if (from == -1) { + return Failure(ErrnoError("Failed to duplicate 'from' file descriptor")); + } + + // Set the close-on-exec flag (no-op if already set). + Try cloexec = os::cloexec(from); + if (cloexec.isError()) { + os::close(from); + os::close(to.get()); + return Failure("Failed to set close-on-exec on 'from': " + cloexec.error()); + } + + cloexec = os::cloexec(to.get()); + if (cloexec.isError()) { + os::close(from); + os::close(to.get()); + return Failure("Failed to set close-on-exec on 'to': " + cloexec.error()); + } + + // Make the file descriptors non-blocking (no-op if already set). + Try nonblock = os::nonblock(from); + if (nonblock.isError()) { + os::close(from); + os::close(to.get()); + return Failure("Failed to make 'from' non-blocking: " + nonblock.error()); + } + + nonblock = os::nonblock(to.get()); + if (nonblock.isError()) { + os::close(from); + os::close(to.get()); + return Failure("Failed to make 'to' non-blocking: " + nonblock.error()); + } + + return splice(from, to.get(), chunk) + .onAny(lambda::bind(&os::close, from)) + .onAny(lambda::bind(&os::close, to.get())); +} + } // namespace io { namespace internal { http://git-wip-us.apache.org/repos/asf/mesos/blob/aa27d934/3rdparty/libprocess/src/tests/io_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp b/3rdparty/libprocess/src/tests/io_tests.cpp index 3c7b459..05ea7bb 100644 --- a/3rdparty/libprocess/src/tests/io_tests.cpp +++ b/3rdparty/libprocess/src/tests/io_tests.cpp @@ -35,8 +35,8 @@ TEST(IO, Poll) ASSERT_EQ(3, write(pipes[1], "hi", 3)); AWAIT_EXPECT_EQ(io::READ, future); - close(pipes[0]); - close(pipes[1]); + ASSERT_SOME(os::close(pipes[0])); + ASSERT_SOME(os::close(pipes[1])); } @@ -53,8 +53,8 @@ TEST(IO, Read) // Test on a blocking file descriptor. AWAIT_EXPECT_FAILED(io::read(pipes[0], data, 3)); - close(pipes[0]); - close(pipes[1]); + ASSERT_SOME(os::close(pipes[0])); + ASSERT_SOME(os::close(pipes[1])); // Test on a closed file descriptor. AWAIT_EXPECT_FAILED(io::read(pipes[0], data, 3)); @@ -100,11 +100,11 @@ TEST(IO, Read) future = io::read(pipes[0], data, 3); ASSERT_FALSE(future.isReady()); - close(pipes[1]); + ASSERT_SOME(os::close(pipes[1])); AWAIT_ASSERT_EQ(0u, future); - close(pipes[0]); + ASSERT_SOME(os::close(pipes[0])); } @@ -135,7 +135,7 @@ TEST(IO, BufferedRead) ASSERT_TRUE(os::nonblock(fd.get()).isSome()); AWAIT_EXPECT_EQ(data, io::read(fd.get())); - os::close(fd.get()); + ASSERT_SOME(os::close(fd.get())); // Now read from pipes. int pipes[2]; @@ -146,8 +146,8 @@ TEST(IO, BufferedRead) // Test on a blocking pipe. AWAIT_EXPECT_FAILED(io::read(pipes[0])); - close(pipes[0]); - close(pipes[1]); + ASSERT_SOME(os::close(pipes[0])); + ASSERT_SOME(os::close(pipes[1])); // Test on a closed pipe. AWAIT_EXPECT_FAILED(io::read(pipes[0])); @@ -164,11 +164,11 @@ TEST(IO, BufferedRead) ASSERT_FALSE(future.isReady()); ASSERT_SOME(os::write(pipes[1], data)); - close(pipes[1]); + ASSERT_SOME(os::close(pipes[1])); AWAIT_EXPECT_EQ(data, future); - close(pipes[0]); + ASSERT_SOME(os::close(pipes[0])); ASSERT_SOME(os::rm("file")); } @@ -186,8 +186,8 @@ TEST(IO, Write) // Test on a blocking file descriptor. AWAIT_EXPECT_FAILED(io::write(pipes[1], (void*) "hi", 2)); - close(pipes[0]); - close(pipes[1]); + ASSERT_SOME(os::close(pipes[0])); + ASSERT_SOME(os::close(pipes[1])); // Test on a closed file descriptor. AWAIT_EXPECT_FAILED(io::write(pipes[1], (void*) "hi", 2)); @@ -208,10 +208,10 @@ TEST(IO, Write) EXPECT_EQ("hi", string(data, 2)); // Test write to broken pipe. - close(pipes[0]); + ASSERT_SOME(os::close(pipes[0])); AWAIT_EXPECT_FAILED(io::write(pipes[1], (void*) "hi", 2)); - close(pipes[1]); + ASSERT_SOME(os::close(pipes[1])); } @@ -235,8 +235,8 @@ TEST(IO, DISABLED_BlockingWrite) ASSERT_TRUE(errno == EAGAIN || errno == EWOULDBLOCK); - close(pipes[0]); - close(pipes[1]); + ASSERT_SOME(os::close(pipes[0])); + ASSERT_SOME(os::close(pipes[1])); // Recreate a nonblocking pipe. ASSERT_NE(-1, ::pipe(pipes)); @@ -285,8 +285,8 @@ TEST(IO, DISABLED_BlockingWrite) AWAIT_EXPECT_READY(future1); - close(pipes[0]); - close(pipes[1]); + ASSERT_SOME(os::close(pipes[0])); + ASSERT_SOME(os::close(pipes[1])); } @@ -300,7 +300,9 @@ TEST(IO, splice) Try fd = os::open( path.get(), - O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); + O_WRONLY | O_CREAT | O_TRUNC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); + ASSERT_SOME(fd); ASSERT_SOME(os::nonblock(fd.get())); @@ -314,8 +316,8 @@ TEST(IO, splice) // Test splicing on a blocking file descriptor. AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get())); - close(pipes[0]); - close(pipes[1]); + ASSERT_SOME(os::close(pipes[0])); + ASSERT_SOME(os::close(pipes[1])); // Test on a closed file descriptor. AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get())); @@ -326,7 +328,7 @@ TEST(IO, splice) ASSERT_SOME(os::nonblock(pipes[1])); // Test write to broken pipe. - close(pipes[0]); + ASSERT_SOME(os::close(pipes[0])); AWAIT_EXPECT_FAILED(io::splice(pipes[1], fd.get())); close(pipes[1]); @@ -363,13 +365,83 @@ TEST(IO, splice) // Closing the write pipe should cause an EOF on the read end, thus // completing 'splice'. - close(pipes[1]); + ASSERT_SOME(os::close(pipes[1])); AWAIT_READY(splice); - close(pipes[0]); + ASSERT_SOME(os::close(pipes[0])); + ASSERT_SOME(os::close(fd.get())); + + // Now make sure all the data is there! + Try read = os::read(path.get()); + ASSERT_SOME(read); + EXPECT_EQ(data, read.get()); +} + + +TEST(IO, redirect) +{ + ASSERT_TRUE(GTEST_IS_THREADSAFE); + + // Start by checking that using "invalid" file descriptors fails. + AWAIT_EXPECT_FAILED(io::redirect(-1, 0)); + AWAIT_EXPECT_FAILED(io::redirect(0, -1)); + + // Create a temporary file for redirecting into. + Try path = os::mktemp(); + ASSERT_SOME(path); + + Try fd = os::open( + path.get(), + O_WRONLY | O_CREAT | O_TRUNC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); + + ASSERT_SOME(fd); + + ASSERT_SOME(os::nonblock(fd.get())); + + // Use a nonblocking pipe for doing the redirection. + int pipes[2]; + + ASSERT_NE(-1, ::pipe(pipes)); + ASSERT_SOME(os::nonblock(pipes[0])); + ASSERT_SOME(os::nonblock(pipes[1])); + + // Now write data to the pipe and splice to the file. + string data = + "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do " + "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim " + "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut " + "aliquip ex ea commodo consequat. Duis aute irure dolor in " + "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla " + "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in " + "culpa qui officia deserunt mollit anim id est laborum."; + + // Create more data! + while (Bytes(data.size()) < Megabytes(1)) { + data.append(data); + } + + Future redirect = io::redirect(pipes[0], fd.get()); + + // Closing the read end of the pipe and the file should not have any + // impact as we dup the file descriptor. + ASSERT_SOME(os::close(pipes[0])); + ASSERT_SOME(os::close(fd.get())); + + EXPECT_TRUE(redirect.isPending()); + + // Writing the data should keep the future pending as it hasn't seen + // EOF yet. + AWAIT_READY(io::write(pipes[1], data)); + + EXPECT_TRUE(redirect.isPending()); + + // Now closing the write pipe should cause an EOF on the read end, + // thus completing underlying splice in io::redirect. + ASSERT_SOME(os::close(pipes[1])); - os::close(fd.get()); + AWAIT_READY(redirect); // Now make sure all the data is there! Try read = os::read(path.get());