parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From n...@apache.org
Subject [1/2] parquet-cpp git commit: PARQUET-451: Add RowGroupReader helper class and refactor parquet_reader.cc into DebugPrint
Date Thu, 28 Jan 2016 05:13:20 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 8fc24f861 -> 1f24e7658


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/reader.h b/src/parquet/reader.h
index 4a40e04..e8a6806 100644
--- a/src/parquet/reader.h
+++ b/src/parquet/reader.h
@@ -19,14 +19,18 @@
 #define PARQUET_FILE_READER_H
 
 #include <cstdint>
+#include <memory>
 #include <string>
 #include <stdio.h>
+#include <unordered_map>
 
 #include "parquet/thrift/parquet_types.h"
-#include "parquet/parquet.h"
+#include "parquet/types.h"
 
 namespace parquet_cpp {
 
+class ColumnReader;
+
 class FileLike {
  public:
   virtual ~FileLike() {}
@@ -35,7 +39,9 @@ class FileLike {
   virtual size_t Size() = 0;
   virtual size_t Tell() = 0;
   virtual void Seek(size_t pos) = 0;
-  virtual void Read(size_t nbytes, uint8_t* out, size_t* bytes_read) = 0;
+
+  // Returns actual number of bytes read
+  virtual size_t Read(size_t nbytes, uint8_t* out) = 0;
 };
 
 
@@ -50,36 +56,83 @@ class LocalFile : public FileLike {
   virtual size_t Size();
   virtual size_t Tell();
   virtual void Seek(size_t pos);
-  virtual void Read(size_t nbytes, uint8_t* out, size_t* bytes_read);
+
+  // Returns actual number of bytes read
+  virtual size_t Read(size_t nbytes, uint8_t* out);
 
   bool is_open() const { return is_open_;}
   const std::string& path() const { return path_;}
 
  private:
+  void CloseFile();
+
   std::string path_;
   FILE* file_;
   bool is_open_;
 };
 
+class ParquetFileReader;
+
+class RowGroupReader {
+ public:
+  RowGroupReader(ParquetFileReader* parent, parquet::RowGroup* group) :
+      parent_(parent),
+      row_group_(group) {}
+
+  // Construct a ColumnReader for the indicated row group-relative column. The
+  // returned object is owned by the RowGroupReader
+  ColumnReader* Column(size_t i);
+
+  size_t num_columns() const {
+    return row_group_->columns.size();
+  }
+
+ private:
+  friend class ParquetFileReader;
+
+  ParquetFileReader* parent_;
+  parquet::RowGroup* row_group_;
+
+  // Column index -> ColumnReader
+  std::unordered_map<int, std::shared_ptr<ColumnReader> > column_readers_;
+};
+
 
 class ParquetFileReader {
  public:
-  ParquetFileReader() : buffer_(nullptr) {}
-  ~ParquetFileReader() {}
+  ParquetFileReader();
+  ~ParquetFileReader();
 
-  // The class takes ownership of the passed file-like object
+  // This class does _not_ take ownership of the file. You must manage its
+  // lifetime separately
   void Open(FileLike* buffer);
 
   void Close();
 
   void ParseMetaData();
 
+  // The RowGroupReader is owned by the FileReader
+  RowGroupReader* RowGroup(size_t i);
+
+  size_t num_row_groups() const {
+    return metadata_.row_groups.size();
+  }
+
   const parquet::FileMetaData& metadata() const {
     return metadata_;
   }
 
+  void DebugPrint(std::ostream& stream, bool print_values = true);
+
  private:
+  friend class RowGroupReader;
+
   parquet::FileMetaData metadata_;
+  bool parsed_metadata_;
+
+  // Row group index -> RowGroupReader
+  std::unordered_map<int, std::shared_ptr<RowGroupReader> > row_group_readers_;
+
   FileLike* buffer_;
 };
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/types.h
----------------------------------------------------------------------
diff --git a/src/parquet/types.h b/src/parquet/types.h
new file mode 100644
index 0000000..37f538a
--- /dev/null
+++ b/src/parquet/types.h
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_TYPES_H
+#define PARQUET_TYPES_H
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <string>
+
+#include "parquet/thrift/parquet_types.h"
+
+namespace parquet_cpp {
+
+struct ByteArray {
+  uint32_t len;
+  const uint8_t* ptr;
+};
+
+
+static inline std::string ByteArrayToString(const ByteArray& a) {
+  return std::string(reinterpret_cast<const char*>(a.ptr), a.len);
+}
+
+static inline int ByteCompare(const ByteArray& x1, const ByteArray& x2) {
+  int len = std::min(x1.len, x2.len);
+  int cmp = memcmp(x1.ptr, x2.ptr, len);
+  if (cmp != 0) return cmp;
+  if (len < x1.len) return 1;
+  if (len < x2.len) return -1;
+  return 0;
+}
+
+template <int TYPE>
+struct type_traits {
+};
+
+template <>
+struct type_traits<parquet::Type::BOOLEAN> {
+  typedef bool value_type;
+  static constexpr parquet::Type::type parquet_type = parquet::Type::BOOLEAN;
+
+  static constexpr size_t value_byte_size = 1;
+};
+
+template <>
+struct type_traits<parquet::Type::INT32> {
+  typedef int32_t value_type;
+  static constexpr parquet::Type::type parquet_type = parquet::Type::INT32;
+
+  static constexpr size_t value_byte_size = 4;
+};
+
+template <>
+struct type_traits<parquet::Type::INT64> {
+  typedef int64_t value_type;
+  static constexpr parquet::Type::type parquet_type = parquet::Type::INT64;
+
+  static constexpr size_t value_byte_size = 8;
+};
+
+template <>
+struct type_traits<parquet::Type::INT96> {
+  // TODO
+  typedef void* value_type;
+  static constexpr parquet::Type::type parquet_type = parquet::Type::INT96;
+
+  static constexpr size_t value_byte_size = 12;
+};
+
+template <>
+struct type_traits<parquet::Type::FLOAT> {
+  typedef float value_type;
+  static constexpr parquet::Type::type parquet_type = parquet::Type::FLOAT;
+
+  static constexpr size_t value_byte_size = 4;
+};
+
+template <>
+struct type_traits<parquet::Type::DOUBLE> {
+  typedef double value_type;
+  static constexpr parquet::Type::type parquet_type = parquet::Type::DOUBLE;
+
+  static constexpr size_t value_byte_size = 8;
+};
+
+template <>
+struct type_traits<parquet::Type::BYTE_ARRAY> {
+  typedef ByteArray value_type;
+  static constexpr parquet::Type::type parquet_type = parquet::Type::BYTE_ARRAY;
+
+  static constexpr size_t value_byte_size = sizeof(ByteArray);
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_TYPES_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt
index 766214b..b3c817d 100644
--- a/src/parquet/util/CMakeLists.txt
+++ b/src/parquet/util/CMakeLists.txt
@@ -21,8 +21,13 @@ install(FILES
   logging.h
   rle-encoding.h
   stopwatch.h
+  input_stream.h
   DESTINATION include/parquet/util)
 
+add_library(parquet_util STATIC
+  input_stream.cc
+)
+
 add_library(parquet_test_main
   test_main.cc)
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/util/input_stream.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input_stream.cc b/src/parquet/util/input_stream.cc
new file mode 100644
index 0000000..d0e53ed
--- /dev/null
+++ b/src/parquet/util/input_stream.cc
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/util/input_stream.h"
+
+#include <algorithm>
+
+#include "parquet/exception.h"
+
+namespace parquet_cpp {
+
+InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) :
+   buffer_(buffer), len_(len), offset_(0) {}
+
+const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) {
+  *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
+  return buffer_ + offset_;
+}
+
+const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) {
+  const uint8_t* result = Peek(num_to_read, num_bytes);
+  offset_ += *num_bytes;
+  return result;
+}
+
+ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) {
+  buffer_.resize(len);
+  stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size()));
+}
+
+uint8_t* ScopedInMemoryInputStream::data() {
+  return buffer_.data();
+}
+
+int64_t ScopedInMemoryInputStream::size() {
+  return buffer_.size();
+}
+
+const uint8_t* ScopedInMemoryInputStream::Peek(int num_to_peek,
+                                               int* num_bytes) {
+  return stream_->Peek(num_to_peek, num_bytes);
+}
+
+const uint8_t* ScopedInMemoryInputStream::Read(int num_to_read,
+                                               int* num_bytes) {
+  return stream_->Read(num_to_read, num_bytes);
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/util/input_stream.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input_stream.h b/src/parquet/util/input_stream.h
new file mode 100644
index 0000000..ece2488
--- /dev/null
+++ b/src/parquet/util/input_stream.h
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_INPUT_STREAM_H
+#define PARQUET_INPUT_STREAM_H
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+namespace parquet_cpp {
+
+// Interface for the column reader to get the bytes. The interface is a stream
+// interface, meaning the bytes in order and once a byte is read, it does not
+// need to be read again.
+class InputStream {
+ public:
+  // Returns the next 'num_to_peek' without advancing the current position.
+  // *num_bytes will contain the number of bytes returned which can only be
+  // less than num_to_peek at end of stream cases.
+  // Since the position is not advanced, calls to this function are idempotent.
+  // The buffer returned to the caller is still owned by the input stream and must
+  // stay valid until the next call to Peek() or Read().
+  virtual const uint8_t* Peek(int num_to_peek, int* num_bytes) = 0;
+
+  // Identical to Peek(), except the current position in the stream is advanced by
+  // *num_bytes.
+  virtual const uint8_t* Read(int num_to_read, int* num_bytes) = 0;
+
+  virtual ~InputStream() {}
+
+ protected:
+  InputStream() {}
+};
+
+// Implementation of an InputStream when all the bytes are in memory.
+class InMemoryInputStream : public InputStream {
+ public:
+  InMemoryInputStream(const uint8_t* buffer, int64_t len);
+  virtual const uint8_t* Peek(int num_to_peek, int* num_bytes);
+  virtual const uint8_t* Read(int num_to_read, int* num_bytes);
+
+ private:
+  const uint8_t* buffer_;
+  int64_t len_;
+  int64_t offset_;
+};
+
+
+// A wrapper for InMemoryInputStream to manage the memory.
+class ScopedInMemoryInputStream : public InputStream {
+ public:
+  explicit ScopedInMemoryInputStream(int64_t len);
+  uint8_t* data();
+  int64_t size();
+  virtual const uint8_t* Peek(int num_to_peek, int* num_bytes);
+  virtual const uint8_t* Read(int num_to_read, int* num_bytes);
+
+ private:
+  std::vector<uint8_t> buffer_;
+  std::unique_ptr<InMemoryInputStream> stream_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_INPUT_STREAM_H


Mime
View raw message