Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 67EBC200C22 for ; Tue, 21 Feb 2017 09:41:33 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6655F160B68; Tue, 21 Feb 2017 08:41:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6401D160B3E for ; Tue, 21 Feb 2017 09:41:32 +0100 (CET) Received: (qmail 34619 invoked by uid 500); 21 Feb 2017 08:41:31 -0000 Mailing-List: contact commits-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@avro.apache.org Delivered-To: mailing list commits@avro.apache.org Received: (qmail 34610 invoked by uid 99); 21 Feb 2017 08:41:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Feb 2017 08:41:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 55376DFC1C; Tue, 21 Feb 2017 08:41:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: thiru@apache.org To: commits@avro.apache.org Message-Id: <6bd1a1635d124f7f9a861ff6fc2b02af@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: avro git commit: Added Snappy compression for the DataFile. Date: Tue, 21 Feb 2017 08:41:31 +0000 (UTC) archived-at: Tue, 21 Feb 2017 08:41:33 -0000 Repository: avro Updated Branches: refs/heads/master 0e24e2ab4 -> 5ec94b9af Added Snappy compression for the DataFile. Also added additional unit test for the Snappy Compression to the DataFileTests. Project: http://git-wip-us.apache.org/repos/asf/avro/repo Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/5ec94b9a Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/5ec94b9a Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/5ec94b9a Branch: refs/heads/master Commit: 5ec94b9aff523cd0b847af823243f6fb11cf23c4 Parents: 0e24e2a Author: J Langley Authored: Tue Oct 13 22:12:20 2015 -0500 Committer: Thiruvalluvan M G Committed: Tue Feb 21 14:09:52 2017 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + lang/c++/CMakeLists.txt | 20 +++++-- lang/c++/FindSnappy.cmake | 54 +++++++++++++++++ lang/c++/api/DataFile.hh | 9 ++- lang/c++/impl/DataFile.cc | 116 ++++++++++++++++++++++++++++++++---- lang/c++/test/DataFileTests.cc | 36 ++++++++++- 6 files changed, 219 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/avro/blob/5ec94b9a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 93ea0d7..3204b05 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -46,6 +46,8 @@ Trunk (not yet released) AVRO-1960: Add log4j properties for avro-tools + AVRO-1748. Add Snappy Compression to C++ DataFile (J. Langley via thiru) + BUG FIXES AVRO-1741: Python3: Fix error when codec is not in the header. http://git-wip-us.apache.org/repos/asf/avro/blob/5ec94b9a/lang/c++/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/lang/c++/CMakeLists.txt b/lang/c++/CMakeLists.txt index cf18243..be39215 100644 --- a/lang/c++/CMakeLists.txt +++ b/lang/c++/CMakeLists.txt @@ -36,6 +36,7 @@ set (AVRO_VERSION_MAJOR ${AVRO_VERSION}) set (AVRO_VERSION_MINOR "0") project (Avro-cpp) +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${CMAKE_SOURCE_DIR}) if (WIN32 AND NOT CYGWIN AND NOT MSYS) add_definitions (/EHa) @@ -61,6 +62,17 @@ endif () find_package (Boost 1.38 REQUIRED COMPONENTS filesystem system program_options iostreams) +find_package(Snappy) +if (SNAPPY_FOUND) + set(SNAPPY_PKG libsnappy) + add_definitions(-DSNAPPY_CODEC_AVAILABLE) + message("Enabled snappy codec") +else (SNAPPY_FOUND) + set(SNAPPY_PKG "") + set(SNAPPY_LIBRARIES "") + message("Disabled snappy codec. libsnappy not found.") +endif (SNAPPY_FOUND) + add_definitions (${Boost_LIB_DIAGNOSTIC_DEFINITIONS}) include_directories (api ${CMAKE_CURRENT_BINARY_DIR} ${Boost_INCLUDE_DIRS}) @@ -98,11 +110,11 @@ set_target_properties (avrocpp PROPERTIES set_target_properties (avrocpp_s PROPERTIES VERSION ${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR}) -target_link_libraries (avrocpp ${Boost_LIBRARIES}) +target_link_libraries (avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES}) add_executable (precompile test/precompile.cc) -target_link_libraries (precompile avrocpp_s ${Boost_LIBRARIES}) +target_link_libraries (precompile avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES}) macro (gen file ns) add_custom_command (OUTPUT ${file}.hh @@ -131,13 +143,13 @@ gen (crossref cr) gen (primitivetypes pt) add_executable (avrogencpp impl/avrogencpp.cc) -target_link_libraries (avrogencpp avrocpp_s ${Boost_LIBRARIES}) +target_link_libraries (avrogencpp avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES}) enable_testing() macro (unittest name) add_executable (${name} test/${name}.cc) - target_link_libraries (${name} avrocpp ${Boost_LIBRARIES}) + target_link_libraries (${name} avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES}) add_test (NAME ${name} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${name}) endmacro (unittest) http://git-wip-us.apache.org/repos/asf/avro/blob/5ec94b9a/lang/c++/FindSnappy.cmake ---------------------------------------------------------------------- diff --git a/lang/c++/FindSnappy.cmake b/lang/c++/FindSnappy.cmake new file mode 100644 index 0000000..e9053af --- /dev/null +++ b/lang/c++/FindSnappy.cmake @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Tries to find Snappy headers and libraries. +# +# Usage of this module as follows: +# +# find_package(Snappy) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# SNAPPY_ROOT_DIR Set this variable to the root installation of +# Snappy if the module has problems finding +# the proper installation path. +# +# Variables defined by this module: +# +# SNAPPY_FOUND System has Snappy libs/headers +# SNAPPY_LIBRARIES The Snappy libraries +# SNAPPY_INCLUDE_DIR The location of Snappy headers + +find_path(SNAPPY_INCLUDE_DIR + NAMES snappy.h + HINTS ${SNAPPY_ROOT_DIR}/include) + +find_library(SNAPPY_LIBRARIES + NAMES snappy + HINTS ${SNAPPY_ROOT_DIR}/lib) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(Snappy DEFAULT_MSG + SNAPPY_LIBRARIES + SNAPPY_INCLUDE_DIR) + +mark_as_advanced( + SNAPPY_ROOT_DIR + SNAPPY_LIBRARIES + SNAPPY_INCLUDE_DIR) http://git-wip-us.apache.org/repos/asf/avro/blob/5ec94b9a/lang/c++/api/DataFile.hh ---------------------------------------------------------------------- diff --git a/lang/c++/api/DataFile.hh b/lang/c++/api/DataFile.hh index 98779b6..bff3097 100644 --- a/lang/c++/api/DataFile.hh +++ b/lang/c++/api/DataFile.hh @@ -40,7 +40,12 @@ namespace avro { /** Specify type of compression to use when writing data files. */ enum Codec { NULL_CODEC, - DEFLATE_CODEC + DEFLATE_CODEC, + +#ifdef SNAPPY_CODEC_AVAILABLE + SNAPPY_CODEC +#endif + }; /** @@ -185,7 +190,7 @@ class AVRO_DECL DataFileReaderBase : boost::noncopyable { // for compressed buffer boost::scoped_ptr os_; std::vector compressed_; - + std::string uncompressed; void readHeader(); bool readDataBlock(); http://git-wip-us.apache.org/repos/asf/avro/blob/5ec94b9a/lang/c++/impl/DataFile.cc ---------------------------------------------------------------------- diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc index 035dd27..ee8f62c 100644 --- a/lang/c++/impl/DataFile.cc +++ b/lang/c++/impl/DataFile.cc @@ -26,6 +26,11 @@ #include #include #include +#include // for boost::crc_32_type + +#ifdef SNAPPY_CODEC_AVAILABLE +#include +#endif namespace avro { using std::auto_ptr; @@ -43,6 +48,10 @@ const string AVRO_CODEC_KEY("avro.codec"); const string AVRO_NULL_CODEC("null"); const string AVRO_DEFLATE_CODEC("deflate"); +#ifdef SNAPPY_CODEC_AVAILABLE +const string AVRO_SNAPPY_CODEC = "snappy"; +#endif + const size_t minSyncInterval = 32; const size_t maxSyncInterval = 1u << 30; const size_t defaultSyncInterval = 64 * 1024; @@ -83,8 +92,12 @@ DataFileWriterBase::DataFileWriterBase(const char* filename, setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC); } else if (codec_ == DEFLATE_CODEC) { setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC); +#ifdef SNAPPY_CODEC_AVAILABLE + } else if (codec_ == SNAPPY_CODEC) { + setMetadata(AVRO_CODEC_KEY, AVRO_SNAPPY_CODEC); +#endif } else { - throw Exception("Unknown codec codec"); + throw Exception(boost::format("Unknown codec: %1%") % codec); } setMetadata(AVRO_SCHEMA_KEY, toString(schema)); @@ -117,13 +130,11 @@ void DataFileWriterBase::sync() encoderPtr_->flush(); std::auto_ptr in = memoryInputStream(*buffer_); copy(*in, *stream_); - } else { + } else if (codec_ == DEFLATE_CODEC) { std::vector buf; { boost::iostreams::filtering_ostream os; - if (codec_ == DEFLATE_CODEC) { - os.push(boost::iostreams::zlib_compressor(get_zlib_params())); - } + os.push(boost::iostreams::zlib_compressor(get_zlib_params())); os.push(boost::iostreams::back_inserter(buf)); const uint8_t* data; size_t len; @@ -139,6 +150,49 @@ void DataFileWriterBase::sync() avro::encode(*encoderPtr_, byteCount); encoderPtr_->flush(); copy(*in, *stream_); +#ifdef SNAPPY_CODEC_AVAILABLE + } else if (codec_ == SNAPPY_CODEC) { + std::vector temp; + std::string compressed; + boost::crc_32_type crc; + { + boost::iostreams::filtering_ostream os; + os.push(boost::iostreams::back_inserter(temp)); + const uint8_t* data; + size_t len; + + std::auto_ptr input = memoryInputStream(*buffer_); + while (input->next(&data, &len)) { + boost::iostreams::write(os, reinterpret_cast(data), + len); + } + } // make sure all is flushed + + crc.process_bytes(reinterpret_cast(&temp[0]), temp.size()); + // For Snappy, add the CRC32 checksum + int32_t checksum = crc(); + + // Now compress + size_t compressed_size = snappy::Compress( + reinterpret_cast(&temp[0]), temp.size(), + &compressed); + temp.clear(); + { + boost::iostreams::filtering_ostream os; + os.push(boost::iostreams::back_inserter(temp)); + boost::iostreams::write(os, compressed.c_str(), compressed_size); + } + temp.push_back((checksum >> 24) & 0xFF); + temp.push_back((checksum >> 16) & 0xFF); + temp.push_back((checksum >> 8) & 0xFF); + temp.push_back(checksum & 0xFF); + std::auto_ptr in = memoryInputStream( + reinterpret_cast(&temp[0]), temp.size()); + int64_t byteCount = temp.size(); + avro::encode(*encoderPtr_, byteCount); + encoderPtr_->flush(); + copy(*in, *stream_); +#endif } encoderPtr_->init(*stream_); @@ -320,7 +374,7 @@ bool DataFileReaderBase::readDataBlock() if (codec_ == NULL_CODEC) { dataDecoder_->init(*st); dataStream_ = st; - } else { + } else if (codec_ == DEFLATE_CODEC) { compressed_.clear(); const uint8_t* data; size_t len; @@ -329,17 +383,52 @@ bool DataFileReaderBase::readDataBlock() } // boost::iostreams::write(os, reinterpret_cast(data), len); os_.reset(new boost::iostreams::filtering_istream()); - if (codec_ == DEFLATE_CODEC) { - os_->push(boost::iostreams::zlib_decompressor(get_zlib_params())); - } else { - throw Exception("Bad codec"); - } + os_->push(boost::iostreams::zlib_decompressor(get_zlib_params())); os_->push(boost::iostreams::basic_array_source( &compressed_[0], compressed_.size())); std::auto_ptr in = istreamInputStream(*os_); dataDecoder_->init(*in); dataStream_ = in; +#ifdef SNAPPY_CODEC_AVAILABLE + } else if (codec_ == SNAPPY_CODEC) { + boost::crc_32_type crc; + uint32_t checksum = 0; + compressed_.clear(); + uncompressed.clear(); + const uint8_t* data; + size_t len; + while (st->next(&data, &len)) { + compressed_.insert(compressed_.end(), data, data + len); + } + len = compressed_.size(); + int b1 = compressed_[len - 4] & 0xFF; + int b2 = compressed_[len - 3] & 0xFF; + int b3 = compressed_[len - 2] & 0xFF; + int b4 = compressed_[len - 1] & 0xFF; + + checksum = (b1 << 24) + (b2 << 16) + (b3 << 8) + (b4); + if (!snappy::Uncompress(reinterpret_cast(&compressed_[0]), + len - 4, &uncompressed)) { + throw Exception( + "Snappy Compression reported an error when decompressing"); + } + crc.process_bytes(uncompressed.c_str(), uncompressed.size()); + uint32_t c = crc(); + if (checksum != c) { + throw Exception(boost::format("Checksum did not match for Snappy compression: Expected: %1%, computed: %2%") % checksum % c); + } + os_.reset(new boost::iostreams::filtering_istream()); + os_->push( + boost::iostreams::basic_array_source(uncompressed.c_str(), + uncompressed.size())); + std::auto_ptr in = istreamInputStream(*os_); + + dataDecoder_->init(*in); + dataStream_ = in; +#endif + } else { + throw Exception("Bad codec"); } return true; } @@ -387,6 +476,11 @@ void DataFileReaderBase::readHeader() it = metadata_.find(AVRO_CODEC_KEY); if (it != metadata_.end() && toString(it->second) == AVRO_DEFLATE_CODEC) { codec_ = DEFLATE_CODEC; +#ifdef SNAPPY_CODEC_AVAILABLE + } else if (it != metadata_.end() + && toString(it->second) == AVRO_SNAPPY_CODEC) { + codec_ = SNAPPY_CODEC; +#endif } else { codec_ = NULL_CODEC; if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) { http://git-wip-us.apache.org/repos/asf/avro/blob/5ec94b9a/lang/c++/test/DataFileTests.cc ---------------------------------------------------------------------- diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc index 95e80b1..f1ce789 100644 --- a/lang/c++/test/DataFileTests.cc +++ b/lang/c++/test/DataFileTests.cc @@ -434,11 +434,42 @@ public: } } + void testSnappy() { + // Add enough objects to span multiple blocks + const size_t number_of_objects = 1000000; + // first create a large file + ValidSchema dschema = avro::compileJsonSchemaFromString(sch); + { + avro::DataFileWriter writer( + filename, dschema, 16 * 1024, avro::SNAPPY_CODEC); + + for (size_t i = 0; i < number_of_objects; ++i) { + ComplexInteger d; + d.re = i; + d.im = 2 * i; + writer.write(d); + } + } + { + avro::DataFileReader reader(filename, dschema); + sleep(1); + std::vector found; + ComplexInteger record; + while (reader.read(record)) { + found.push_back(record.re); + } + BOOST_CHECK_EQUAL(found.size(), number_of_objects); + for (unsigned int i = 0; i < found.size(); ++i) { + BOOST_CHECK_EQUAL(found[i], i); + } + } + } + void testSchemaReadWrite() { uint32_t a=42; { avro::DataFileWriter df(filename, writerSchema); - df.write(a); + df.write(a); } { @@ -492,6 +523,9 @@ init_unit_test_suite( int argc, char* argv[] ) shared_ptr t6(new DataFileTest("test6.df", dsch, dblsch)); ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testZip, t6)); + shared_ptr t8(new DataFileTest("test8.df", dsch, dblsch)); + ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testSnappy, t8)); + shared_ptr t7(new DataFileTest("test7.df",fsch,fsch)); ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testSchemaReadWrite,t7));