parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-619: Add OutputStream for local files
Date Wed, 18 May 2016 17:55:28 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 45ca7bab9 -> 41981d981


PARQUET-619: Add OutputStream for local files

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes #107 from xhochy/parquet-619 and squashes the following commits:

612a2f2 [Uwe L. Korn] Check that the correct data was written
66fb32a [Uwe L. Korn] Add unit test for LocalFileOutputStream
a1179d1 [Uwe L. Korn] Add missing header
6ecb940 [Uwe L. Korn] Add OutputStream for local files


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/41981d98
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/41981d98
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/41981d98

Branch: refs/heads/master
Commit: 41981d981051f7f4f318f6edc6d3d9526d764489
Parents: 45ca7ba
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Wed May 18 10:54:45 2016 -0700
Committer: Wes McKinney <wesm@apache.org>
Committed: Wed May 18 10:54:45 2016 -0700

----------------------------------------------------------------------
 src/parquet/util/input-output-test.cc | 33 ++++++++++++++++++++
 src/parquet/util/output.cc            | 50 ++++++++++++++++++++++++++++++
 src/parquet/util/output.h             | 27 ++++++++++++++++
 3 files changed, 110 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41981d98/src/parquet/util/input-output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input-output-test.cc b/src/parquet/util/input-output-test.cc
index a98ae08..72aad9c 100644
--- a/src/parquet/util/input-output-test.cc
+++ b/src/parquet/util/input-output-test.cc
@@ -208,4 +208,37 @@ TYPED_TEST(TestFileReaders, BadSeek) {
   ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException);
 }
 
+class TestFileWriter : public ::testing::Test {
+ public:
+  void SetUp() {
+    test_path_ = "parquet-input-output-test.txt";
+    if (file_exists(test_path_)) { std::remove(test_path_.c_str()); }
+  }
+
+  void TearDown() { DeleteTestFile(); }
+
+  void DeleteTestFile() {
+    if (file_exists(test_path_)) { std::remove(test_path_.c_str()); }
+  }
+
+ protected:
+  std::string test_path_;
+  uint8_t test_data_[4] = {1, 2, 3, 4};
+};
+
+TEST_F(TestFileWriter, Write) {
+  LocalFileOutputStream sink(test_path_);
+  ASSERT_EQ(0, sink.Tell());
+  sink.Write(test_data_, 4);
+  ASSERT_EQ(4, sink.Tell());
+  sink.Close();
+
+  // Check that the correct content was written
+  LocalFileSource source;
+  source.Open(test_path_);
+  std::shared_ptr<Buffer> buffer = source.Read(4);
+  ASSERT_EQ(4, buffer->size());
+  ASSERT_EQ(0, memcmp(test_data_, buffer->data(), 4));
+}
+
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41981d98/src/parquet/util/output.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.cc b/src/parquet/util/output.cc
index 1d27355..4f024a5 100644
--- a/src/parquet/util/output.cc
+++ b/src/parquet/util/output.cc
@@ -26,6 +26,11 @@
 namespace parquet {
 
 // ----------------------------------------------------------------------
+// OutputStream
+
+OutputStream::~OutputStream() {}
+
+// ----------------------------------------------------------------------
 // In-memory output stream
 
 InMemoryOutputStream::InMemoryOutputStream(
@@ -35,6 +40,8 @@ InMemoryOutputStream::InMemoryOutputStream(
   buffer_.reset(new OwnedMutableBuffer(initial_capacity, allocator));
 }
 
+InMemoryOutputStream::~InMemoryOutputStream() {}
+
 uint8_t* InMemoryOutputStream::Head() {
   return buffer_->mutable_data() + size_;
 }
@@ -63,4 +70,47 @@ std::shared_ptr<Buffer> InMemoryOutputStream::GetBuffer() {
   return result;
 }
 
+// ----------------------------------------------------------------------
+// local file output stream
+
+LocalFileOutputStream::LocalFileOutputStream(const std::string& path) : is_open_(true)
{
+  file_ = fopen(path.c_str(), "wb");
+  if (file_ == nullptr || ferror(file_)) {
+    std::stringstream ss;
+    ss << "Unable to open file: " << path;
+    throw ParquetException(ss.str());
+  }
+}
+
+LocalFileOutputStream::~LocalFileOutputStream() {
+  CloseFile();
+}
+
+void LocalFileOutputStream::Close() {
+  CloseFile();
+}
+
+int64_t LocalFileOutputStream::Tell() {
+  DCHECK(is_open_);
+  int64_t position = ftell(file_);
+  if (position < 0) { throw ParquetException("ftell failed, did the file disappear?");
}
+  return position;
+}
+
+void LocalFileOutputStream::Write(const uint8_t* data, int64_t length) {
+  DCHECK(is_open_);
+  int64_t bytes_written = fwrite(data, sizeof(uint8_t), length, file_);
+  if (bytes_written != length) {
+    int error_code = ferror(file_);
+    throw ParquetException("fwrite failed, error code: " + std::to_string(error_code));
+  }
+}
+
+void LocalFileOutputStream::CloseFile() {
+  if (is_open_) {
+    fclose(file_);
+    is_open_ = false;
+  }
+}
+
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41981d98/src/parquet/util/output.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h
index 472a9cc..a7cb773 100644
--- a/src/parquet/util/output.h
+++ b/src/parquet/util/output.h
@@ -20,6 +20,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <string>
 
 #include "parquet/util/macros.h"
 #include "parquet/util/mem-allocator.h"
@@ -35,6 +36,8 @@ class ResizableBuffer;
 // Abstract output stream
 class OutputStream {
  public:
+  virtual ~OutputStream();
+
   // Close the output stream
   virtual void Close() = 0;
 
@@ -53,6 +56,8 @@ class InMemoryOutputStream : public OutputStream {
   explicit InMemoryOutputStream(int64_t initial_capacity = IN_MEMORY_DEFAULT_CAPACITY,
       MemoryAllocator* allocator = default_allocator());
 
+  virtual ~InMemoryOutputStream();
+
   // Close is currently a no-op with the in-memory stream
   virtual void Close() {}
 
@@ -74,6 +79,28 @@ class InMemoryOutputStream : public OutputStream {
   DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream);
 };
 
+class LocalFileOutputStream : public OutputStream {
+ public:
+  explicit LocalFileOutputStream(const std::string& path);
+
+  virtual ~LocalFileOutputStream();
+
+  // Close the output stream
+  void Close() override;
+
+  // Return the current position in the output stream relative to the start
+  int64_t Tell() override;
+
+  // Copy bytes into the output stream
+  void Write(const uint8_t* data, int64_t length) override;
+
+ private:
+  void CloseFile();
+
+  FILE* file_;
+  bool is_open_;
+};
+
 }  // namespace parquet
 
 #endif  // PARQUET_UTIL_OUTPUT_H


Mime
View raw message