arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [1/3] arrow git commit: ARROW-1154: [C++] Import miscellaneous computational utility code from parquet-cpp
Date Tue, 27 Jun 2017 12:14:04 GMT
Repository: arrow
Updated Branches:
  refs/heads/master cb5f2b953 -> b06522870


http://git-wip-us.apache.org/repos/asf/arrow/blob/b0652287/cpp/src/arrow/util/hash-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/hash-util.h b/cpp/src/arrow/util/hash-util.h
new file mode 100644
index 0000000..ffe1a9d
--- /dev/null
+++ b/cpp/src/arrow/util/hash-util.h
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// From Apache Impala (incubating) as of 2016-02-22
+
+#ifndef ARROW_UTIL_HASH_UTIL_H
+#define ARROW_UTIL_HASH_UTIL_H
+
+#include <cstdint>
+
+#include "arrow/util/compiler-util.h"
+#include "arrow/util/cpu-info.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/sse-util.h"
+
+namespace arrow {
+
+/// Utility class to compute hash values.
+class HashUtil {
+ public:
+  /// Compute the Crc32 hash for data using SSE4 instructions.  The input hash
+  /// parameter is the current hash/seed value.
+  /// This should only be called if SSE is supported.
+  /// This is ~4x faster than Fnv/Boost Hash.
+  /// TODO: crc32 hashes with different seeds do not result in different hash functions.
+  /// The resulting hashes are correlated.
+  /// TODO: update this to also use SSE4_crc32_u64 and SSE4_crc32_u16 where appropriate.
+  static uint32_t CrcHash(const void* data, int32_t bytes, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    uint32_t words = bytes / sizeof(uint32_t);
+    bytes = bytes % sizeof(uint32_t);
+
+    const uint32_t* p = reinterpret_cast<const uint32_t*>(data);
+    while (words--) {
+      hash = SSE4_crc32_u32(hash, *p);
+      ++p;
+    }
+
+    const uint8_t* s = reinterpret_cast<const uint8_t*>(p);
+    while (bytes--) {
+      hash = SSE4_crc32_u8(hash, *s);
+      ++s;
+    }
+
+    // The lower half of the CRC hash has has poor uniformity, so swap the halves
+    // for anyone who only uses the first several bits of the hash.
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  /// CrcHash() specialized for 1-byte data
+  static inline uint32_t CrcHash1(const void* v, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    const uint8_t* s = reinterpret_cast<const uint8_t*>(v);
+    hash = SSE4_crc32_u8(hash, *s);
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  /// CrcHash() specialized for 2-byte data
+  static inline uint32_t CrcHash2(const void* v, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    const uint16_t* s = reinterpret_cast<const uint16_t*>(v);
+    hash = SSE4_crc32_u16(hash, *s);
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  /// CrcHash() specialized for 4-byte data
+  static inline uint32_t CrcHash4(const void* v, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    const uint32_t* p = reinterpret_cast<const uint32_t*>(v);
+    hash = SSE4_crc32_u32(hash, *p);
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  /// CrcHash() specialized for 8-byte data
+  static inline uint32_t CrcHash8(const void* v, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    const uint64_t* p = reinterpret_cast<const uint64_t*>(v);
+    hash = SSE4_crc32_u64(hash, *p);
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  /// CrcHash() specialized for 12-byte data
+  static inline uint32_t CrcHash12(const void* v, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    const uint64_t* p = reinterpret_cast<const uint64_t*>(v);
+    hash = SSE4_crc32_u64(hash, *p);
+    ++p;
+    hash = SSE4_crc32_u32(hash, *reinterpret_cast<const uint32_t*>(p));
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  /// CrcHash() specialized for 16-byte data
+  static inline uint32_t CrcHash16(const void* v, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    const uint64_t* p = reinterpret_cast<const uint64_t*>(v);
+    hash = SSE4_crc32_u64(hash, *p);
+    ++p;
+    hash = SSE4_crc32_u64(hash, *p);
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  static const uint64_t MURMUR_PRIME = 0xc6a4a7935bd1e995;
+  static const int MURMUR_R = 47;
+
+  /// Murmur2 hash implementation returning 64-bit hashes.
+  static uint64_t MurmurHash2_64(const void* input, int len, uint64_t seed) {
+    uint64_t h = seed ^ (len * MURMUR_PRIME);
+
+    const uint64_t* data = reinterpret_cast<const uint64_t*>(input);
+    const uint64_t* end = data + (len / sizeof(uint64_t));
+
+    while (data != end) {
+      uint64_t k = *data++;
+      k *= MURMUR_PRIME;
+      k ^= k >> MURMUR_R;
+      k *= MURMUR_PRIME;
+      h ^= k;
+      h *= MURMUR_PRIME;
+    }
+
+    const uint8_t* data2 = reinterpret_cast<const uint8_t*>(data);
+    switch (len & 7) {
+      case 7:
+        h ^= uint64_t(data2[6]) << 48;
+      case 6:
+        h ^= uint64_t(data2[5]) << 40;
+      case 5:
+        h ^= uint64_t(data2[4]) << 32;
+      case 4:
+        h ^= uint64_t(data2[3]) << 24;
+      case 3:
+        h ^= uint64_t(data2[2]) << 16;
+      case 2:
+        h ^= uint64_t(data2[1]) << 8;
+      case 1:
+        h ^= uint64_t(data2[0]);
+        h *= MURMUR_PRIME;
+    }
+
+    h ^= h >> MURMUR_R;
+    h *= MURMUR_PRIME;
+    h ^= h >> MURMUR_R;
+    return h;
+  }
+
+  /// default values recommended by http://isthe.com/chongo/tech/comp/fnv/
+  static const uint32_t FNV_PRIME = 0x01000193;  //   16777619
+  static const uint32_t FNV_SEED = 0x811C9DC5;   // 2166136261
+  static const uint64_t FNV64_PRIME = 1099511628211UL;
+  static const uint64_t FNV64_SEED = 14695981039346656037UL;
+
+  /// Implementation of the Fowler-Noll-Vo hash function. This is not as performant
+  /// as boost's hash on int types (2x slower) but has bit entropy.
+  /// For ints, boost just returns the value of the int which can be pathological.
+  /// For example, if the data is <1000, 2000, 3000, 4000, ..> and then the mod of 1000
+  /// is taken on the hash, all values will collide to the same bucket.
+  /// For string values, Fnv is slightly faster than boost.
+  /// IMPORTANT: FNV hash suffers from poor diffusion of the least significant bit,
+  /// which can lead to poor results when input bytes are duplicated.
+  /// See FnvHash64to32() for how this can be mitigated.
+  static uint64_t FnvHash64(const void* data, int32_t bytes, uint64_t hash) {
+    const uint8_t* ptr = reinterpret_cast<const uint8_t*>(data);
+    while (bytes--) {
+      hash = (*ptr ^ hash) * FNV64_PRIME;
+      ++ptr;
+    }
+    return hash;
+  }
+
+  /// Return a 32-bit hash computed by invoking FNV-64 and folding the result to 32-bits.
+  /// This technique is recommended instead of FNV-32 since the LSB of an FNV hash is the
+  /// XOR of the LSBs of its input bytes, leading to poor results for duplicate inputs.
+  /// The input seed 'hash' is duplicated so the top half of the seed is not all zero.
+  /// Data length must be at least 1 byte: zero-length data should be handled separately,
+  /// for example using CombineHash with a unique constant value to avoid returning the
+  /// hash argument. Zero-length data gives terrible results: the initial hash value is
+  /// xored with itself cancelling all bits.
+  static uint32_t FnvHash64to32(const void* data, int32_t bytes, uint32_t hash) {
+    // IMPALA-2270: this function should never be used for zero-byte inputs.
+    DCHECK_GT(bytes, 0);
+    uint64_t hash_u64 = hash | ((uint64_t)hash << 32);
+    hash_u64 = FnvHash64(data, bytes, hash_u64);
+    return (hash_u64 >> 32) ^ (hash_u64 & 0xFFFFFFFF);
+  }
+
+  /// Computes the hash value for data.  Will call either CrcHash or MurmurHash
+  /// depending on hardware capabilities.
+  /// Seed values for different steps of the query execution should use different seeds
+  /// to prevent accidental key collisions. (See IMPALA-219 for more details).
+  static uint32_t Hash(const void* data, int32_t bytes, uint32_t seed) {
+#ifdef ARROW_USE_SSE
+    if (LIKELY(CpuInfo::IsSupported(CpuInfo::SSE4_2))) {
+      return CrcHash(data, bytes, seed);
+    } else {
+      return MurmurHash2_64(data, bytes, seed);
+    }
+#else
+    return static_cast<uint32_t>(MurmurHash2_64(data, bytes, seed));
+#endif
+  }
+
+  /// The magic number (used in hash_combine()) 0x9e3779b9 = 2^32 / (golden ratio).
+  static const uint32_t HASH_COMBINE_SEED = 0x9e3779b9;
+
+  /// Combine hashes 'value' and 'seed' to get a new hash value.  Similar to
+  /// boost::hash_combine(), but for uint32_t. This function should be used with a
+  /// constant first argument to update the hash value for zero-length values such as
+  /// NULL, boolean, and empty strings.
+  static inline uint32_t HashCombine32(uint32_t value, uint32_t seed) {
+    return seed ^ (HASH_COMBINE_SEED + value + (seed << 6) + (seed >> 2));
+  }
+
+  // Get 32 more bits of randomness from a 32-bit hash:
+  static inline uint32_t Rehash32to32(const uint32_t hash) {
+    // Constants generated by uuidgen(1) with the -r flag
+    static const uint64_t m = 0x7850f11ec6d14889ull, a = 0x6773610597ca4c63ull;
+    // This is strongly universal hashing following Dietzfelbinger's "Universal hashing
+    // and k-wise independent random variables via integer arithmetic without primes". As
+    // such, for any two distinct uint32_t's hash1 and hash2, the probability (over the
+    // randomness of the constants) that any subset of bit positions of
+    // Rehash32to32(hash1) is equal to the same subset of bit positions
+    // Rehash32to32(hash2) is minimal.
+    return (static_cast<uint64_t>(hash) * m + a) >> 32;
+  }
+
+  static inline uint64_t Rehash32to64(const uint32_t hash) {
+    static const uint64_t m1 = 0x47b6137a44974d91ull, m2 = 0x8824ad5ba2b7289cull,
+                          a1 = 0x705495c62df1424aull, a2 = 0x9efc49475c6bfb31ull;
+    const uint64_t hash1 = (static_cast<uint64_t>(hash) * m1 + a1) >> 32;
+    const uint64_t hash2 = (static_cast<uint64_t>(hash) * m2 + a2) >> 32;
+    return hash1 | (hash2 << 32);
+  }
+};
+
+}  // namespace arrow
+
+#endif  // ARROW_UTIL_HASH_UTIL_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/b0652287/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index 8a929da..b618121 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -38,6 +38,7 @@ namespace arrow {
 
 #define ARROW_LOG_INTERNAL(level) ::arrow::internal::CerrLog(level)
 #define ARROW_LOG(level) ARROW_LOG_INTERNAL(ARROW_##level)
+#define ARROW_IGNORE_EXPR(expr) ((void)(expr));
 
 #define ARROW_CHECK(condition)                           \
   (condition) ? 0                                        \
@@ -47,25 +48,32 @@ namespace arrow {
 #ifdef NDEBUG
 #define ARROW_DFATAL ARROW_WARNING
 
-#define DCHECK(condition) \
-  while (false)           \
+#define DCHECK(condition)      \
+  ARROW_IGNORE_EXPR(condition) \
+  while (false)                \
   ::arrow::internal::NullLog()
 #define DCHECK_EQ(val1, val2) \
+  ARROW_IGNORE_EXPR(val1)     \
   while (false)               \
   ::arrow::internal::NullLog()
 #define DCHECK_NE(val1, val2) \
+  ARROW_IGNORE_EXPR(val1)     \
   while (false)               \
   ::arrow::internal::NullLog()
 #define DCHECK_LE(val1, val2) \
+  ARROW_IGNORE_EXPR(val1)     \
   while (false)               \
   ::arrow::internal::NullLog()
 #define DCHECK_LT(val1, val2) \
+  ARROW_IGNORE_EXPR(val1)     \
   while (false)               \
   ::arrow::internal::NullLog()
 #define DCHECK_GE(val1, val2) \
+  ARROW_IGNORE_EXPR(val1)     \
   while (false)               \
   ::arrow::internal::NullLog()
 #define DCHECK_GT(val1, val2) \
+  ARROW_IGNORE_EXPR(val1)     \
   while (false)               \
   ::arrow::internal::NullLog()
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/b0652287/cpp/src/arrow/util/rle-encoding-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/rle-encoding-test.cc b/cpp/src/arrow/util/rle-encoding-test.cc
new file mode 100644
index 0000000..7c9b33c
--- /dev/null
+++ b/cpp/src/arrow/util/rle-encoding-test.cc
@@ -0,0 +1,460 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// From Apache Impala as of 2016-01-29
+
+#include <gtest/gtest.h>
+#include <math.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <boost/utility.hpp>
+
+#include <cstdint>
+#include <iostream>
+#include <random>
+#include <vector>
+
+#include "arrow/util/bit-stream-utils.h"
+#include "arrow/util/rle-encoding.h"
+
+using std::vector;
+
+namespace arrow {
+
+const int MAX_WIDTH = 32;
+
+TEST(BitArray, TestBool) {
+  const int len = 8;
+  uint8_t buffer[len];
+
+  BitWriter writer(buffer, len);
+
+  // Write alternating 0's and 1's
+  for (int i = 0; i < 8; ++i) {
+    bool result = writer.PutValue(i % 2, 1);
+    EXPECT_TRUE(result);
+  }
+  writer.Flush();
+  EXPECT_EQ((int)buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
+
+  // Write 00110011
+  for (int i = 0; i < 8; ++i) {
+    bool result = false;
+    switch (i) {
+      case 0:
+      case 1:
+      case 4:
+      case 5:
+        result = writer.PutValue(false, 1);
+        break;
+      default:
+        result = writer.PutValue(true, 1);
+        break;
+    }
+    EXPECT_TRUE(result);
+  }
+  writer.Flush();
+
+  // Validate the exact bit value
+  EXPECT_EQ((int)buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0));
+  EXPECT_EQ((int)buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0));
+
+  // Use the reader and validate
+  BitReader reader(buffer, len);
+  for (int i = 0; i < 8; ++i) {
+    bool val = false;
+    bool result = reader.GetValue(1, &val);
+    EXPECT_TRUE(result);
+    EXPECT_EQ(val, (i % 2) != 0);
+  }
+
+  for (int i = 0; i < 8; ++i) {
+    bool val = false;
+    bool result = reader.GetValue(1, &val);
+    EXPECT_TRUE(result);
+    switch (i) {
+      case 0:
+      case 1:
+      case 4:
+      case 5:
+        EXPECT_EQ(val, false);
+        break;
+      default:
+        EXPECT_EQ(val, true);
+        break;
+    }
+  }
+}
+
+// Writes 'num_vals' values with width 'bit_width' and reads them back.
+void TestBitArrayValues(int bit_width, int num_vals) {
+  int len = static_cast<int>(BitUtil::Ceil(bit_width * num_vals, 8));
+  EXPECT_GT(len, 0);
+  const uint64_t mod = bit_width == 64 ? 1 : 1LL << bit_width;
+
+  std::vector<uint8_t> buffer(len);
+  BitWriter writer(buffer.data(), len);
+  for (int i = 0; i < num_vals; ++i) {
+    bool result = writer.PutValue(i % mod, bit_width);
+    EXPECT_TRUE(result);
+  }
+  writer.Flush();
+  EXPECT_EQ(writer.bytes_written(), len);
+
+  BitReader reader(buffer.data(), len);
+  for (int i = 0; i < num_vals; ++i) {
+    int64_t val = 0;
+    bool result = reader.GetValue(bit_width, &val);
+    EXPECT_TRUE(result);
+    EXPECT_EQ(val, i % mod);
+  }
+  EXPECT_EQ(reader.bytes_left(), 0);
+}
+
+TEST(BitArray, TestValues) {
+  for (int width = 1; width <= MAX_WIDTH; ++width) {
+    TestBitArrayValues(width, 1);
+    TestBitArrayValues(width, 2);
+    // Don't write too many values
+    TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096);
+    TestBitArrayValues(width, 1024);
+  }
+}
+
+// Test some mixed values
+TEST(BitArray, TestMixed) {
+  const int len = 1024;
+  uint8_t buffer[len];
+  bool parity = true;
+
+  BitWriter writer(buffer, len);
+  for (int i = 0; i < len; ++i) {
+    bool result;
+    if (i % 2 == 0) {
+      result = writer.PutValue(parity, 1);
+      parity = !parity;
+    } else {
+      result = writer.PutValue(i, 10);
+    }
+    EXPECT_TRUE(result);
+  }
+  writer.Flush();
+
+  parity = true;
+  BitReader reader(buffer, len);
+  for (int i = 0; i < len; ++i) {
+    bool result;
+    if (i % 2 == 0) {
+      bool val;
+      result = reader.GetValue(1, &val);
+      EXPECT_EQ(val, parity);
+      parity = !parity;
+    } else {
+      int val;
+      result = reader.GetValue(10, &val);
+      EXPECT_EQ(val, i);
+    }
+    EXPECT_TRUE(result);
+  }
+}
+
+// Validates encoding of values by encoding and decoding them.  If
+// expected_encoding != NULL, also validates that the encoded buffer is
+// exactly 'expected_encoding'.
+// if expected_len is not -1, it will validate the encoded size is correct.
+void ValidateRle(const vector<int>& values, int bit_width, uint8_t* expected_encoding,
+    int expected_len) {
+  const int len = 64 * 1024;
+  uint8_t buffer[len];
+  EXPECT_LE(expected_len, len);
+
+  RleEncoder encoder(buffer, len, bit_width);
+  for (size_t i = 0; i < values.size(); ++i) {
+    bool result = encoder.Put(values[i]);
+    EXPECT_TRUE(result);
+  }
+  int encoded_len = encoder.Flush();
+
+  if (expected_len != -1) { EXPECT_EQ(encoded_len, expected_len); }
+  if (expected_encoding != NULL) {
+    EXPECT_EQ(memcmp(buffer, expected_encoding, expected_len), 0);
+  }
+
+  // Verify read
+  {
+    RleDecoder decoder(buffer, len, bit_width);
+    for (size_t i = 0; i < values.size(); ++i) {
+      uint64_t val;
+      bool result = decoder.Get(&val);
+      EXPECT_TRUE(result);
+      EXPECT_EQ(values[i], val);
+    }
+  }
+
+  // Verify batch read
+  {
+    RleDecoder decoder(buffer, len, bit_width);
+    vector<int> values_read(values.size());
+    ASSERT_EQ(values.size(),
+        decoder.GetBatch(values_read.data(), static_cast<int>(values.size())));
+    EXPECT_EQ(values, values_read);
+  }
+}
+
+// A version of ValidateRle that round-trips the values and returns false if
+// the returned values are not all the same
+bool CheckRoundTrip(const vector<int>& values, int bit_width) {
+  const int len = 64 * 1024;
+  uint8_t buffer[len];
+  RleEncoder encoder(buffer, len, bit_width);
+  for (size_t i = 0; i < values.size(); ++i) {
+    bool result = encoder.Put(values[i]);
+    if (!result) { return false; }
+  }
+  int encoded_len = encoder.Flush();
+  int out = 0;
+
+  {
+    RleDecoder decoder(buffer, encoded_len, bit_width);
+    for (size_t i = 0; i < values.size(); ++i) {
+      EXPECT_TRUE(decoder.Get(&out));
+      if (values[i] != out) { return false; }
+    }
+  }
+
+  // Verify batch read
+  {
+    RleDecoder decoder(buffer, len, bit_width);
+    vector<int> values_read(values.size());
+    if (static_cast<int>(values.size()) !=
+        decoder.GetBatch(values_read.data(), static_cast<int>(values.size()))) {
+      return false;
+    }
+    if (values != values_read) { return false; }
+  }
+
+  return true;
+}
+
+TEST(Rle, SpecificSequences) {
+  const int len = 1024;
+  uint8_t expected_buffer[len];
+  vector<int> values;
+
+  // Test 50 0' followed by 50 1's
+  values.resize(100);
+  for (int i = 0; i < 50; ++i) {
+    values[i] = 0;
+  }
+  for (int i = 50; i < 100; ++i) {
+    values[i] = 1;
+  }
+
+  // expected_buffer valid for bit width <= 1 byte
+  expected_buffer[0] = (50 << 1);
+  expected_buffer[1] = 0;
+  expected_buffer[2] = (50 << 1);
+  expected_buffer[3] = 1;
+  for (int width = 1; width <= 8; ++width) {
+    ValidateRle(values, width, expected_buffer, 4);
+  }
+
+  for (int width = 9; width <= MAX_WIDTH; ++width) {
+    ValidateRle(values, width, NULL, 2 * (1 + static_cast<int>(BitUtil::Ceil(width, 8))));
+  }
+
+  // Test 100 0's and 1's alternating
+  for (int i = 0; i < 100; ++i) {
+    values[i] = i % 2;
+  }
+  int num_groups = static_cast<int>(BitUtil::Ceil(100, 8));
+  expected_buffer[0] = static_cast<uint8_t>((num_groups << 1) | 1);
+  for (int i = 1; i <= 100 / 8; ++i) {
+    expected_buffer[i] = BOOST_BINARY(1 0 1 0 1 0 1 0);
+  }
+  // Values for the last 4 0 and 1's. The upper 4 bits should be padded to 0.
+  expected_buffer[100 / 8 + 1] = BOOST_BINARY(0 0 0 0 1 0 1 0);
+
+  // num_groups and expected_buffer only valid for bit width = 1
+  ValidateRle(values, 1, expected_buffer, 1 + num_groups);
+  for (int width = 2; width <= MAX_WIDTH; ++width) {
+    int num_values = static_cast<int>(BitUtil::Ceil(100, 8)) * 8;
+    ValidateRle(
+        values, width, NULL, 1 + static_cast<int>(BitUtil::Ceil(width * num_values, 8)));
+  }
+}
+
+// ValidateRle on 'num_vals' values with width 'bit_width'. If 'value' != -1, that value
+// is used, otherwise alternating values are used.
+void TestRleValues(int bit_width, int num_vals, int value = -1) {
+  const uint64_t mod = (bit_width == 64) ? 1 : 1LL << bit_width;
+  vector<int> values;
+  for (int v = 0; v < num_vals; ++v) {
+    values.push_back((value != -1) ? value : static_cast<int>(v % mod));
+  }
+  ValidateRle(values, bit_width, NULL, -1);
+}
+
+TEST(Rle, TestValues) {
+  for (int width = 1; width <= MAX_WIDTH; ++width) {
+    TestRleValues(width, 1);
+    TestRleValues(width, 1024);
+    TestRleValues(width, 1024, 0);
+    TestRleValues(width, 1024, 1);
+  }
+}
+
+TEST(Rle, BitWidthZeroRepeated) {
+  uint8_t buffer[1];
+  const int num_values = 15;
+  buffer[0] = num_values << 1;  // repeated indicator byte
+  RleDecoder decoder(buffer, sizeof(buffer), 0);
+  uint8_t val;
+  for (int i = 0; i < num_values; ++i) {
+    bool result = decoder.Get(&val);
+    EXPECT_TRUE(result);
+    EXPECT_EQ(val, 0);  // can only encode 0s with bit width 0
+  }
+  EXPECT_FALSE(decoder.Get(&val));
+}
+
+TEST(Rle, BitWidthZeroLiteral) {
+  uint8_t buffer[1];
+  const int num_groups = 4;
+  buffer[0] = num_groups << 1 | 1;  // literal indicator byte
+  RleDecoder decoder = RleDecoder(buffer, sizeof(buffer), 0);
+  const int num_values = num_groups * 8;
+  uint8_t val;
+  for (int i = 0; i < num_values; ++i) {
+    bool result = decoder.Get(&val);
+    EXPECT_TRUE(result);
+    EXPECT_EQ(val, 0);  // can only encode 0s with bit width 0
+  }
+  EXPECT_FALSE(decoder.Get(&val));
+}
+
+// Test that writes out a repeated group and then a literal
+// group but flush before finishing.
+TEST(BitRle, Flush) {
+  vector<int> values;
+  for (int i = 0; i < 16; ++i)
+    values.push_back(1);
+  values.push_back(0);
+  ValidateRle(values, 1, NULL, -1);
+  values.push_back(1);
+  ValidateRle(values, 1, NULL, -1);
+  values.push_back(1);
+  ValidateRle(values, 1, NULL, -1);
+  values.push_back(1);
+  ValidateRle(values, 1, NULL, -1);
+}
+
+// Test some random sequences.
+TEST(BitRle, Random) {
+  int niters = 50;
+  int ngroups = 1000;
+  int max_group_size = 16;
+  vector<int> values(ngroups + max_group_size);
+
+  // prng setup
+  std::random_device rd;
+  std::uniform_int_distribution<int> dist(1, 20);
+
+  for (int iter = 0; iter < niters; ++iter) {
+    // generate a seed with device entropy
+    uint32_t seed = rd();
+    std::mt19937 gen(seed);
+
+    bool parity = 0;
+    values.resize(0);
+
+    for (int i = 0; i < ngroups; ++i) {
+      int group_size = dist(gen);
+      if (group_size > max_group_size) { group_size = 1; }
+      for (int i = 0; i < group_size; ++i) {
+        values.push_back(parity);
+      }
+      parity = !parity;
+    }
+    if (!CheckRoundTrip(values, BitUtil::NumRequiredBits(values.size()))) {
+      FAIL() << "failing seed: " << seed;
+    }
+  }
+}
+
+// Test a sequence of 1 0's, 2 1's, 3 0's. etc
+// e.g. 011000111100000
+TEST(BitRle, RepeatedPattern) {
+  vector<int> values;
+  const int min_run = 1;
+  const int max_run = 32;
+
+  for (int i = min_run; i <= max_run; ++i) {
+    int v = i % 2;
+    for (int j = 0; j < i; ++j) {
+      values.push_back(v);
+    }
+  }
+
+  // And go back down again
+  for (int i = max_run; i >= min_run; --i) {
+    int v = i % 2;
+    for (int j = 0; j < i; ++j) {
+      values.push_back(v);
+    }
+  }
+
+  ValidateRle(values, 1, NULL, -1);
+}
+
+TEST(BitRle, Overflow) {
+  for (int bit_width = 1; bit_width < 32; bit_width += 3) {
+    int len = RleEncoder::MinBufferSize(bit_width);
+    std::vector<uint8_t> buffer(len);
+    int num_added = 0;
+    bool parity = true;
+
+    RleEncoder encoder(buffer.data(), len, bit_width);
+    // Insert alternating true/false until there is no space left
+    while (true) {
+      bool result = encoder.Put(parity);
+      parity = !parity;
+      if (!result) break;
+      ++num_added;
+    }
+
+    int bytes_written = encoder.Flush();
+    EXPECT_LE(bytes_written, len);
+    EXPECT_GT(num_added, 0);
+
+    RleDecoder decoder(buffer.data(), bytes_written, bit_width);
+    parity = true;
+    uint32_t v;
+    for (int i = 0; i < num_added; ++i) {
+      bool result = decoder.Get(&v);
+      EXPECT_TRUE(result);
+      EXPECT_EQ(v != 0, parity);
+      parity = !parity;
+    }
+    // Make sure we get false when reading past end a couple times.
+    EXPECT_FALSE(decoder.Get(&v));
+    EXPECT_FALSE(decoder.Get(&v));
+  }
+}
+
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/b0652287/cpp/src/arrow/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/rle-encoding.h b/cpp/src/arrow/util/rle-encoding.h
new file mode 100644
index 0000000..9ec6235
--- /dev/null
+++ b/cpp/src/arrow/util/rle-encoding.h
@@ -0,0 +1,598 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Imported from Apache Impala (incubating) on 2016-01-29 and modified for use
+// in parquet-cpp, Arrow
+
+#ifndef ARROW_UTIL_RLE_ENCODING_H
+#define ARROW_UTIL_RLE_ENCODING_H
+
+#include <algorithm>
+#include <math.h>
+
+#include "arrow/util/bit-stream-utils.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/compiler-util.h"
+
+namespace arrow {
+
+/// Utility classes to do run length encoding (RLE) for fixed bit width values.  If runs
+/// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
+/// (literal encoding).
+/// For both types of runs, there is a byte-aligned indicator which encodes the length
+/// of the run and the type of the run.
+/// This encoding has the benefit that when there aren't any long enough runs, values
+/// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
+/// the run length are byte aligned. This allows for very efficient decoding
+/// implementations.
+/// The encoding is:
+///    encoded-block := run*
+///    run := literal-run | repeated-run
+///    literal-run := literal-indicator < literal bytes >
+///    repeated-run := repeated-indicator < repeated value. padded to byte boundary >
+///    literal-indicator := varint_encode( number_of_groups << 1 | 1)
+///    repeated-indicator := varint_encode( number_of_repetitions << 1 )
+//
+/// Each run is preceded by a varint. The varint's least significant bit is
+/// used to indicate whether the run is a literal run or a repeated run. The rest
+/// of the varint is used to determine the length of the run (eg how many times the
+/// value repeats).
+//
+/// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
+/// in groups of 8), so that no matter the bit-width of the value, the sequence will end
+/// on a byte boundary without padding.
+/// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
+/// the actual number of encoded ints. (This means that the total number of encoded values
+/// can not be determined from the encoded data, since the number of values in the last
+/// group may not be a multiple of 8). For the last group of literal runs, we pad
+/// the group to 8 with zeros. This allows for 8 at a time decoding on the read side
+/// without the need for additional checks.
+//
+/// There is a break-even point when it is more storage efficient to do run length
+/// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 bytes
+/// for both the repeated encoding or the literal encoding.  This value can always
+/// be computed based on the bit-width.
+/// TODO: think about how to use this for strings.  The bit packing isn't quite the same.
+//
+/// Examples with bit-width 1 (eg encoding booleans):
+/// ----------------------------------------
+/// 100 1s followed by 100 0s:
+/// <varint(100 << 1)> <1, padded to 1 byte>  <varint(100 << 1)> <0, padded to 1 byte>
+///  - (total 4 bytes)
+//
+/// alternating 1s and 0s (200 total):
+/// 200 ints = 25 groups of 8
+/// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
+/// (total 26 bytes, 1 byte overhead)
+//
+
+/// Decoder class for RLE encoded data.
+class RleDecoder {
+ public:
+  /// Create a decoder object. buffer/buffer_len is the decoded data.
+  /// bit_width is the width of each value (before encoding).
+  RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
+      : bit_reader_(buffer, buffer_len),
+        bit_width_(bit_width),
+        current_value_(0),
+        repeat_count_(0),
+        literal_count_(0) {
+    DCHECK_GE(bit_width_, 0);
+    DCHECK_LE(bit_width_, 64);
+  }
+
+  RleDecoder() : bit_width_(-1) {}
+
+  void Reset(const uint8_t* buffer, int buffer_len, int bit_width) {
+    DCHECK_GE(bit_width, 0);
+    DCHECK_LE(bit_width, 64);
+    bit_reader_.Reset(buffer, buffer_len);
+    bit_width_ = bit_width;
+    current_value_ = 0;
+    repeat_count_ = 0;
+    literal_count_ = 0;
+  }
+
+  /// Gets the next value.  Returns false if there are no more.
+  template <typename T>
+  bool Get(T* val);
+
+  /// Gets a batch of values.  Returns the number of decoded elements.
+  template <typename T>
+  int GetBatch(T* values, int batch_size);
+
+  /// Like GetBatch but the values are then decoded using the provided dictionary
+  template <typename T>
+  int GetBatchWithDict(const T* dictionary, T* values, int batch_size);
+
+  /// Like GetBatchWithDict but add spacing for null entries
+  template <typename T>
+  int GetBatchWithDictSpaced(const T* dictionary, T* values, int batch_size,
+      int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset);
+
+ protected:
+  BitReader bit_reader_;
+  /// Number of bits needed to encode the value. Must be between 0 and 64.
+  int bit_width_;
+  uint64_t current_value_;
+  uint32_t repeat_count_;
+  uint32_t literal_count_;
+
+ private:
+  /// Fills literal_count_ and repeat_count_ with next values. Returns false if there
+  /// are no more.
+  template <typename T>
+  bool NextCounts();
+};
+
+/// Class to incrementally build the rle data.   This class does not allocate any memory.
+/// The encoding has two modes: encoding repeated runs and literal runs.
+/// If the run is sufficiently short, it is more efficient to encode as a literal run.
+/// This class does so by buffering 8 values at a time.  If they are not all the same
+/// they are added to the literal run.  If they are the same, they are added to the
+/// repeated run.  When we switch modes, the previous run is flushed out.
+class RleEncoder {
+ public:
+  /// buffer/buffer_len: preallocated output buffer.
+  /// bit_width: max number of bits for value.
+  /// TODO: consider adding a min_repeated_run_length so the caller can control
+  /// when values should be encoded as repeated runs.  Currently this is derived
+  /// based on the bit_width, which can determine a storage optimal choice.
+  /// TODO: allow 0 bit_width (and have dict encoder use it)
+  RleEncoder(uint8_t* buffer, int buffer_len, int bit_width)
+      : bit_width_(bit_width), bit_writer_(buffer, buffer_len) {
+    DCHECK_GE(bit_width_, 0);
+    DCHECK_LE(bit_width_, 64);
+    max_run_byte_size_ = MinBufferSize(bit_width);
+    DCHECK_GE(buffer_len, max_run_byte_size_) << "Input buffer not big enough.";
+    Clear();
+  }
+
+  /// Returns the minimum buffer size needed to use the encoder for 'bit_width'
+  /// This is the maximum length of a single run for 'bit_width'.
+  /// It is not valid to pass a buffer less than this length.
+  static int MinBufferSize(int bit_width) {
+    /// 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values.
+    int max_literal_run_size =
+        1 + static_cast<int>(BitUtil::Ceil(MAX_VALUES_PER_LITERAL_RUN * bit_width, 8));
+    /// Up to MAX_VLQ_BYTE_LEN indicator and a single 'bit_width' value.
+    int max_repeated_run_size =
+        BitReader::MAX_VLQ_BYTE_LEN + static_cast<int>(BitUtil::Ceil(bit_width, 8));
+    return std::max(max_literal_run_size, max_repeated_run_size);
+  }
+
+  /// Returns the maximum byte size it could take to encode 'num_values'.
+  static int MaxBufferSize(int bit_width, int num_values) {
+    // For a bit_width > 1, the worst case is the repetition of "literal run of length 8
+    // and then a repeated run of length 8".
+    // 8 values per smallest run, 8 bits per byte
+    // int bytes_per_run = BitUtil::Ceil(bit_width * 8, 8);
+    int bytes_per_run = bit_width;
+    int num_runs = static_cast<int>(BitUtil::Ceil(num_values, 8));
+    int literal_max_size = num_runs + num_runs * bytes_per_run;
+
+    // In the very worst case scenario, the data is a concatenation of repeated
+    // runs of 8 values. Repeated run has a 1 byte varint followed by the
+    // bit-packed repeated value
+    int min_repeated_run_size = 1 + static_cast<int>(BitUtil::Ceil(bit_width, 8));
+    int repeated_max_size =
+        static_cast<int>(BitUtil::Ceil(num_values, 8)) * min_repeated_run_size;
+
+    return std::max(literal_max_size, repeated_max_size);
+  }
+
+  /// Encode value.  Returns true if the value fits in buffer, false otherwise.
+  /// This value must be representable with bit_width_ bits.
+  bool Put(uint64_t value);
+
+  /// Flushes any pending values to the underlying buffer.
+  /// Returns the total number of bytes written
+  int Flush();
+
+  /// Resets all the state in the encoder.
+  void Clear();
+
+  /// Returns pointer to underlying buffer
+  uint8_t* buffer() { return bit_writer_.buffer(); }
+  int32_t len() { return bit_writer_.bytes_written(); }
+
+ private:
+  /// Flushes any buffered values.  If this is part of a repeated run, this is largely
+  /// a no-op.
+  /// If it is part of a literal run, this will call FlushLiteralRun, which writes
+  /// out the buffered literal values.
+  /// If 'done' is true, the current run would be written even if it would normally
+  /// have been buffered more.  This should only be called at the end, when the
+  /// encoder has received all values even if it would normally continue to be
+  /// buffered.
+  void FlushBufferedValues(bool done);
+
+  /// Flushes literal values to the underlying buffer.  If update_indicator_byte,
+  /// then the current literal run is complete and the indicator byte is updated.
+  void FlushLiteralRun(bool update_indicator_byte);
+
+  /// Flushes a repeated run to the underlying buffer.
+  void FlushRepeatedRun();
+
+  /// Checks and sets buffer_full_. This must be called after flushing a run to
+  /// make sure there are enough bytes remaining to encode the next run.
+  void CheckBufferFull();
+
+  /// The maximum number of values in a single literal run
+  /// (number of groups encodable by a 1-byte indicator * 8)
+  static const int MAX_VALUES_PER_LITERAL_RUN = (1 << 6) * 8;
+
+  /// Number of bits needed to encode the value. Must be between 0 and 64.
+  const int bit_width_;
+
+  /// Underlying buffer.
+  BitWriter bit_writer_;
+
+  /// If true, the buffer is full and subsequent Put()'s will fail.
+  bool buffer_full_;
+
+  /// The maximum byte size a single run can take.
+  int max_run_byte_size_;
+
+  /// We need to buffer at most 8 values for literals.  This happens when the
+  /// bit_width is 1 (so 8 values fit in one byte).
+  /// TODO: generalize this to other bit widths
+  int64_t buffered_values_[8];
+
+  /// Number of values in buffered_values_
+  int num_buffered_values_;
+
+  /// The current (also last) value that was written and the count of how
+  /// many times in a row that value has been seen.  This is maintained even
+  /// if we are in a literal run.  If the repeat_count_ get high enough, we switch
+  /// to encoding repeated runs.
+  uint64_t current_value_;
+  int repeat_count_;
+
+  /// Number of literals in the current run.  This does not include the literals
+  /// that might be in buffered_values_.  Only after we've got a group big enough
+  /// can we decide if they should part of the literal_count_ or repeat_count_
+  int literal_count_;
+
+  /// Pointer to a byte in the underlying buffer that stores the indicator byte.
+  /// This is reserved as soon as we need a literal run but the value is written
+  /// when the literal run is complete.
+  uint8_t* literal_indicator_byte_;
+};
+
+template <typename T>
+inline bool RleDecoder::Get(T* val) {
+  return GetBatch(val, 1) == 1;
+}
+
+template <typename T>
+inline int RleDecoder::GetBatch(T* values, int batch_size) {
+  DCHECK_GE(bit_width_, 0);
+  int values_read = 0;
+
+  while (values_read < batch_size) {
+    if (repeat_count_ > 0) {
+      int repeat_batch =
+          std::min(batch_size - values_read, static_cast<int>(repeat_count_));
+      std::fill(values + values_read, values + values_read + repeat_batch,
+          static_cast<T>(current_value_));
+      repeat_count_ -= repeat_batch;
+      values_read += repeat_batch;
+    } else if (literal_count_ > 0) {
+      int literal_batch =
+          std::min(batch_size - values_read, static_cast<int>(literal_count_));
+      int actual_read =
+          bit_reader_.GetBatch(bit_width_, values + values_read, literal_batch);
+      DCHECK_EQ(actual_read, literal_batch);
+      literal_count_ -= literal_batch;
+      values_read += literal_batch;
+    } else {
+      if (!NextCounts<T>()) return values_read;
+    }
+  }
+
+  return values_read;
+}
+
+template <typename T>
+inline int RleDecoder::GetBatchWithDict(const T* dictionary, T* values, int batch_size) {
+  DCHECK_GE(bit_width_, 0);
+  int values_read = 0;
+
+  while (values_read < batch_size) {
+    if (repeat_count_ > 0) {
+      int repeat_batch =
+          std::min(batch_size - values_read, static_cast<int>(repeat_count_));
+      std::fill(values + values_read, values + values_read + repeat_batch,
+          dictionary[current_value_]);
+      repeat_count_ -= repeat_batch;
+      values_read += repeat_batch;
+    } else if (literal_count_ > 0) {
+      int literal_batch =
+          std::min(batch_size - values_read, static_cast<int>(literal_count_));
+
+      const int buffer_size = 1024;
+      int indices[buffer_size];
+      literal_batch = std::min(literal_batch, buffer_size);
+      int actual_read = bit_reader_.GetBatch(bit_width_, &indices[0], literal_batch);
+      DCHECK_EQ(actual_read, literal_batch);
+      for (int i = 0; i < literal_batch; ++i) {
+        values[values_read + i] = dictionary[indices[i]];
+      }
+      literal_count_ -= literal_batch;
+      values_read += literal_batch;
+    } else {
+      if (!NextCounts<T>()) return values_read;
+    }
+  }
+
+  return values_read;
+}
+
+template <typename T>
+inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary, T* values,
+    int batch_size, int null_count, const uint8_t* valid_bits,
+    int64_t valid_bits_offset) {
+  DCHECK_GE(bit_width_, 0);
+  int values_read = 0;
+  int remaining_nulls = null_count;
+  INIT_BITSET(valid_bits, static_cast<int>(valid_bits_offset));
+
+  while (values_read < batch_size) {
+    bool is_valid = (bitset_valid_bits & (1 << bit_offset_valid_bits)) != 0;
+    READ_NEXT_BITSET(valid_bits);
+
+    if (is_valid) {
+      if ((repeat_count_ == 0) && (literal_count_ == 0)) {
+        if (!NextCounts<T>()) return values_read;
+      }
+      if (repeat_count_ > 0) {
+        T value = dictionary[current_value_];
+        // The current index is already valid, we don't need to check that again
+        int repeat_batch = 1;
+        repeat_count_--;
+
+        while (repeat_count_ > 0 && (values_read + repeat_batch) < batch_size) {
+          if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+            repeat_count_--;
+          } else {
+            remaining_nulls--;
+          }
+          repeat_batch++;
+
+          READ_NEXT_BITSET(valid_bits);
+        }
+        std::fill(values + values_read, values + values_read + repeat_batch, value);
+        values_read += repeat_batch;
+      } else if (literal_count_ > 0) {
+        int literal_batch = std::min(
+            batch_size - values_read - remaining_nulls, static_cast<int>(literal_count_));
+
+        // Decode the literals
+        constexpr int kBufferSize = 1024;
+        int indices[kBufferSize];
+        literal_batch = std::min(literal_batch, kBufferSize);
+        int actual_read = bit_reader_.GetBatch(bit_width_, &indices[0], literal_batch);
+        DCHECK_EQ(actual_read, literal_batch);
+
+        int skipped = 0;
+        int literals_read = 1;
+        values[values_read] = dictionary[indices[0]];
+
+        // Read the first bitset to the end
+        while (literals_read < literal_batch) {
+          if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+            values[values_read + literals_read + skipped] =
+                dictionary[indices[literals_read]];
+            literals_read++;
+          } else {
+            skipped++;
+          }
+
+          READ_NEXT_BITSET(valid_bits);
+        }
+        literal_count_ -= literal_batch;
+        values_read += literal_batch + skipped;
+        remaining_nulls -= skipped;
+      }
+    } else {
+      values_read++;
+      remaining_nulls--;
+    }
+  }
+
+  return values_read;
+}
+
+template <typename T>
+bool RleDecoder::NextCounts() {
+  // Read the next run's indicator int, it could be a literal or repeated run.
+  // The int is encoded as a vlq-encoded value.
+  int32_t indicator_value = 0;
+  bool result = bit_reader_.GetVlqInt(&indicator_value);
+  if (!result) return false;
+
+  // lsb indicates if it is a literal run or repeated run
+  bool is_literal = indicator_value & 1;
+  if (is_literal) {
+    literal_count_ = (indicator_value >> 1) * 8;
+  } else {
+    repeat_count_ = indicator_value >> 1;
+    bool result =
+        bit_reader_.GetAligned<T>(static_cast<int>(BitUtil::Ceil(bit_width_, 8)),
+            reinterpret_cast<T*>(&current_value_));
+    DCHECK(result);
+  }
+  return true;
+}
+
+/// This function buffers input values 8 at a time.  After seeing all 8 values,
+/// it decides whether they should be encoded as a literal or repeated run.
+inline bool RleEncoder::Put(uint64_t value) {
+  DCHECK(bit_width_ == 64 || value < (1ULL << bit_width_));
+  if (UNLIKELY(buffer_full_)) return false;
+
+  if (LIKELY(current_value_ == value)) {
+    ++repeat_count_;
+    if (repeat_count_ > 8) {
+      // This is just a continuation of the current run, no need to buffer the
+      // values.
+      // Note that this is the fast path for long repeated runs.
+      return true;
+    }
+  } else {
+    if (repeat_count_ >= 8) {
+      // We had a run that was long enough but it has ended.  Flush the
+      // current repeated run.
+      DCHECK_EQ(literal_count_, 0);
+      FlushRepeatedRun();
+    }
+    repeat_count_ = 1;
+    current_value_ = value;
+  }
+
+  buffered_values_[num_buffered_values_] = value;
+  if (++num_buffered_values_ == 8) {
+    DCHECK_EQ(literal_count_ % 8, 0);
+    FlushBufferedValues(false);
+  }
+  return true;
+}
+
+inline void RleEncoder::FlushLiteralRun(bool update_indicator_byte) {
+  if (literal_indicator_byte_ == NULL) {
+    // The literal indicator byte has not been reserved yet, get one now.
+    literal_indicator_byte_ = bit_writer_.GetNextBytePtr();
+    DCHECK(literal_indicator_byte_ != NULL);
+  }
+
+  // Write all the buffered values as bit packed literals
+  for (int i = 0; i < num_buffered_values_; ++i) {
+    bool success = bit_writer_.PutValue(buffered_values_[i], bit_width_);
+    DCHECK(success) << "There is a bug in using CheckBufferFull()";
+  }
+  num_buffered_values_ = 0;
+
+  if (update_indicator_byte) {
+    // At this point we need to write the indicator byte for the literal run.
+    // We only reserve one byte, to allow for streaming writes of literal values.
+    // The logic makes sure we flush literal runs often enough to not overrun
+    // the 1 byte.
+    DCHECK_EQ(literal_count_ % 8, 0);
+    int num_groups = literal_count_ / 8;
+    int32_t indicator_value = (num_groups << 1) | 1;
+    DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
+    *literal_indicator_byte_ = static_cast<uint8_t>(indicator_value);
+    literal_indicator_byte_ = NULL;
+    literal_count_ = 0;
+    CheckBufferFull();
+  }
+}
+
+inline void RleEncoder::FlushRepeatedRun() {
+  DCHECK_GT(repeat_count_, 0);
+  bool result = true;
+  // The lsb of 0 indicates this is a repeated run
+  int32_t indicator_value = repeat_count_ << 1 | 0;
+  result &= bit_writer_.PutVlqInt(indicator_value);
+  result &= bit_writer_.PutAligned(
+      current_value_, static_cast<int>(BitUtil::Ceil(bit_width_, 8)));
+  DCHECK(result);
+  num_buffered_values_ = 0;
+  repeat_count_ = 0;
+  CheckBufferFull();
+}
+
+/// Flush the values that have been buffered.  At this point we decide whether
+/// we need to switch between the run types or continue the current one.
+inline void RleEncoder::FlushBufferedValues(bool done) {
+  if (repeat_count_ >= 8) {
+    // Clear the buffered values.  They are part of the repeated run now and we
+    // don't want to flush them out as literals.
+    num_buffered_values_ = 0;
+    if (literal_count_ != 0) {
+      // There was a current literal run.  All the values in it have been flushed
+      // but we still need to update the indicator byte.
+      DCHECK_EQ(literal_count_ % 8, 0);
+      DCHECK_EQ(repeat_count_, 8);
+      FlushLiteralRun(true);
+    }
+    DCHECK_EQ(literal_count_, 0);
+    return;
+  }
+
+  literal_count_ += num_buffered_values_;
+  DCHECK_EQ(literal_count_ % 8, 0);
+  int num_groups = literal_count_ / 8;
+  if (num_groups + 1 >= (1 << 6)) {
+    // We need to start a new literal run because the indicator byte we've reserved
+    // cannot store more values.
+    DCHECK(literal_indicator_byte_ != NULL);
+    FlushLiteralRun(true);
+  } else {
+    FlushLiteralRun(done);
+  }
+  repeat_count_ = 0;
+}
+
+inline int RleEncoder::Flush() {
+  if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
+    bool all_repeat = literal_count_ == 0 && (repeat_count_ == num_buffered_values_ ||
+                                                 num_buffered_values_ == 0);
+    // There is something pending, figure out if it's a repeated or literal run
+    if (repeat_count_ > 0 && all_repeat) {
+      FlushRepeatedRun();
+    } else {
+      DCHECK_EQ(literal_count_ % 8, 0);
+      // Buffer the last group of literals to 8 by padding with 0s.
+      for (; num_buffered_values_ != 0 && num_buffered_values_ < 8;
+           ++num_buffered_values_) {
+        buffered_values_[num_buffered_values_] = 0;
+      }
+      literal_count_ += num_buffered_values_;
+      FlushLiteralRun(true);
+      repeat_count_ = 0;
+    }
+  }
+  bit_writer_.Flush();
+  DCHECK_EQ(num_buffered_values_, 0);
+  DCHECK_EQ(literal_count_, 0);
+  DCHECK_EQ(repeat_count_, 0);
+
+  return bit_writer_.bytes_written();
+}
+
+inline void RleEncoder::CheckBufferFull() {
+  int bytes_written = bit_writer_.bytes_written();
+  if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) {
+    buffer_full_ = true;
+  }
+}
+
+inline void RleEncoder::Clear() {
+  buffer_full_ = false;
+  current_value_ = 0;
+  repeat_count_ = 0;
+  num_buffered_values_ = 0;
+  literal_count_ = 0;
+  literal_indicator_byte_ = NULL;
+  bit_writer_.Clear();
+}
+
+}  // namespace arrow
+
+#endif  // ARROW_UTIL_RLE_ENCODING_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/b0652287/cpp/src/arrow/util/sse-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/sse-util.h b/cpp/src/arrow/util/sse-util.h
new file mode 100644
index 0000000..570c405
--- /dev/null
+++ b/cpp/src/arrow/util/sse-util.h
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// From Apache Impala as of 2016-01-29. Pared down to a minimal set of
+// functions needed for parquet-cpp
+
+#ifndef ARROW_UTIL_SSE_UTIL_H
+#define ARROW_UTIL_SSE_UTIL_H
+
+#ifdef ARROW_USE_SSE
+#include <emmintrin.h>
+#endif
+
+namespace arrow {
+
+/// This class contains constants useful for text processing with SSE4.2 intrinsics.
+namespace SSEUtil {
+/// Number of characters that fit in 64/128 bit register.  SSE provides instructions
+/// for loading 64 or 128 bits into a register at a time.
+static const int CHARS_PER_64_BIT_REGISTER = 8;
+static const int CHARS_PER_128_BIT_REGISTER = 16;
+
+/// SSE4.2 adds instructions for text processing.  The instructions have a control
+/// byte that determines some of functionality of the instruction.  (Equivalent to
+/// GCC's _SIDD_CMP_EQUAL_ANY, etc).
+static const int PCMPSTR_EQUAL_ANY = 0x00;     // strchr
+static const int PCMPSTR_EQUAL_EACH = 0x08;    // strcmp
+static const int PCMPSTR_UBYTE_OPS = 0x00;     // unsigned char (8-bits, rather than 16)
+static const int PCMPSTR_NEG_POLARITY = 0x10;  // see Intel SDM chapter 4.1.4.
+
+/// In this mode, SSE text processing functions will return a mask of all the
+/// characters that matched.
+static const int STRCHR_MODE = PCMPSTR_EQUAL_ANY | PCMPSTR_UBYTE_OPS;
+
+/// In this mode, SSE text processing functions will return the number of
+/// bytes that match consecutively from the beginning.
+static const int STRCMP_MODE =
+    PCMPSTR_EQUAL_EACH | PCMPSTR_UBYTE_OPS | PCMPSTR_NEG_POLARITY;
+
+/// Precomputed mask values up to 16 bits.
+static const int SSE_BITMASK[CHARS_PER_128_BIT_REGISTER] = {
+    1 << 0, 1 << 1, 1 << 2, 1 << 3, 1 << 4, 1 << 5, 1 << 6, 1 << 7, 1 << 8, 1 << 9,
+    1 << 10, 1 << 11, 1 << 12, 1 << 13, 1 << 14, 1 << 15,
+};
+}  // namespace SSEUtil
+
+#ifdef ARROW_USE_SSE
+
+/// Define the SSE 4.2 intrinsics.  The caller must first verify at runtime (or codegen
+/// IR load time) that the processor supports SSE 4.2 before calling these.  These are
+/// defined outside the namespace because the IR w/ SSE 4.2 case needs to use macros.
+#ifndef IR_COMPILE
+/// When compiling to native code (i.e. not IR), we cannot use the -msse4.2 compiler
+/// flag.  Otherwise, the compiler will emit SSE 4.2 instructions outside of the runtime
+/// SSE 4.2 checks and Impala will crash on CPUs that don't support SSE 4.2
+/// (IMPALA-1399/1646).  The compiler intrinsics cannot be used without -msse4.2, so we
+/// define our own implementations of the intrinsics instead.
+
+/// The PCMPxSTRy instructions require that the control byte 'mode' be encoded as an
+/// immediate.  So, those need to be always inlined in order to always propagate the
+/// mode constant into the inline asm.
+#define SSE_ALWAYS_INLINE inline __attribute__((__always_inline__))
+
+template <int MODE>
+static inline __m128i SSE4_cmpestrm(__m128i str1, int len1, __m128i str2, int len2) {
+#ifdef __clang__
+  /// Use asm reg rather than Yz output constraint to workaround LLVM bug 13199 -
+  /// clang doesn't support Y-prefixed asm constraints.
+  register volatile __m128i result asm("xmm0");
+  __asm__ volatile("pcmpestrm %5, %2, %1"
+                   : "=x"(result)
+                   : "x"(str1), "xm"(str2), "a"(len1), "d"(len2), "i"(MODE)
+                   : "cc");
+#else
+  __m128i result;
+  __asm__ volatile("pcmpestrm %5, %2, %1"
+                   : "=Yz"(result)
+                   : "x"(str1), "xm"(str2), "a"(len1), "d"(len2), "i"(MODE)
+                   : "cc");
+#endif
+  return result;
+}
+
+template <int MODE>
+static inline int SSE4_cmpestri(__m128i str1, int len1, __m128i str2, int len2) {
+  int result;
+  __asm__("pcmpestri %5, %2, %1"
+          : "=c"(result)
+          : "x"(str1), "xm"(str2), "a"(len1), "d"(len2), "i"(MODE)
+          : "cc");
+  return result;
+}
+
+static inline uint32_t SSE4_crc32_u8(uint32_t crc, uint8_t v) {
+  __asm__("crc32b %1, %0" : "+r"(crc) : "rm"(v));
+  return crc;
+}
+
+static inline uint32_t SSE4_crc32_u16(uint32_t crc, uint16_t v) {
+  __asm__("crc32w %1, %0" : "+r"(crc) : "rm"(v));
+  return crc;
+}
+
+static inline uint32_t SSE4_crc32_u32(uint32_t crc, uint32_t v) {
+  __asm__("crc32l %1, %0" : "+r"(crc) : "rm"(v));
+  return crc;
+}
+
+static inline uint32_t SSE4_crc32_u64(uint32_t crc, uint64_t v) {
+  uint64_t result = crc;
+  __asm__("crc32q %1, %0" : "+r"(result) : "rm"(v));
+  return result;
+}
+
+static inline int64_t POPCNT_popcnt_u64(uint64_t a) {
+  int64_t result;
+  __asm__("popcntq %1, %0" : "=r"(result) : "mr"(a) : "cc");
+  return result;
+}
+
+#undef SSE_ALWAYS_INLINE
+
+#elif defined(__SSE4_2__)  // IR_COMPILE for SSE 4.2.
+/// When cross-compiling to IR, we cannot use inline asm because LLVM JIT does not
+/// support it.  However, the cross-compiled IR is compiled twice: with and without
+/// -msse4.2.  When -msse4.2 is enabled in the cross-compile, we can just use the
+/// compiler intrinsics.
+
+#include <smmintrin.h>
+
+template <int MODE>
+static inline __m128i SSE4_cmpestrm(__m128i str1, int len1, __m128i str2, int len2) {
+  return _mm_cmpestrm(str1, len1, str2, len2, MODE);
+}
+
+template <int MODE>
+static inline int SSE4_cmpestri(__m128i str1, int len1, __m128i str2, int len2) {
+  return _mm_cmpestri(str1, len1, str2, len2, MODE);
+}
+
+#define SSE4_crc32_u8 _mm_crc32_u8
+#define SSE4_crc32_u16 _mm_crc32_u16
+#define SSE4_crc32_u32 _mm_crc32_u32
+#define SSE4_crc32_u64 _mm_crc32_u64
+#define POPCNT_popcnt_u64 _mm_popcnt_u64
+
+#else  // IR_COMPILE without SSE 4.2.
+/// When cross-compiling to IR without SSE 4.2 support (i.e. no -msse4.2), we cannot use
+/// SSE 4.2 instructions.  Otherwise, the IR loading will fail on CPUs that don't
+/// support SSE 4.2.  However, because the caller isn't allowed to call these routines
+/// on CPUs that lack SSE 4.2 anyway, we can implement stubs for this case.
+
+template <int MODE>
+static inline __m128i SSE4_cmpestrm(__m128i str1, int len1, __m128i str2, int len2) {
+  DCHECK(false) << "CPU doesn't support SSE 4.2";
+  return (__m128i){0};  // NOLINT
+}
+
+template <int MODE>
+static inline int SSE4_cmpestri(__m128i str1, int len1, __m128i str2, int len2) {
+  DCHECK(false) << "CPU doesn't support SSE 4.2";
+  return 0;
+}
+
+static inline uint32_t SSE4_crc32_u8(uint32_t crc, uint8_t v) {
+  DCHECK(false) << "CPU doesn't support SSE 4.2";
+  return 0;
+}
+
+static inline uint32_t SSE4_crc32_u16(uint32_t crc, uint16_t v) {
+  DCHECK(false) << "CPU doesn't support SSE 4.2";
+  return 0;
+}
+
+static inline uint32_t SSE4_crc32_u32(uint32_t crc, uint32_t v) {
+  DCHECK(false) << "CPU doesn't support SSE 4.2";
+  return 0;
+}
+
+static inline uint32_t SSE4_crc32_u64(uint32_t crc, uint64_t v) {
+  DCHECK(false) << "CPU doesn't support SSE 4.2";
+  return 0;
+}
+
+static inline int64_t POPCNT_popcnt_u64(uint64_t a) {
+  DCHECK(false) << "CPU doesn't support SSE 4.2";
+  return 0;
+}
+
+#endif  // IR_COMPILE
+
+#else
+
+static inline uint32_t SSE4_crc32_u8(uint32_t crc, uint8_t v) {
+  DCHECK(false) << "SSE support is not enabled";
+  return 0;
+}
+
+static inline uint32_t SSE4_crc32_u16(uint32_t crc, uint16_t v) {
+  DCHECK(false) << "SSE support is not enabled";
+  return 0;
+}
+
+static inline uint32_t SSE4_crc32_u32(uint32_t crc, uint32_t v) {
+  DCHECK(false) << "SSE support is not enabled";
+  return 0;
+}
+
+static inline uint32_t SSE4_crc32_u64(uint32_t crc, uint64_t v) {
+  DCHECK(false) << "SSE support is not enabled";
+  return 0;
+}
+
+static inline int64_t POPCNT_popcnt_u64(uint64_t a) {
+  DCHECK(false) << "SSE support is not enabled";
+  return 0;
+}
+
+#endif  // ARROW_USE_SSE
+
+}  // namespace arrow
+
+#endif  //  ARROW_UTIL_SSE_UTIL_H


Mime
View raw message