From commits-return-8696-archive-asf-public=cust-asf.ponee.io@kudu.apache.org Mon Mar 30 18:53:35 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9AEFF180680 for ; Mon, 30 Mar 2020 20:53:34 +0200 (CEST) Received: (qmail 70545 invoked by uid 500); 30 Mar 2020 18:53:34 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 70428 invoked by uid 99); 30 Mar 2020 18:53:33 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Mar 2020 18:53:33 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id A19F881A15; Mon, 30 Mar 2020 18:53:33 +0000 (UTC) Date: Mon, 30 Mar 2020 18:53:37 +0000 To: "commits@kudu.apache.org" Subject: [kudu] 04/04: env: add a fifo class MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: awong@apache.org In-Reply-To: <158559441349.25313.16751961858478987098@gitbox.apache.org> References: <158559441349.25313.16751961858478987098@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: kudu X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Rev: 5cd26cfad9a5796080d1242fff2090c05eaa1f4a X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20200330185333.A19F881A15@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git commit 5cd26cfad9a5796080d1242fff2090c05eaa1f4a Author: Andrew Wong AuthorDate: Sat Mar 28 02:40:10 2020 -0700 env: add a fifo class This adds a basic Fifo class and a PosixFifo variant that encapsulates the usage of mkfifo and associated functionality. My immediate need is just to return the FDs of the opened fifo, so the interface is quite bare. That said, this will be good enough to move the SubprocessServer away from using the process stdout FD for IPC. Change-Id: I23920457d695b84509937694daea53d61f445e27 Reviewed-on: http://gerrit.cloudera.org:8080/15584 Reviewed-by: Adar Dembo Reviewed-by: Hao Hao Tested-by: Kudu Jenkins --- src/kudu/util/env-test.cc | 32 ++++++++++++++++ src/kudu/util/env.h | 27 ++++++++++++- src/kudu/util/env_posix.cc | 94 ++++++++++++++++++++++++++++++++++++---------- 3 files changed, 132 insertions(+), 21 deletions(-) diff --git a/src/kudu/util/env-test.cc b/src/kudu/util/env-test.cc index 364a7ba..6dda618 100644 --- a/src/kudu/util/env-test.cc +++ b/src/kudu/util/env-test.cc @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -84,6 +85,7 @@ namespace kudu { using std::pair; using std::shared_ptr; using std::string; +using std::thread; using std::unique_ptr; using std::unordered_set; using std::vector; @@ -1210,5 +1212,35 @@ TEST_F(TestEnv, TestCreateSymlink) { ASSERT_TRUE(env_->FileExists(JoinPathSegments(kDst, "foobar"))); } +TEST_F(TestEnv, TestCreateFifo) { + const string kFifo = JoinPathSegments(test_dir_, "fifo"); + unique_ptr fifo; + // Open a fifo for reads and writes. + ASSERT_OK(env_->NewFifo(kFifo, &fifo)); + thread t([&] { + ASSERT_OK(fifo->OpenForReads()); + }); + ASSERT_OK(fifo->OpenForWrites()); + t.join(); + + Slice data("it's the final countdown"); + ssize_t written; + RETRY_ON_EINTR(written, write(fifo->write_fd(), data.data(), data.size())); + ASSERT_EQ(data.size(), written); + char buf[32]; + ssize_t n; + RETRY_ON_EINTR(n, read(fifo->read_fd(), buf, arraysize(buf))); + ASSERT_EQ(data.size(), n); + Slice read_data(buf, n); + ASSERT_EQ(data, read_data); + + // Trying to create the same fifo should fail. + Status s = env_->NewFifo(kFifo, &fifo); + ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString(); + + // Until our fifo gets deleted. + ASSERT_OK(env_->DeleteFile(kFifo)); + ASSERT_OK(env_->NewFifo(kFifo, &fifo)); +} } // namespace kudu diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h index 8d818e9..9a2d7bc 100644 --- a/src/kudu/util/env.h +++ b/src/kudu/util/env.h @@ -28,6 +28,7 @@ namespace kudu { class faststring; +class Fifo; class FileLock; class RandomAccessFile; class RWFile; @@ -142,12 +143,16 @@ class Env { const std::string& fname, std::unique_ptr* result) = 0; - // Same as abovoe for NewTempWritableFile(), but for an RWFile. + // Same as above for NewTempWritableFile(), but for an RWFile. virtual Status NewTempRWFile(const RWFileOptions& opts, const std::string& name_template, std::string* created_filename, std::unique_ptr* res) = 0; + // Creates a new fifo. + virtual Status NewFifo(const std::string& fname, + std::unique_ptr* fifo) = 0; + // Returns true iff the named file exists. virtual bool FileExists(const std::string& fname) = 0; @@ -383,6 +388,26 @@ class File { virtual const std::string& filename() const = 0; }; +// A simple fifo abstraction. +class Fifo : public File { + public: + // Opens the fifo for reads. This will wait until the fifo has also been + // opened for writes. + virtual Status OpenForReads() = 0; + + // Opens the fifo for writes. This will wait until the fifo has also been + // opened for reads. + virtual Status OpenForWrites() = 0; + + // Returns the read fd, set when opened for reads. The fifo must have been + // opened for reads before calling. + virtual int read_fd() const = 0; + + // Returns the write fd, set when opened for writes. The fifo must have been + // opened for writes before calling. + virtual int write_fd() const = 0; +}; + // A file abstraction for reading sequentially through a file class SequentialFile : public File { public: diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc index 1fd4507..3590fd1 100644 --- a/src/kudu/util/env_posix.cc +++ b/src/kudu/util/env_posix.cc @@ -286,6 +286,13 @@ ssize_t pwritev(int fd, const struct iovec* iovec, int count, off_t offset) { } #endif +void DoClose(int fd) { + int err; + RETRY_ON_EINTR(err, close(fd)); + if (PREDICT_FALSE(err != 0)) { + PLOG(WARNING) << "Failed to close fd " << fd; + } +} // Close file descriptor when object goes out of scope. class ScopedFdCloser { @@ -296,11 +303,7 @@ class ScopedFdCloser { ~ScopedFdCloser() { ThreadRestrictions::AssertIOAllowed(); - int err; - RETRY_ON_EINTR(err, ::close(fd_)); - if (PREDICT_FALSE(err != 0)) { - PLOG(WARNING) << "Failed to close fd " << fd_; - } + DoClose(fd_); } private: @@ -348,6 +351,16 @@ Status DoSync(int fd, const string& filename) { return Status::OK(); } +Status DoOpen(const string& filename, int flags, const string& reason, int* fd) { + int f; + RETRY_ON_EINTR(f, open(filename.c_str(), flags)); + if (f == -1) { + return IOError(Substitute("Error opening for $0: $1", reason, filename), errno); + } + *fd = f; + return Status::OK(); +} + Status DoOpen(const string& filename, Env::OpenMode mode, int* fd) { MAYBE_RETURN_EIO(filename, IOError(Env::kInjectedFailureStatusMsg, EIO)); ThreadRestrictions::AssertIOAllowed(); @@ -556,6 +569,49 @@ const char* ResourceLimitTypeToMacosRlimit(Env::ResourceLimitType t) { } #endif +class PosixFifo : public Fifo { + public: + explicit PosixFifo(string fname) : filename_(std::move(fname)) {} + + const string& filename() const override { + return filename_; + } + + Status OpenForReads() override { + CHECK_EQ(-1, read_fd_); + return DoOpen(filename_, O_RDONLY, "reads", &read_fd_); + } + + Status OpenForWrites() override { + CHECK_EQ(-1, write_fd_); + return DoOpen(filename_, O_WRONLY, "writes", &write_fd_); + } + + int read_fd() const override { + CHECK_NE(-1, read_fd_); + return read_fd_; + } + + int write_fd() const override { + CHECK_NE(-1, write_fd_); + return write_fd_; + } + + ~PosixFifo() { + if (read_fd_ != -1) { + DoClose(read_fd_); + } + if (write_fd_ != -1) { + DoClose(write_fd_); + } + } + + private: + const string filename_; + int read_fd_ = -1; + int write_fd_ = -1; +}; + class PosixSequentialFile: public SequentialFile { private: const string filename_; @@ -614,11 +670,7 @@ class PosixRandomAccessFile: public RandomAccessFile { PosixRandomAccessFile(string fname, int fd) : filename_(std::move(fname)), fd_(fd) {} ~PosixRandomAccessFile() { - int err; - RETRY_ON_EINTR(err, close(fd_)); - if (PREDICT_FALSE(err != 0)) { - PLOG(WARNING) << "Failed to close " << filename_; - } + DoClose(fd_); } virtual Status Read(uint64_t offset, Slice result) const OVERRIDE { @@ -1171,6 +1223,16 @@ class PosixEnv : public Env { return Status::OK(); } + virtual Status NewFifo(const string& fname, unique_ptr* fifo) override { + TRACE_EVENT1("io", "PosixEnv::NewFifo", "path", fname); + int m = mkfifo(fname.c_str(), 0666); + if (m != 0) { + return IOError(Substitute("Error creating fifo $0", fname), errno); + } + fifo->reset(new PosixFifo(fname)); + return Status::OK(); + } + virtual bool FileExists(const string& fname) OVERRIDE { TRACE_EVENT1("io", "PosixEnv::FileExists", "path", fname); ThreadRestrictions::AssertIOAllowed(); @@ -1397,11 +1459,7 @@ class PosixEnv : public Env { result = IOError(fname, errno); } else if (LockOrUnlock(fd, true) == -1) { result = IOError("lock " + fname, errno); - int err; - RETRY_ON_EINTR(err, close(fd)); - if (PREDICT_FALSE(err != 0)) { - PLOG(WARNING) << "Failed to close fd " << fd; - } + DoClose(fd); } else { auto my_lock = new PosixFileLock; my_lock->fd_ = fd; @@ -1418,11 +1476,7 @@ class PosixEnv : public Env { if (LockOrUnlock(my_lock->fd_, false) == -1) { result = IOError("unlock", errno); } - int err; - RETRY_ON_EINTR(err, close(my_lock->fd_)); - if (PREDICT_FALSE(err != 0)) { - PLOG(WARNING) << "Failed to close fd " << my_lock->fd_; - } + DoClose(my_lock->fd_); return result; }