kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [kudu] 04/04: env: add a fifo class
Date Mon, 30 Mar 2020 18:53:37 GMT
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5cd26cfad9a5796080d1242fff2090c05eaa1f4a
Author: Andrew Wong <awong@cloudera.com>
AuthorDate: Sat Mar 28 02:40:10 2020 -0700

    env: add a fifo class
    
    This adds a basic Fifo class and a PosixFifo variant that encapsulates
    the usage of mkfifo and associated functionality. My immediate need is
    just to return the FDs of the opened fifo, so the interface is quite
    bare. That said, this will be good enough to move the SubprocessServer
    away from using the process stdout FD for IPC.
    
    Change-Id: I23920457d695b84509937694daea53d61f445e27
    Reviewed-on: http://gerrit.cloudera.org:8080/15584
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Reviewed-by: Hao Hao <hao.hao@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/util/env-test.cc  | 32 ++++++++++++++++
 src/kudu/util/env.h        | 27 ++++++++++++-
 src/kudu/util/env_posix.cc | 94 ++++++++++++++++++++++++++++++++++++----------
 3 files changed, 132 insertions(+), 21 deletions(-)

diff --git a/src/kudu/util/env-test.cc b/src/kudu/util/env-test.cc
index 364a7ba..6dda618 100644
--- a/src/kudu/util/env-test.cc
+++ b/src/kudu/util/env-test.cc
@@ -42,6 +42,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <unordered_set>
 #include <utility>
 #include <vector>
@@ -84,6 +85,7 @@ namespace kudu {
 using std::pair;
 using std::shared_ptr;
 using std::string;
+using std::thread;
 using std::unique_ptr;
 using std::unordered_set;
 using std::vector;
@@ -1210,5 +1212,35 @@ TEST_F(TestEnv, TestCreateSymlink) {
   ASSERT_TRUE(env_->FileExists(JoinPathSegments(kDst, "foobar")));
 }
 
+TEST_F(TestEnv, TestCreateFifo) {
+  const string kFifo = JoinPathSegments(test_dir_, "fifo");
+  unique_ptr<Fifo> fifo;
+  // Open a fifo for reads and writes.
+  ASSERT_OK(env_->NewFifo(kFifo, &fifo));
+  thread t([&] {
+    ASSERT_OK(fifo->OpenForReads());
+  });
+  ASSERT_OK(fifo->OpenForWrites());
+  t.join();
+
+  Slice data("it's the final countdown");
+  ssize_t written;
+  RETRY_ON_EINTR(written, write(fifo->write_fd(), data.data(), data.size()));
+  ASSERT_EQ(data.size(), written);
+  char buf[32];
+  ssize_t n;
+  RETRY_ON_EINTR(n, read(fifo->read_fd(), buf, arraysize(buf)));
+  ASSERT_EQ(data.size(), n);
+  Slice read_data(buf, n);
+  ASSERT_EQ(data, read_data);
+
+  // Trying to create the same fifo should fail.
+  Status s = env_->NewFifo(kFifo, &fifo);
+  ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+
+  // Until our fifo gets deleted.
+  ASSERT_OK(env_->DeleteFile(kFifo));
+  ASSERT_OK(env_->NewFifo(kFifo, &fifo));
+}
 
 }  // namespace kudu
diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h
index 8d818e9..9a2d7bc 100644
--- a/src/kudu/util/env.h
+++ b/src/kudu/util/env.h
@@ -28,6 +28,7 @@
 namespace kudu {
 
 class faststring;
+class Fifo;
 class FileLock;
 class RandomAccessFile;
 class RWFile;
@@ -142,12 +143,16 @@ class Env {
                            const std::string& fname,
                            std::unique_ptr<RWFile>* result) = 0;
 
-  // Same as abovoe for NewTempWritableFile(), but for an RWFile.
+  // Same as above for NewTempWritableFile(), but for an RWFile.
   virtual Status NewTempRWFile(const RWFileOptions& opts,
                                const std::string& name_template,
                                std::string* created_filename,
                                std::unique_ptr<RWFile>* res) = 0;
 
+  // Creates a new fifo.
+  virtual Status NewFifo(const std::string& fname,
+                         std::unique_ptr<Fifo>* fifo) = 0;
+
   // Returns true iff the named file exists.
   virtual bool FileExists(const std::string& fname) = 0;
 
@@ -383,6 +388,26 @@ class File {
   virtual const std::string& filename() const = 0;
 };
 
+// A simple fifo abstraction.
+class Fifo : public File {
+ public:
+  // Opens the fifo for reads. This will wait until the fifo has also been
+  // opened for writes.
+  virtual Status OpenForReads() = 0;
+
+  // Opens the fifo for writes. This will wait until the fifo has also been
+  // opened for reads.
+  virtual Status OpenForWrites() = 0;
+
+  // Returns the read fd, set when opened for reads. The fifo must have been
+  // opened for reads before calling.
+  virtual int read_fd() const = 0;
+
+  // Returns the write fd, set when opened for writes. The fifo must have been
+  // opened for writes before calling.
+  virtual int write_fd() const = 0;
+};
+
 // A file abstraction for reading sequentially through a file
 class SequentialFile : public File {
  public:
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 1fd4507..3590fd1 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -286,6 +286,13 @@ ssize_t pwritev(int fd, const struct iovec* iovec, int count, off_t offset)
{
 }
 #endif
 
+void DoClose(int fd) {
+  int err;
+  RETRY_ON_EINTR(err, close(fd));
+  if (PREDICT_FALSE(err != 0)) {
+    PLOG(WARNING) << "Failed to close fd " << fd;
+  }
+}
 
 // Close file descriptor when object goes out of scope.
 class ScopedFdCloser {
@@ -296,11 +303,7 @@ class ScopedFdCloser {
 
   ~ScopedFdCloser() {
     ThreadRestrictions::AssertIOAllowed();
-    int err;
-    RETRY_ON_EINTR(err, ::close(fd_));
-    if (PREDICT_FALSE(err != 0)) {
-      PLOG(WARNING) << "Failed to close fd " << fd_;
-    }
+    DoClose(fd_);
   }
 
  private:
@@ -348,6 +351,16 @@ Status DoSync(int fd, const string& filename) {
   return Status::OK();
 }
 
+Status DoOpen(const string& filename, int flags, const string& reason, int* fd) {
+  int f;
+  RETRY_ON_EINTR(f, open(filename.c_str(), flags));
+  if (f == -1) {
+    return IOError(Substitute("Error opening for $0: $1", reason, filename), errno);
+  }
+  *fd = f;
+  return Status::OK();
+}
+
 Status DoOpen(const string& filename, Env::OpenMode mode, int* fd) {
   MAYBE_RETURN_EIO(filename, IOError(Env::kInjectedFailureStatusMsg, EIO));
   ThreadRestrictions::AssertIOAllowed();
@@ -556,6 +569,49 @@ const char* ResourceLimitTypeToMacosRlimit(Env::ResourceLimitType t)
{
 }
 #endif
 
+class PosixFifo : public Fifo {
+ public:
+  explicit PosixFifo(string fname) : filename_(std::move(fname)) {}
+
+  const string& filename() const override {
+    return filename_;
+  }
+
+  Status OpenForReads() override {
+    CHECK_EQ(-1, read_fd_);
+    return DoOpen(filename_, O_RDONLY, "reads", &read_fd_);
+  }
+
+  Status OpenForWrites() override {
+    CHECK_EQ(-1, write_fd_);
+    return DoOpen(filename_, O_WRONLY, "writes", &write_fd_);
+  }
+
+  int read_fd() const override {
+    CHECK_NE(-1, read_fd_);
+    return read_fd_;
+  }
+
+  int write_fd() const override {
+    CHECK_NE(-1, write_fd_);
+    return write_fd_;
+  }
+
+  ~PosixFifo() {
+    if (read_fd_ != -1) {
+      DoClose(read_fd_);
+    }
+    if (write_fd_ != -1) {
+      DoClose(write_fd_);
+    }
+  }
+
+ private:
+  const string filename_;
+  int read_fd_ = -1;
+  int write_fd_ = -1;
+};
+
 class PosixSequentialFile: public SequentialFile {
  private:
   const string filename_;
@@ -614,11 +670,7 @@ class PosixRandomAccessFile: public RandomAccessFile {
   PosixRandomAccessFile(string fname, int fd)
       : filename_(std::move(fname)), fd_(fd) {}
   ~PosixRandomAccessFile() {
-    int err;
-    RETRY_ON_EINTR(err, close(fd_));
-    if (PREDICT_FALSE(err != 0)) {
-      PLOG(WARNING) << "Failed to close " << filename_;
-    }
+    DoClose(fd_);
   }
 
   virtual Status Read(uint64_t offset, Slice result) const OVERRIDE {
@@ -1171,6 +1223,16 @@ class PosixEnv : public Env {
     return Status::OK();
   }
 
+  virtual Status NewFifo(const string& fname, unique_ptr<Fifo>* fifo) override
{
+    TRACE_EVENT1("io", "PosixEnv::NewFifo", "path", fname);
+    int m = mkfifo(fname.c_str(), 0666);
+    if (m != 0) {
+      return IOError(Substitute("Error creating fifo $0", fname), errno);
+    }
+    fifo->reset(new PosixFifo(fname));
+    return Status::OK();
+  }
+
   virtual bool FileExists(const string& fname) OVERRIDE {
     TRACE_EVENT1("io", "PosixEnv::FileExists", "path", fname);
     ThreadRestrictions::AssertIOAllowed();
@@ -1397,11 +1459,7 @@ class PosixEnv : public Env {
       result = IOError(fname, errno);
     } else if (LockOrUnlock(fd, true) == -1) {
       result = IOError("lock " + fname, errno);
-      int err;
-      RETRY_ON_EINTR(err, close(fd));
-      if (PREDICT_FALSE(err != 0)) {
-        PLOG(WARNING) << "Failed to close fd " << fd;
-      }
+      DoClose(fd);
     } else {
       auto my_lock = new PosixFileLock;
       my_lock->fd_ = fd;
@@ -1418,11 +1476,7 @@ class PosixEnv : public Env {
     if (LockOrUnlock(my_lock->fd_, false) == -1) {
       result = IOError("unlock", errno);
     }
-    int err;
-    RETRY_ON_EINTR(err, close(my_lock->fd_));
-    if (PREDICT_FALSE(err != 0)) {
-      PLOG(WARNING) << "Failed to close fd " << my_lock->fd_;
-    }
+    DoClose(my_lock->fd_);
     return result;
   }
 


Mime
View raw message