Return-Path: X-Original-To: apmail-mesos-commits-archive@www.apache.org Delivered-To: apmail-mesos-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 51A5711C02 for ; Wed, 18 Jun 2014 17:19:56 +0000 (UTC) Received: (qmail 55953 invoked by uid 500); 18 Jun 2014 17:19:56 -0000 Delivered-To: apmail-mesos-commits-archive@mesos.apache.org Received: (qmail 55930 invoked by uid 500); 18 Jun 2014 17:19:56 -0000 Mailing-List: contact commits-help@mesos.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mesos.apache.org Delivered-To: mailing list commits@mesos.apache.org Received: (qmail 55917 invoked by uid 99); 18 Jun 2014 17:19:56 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jun 2014 17:19:56 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DF4AE83BF3A; Wed, 18 Jun 2014 17:19:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jieyu@apache.org To: commits@mesos.apache.org Date: Wed, 18 Jun 2014 17:19:55 -0000 Message-Id: <725709002ed24a1291936a304564a646@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: Refactored subprocess to support more IO redirect modes. Repository: mesos Updated Branches: refs/heads/master b5708315d -> 83fcc8ca1 Refactored subprocess to support more IO redirect modes. Review: https://reviews.apache.org/r/22688 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9657fe0f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9657fe0f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9657fe0f Branch: refs/heads/master Commit: 9657fe0f94f6c0a7ad00a6c02d21da3b5fbc1e0d Parents: b570831 Author: Jie Yu Authored: Tue Jun 17 11:30:20 2014 -0700 Committer: Jie Yu Committed: Wed Jun 18 10:19:13 2014 -0700 ---------------------------------------------------------------------- .../libprocess/include/process/subprocess.hpp | 103 +++- 3rdparty/libprocess/src/subprocess.cpp | 262 ++++++++-- .../libprocess/src/tests/subprocess_tests.cpp | 491 +++++++++++++++---- 3 files changed, 707 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9657fe0f/3rdparty/libprocess/include/process/subprocess.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp index e9d7b31..e93608a 100644 --- a/3rdparty/libprocess/include/process/subprocess.hpp +++ b/3rdparty/libprocess/include/process/subprocess.hpp @@ -1,6 +1,8 @@ #ifndef __PROCESS_SUBPROCESS_HPP__ #define __PROCESS_SUBPROCESS_HPP__ +#include + #include #include @@ -22,24 +24,79 @@ namespace process { // 1. The subprocess has terminated, and // 2. There are no longer any references to the associated // Subprocess object. -struct Subprocess +class Subprocess { +public: + // Describes how the I/O is redirected for stdin/stdout/stderr. + // One of the following three modes are supported: + // 1. PIPE: Redirect to a pipe. The pipe will be created + // automatically and the user can read/write the parent side of + // the pipe from in()/out()/err(). + // 2. PATH: Redirect to a file. The file will be created if it + // does not exist. If the file exists, it will be appended. + // 3. FD: Redirect to an open file descriptor. + class IO + { + private: + friend class Subprocess; + + friend Try subprocess( + const std::string& command, + const IO& in, + const IO& out, + const IO& err, + const Option >& environment, + const Option >& setup); + + enum Mode + { + PIPE, // Redirect I/O to a pipe. + PATH, // Redirect I/O to a file. + FD, // Redirect I/O to an open file descriptor. + }; + + IO(Mode _mode, const Option& _fd, const Option& _path) + : mode(_mode), fd(_fd), path(_path) {} + + Mode mode; + Option fd; + Option path; + }; + + // Syntactic sugar to create IO descriptors. + static IO PIPE() + { + return IO(IO::PIPE, None(), None()); + } + + static IO PATH(const std::string& path) + { + return IO(IO::PATH, None(), path); + } + + static IO FD(int fd) + { + return IO(IO::FD, fd, None()); + } + // Returns the pid for the subprocess. pid_t pid() const { return data->pid; } - // File descriptor accessors for input / output. - int in() const { return data->in; } - int out() const { return data->out; } - int err() const { return data->err; } + // The parent side of the pipe for stdin/stdout/stderr. + Option in() const { return data->in; } + Option out() const { return data->out; } + Option err() const { return data->err; } // Returns a future from process::reap of this subprocess. // Discarding this future has no effect on the subprocess. Future > status() const { return data->status; } private: - Subprocess() : data(new Data()) {} friend Try subprocess( const std::string& command, + const Subprocess::IO& in, + const Subprocess::IO& out, + const Subprocess::IO& err, const Option >& environment, const Option >& setup); @@ -47,22 +104,26 @@ private: { ~Data() { - os::close(in); - os::close(out); - os::close(err); + if (in.isSome()) { os::close(in.get()); } + if (out.isSome()) { os::close(out.get()); } + if (err.isSome()) { os::close(err.get()); } } pid_t pid; + // The parent side of the pipe for stdin/stdout/stderr. If the + // mode is not PIPE, None will be stored. // NOTE: stdin, stdout, stderr are macros on some systems, hence // these names instead. - int in; - int out; - int err; + Option in; + Option out; + Option err; Future > status; }; + Subprocess() : data(new Data()) {} + memory::shared_ptr data; }; @@ -78,9 +139,27 @@ private: // TODO(dhamon): Add an option to not combine the two environments. Try subprocess( const std::string& command, + const Subprocess::IO& in, + const Subprocess::IO& out, + const Subprocess::IO& err, const Option >& environment = None(), const Option >& setup = None()); + +inline Try subprocess( + const std::string& command, + const Option >& environment = None(), + const Option >& setup = None()) +{ + return subprocess( + command, + Subprocess::FD(STDIN_FILENO), + Subprocess::FD(STDOUT_FILENO), + Subprocess::FD(STDERR_FILENO), + environment, + setup); +} + } // namespace process { #endif // __PROCESS_SUBPROCESS_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/9657fe0f/3rdparty/libprocess/src/subprocess.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/subprocess.cpp b/3rdparty/libprocess/src/subprocess.cpp index 9f8f37f..78fa1ec 100644 --- a/3rdparty/libprocess/src/subprocess.cpp +++ b/3rdparty/libprocess/src/subprocess.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -27,7 +28,7 @@ namespace process { namespace internal { // See the comment below as to why subprocess is passed to cleanup. -void cleanup( +static void cleanup( const Future >& result, Promise >* promise, const Subprocess& subprocess) @@ -44,33 +45,176 @@ void cleanup( delete promise; } + +static void close(int stdinFd[2], int stdoutFd[2], int stderrFd[2]) +{ + os::close(stdinFd[0]); + os::close(stdinFd[1]); + os::close(stdoutFd[0]); + os::close(stdoutFd[1]); + os::close(stderrFd[0]); + os::close(stderrFd[1]); +} + +// This function will invoke os::cloexec on all file descriptors in +// these pairs that are valid (i.e., >= 0). +static Try cloexec(int stdinFd[2], int stdoutFd[2], int stderrFd[2]) +{ + int fd[6] = { + stdinFd[0], + stdinFd[1], + stdoutFd[0], + stdoutFd[1], + stderrFd[0], + stderrFd[1] + }; + + for (int i = 0; i < 6; i++) { + if (fd[i] >= 0) { + Try cloexec = os::cloexec(fd[i]); + if (cloexec.isError()) { + return Error(cloexec.error()); + } + } + } + + return Nothing(); +} + } // namespace internal { // Runs the provided command in a subprocess. Try subprocess( const string& command, + const Subprocess::IO& in, + const Subprocess::IO& out, + const Subprocess::IO& err, const Option >& environment, const Option >& setup) { - // Create pipes for stdin, stdout, stderr. - // Index 0 is for reading, and index 1 is for writing. - int stdinPipe[2]; - int stdoutPipe[2]; - int stderrPipe[2]; - - if (pipe(stdinPipe) == -1) { - return ErrnoError("Failed to create pipe"); - } else if (pipe(stdoutPipe) == -1) { - os::close(stdinPipe[0]); - os::close(stdinPipe[1]); - return ErrnoError("Failed to create pipe"); - } else if (pipe(stderrPipe) == -1) { - os::close(stdinPipe[0]); - os::close(stdinPipe[1]); - os::close(stdoutPipe[0]); - os::close(stdoutPipe[1]); - return ErrnoError("Failed to create pipe"); + // File descriptors for redirecting stdin/stdout/stderr. These file + // descriptors are used for different purposes depending on the + // specified I/O modes. If the mode is PIPE, the two file + // descriptors represent two ends of a pipe. If the mode is PATH or + // FD, only one of the two file descriptors is used. Our protocol + // here is that index 0 is always for reading, and index 1 is always + // for writing (similar to the pipe semantics). + int stdinFd[2] = { -1, -1 }; + int stdoutFd[2] = { -1, -1 }; + int stderrFd[2] = { -1, -1 }; + + // Prepare the file descriptor(s) for stdin. + switch (in.mode) { + case Subprocess::IO::FD: { + stdinFd[0] = ::dup(in.fd.get()); + if (stdinFd[0] == -1) { + return ErrnoError("Failed to dup"); + } + break; + } + case Subprocess::IO::PIPE: { + if (pipe(stdinFd) == -1) { + return ErrnoError("Failed to create pipe"); + } + break; + } + case Subprocess::IO::PATH: { + Try open = os::open(in.path.get(), O_RDONLY); + if (open.isError()) { + return Error( + "Failed to open '" + in.path.get() + "': " + open.error()); + } + stdinFd[0] = open.get(); + break; + } + default: + return UNREACHABLE(); + } + + // Prepare the file descriptor(s) for stdout. + switch (out.mode) { + case Subprocess::IO::FD: { + stdoutFd[1] = ::dup(out.fd.get()); + if (stdoutFd[1] == -1) { + // Save the errno as 'close' below might overwrite it. + ErrnoError error("Failed to dup"); + internal::close(stdinFd, stdoutFd, stderrFd); + return error; + } + break; + } + case Subprocess::IO::PIPE: { + if (pipe(stdoutFd) == -1) { + // Save the errno as 'close' below might overwrite it. + ErrnoError error("Failed to create pipe"); + internal::close(stdinFd, stdoutFd, stderrFd); + return error; + } + break; + } + case Subprocess::IO::PATH: { + Try open = os::open( + out.path.get(), + O_WRONLY | O_CREAT | O_APPEND, + S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); + + if (open.isError()) { + internal::close(stdinFd, stdoutFd, stderrFd); + return Error( + "Failed to open '" + out.path.get() + "': " + open.error()); + } + stdoutFd[1] = open.get(); + break; + } + default: + return UNREACHABLE(); + } + + // Prepare the file descriptor(s) for stderr. + switch (err.mode) { + case Subprocess::IO::FD: { + stderrFd[1] = ::dup(err.fd.get()); + if (stderrFd[1] == -1) { + // Save the errno as 'close' below might overwrite it. + ErrnoError error("Failed to dup"); + internal::close(stdinFd, stdoutFd, stderrFd); + return error; + } + break; + } + case Subprocess::IO::PIPE: { + if (pipe(stderrFd) == -1) { + // Save the errno as 'close' below might overwrite it. + ErrnoError error("Failed to create pipe"); + internal::close(stdinFd, stdoutFd, stderrFd); + return error; + } + break; + } + case Subprocess::IO::PATH: { + Try open = os::open( + err.path.get(), + O_WRONLY | O_CREAT | O_APPEND, + S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); + + if (open.isError()) { + internal::close(stdinFd, stdoutFd, stderrFd); + return Error( + "Failed to open '" + err.path.get() + "': " + open.error()); + } + stderrFd[1] = open.get(); + break; + } + default: + return UNREACHABLE(); + } + + // TODO(jieyu): Consider using O_CLOEXEC for atomic close-on-exec. + Try cloexec = internal::cloexec(stdinFd, stdoutFd, stderrFd); + if (cloexec.isError()) { + internal::close(stdinFd, stdoutFd, stderrFd); + return Error("Failed to cloexec: " + cloexec.error()); } // We need to do this construction before doing the fork as it @@ -82,13 +226,10 @@ Try subprocess( pid_t pid; if ((pid = fork()) == -1) { - os::close(stdinPipe[0]); - os::close(stdinPipe[1]); - os::close(stdoutPipe[0]); - os::close(stdoutPipe[1]); - os::close(stderrPipe[0]); - os::close(stderrPipe[1]); - return ErrnoError("Failed to fork"); + // Save the errno as 'close' below might overwrite it. + ErrnoError error("Failed to fork"); + internal::close(stdinFd, stdoutFd, stderrFd); + return error; } Subprocess process; @@ -97,19 +238,41 @@ Try subprocess( if (process.data->pid == 0) { // Child. // Close parent's end of the pipes. - os::close(stdinPipe[1]); - os::close(stdoutPipe[0]); - os::close(stderrPipe[0]); + if (in.mode == Subprocess::IO::PIPE) { + while (::close(stdinFd[1]) == -1 && errno == EINTR); + } + if (out.mode == Subprocess::IO::PIPE) { + while (::close(stdoutFd[0]) == -1 && errno == EINTR); + } + if (err.mode == Subprocess::IO::PIPE) { + while (::close(stderrFd[0]) == -1 && errno == EINTR); + } - // Make our pipes look like stdin, stderr, stdout before we exec. - while (dup2(stdinPipe[0], STDIN_FILENO) == -1 && errno == EINTR); - while (dup2(stdoutPipe[1], STDOUT_FILENO) == -1 && errno == EINTR); - while (dup2(stderrPipe[1], STDERR_FILENO) == -1 && errno == EINTR); + // Redirect I/O for stdin/stdout/stderr. + while (::dup2(stdinFd[0], STDIN_FILENO) == -1 && errno == EINTR); + while (::dup2(stdoutFd[1], STDOUT_FILENO) == -1 && errno == EINTR); + while (::dup2(stderrFd[1], STDERR_FILENO) == -1 && errno == EINTR); - // Close the copies. - os::close(stdinPipe[0]); - os::close(stdoutPipe[1]); - os::close(stderrPipe[1]); + // Close the copies. We need to make sure that we do not close the + // file descriptor assigned to stdin/stdout/stderr in case the + // parent has closed stdin/stdout/stderr when calling this + // function (in that case, a dup'ed file descriptor may have the + // same file descriptor number as stdin/stdout/stderr). + if (stdinFd[0] != STDIN_FILENO && + stdinFd[0] != STDOUT_FILENO && + stdinFd[0] != STDERR_FILENO) { + while (::close(stdinFd[0]) == -1 && errno == EINTR); + } + if (stdoutFd[1] != STDIN_FILENO && + stdoutFd[1] != STDOUT_FILENO && + stdoutFd[1] != STDERR_FILENO) { + while (::close(stdoutFd[1]) == -1 && errno == EINTR); + } + if (stderrFd[1] != STDIN_FILENO && + stderrFd[1] != STDOUT_FILENO && + stderrFd[1] != STDERR_FILENO) { + while (::close(stderrFd[1]) == -1 && errno == EINTR); + } if (setup.isSome()) { int status = setup.get()(); @@ -124,15 +287,24 @@ Try subprocess( } // Parent. + // Close the file descriptors that are created by this function. For + // pipes, we close the child ends and store the parent ends (see the + // code below). + os::close(stdinFd[0]); + os::close(stdoutFd[1]); + os::close(stderrFd[1]); - // Close the child's end of the pipes. - os::close(stdinPipe[0]); - os::close(stdoutPipe[1]); - os::close(stderrPipe[1]); - - process.data->in = stdinPipe[1]; - process.data->out = stdoutPipe[0]; - process.data->err = stderrPipe[0]; + // If the mode is PIPE, store the parent side of the pipe so that + // the user can communicate with the subprocess. + if (in.mode == Subprocess::IO::PIPE) { + process.data->in = stdinFd[1]; + } + if (out.mode == Subprocess::IO::PIPE) { + process.data->out = stdoutFd[0]; + } + if (err.mode == Subprocess::IO::PIPE) { + process.data->err = stderrFd[0]; + } // Rather than directly exposing the future from process::reap, we // must use an explicit promise so that we can ensure we can receive http://git-wip-us.apache.org/repos/asf/mesos/blob/9657fe0f/3rdparty/libprocess/src/tests/subprocess_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp index 7d890bf..1cb1ce3 100644 --- a/3rdparty/libprocess/src/tests/subprocess_tests.cpp +++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp @@ -15,16 +15,22 @@ #include #include #include -#include #include +#include + +#include + using namespace process; using std::map; using std::string; -TEST(Subprocess, status) +class SubprocessTest: public TemporaryDirectoryTest {}; + + +TEST_F(SubprocessTest, Status) { Clock::pause(); @@ -43,8 +49,8 @@ TEST(Subprocess, status) ASSERT_SOME(s.get().status().get()); int status = s.get().status().get().get(); - ASSERT_TRUE(WIFEXITED(status)); - ASSERT_EQ(0, WEXITSTATUS(status)); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); // Exit 1. s = subprocess("exit 1"); @@ -61,8 +67,8 @@ TEST(Subprocess, status) ASSERT_SOME(s.get().status().get()); status = s.get().status().get().get(); - ASSERT_TRUE(WIFEXITED(status)); - ASSERT_EQ(1, WEXITSTATUS(status)); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(1, WEXITSTATUS(status)); // SIGTERM. s = subprocess("sleep 60"); @@ -81,8 +87,8 @@ TEST(Subprocess, status) ASSERT_SOME(s.get().status().get()); status = s.get().status().get().get(); - ASSERT_TRUE(WIFSIGNALED(status)); - ASSERT_EQ(SIGTERM, WTERMSIG(status)); + EXPECT_TRUE(WIFSIGNALED(status)); + EXPECT_EQ(SIGTERM, WTERMSIG(status)); // SIGKILL. s = subprocess("sleep 60"); @@ -101,25 +107,88 @@ TEST(Subprocess, status) ASSERT_SOME(s.get().status().get()); status = s.get().status().get().get(); - ASSERT_TRUE(WIFSIGNALED(status)); - ASSERT_EQ(SIGKILL, WTERMSIG(status)); + EXPECT_TRUE(WIFSIGNALED(status)); + EXPECT_EQ(SIGKILL, WTERMSIG(status)); Clock::resume(); } -TEST(Subprocess, output) +TEST_F(SubprocessTest, PipeOutput) { Clock::pause(); // Standard out. - Try s = subprocess("echo hello"); + Try s = subprocess( + "echo hello", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE()); ASSERT_SOME(s); + ASSERT_SOME(s.get().out()); + ASSERT_SOME(os::nonblock(s.get().out().get())); + AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get())); + + // Advance time until the internal reaper reaps the subprocess. + while (s.get().status().isPending()) { + Clock::advance(Seconds(1)); + Clock::settle(); + } + + AWAIT_ASSERT_READY(s.get().status()); + ASSERT_SOME(s.get().status().get()); + + int status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); + + // Standard error. + s = subprocess( + "echo hello 1>&2", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE()); + + ASSERT_SOME(s); + ASSERT_SOME(s.get().err()); + ASSERT_SOME(os::nonblock(s.get().err().get())); + AWAIT_EXPECT_EQ("hello\n", io::read(s.get().err().get())); + + // Advance time until the internal reaper reaps the subprocess. + while (s.get().status().isPending()) { + Clock::advance(Seconds(1)); + Clock::settle(); + } + + AWAIT_ASSERT_READY(s.get().status()); + ASSERT_SOME(s.get().status().get()); + + status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); - ASSERT_SOME(os::nonblock(s.get().out())); + Clock::resume(); +} - AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out())); + +TEST_F(SubprocessTest, PipeInput) +{ + Clock::pause(); + + Try s = subprocess( + "read word ; echo $word", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE()); + + ASSERT_SOME(s); + ASSERT_SOME(s.get().in()); + ASSERT_SOME(os::write(s.get().in().get(), "hello\n")); + + ASSERT_SOME(s.get().out()); + ASSERT_SOME(os::nonblock(s.get().out().get())); + AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get())); // Advance time until the internal reaper reaps the subprocess. while (s.get().status().isPending()) { @@ -129,19 +198,41 @@ TEST(Subprocess, output) AWAIT_ASSERT_READY(s.get().status()); ASSERT_SOME(s.get().status().get()); + int status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); - ASSERT_TRUE(WIFEXITED(status)); - ASSERT_EQ(0, WEXITSTATUS(status)); + Clock::resume(); +} - // Standard error. - s = subprocess("echo hello 1>&2"); + +TEST_F(SubprocessTest, PipeSplice) +{ + Clock::pause(); + + Try s = subprocess( + "echo 'hello world'", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE()); ASSERT_SOME(s); - ASSERT_SOME(os::nonblock(s.get().err())); + // Create a temporary file for splicing into. + string path = path::join(os::getcwd(), "stdout"); - AWAIT_EXPECT_EQ("hello\n", io::read(s.get().err())); + Try fd = os::open( + path, + O_WRONLY | O_CREAT | O_TRUNC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); + + ASSERT_SOME(fd); + ASSERT_SOME(os::nonblock(fd.get())); + + ASSERT_SOME(s.get().out()); + ASSERT_SOME(os::nonblock(s.get().out().get())); + AWAIT_READY(io::splice(s.get().out().get(), fd.get())); // Advance time until the internal reaper reaps the subprocess. while (s.get().status().isPending()) { @@ -151,28 +242,61 @@ TEST(Subprocess, output) AWAIT_ASSERT_READY(s.get().status()); ASSERT_SOME(s.get().status().get()); - status = s.get().status().get().get(); - ASSERT_TRUE(WIFEXITED(status)); - ASSERT_EQ(0, WEXITSTATUS(status)); + int status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); + + // Now make sure all the data is there! + Try read = os::read(path); + ASSERT_SOME(read); + EXPECT_EQ("hello world\n", read.get()); Clock::resume(); } -TEST(Subprocess, input) +TEST_F(SubprocessTest, PathOutput) { Clock::pause(); - Try s = subprocess("read word ; echo $word"); + string out = path::join(os::getcwd(), "stdout"); + string err = path::join(os::getcwd(), "stderr"); + + // Standard out. + Try s = subprocess( + "echo hello", + Subprocess::PIPE(), + Subprocess::PATH(out), + Subprocess::PIPE()); ASSERT_SOME(s); - ASSERT_SOME(os::write(s.get().in(), "hello\n")); + // Advance time until the internal reaper reaps the subprocess. + while (s.get().status().isPending()) { + Clock::advance(Seconds(1)); + Clock::settle(); + } + + AWAIT_ASSERT_READY(s.get().status()); + ASSERT_SOME(s.get().status().get()); + + int status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); + + Try read = os::read(out); + ASSERT_SOME(read); + EXPECT_EQ("hello\n", read.get()); - ASSERT_SOME(os::nonblock(s.get().out())); + // Standard error. + s = subprocess( + "echo hello 1>&2", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PATH(err)); - AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out())); + ASSERT_SOME(s); // Advance time until the internal reaper reaps the subprocess. while (s.get().status().isPending()) { @@ -182,36 +306,78 @@ TEST(Subprocess, input) AWAIT_ASSERT_READY(s.get().status()); ASSERT_SOME(s.get().status().get()); - int status = s.get().status().get().get(); - ASSERT_TRUE(WIFEXITED(status)); - ASSERT_EQ(0, WEXITSTATUS(status)); + status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); + + read = os::read(err); + ASSERT_SOME(read); + EXPECT_EQ("hello\n", read.get()); Clock::resume(); } -TEST(Subprocess, splice) +TEST_F(SubprocessTest, PathInput) { Clock::pause(); - Try s = subprocess("echo 'hello world'"); + string in = path::join(os::getcwd(), "stdin"); + + ASSERT_SOME(os::write(in, "hello\n")); + + Try s = subprocess( + "read word ; echo $word", + Subprocess::PATH(in), + Subprocess::PIPE(), + Subprocess::PIPE()); ASSERT_SOME(s); + ASSERT_SOME(s.get().out()); + ASSERT_SOME(os::nonblock(s.get().out().get())); + AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get())); - // Create a temporary file for splicing into. - Try path = os::mktemp(); - ASSERT_SOME(path); + // Advance time until the internal reaper reaps the subprocess. + while (s.get().status().isPending()) { + Clock::advance(Seconds(1)); + Clock::settle(); + } + + AWAIT_ASSERT_READY(s.get().status()); + ASSERT_SOME(s.get().status().get()); + + int status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); + + Clock::resume(); +} - Try fd = os::open( - path.get(), - O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); - ASSERT_SOME(fd); - ASSERT_SOME(os::nonblock(fd.get())); - ASSERT_SOME(os::nonblock(s.get().out())); +TEST_F(SubprocessTest, FdOutput) +{ + Clock::pause(); + + string out = path::join(os::getcwd(), "stdout"); + string err = path::join(os::getcwd(), "stderr"); + + // Standard out. + Try outFd = os::open( + out, + O_WRONLY | O_CREAT | O_APPEND, + S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); + + ASSERT_SOME(outFd); + + Try s = subprocess( + "echo hello", + Subprocess::PIPE(), + Subprocess::FD(outFd.get()), + Subprocess::PIPE()); - AWAIT_READY(io::splice(s.get().out(), fd.get())); + ASSERT_SOME(os::close(outFd.get())); + ASSERT_SOME(s); // Advance time until the internal reaper reaps the subprocess. while (s.get().status().isPending()) { @@ -221,34 +387,101 @@ TEST(Subprocess, splice) AWAIT_ASSERT_READY(s.get().status()); ASSERT_SOME(s.get().status().get()); + int status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); - ASSERT_TRUE(WIFEXITED(status)); - ASSERT_EQ(0, WEXITSTATUS(status)); + Try read = os::read(out); + ASSERT_SOME(read); + EXPECT_EQ("hello\n", read.get()); - // Now make sure all the data is there! - Try read = os::read(path.get()); + // Standard error. + Try errFd = os::open( + err, + O_WRONLY | O_CREAT | O_APPEND, + S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO); + + ASSERT_SOME(errFd); + + s = subprocess( + "echo hello 1>&2", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::FD(errFd.get())); + + ASSERT_SOME(os::close(errFd.get())); + ASSERT_SOME(s); + + // Advance time until the internal reaper reaps the subprocess. + while (s.get().status().isPending()) { + Clock::advance(Seconds(1)); + Clock::settle(); + } + + AWAIT_ASSERT_READY(s.get().status()); + ASSERT_SOME(s.get().status().get()); + + status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); + + read = os::read(err); ASSERT_SOME(read); - EXPECT_EQ("hello world\n", read.get()); + EXPECT_EQ("hello\n", read.get()); Clock::resume(); } -TEST(Subprocess, environment) +TEST_F(SubprocessTest, FdInput) { Clock::pause(); - // Simple value. - map environment; - environment["MESSAGE"] = "hello"; - Try s = subprocess("echo $MESSAGE", environment); + string in = path::join(os::getcwd(), "stdin"); + + ASSERT_SOME(os::write(in, "hello\n")); + + Try inFd = os::open(in, O_RDONLY, 0); + ASSERT_SOME(inFd); + + Try s = subprocess( + "read word ; echo $word", + Subprocess::FD(inFd.get()), + Subprocess::PIPE(), + Subprocess::PIPE()); + + ASSERT_SOME(os::close(inFd.get())); ASSERT_SOME(s); + ASSERT_SOME(s.get().out()); + ASSERT_SOME(os::nonblock(s.get().out().get())); + AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get())); + + // Advance time until the internal reaper reaps the subprocess. + while (s.get().status().isPending()) { + Clock::advance(Seconds(1)); + Clock::settle(); + } + + AWAIT_ASSERT_READY(s.get().status()); + ASSERT_SOME(s.get().status().get()); + + int status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); + + Clock::resume(); +} + - ASSERT_SOME(os::nonblock(s.get().out())); +TEST_F(SubprocessTest, Default) +{ + Clock::pause(); - AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out())); + Try s = subprocess("echo hello world"); + + ASSERT_SOME(s); // Advance time until the internal reaper reaps the subprocess. while (s.get().status().isPending()) { @@ -258,22 +491,64 @@ TEST(Subprocess, environment) AWAIT_ASSERT_READY(s.get().status()); ASSERT_SOME(s.get().status().get()); + int status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); + + Clock::resume(); +} + + +TEST_F(SubprocessTest, Environment) +{ + Clock::pause(); + + // Simple value. + map environment; + environment["MESSAGE"] = "hello"; + + Try s = subprocess( + "echo $MESSAGE", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE(), + environment); + + ASSERT_SOME(s); + ASSERT_SOME(s.get().out()); + ASSERT_SOME(os::nonblock(s.get().out().get())); + AWAIT_EXPECT_EQ("hello\n", io::read(s.get().out().get())); + + // Advance time until the internal reaper reaps the subprocess. + while (s.get().status().isPending()) { + Clock::advance(Seconds(1)); + Clock::settle(); + } + + AWAIT_ASSERT_READY(s.get().status()); + ASSERT_SOME(s.get().status().get()); - ASSERT_TRUE(WIFEXITED(status)); - ASSERT_EQ(0, WEXITSTATUS(status)); + int status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); // Multiple key-value pairs. environment.clear(); environment["MESSAGE0"] = "hello"; environment["MESSAGE1"] = "world"; - s = subprocess("echo $MESSAGE0 $MESSAGE1", environment); - ASSERT_SOME(s); - - ASSERT_SOME(os::nonblock(s.get().out())); + s = subprocess( + "echo $MESSAGE0 $MESSAGE1", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE(), + environment); - AWAIT_EXPECT_EQ("hello world\n", io::read(s.get().out())); + ASSERT_SOME(s); + ASSERT_SOME(s.get().out()); + ASSERT_SOME(os::nonblock(s.get().out().get())); + AWAIT_EXPECT_EQ("hello world\n", io::read(s.get().out().get())); // Advance time until the internal reaper reaps the subprocess. while (s.get().status().isPending()) { @@ -283,27 +558,34 @@ TEST(Subprocess, environment) AWAIT_ASSERT_READY(s.get().status()); ASSERT_SOME(s.get().status().get()); + status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); - ASSERT_TRUE(WIFEXITED(status)); - ASSERT_EQ(0, WEXITSTATUS(status)); + Clock::resume(); } -TEST(Subprocess, environmentWithSpaces) +TEST_F(SubprocessTest, EnvironmentWithSpaces) { Clock::pause(); // Spaces in value. map environment; environment["MESSAGE"] = "hello world"; - Try s = subprocess("echo $MESSAGE", environment); - - ASSERT_SOME(s); - ASSERT_SOME(os::nonblock(s.get().out())); + Try s = subprocess( + "echo $MESSAGE", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE(), + environment); - AWAIT_EXPECT_EQ("hello world\n", io::read(s.get().out())); + ASSERT_SOME(s); + ASSERT_SOME(s.get().out()); + ASSERT_SOME(os::nonblock(s.get().out().get())); + AWAIT_EXPECT_EQ("hello world\n", io::read(s.get().out().get())); // Advance time until the internal reaper reaps the subprocess. while (s.get().status().isPending()) { @@ -313,27 +595,34 @@ TEST(Subprocess, environmentWithSpaces) AWAIT_ASSERT_READY(s.get().status()); ASSERT_SOME(s.get().status().get()); + int status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); - ASSERT_TRUE(WIFEXITED(status)); - ASSERT_EQ(0, WEXITSTATUS(status)); + Clock::resume(); } -TEST(Subprocess, environmentWithSpacesAndQuotes) +TEST_F(SubprocessTest, EnvironmentWithSpacesAndQuotes) { Clock::pause(); // Spaces and quotes in value. map environment; environment["MESSAGE"] = "\"hello world\""; - Try s = subprocess("echo $MESSAGE", environment); - - ASSERT_SOME(s); - ASSERT_SOME(os::nonblock(s.get().out())); + Try s = subprocess( + "echo $MESSAGE", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE(), + environment); - AWAIT_EXPECT_EQ("\"hello world\"\n", io::read(s.get().out())); + ASSERT_SOME(s); + ASSERT_SOME(s.get().out()); + ASSERT_SOME(os::nonblock(s.get().out().get())); + AWAIT_EXPECT_EQ("\"hello world\"\n", io::read(s.get().out().get())); // Advance time until the internal reaper reaps the subprocess. while (s.get().status().isPending()) { @@ -343,14 +632,16 @@ TEST(Subprocess, environmentWithSpacesAndQuotes) AWAIT_ASSERT_READY(s.get().status()); ASSERT_SOME(s.get().status().get()); + int status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); - ASSERT_TRUE(WIFEXITED(status)); - ASSERT_EQ(0, WEXITSTATUS(status)); + Clock::resume(); } -TEST(Subprocess, environmentOverride) +TEST_F(SubprocessTest, EnvironmentOverride) { Clock::pause(); @@ -359,13 +650,18 @@ TEST(Subprocess, environmentOverride) map environment; environment["MESSAGE"] = "goodbye"; - Try s = subprocess("echo $MESSAGE", environment); - ASSERT_SOME(s); - - ASSERT_SOME(os::nonblock(s.get().out())); + Try s = subprocess( + "echo $MESSAGE", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE(), + environment); - AWAIT_EXPECT_EQ("goodbye\n", io::read(s.get().out())); + ASSERT_SOME(s); + ASSERT_SOME(s.get().out()); + ASSERT_SOME(os::nonblock(s.get().out().get())); + AWAIT_EXPECT_EQ("goodbye\n", io::read(s.get().out().get())); // Advance time until the internal reaper reaps the subprocess. while (s.get().status().isPending()) { @@ -375,14 +671,16 @@ TEST(Subprocess, environmentOverride) AWAIT_ASSERT_READY(s.get().status()); ASSERT_SOME(s.get().status().get()); + int status = s.get().status().get().get(); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); - ASSERT_TRUE(WIFEXITED(status)); - ASSERT_EQ(0, WEXITSTATUS(status)); + Clock::resume(); } -int setupChdir(const string& directory) +static int setupChdir(const string& directory) { // Keep everything async-signal safe. if (::chdir(directory.c_str()) == -1) { @@ -393,7 +691,7 @@ int setupChdir(const string& directory) } -TEST(Subprocess, setup) +TEST_F(SubprocessTest, Setup) { Clock::pause(); @@ -403,6 +701,9 @@ TEST(Subprocess, setup) // chdir(). Try s = subprocess( "echo hello world > file", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE(), None(), lambda::bind(&setupChdir, directory.get())); @@ -428,19 +729,22 @@ TEST(Subprocess, setup) } -int setupStatus(int ret) +static int setupStatus(int ret) { return ret; } -TEST(Subprocess, setupStatus) +TEST_F(SubprocessTest, SetupStatus) { Clock::pause(); // Exit 0 && setup 1. Try s = subprocess( "exit 0", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE(), None(), lambda::bind(&setupStatus, 1)); @@ -464,6 +768,9 @@ TEST(Subprocess, setupStatus) // Exit 1 && setup 0. s = subprocess( "exit 1", + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::PIPE(), None(), lambda::bind(&setupStatus, 0));