avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From th...@apache.org
Subject avro git commit: Added Snappy compression for the DataFile.
Date Tue, 21 Feb 2017 08:41:31 GMT
Repository: avro
Updated Branches:
  refs/heads/master 0e24e2ab4 -> 5ec94b9af


Added Snappy compression for the DataFile.

Also added additional unit test for the Snappy Compression to the DataFileTests.


Project: http://git-wip-us.apache.org/repos/asf/avro/repo
Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/5ec94b9a
Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/5ec94b9a
Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/5ec94b9a

Branch: refs/heads/master
Commit: 5ec94b9aff523cd0b847af823243f6fb11cf23c4
Parents: 0e24e2a
Author: J Langley <jlangley@cohesionforce.com>
Authored: Tue Oct 13 22:12:20 2015 -0500
Committer: Thiruvalluvan M G <thiru@startsmartlabs.com>
Committed: Tue Feb 21 14:09:52 2017 +0530

----------------------------------------------------------------------
 CHANGES.txt                    |   2 +
 lang/c++/CMakeLists.txt        |  20 +++++--
 lang/c++/FindSnappy.cmake      |  54 +++++++++++++++++
 lang/c++/api/DataFile.hh       |   9 ++-
 lang/c++/impl/DataFile.cc      | 116 ++++++++++++++++++++++++++++++++----
 lang/c++/test/DataFileTests.cc |  36 ++++++++++-
 6 files changed, 219 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/avro/blob/5ec94b9a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 93ea0d7..3204b05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -46,6 +46,8 @@ Trunk (not yet released)
 
     AVRO-1960: Add log4j properties for avro-tools
 
+    AVRO-1748. Add Snappy Compression to C++ DataFile (J. Langley via thiru)
+
   BUG FIXES
 
     AVRO-1741: Python3: Fix error when codec is not in the header.

http://git-wip-us.apache.org/repos/asf/avro/blob/5ec94b9a/lang/c++/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/lang/c++/CMakeLists.txt b/lang/c++/CMakeLists.txt
index cf18243..be39215 100644
--- a/lang/c++/CMakeLists.txt
+++ b/lang/c++/CMakeLists.txt
@@ -36,6 +36,7 @@ set (AVRO_VERSION_MAJOR ${AVRO_VERSION})
 set (AVRO_VERSION_MINOR "0")
 
 project (Avro-cpp)
+set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${CMAKE_SOURCE_DIR})
 
 if (WIN32 AND NOT CYGWIN AND NOT MSYS)
 add_definitions (/EHa)
@@ -61,6 +62,17 @@ endif ()
 find_package (Boost 1.38 REQUIRED
     COMPONENTS filesystem system program_options iostreams)
 
+find_package(Snappy)
+if (SNAPPY_FOUND)
+    set(SNAPPY_PKG libsnappy)
+    add_definitions(-DSNAPPY_CODEC_AVAILABLE)
+    message("Enabled snappy codec")
+else (SNAPPY_FOUND)
+    set(SNAPPY_PKG "")
+    set(SNAPPY_LIBRARIES "")
+    message("Disabled snappy codec. libsnappy not found.")
+endif (SNAPPY_FOUND)
+
 add_definitions (${Boost_LIB_DIAGNOSTIC_DEFINITIONS})
 
 include_directories (api ${CMAKE_CURRENT_BINARY_DIR} ${Boost_INCLUDE_DIRS})
@@ -98,11 +110,11 @@ set_target_properties (avrocpp PROPERTIES
 set_target_properties (avrocpp_s PROPERTIES
     VERSION ${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR})
 
-target_link_libraries (avrocpp ${Boost_LIBRARIES})
+target_link_libraries (avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES})
 
 add_executable (precompile test/precompile.cc)
 
-target_link_libraries (precompile avrocpp_s ${Boost_LIBRARIES})
+target_link_libraries (precompile avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES})
 
 macro (gen file ns)
     add_custom_command (OUTPUT ${file}.hh
@@ -131,13 +143,13 @@ gen (crossref cr)
 gen (primitivetypes pt)
 
 add_executable (avrogencpp impl/avrogencpp.cc)
-target_link_libraries (avrogencpp avrocpp_s ${Boost_LIBRARIES})
+target_link_libraries (avrogencpp avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES})
 
 enable_testing()
 
 macro (unittest name)
     add_executable (${name} test/${name}.cc)
-    target_link_libraries (${name} avrocpp ${Boost_LIBRARIES})
+    target_link_libraries (${name} avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES})
     add_test (NAME ${name} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
         COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${name})
 endmacro (unittest)

http://git-wip-us.apache.org/repos/asf/avro/blob/5ec94b9a/lang/c++/FindSnappy.cmake
----------------------------------------------------------------------
diff --git a/lang/c++/FindSnappy.cmake b/lang/c++/FindSnappy.cmake
new file mode 100644
index 0000000..e9053af
--- /dev/null
+++ b/lang/c++/FindSnappy.cmake
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Tries to find Snappy headers and libraries.
+#
+# Usage of this module as follows:
+#
+#  find_package(Snappy)
+#
+# Variables used by this module, they can change the default behaviour and need
+# to be set before calling find_package:
+#
+#  SNAPPY_ROOT_DIR  Set this variable to the root installation of
+#                    Snappy if the module has problems finding
+#                    the proper installation path.
+#
+# Variables defined by this module:
+#
+#  SNAPPY_FOUND              System has Snappy libs/headers
+#  SNAPPY_LIBRARIES          The Snappy libraries
+#  SNAPPY_INCLUDE_DIR        The location of Snappy headers
+
+find_path(SNAPPY_INCLUDE_DIR
+    NAMES snappy.h
+    HINTS ${SNAPPY_ROOT_DIR}/include)
+
+find_library(SNAPPY_LIBRARIES
+    NAMES snappy
+    HINTS ${SNAPPY_ROOT_DIR}/lib)
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(Snappy DEFAULT_MSG
+    SNAPPY_LIBRARIES
+    SNAPPY_INCLUDE_DIR)
+
+mark_as_advanced(
+    SNAPPY_ROOT_DIR
+    SNAPPY_LIBRARIES
+    SNAPPY_INCLUDE_DIR)

http://git-wip-us.apache.org/repos/asf/avro/blob/5ec94b9a/lang/c++/api/DataFile.hh
----------------------------------------------------------------------
diff --git a/lang/c++/api/DataFile.hh b/lang/c++/api/DataFile.hh
index 98779b6..bff3097 100644
--- a/lang/c++/api/DataFile.hh
+++ b/lang/c++/api/DataFile.hh
@@ -40,7 +40,12 @@ namespace avro {
 /** Specify type of compression to use when writing data files. */
 enum Codec {
   NULL_CODEC,
-  DEFLATE_CODEC
+  DEFLATE_CODEC,
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+  SNAPPY_CODEC
+#endif
+
 };
 
 /**
@@ -185,7 +190,7 @@ class AVRO_DECL DataFileReaderBase : boost::noncopyable {
     // for compressed buffer
     boost::scoped_ptr<boost::iostreams::filtering_istream> os_;
     std::vector<char> compressed_;
-
+    std::string uncompressed;
     void readHeader();
 
     bool readDataBlock();

http://git-wip-us.apache.org/repos/asf/avro/blob/5ec94b9a/lang/c++/impl/DataFile.cc
----------------------------------------------------------------------
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index 035dd27..ee8f62c 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -26,6 +26,11 @@
 #include <boost/iostreams/device/file.hpp>
 #include <boost/iostreams/filter/gzip.hpp>
 #include <boost/iostreams/filter/zlib.hpp>
+#include <boost/crc.hpp>  // for boost::crc_32_type
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+#include <snappy.h>
+#endif
 
 namespace avro {
 using std::auto_ptr;
@@ -43,6 +48,10 @@ const string AVRO_CODEC_KEY("avro.codec");
 const string AVRO_NULL_CODEC("null");
 const string AVRO_DEFLATE_CODEC("deflate");
 
+#ifdef SNAPPY_CODEC_AVAILABLE
+const string AVRO_SNAPPY_CODEC = "snappy";
+#endif
+
 const size_t minSyncInterval = 32;
 const size_t maxSyncInterval = 1u << 30;
 const size_t defaultSyncInterval = 64 * 1024;
@@ -83,8 +92,12 @@ DataFileWriterBase::DataFileWriterBase(const char* filename,
       setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
     } else if (codec_ == DEFLATE_CODEC) {
       setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC);
+#ifdef SNAPPY_CODEC_AVAILABLE
+    } else if (codec_ == SNAPPY_CODEC) {
+      setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC);
+#endif
     } else {
-      throw Exception("Unknown codec codec");
+      throw Exception(boost::format("Unknown codec: %1%") % codec);
     }
     setMetadata(AVRO_SCHEMA_KEY, toString(schema));
 
@@ -117,13 +130,11 @@ void DataFileWriterBase::sync()
         encoderPtr_->flush();
         std::auto_ptr<InputStream> in = memoryInputStream(*buffer_);
         copy(*in, *stream_);
-    } else {
+    } else if (codec_ == DEFLATE_CODEC) {
         std::vector<char> buf;
         {
             boost::iostreams::filtering_ostream os;
-            if (codec_ == DEFLATE_CODEC) {
-                os.push(boost::iostreams::zlib_compressor(get_zlib_params()));
-            }
+            os.push(boost::iostreams::zlib_compressor(get_zlib_params()));
             os.push(boost::iostreams::back_inserter(buf));
             const uint8_t* data;
             size_t len;
@@ -139,6 +150,49 @@ void DataFileWriterBase::sync()
         avro::encode(*encoderPtr_, byteCount);
         encoderPtr_->flush();
         copy(*in, *stream_);
+#ifdef SNAPPY_CODEC_AVAILABLE
+    } else if (codec_ == SNAPPY_CODEC) {
+        std::vector<char> temp;
+        std::string compressed;
+        boost::crc_32_type crc;
+        {
+            boost::iostreams::filtering_ostream os;
+            os.push(boost::iostreams::back_inserter(temp));
+            const uint8_t* data;
+            size_t len;
+
+            std::auto_ptr<InputStream> input = memoryInputStream(*buffer_);
+            while (input->next(&data, &len)) {
+                boost::iostreams::write(os, reinterpret_cast<const char*>(data),
+                        len);
+            }
+        } // make sure all is flushed
+
+        crc.process_bytes(reinterpret_cast<const char*>(&temp[0]), temp.size());
+        // For Snappy, add the CRC32 checksum
+        int32_t checksum = crc();
+
+        // Now compress
+        size_t compressed_size = snappy::Compress(
+                reinterpret_cast<const char*>(&temp[0]), temp.size(),
+                &compressed);
+        temp.clear();
+        {
+            boost::iostreams::filtering_ostream os;
+            os.push(boost::iostreams::back_inserter(temp));
+            boost::iostreams::write(os, compressed.c_str(), compressed_size);
+        }
+        temp.push_back((checksum >> 24) & 0xFF);
+        temp.push_back((checksum >> 16) & 0xFF);
+        temp.push_back((checksum >> 8) & 0xFF);
+        temp.push_back(checksum & 0xFF);
+        std::auto_ptr<InputStream> in = memoryInputStream(
+                reinterpret_cast<const uint8_t*>(&temp[0]), temp.size());
+        int64_t byteCount = temp.size();
+        avro::encode(*encoderPtr_, byteCount);
+        encoderPtr_->flush();
+        copy(*in, *stream_);
+#endif
     }
 
     encoderPtr_->init(*stream_);
@@ -320,7 +374,7 @@ bool DataFileReaderBase::readDataBlock()
     if (codec_ == NULL_CODEC) {
         dataDecoder_->init(*st);
         dataStream_ = st;
-    } else {
+    } else if (codec_ == DEFLATE_CODEC) {
         compressed_.clear();
         const uint8_t* data;
         size_t len;
@@ -329,17 +383,52 @@ bool DataFileReaderBase::readDataBlock()
         }
         // boost::iostreams::write(os, reinterpret_cast<const char*>(data), len);
         os_.reset(new boost::iostreams::filtering_istream());
-        if (codec_ == DEFLATE_CODEC) {
-            os_->push(boost::iostreams::zlib_decompressor(get_zlib_params()));
-        } else {
-            throw Exception("Bad codec");
-        }
+        os_->push(boost::iostreams::zlib_decompressor(get_zlib_params()));
         os_->push(boost::iostreams::basic_array_source<char>(
             &compressed_[0], compressed_.size()));
 
         std::auto_ptr<InputStream> in = istreamInputStream(*os_);
         dataDecoder_->init(*in);
         dataStream_ = in;
+#ifdef SNAPPY_CODEC_AVAILABLE
+    } else if (codec_ == SNAPPY_CODEC) {
+        boost::crc_32_type crc;
+        uint32_t checksum = 0;
+        compressed_.clear();
+        uncompressed.clear();
+        const uint8_t* data;
+        size_t len;
+        while (st->next(&data, &len)) {
+            compressed_.insert(compressed_.end(), data, data + len);
+        }
+        len = compressed_.size();
+        int b1 = compressed_[len - 4] & 0xFF;
+        int b2 = compressed_[len - 3] & 0xFF;
+        int b3 = compressed_[len - 2] & 0xFF;
+        int b4 = compressed_[len - 1] & 0xFF;
+
+        checksum = (b1 << 24) + (b2 << 16) + (b3 << 8) + (b4);
+        if (!snappy::Uncompress(reinterpret_cast<const char*>(&compressed_[0]),
+                len - 4, &uncompressed)) {
+            throw Exception(
+                    "Snappy Compression reported an error when decompressing");
+        }
+        crc.process_bytes(uncompressed.c_str(), uncompressed.size());
+        uint32_t c = crc();
+        if (checksum != c) {
+            throw Exception(boost::format("Checksum did not match for Snappy compression:
Expected: %1%, computed: %2%") % checksum % c);
+        }
+        os_.reset(new boost::iostreams::filtering_istream());
+        os_->push(
+                boost::iostreams::basic_array_source<char>(uncompressed.c_str(),
+                        uncompressed.size()));
+        std::auto_ptr<InputStream> in = istreamInputStream(*os_);
+
+        dataDecoder_->init(*in);
+        dataStream_ = in;
+#endif
+    } else {
+        throw Exception("Bad codec");
     }
     return true;
 }
@@ -387,6 +476,11 @@ void DataFileReaderBase::readHeader()
     it = metadata_.find(AVRO_CODEC_KEY);
     if (it != metadata_.end() && toString(it->second) == AVRO_DEFLATE_CODEC) {
         codec_ = DEFLATE_CODEC;
+#ifdef SNAPPY_CODEC_AVAILABLE
+    } else if (it != metadata_.end()
+            && toString(it->second) == AVRO_SNAPPY_CODEC) {
+        codec_ = SNAPPY_CODEC;
+#endif
     } else {
         codec_ = NULL_CODEC;
         if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC)
{

http://git-wip-us.apache.org/repos/asf/avro/blob/5ec94b9a/lang/c++/test/DataFileTests.cc
----------------------------------------------------------------------
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index 95e80b1..f1ce789 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -434,11 +434,42 @@ public:
         }
     }
 
+    void testSnappy() {
+        // Add enough objects to span multiple blocks
+        const size_t number_of_objects = 1000000;
+        // first create a large file
+        ValidSchema dschema = avro::compileJsonSchemaFromString(sch);
+        {
+            avro::DataFileWriter<ComplexInteger> writer(
+              filename, dschema, 16 * 1024, avro::SNAPPY_CODEC);
+
+            for (size_t i = 0; i < number_of_objects; ++i) {
+                ComplexInteger d;
+                d.re = i;
+                d.im = 2 * i;
+                writer.write(d);
+            }
+        }
+        {
+            avro::DataFileReader<ComplexInteger> reader(filename, dschema);
+            sleep(1);
+            std::vector<int64_t> found;
+            ComplexInteger record;
+            while (reader.read(record)) {
+                found.push_back(record.re);
+            }
+            BOOST_CHECK_EQUAL(found.size(), number_of_objects);
+            for (unsigned int i = 0; i < found.size(); ++i) {
+                BOOST_CHECK_EQUAL(found[i], i);
+            }
+        }
+    }
+
     void testSchemaReadWrite() {
     uint32_t a=42;
     {
             avro::DataFileWriter<uint32_t> df(filename, writerSchema);
-        df.write(a);    
+        df.write(a);
         }
 
         {
@@ -492,6 +523,9 @@ init_unit_test_suite( int argc, char* argv[] )
 
     shared_ptr<DataFileTest> t6(new DataFileTest("test6.df", dsch, dblsch));
     ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testZip, t6));
+    shared_ptr<DataFileTest> t8(new DataFileTest("test8.df", dsch, dblsch));
+    ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testSnappy, t8));
+
 
     shared_ptr<DataFileTest> t7(new DataFileTest("test7.df",fsch,fsch));
     ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testSchemaReadWrite,t7));


Mime
View raw message