Repository: parquet-cpp
Updated Branches:
refs/heads/master 45ca7bab9 -> 41981d981
PARQUET-619: Add OutputStream for local files
Author: Uwe L. Korn <uwelk@xhochy.com>
Closes #107 from xhochy/parquet-619 and squashes the following commits:
612a2f2 [Uwe L. Korn] Check that the correct data was written
66fb32a [Uwe L. Korn] Add unit test for LocalFileOutputStream
a1179d1 [Uwe L. Korn] Add missing header
6ecb940 [Uwe L. Korn] Add OutputStream for local files
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/41981d98
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/41981d98
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/41981d98
Branch: refs/heads/master
Commit: 41981d981051f7f4f318f6edc6d3d9526d764489
Parents: 45ca7ba
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Wed May 18 10:54:45 2016 -0700
Committer: Wes McKinney <wesm@apache.org>
Committed: Wed May 18 10:54:45 2016 -0700
----------------------------------------------------------------------
src/parquet/util/input-output-test.cc | 33 ++++++++++++++++++++
src/parquet/util/output.cc | 50 ++++++++++++++++++++++++++++++
src/parquet/util/output.h | 27 ++++++++++++++++
3 files changed, 110 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41981d98/src/parquet/util/input-output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input-output-test.cc b/src/parquet/util/input-output-test.cc
index a98ae08..72aad9c 100644
--- a/src/parquet/util/input-output-test.cc
+++ b/src/parquet/util/input-output-test.cc
@@ -208,4 +208,37 @@ TYPED_TEST(TestFileReaders, BadSeek) {
ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException);
}
+class TestFileWriter : public ::testing::Test {
+ public:
+ void SetUp() {
+ test_path_ = "parquet-input-output-test.txt";
+ if (file_exists(test_path_)) { std::remove(test_path_.c_str()); }
+ }
+
+ void TearDown() { DeleteTestFile(); }
+
+ void DeleteTestFile() {
+ if (file_exists(test_path_)) { std::remove(test_path_.c_str()); }
+ }
+
+ protected:
+ std::string test_path_;
+ uint8_t test_data_[4] = {1, 2, 3, 4};
+};
+
+TEST_F(TestFileWriter, Write) {
+ LocalFileOutputStream sink(test_path_);
+ ASSERT_EQ(0, sink.Tell());
+ sink.Write(test_data_, 4);
+ ASSERT_EQ(4, sink.Tell());
+ sink.Close();
+
+ // Check that the correct content was written
+ LocalFileSource source;
+ source.Open(test_path_);
+ std::shared_ptr<Buffer> buffer = source.Read(4);
+ ASSERT_EQ(4, buffer->size());
+ ASSERT_EQ(0, memcmp(test_data_, buffer->data(), 4));
+}
+
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41981d98/src/parquet/util/output.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.cc b/src/parquet/util/output.cc
index 1d27355..4f024a5 100644
--- a/src/parquet/util/output.cc
+++ b/src/parquet/util/output.cc
@@ -26,6 +26,11 @@
namespace parquet {
// ----------------------------------------------------------------------
+// OutputStream
+
+OutputStream::~OutputStream() {}
+
+// ----------------------------------------------------------------------
// In-memory output stream
InMemoryOutputStream::InMemoryOutputStream(
@@ -35,6 +40,8 @@ InMemoryOutputStream::InMemoryOutputStream(
buffer_.reset(new OwnedMutableBuffer(initial_capacity, allocator));
}
+InMemoryOutputStream::~InMemoryOutputStream() {}
+
uint8_t* InMemoryOutputStream::Head() {
return buffer_->mutable_data() + size_;
}
@@ -63,4 +70,47 @@ std::shared_ptr<Buffer> InMemoryOutputStream::GetBuffer() {
return result;
}
+// ----------------------------------------------------------------------
+// local file output stream
+
+LocalFileOutputStream::LocalFileOutputStream(const std::string& path) : is_open_(true)
{
+ file_ = fopen(path.c_str(), "wb");
+ if (file_ == nullptr || ferror(file_)) {
+ std::stringstream ss;
+ ss << "Unable to open file: " << path;
+ throw ParquetException(ss.str());
+ }
+}
+
+LocalFileOutputStream::~LocalFileOutputStream() {
+ CloseFile();
+}
+
+void LocalFileOutputStream::Close() {
+ CloseFile();
+}
+
+int64_t LocalFileOutputStream::Tell() {
+ DCHECK(is_open_);
+ int64_t position = ftell(file_);
+ if (position < 0) { throw ParquetException("ftell failed, did the file disappear?");
}
+ return position;
+}
+
+void LocalFileOutputStream::Write(const uint8_t* data, int64_t length) {
+ DCHECK(is_open_);
+ int64_t bytes_written = fwrite(data, sizeof(uint8_t), length, file_);
+ if (bytes_written != length) {
+ int error_code = ferror(file_);
+ throw ParquetException("fwrite failed, error code: " + std::to_string(error_code));
+ }
+}
+
+void LocalFileOutputStream::CloseFile() {
+ if (is_open_) {
+ fclose(file_);
+ is_open_ = false;
+ }
+}
+
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41981d98/src/parquet/util/output.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h
index 472a9cc..a7cb773 100644
--- a/src/parquet/util/output.h
+++ b/src/parquet/util/output.h
@@ -20,6 +20,7 @@
#include <cstdint>
#include <memory>
+#include <string>
#include "parquet/util/macros.h"
#include "parquet/util/mem-allocator.h"
@@ -35,6 +36,8 @@ class ResizableBuffer;
// Abstract output stream
class OutputStream {
public:
+ virtual ~OutputStream();
+
// Close the output stream
virtual void Close() = 0;
@@ -53,6 +56,8 @@ class InMemoryOutputStream : public OutputStream {
explicit InMemoryOutputStream(int64_t initial_capacity = IN_MEMORY_DEFAULT_CAPACITY,
MemoryAllocator* allocator = default_allocator());
+ virtual ~InMemoryOutputStream();
+
// Close is currently a no-op with the in-memory stream
virtual void Close() {}
@@ -74,6 +79,28 @@ class InMemoryOutputStream : public OutputStream {
DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream);
};
+class LocalFileOutputStream : public OutputStream {
+ public:
+ explicit LocalFileOutputStream(const std::string& path);
+
+ virtual ~LocalFileOutputStream();
+
+ // Close the output stream
+ void Close() override;
+
+ // Return the current position in the output stream relative to the start
+ int64_t Tell() override;
+
+ // Copy bytes into the output stream
+ void Write(const uint8_t* data, int64_t length) override;
+
+ private:
+ void CloseFile();
+
+ FILE* file_;
+ bool is_open_;
+};
+
} // namespace parquet
#endif // PARQUET_UTIL_OUTPUT_H
|