avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1556373 - in /avro/trunk: CHANGES.txt lang/c++/CMakeLists.txt lang/c++/api/DataFile.hh lang/c++/impl/DataFile.cc lang/c++/test/DataFileTests.cc
Date Tue, 07 Jan 2014 21:53:54 GMT
Author: cutting
Date: Tue Jan  7 21:53:54 2014
New Revision: 1556373

URL: http://svn.apache.org/r1556373
Log:
AVRO-1414. C++: Add support for deflate-compressed data files.  Contributed by Daniel Russel.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/c++/CMakeLists.txt
    avro/trunk/lang/c++/api/DataFile.hh
    avro/trunk/lang/c++/impl/DataFile.cc
    avro/trunk/lang/c++/test/DataFileTests.cc

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1556373&r1=1556372&r2=1556373&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jan  7 21:53:54 2014
@@ -27,6 +27,9 @@ Trunk (not yet released)
     AVRO-1379. C: avro_file_writer_append_encoded() function.
     (Mark Teodoro via dcreager)
 
+    AVRO-1414. C++: Add support for deflate-compressed data files.
+    (Daniel Russel via cutting)
+
   OPTIMIZATIONS
 
     AVRO-1348. Java: Improve UTF-8 to String conversion performance in

Modified: avro/trunk/lang/c++/CMakeLists.txt
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/CMakeLists.txt?rev=1556373&r1=1556372&r2=1556373&view=diff
==============================================================================
--- avro/trunk/lang/c++/CMakeLists.txt (original)
+++ avro/trunk/lang/c++/CMakeLists.txt Tue Jan  7 21:53:54 2014
@@ -52,7 +52,7 @@ endif ()
 
 
 find_package (Boost 1.38 REQUIRED
-    COMPONENTS filesystem system program_options)
+    COMPONENTS filesystem system program_options iostreams)
 
 add_definitions (${Boost_LIB_DIAGNOSTIC_DEFINITIONS})
 

Modified: avro/trunk/lang/c++/api/DataFile.hh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/api/DataFile.hh?rev=1556373&r1=1556372&r2=1556373&view=diff
==============================================================================
--- avro/trunk/lang/c++/api/DataFile.hh (original)
+++ avro/trunk/lang/c++/api/DataFile.hh Tue Jan  7 21:53:54 2014
@@ -32,9 +32,17 @@
 
 #include "boost/array.hpp"
 #include "boost/utility.hpp"
+#include <boost/iostreams/filtering_stream.hpp>
+#include <boost/scoped_ptr.hpp>
 
 namespace avro {
 
+/** Specify type of compression to use when writing data files. */
+enum Codec {
+  NULL_CODEC,
+  DEFLATE_CODEC
+};
+
 /**
  * The sync value.
  */
@@ -50,6 +58,7 @@ class AVRO_DECL DataFileWriterBase : boo
     const ValidSchema schema_;
     const EncoderPtr encoderPtr_;
     const size_t syncInterval_;
+    Codec codec_;
 
     std::auto_ptr<OutputStream> stream_;
     std::auto_ptr<OutputStream> buffer_;
@@ -76,7 +85,7 @@ public:
      * Returns the current encoder for this writer.
      */
     Encoder& encoder() const { return *encoderPtr_; }
-    
+
     /**
      * Returns true if the buffer has sufficient data for a sync to be
      * inserted.
@@ -93,7 +102,7 @@ public:
      * Constructs a data file writer with the given sync interval and name.
      */
     DataFileWriterBase(const char* filename, const ValidSchema& schema,
-        size_t syncInterval);
+        size_t syncInterval, Codec codec = NULL_CODEC);
 
     ~DataFileWriterBase();
     /**
@@ -124,8 +133,8 @@ public:
      * Constructs a new data file.
      */
     DataFileWriter(const char* filename, const ValidSchema& schema,
-        size_t syncInterval = 64 * 1024) :
-        base_(new DataFileWriterBase(filename, schema, syncInterval)) { }
+        size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) :
+        base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) { }
 
     /**
      * Writes the given piece of data into the file.
@@ -162,6 +171,7 @@ class AVRO_DECL DataFileReaderBase : boo
     const DecoderPtr decoder_;
     int64_t objectCount_;
     bool eof_;
+    Codec codec_;
 
     ValidSchema readerSchema_;
     ValidSchema dataSchema_;
@@ -172,6 +182,10 @@ class AVRO_DECL DataFileReaderBase : boo
     Metadata metadata_;
     DataFileSync sync_;
 
+    // for compressed buffer
+    boost::scoped_ptr<boost::iostreams::filtering_istream> os_;
+    std::vector<char> compressed_;
+
     void readHeader();
 
     bool readDataBlock();
@@ -315,5 +329,3 @@ public:
 
 }   // namespace avro
 #endif
-
-

Modified: avro/trunk/lang/c++/impl/DataFile.cc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/impl/DataFile.cc?rev=1556373&r1=1556372&r2=1556373&view=diff
==============================================================================
--- avro/trunk/lang/c++/impl/DataFile.cc (original)
+++ avro/trunk/lang/c++/impl/DataFile.cc Tue Jan  7 21:53:54 2014
@@ -23,6 +23,9 @@
 #include <sstream>
 
 #include <boost/random/mersenne_twister.hpp>
+#include <boost/iostreams/device/file.hpp>
+#include <boost/iostreams/filter/gzip.hpp>
+#include <boost/iostreams/filter/zlib.hpp>
 
 namespace avro {
 using std::auto_ptr;
@@ -34,14 +37,25 @@ using std::string;
 
 using boost::array;
 
+namespace {
 const string AVRO_SCHEMA_KEY("avro.schema");
 const string AVRO_CODEC_KEY("avro.codec");
 const string AVRO_NULL_CODEC("null");
+const string AVRO_DEFLATE_CODEC("deflate");
 
 const size_t minSyncInterval = 32;
 const size_t maxSyncInterval = 1u << 30;
 const size_t defaultSyncInterval = 64 * 1024;
 
+boost::iostreams::zlib_params get_zlib_params() {
+  boost::iostreams::zlib_params ret;
+  ret.method = boost::iostreams::zlib::deflated;
+  ret.noheader = true;
+  return ret;
+}
+}
+
+
 static string toString(const ValidSchema& schema)
 {
     ostringstream oss;
@@ -50,9 +64,10 @@ static string toString(const ValidSchema
 }
 
 DataFileWriterBase::DataFileWriterBase(const char* filename,
-    const ValidSchema& schema, size_t syncInterval) :
+    const ValidSchema& schema, size_t syncInterval, Codec codec) :
     filename_(filename), schema_(schema), encoderPtr_(binaryEncoder()),
     syncInterval_(syncInterval),
+    codec_(codec),
     stream_(fileOutputStream(filename)),
     buffer_(memoryOutputStream()),
     sync_(makeSync()), objectCount_(0)
@@ -64,6 +79,13 @@ DataFileWriterBase::DataFileWriterBase(c
     }
     setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
 
+    if (codec_ == NULL_CODEC) {
+      setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
+    } else if (codec_ == DEFLATE_CODEC) {
+      setMetadata(AVRO_CODEC_KEY, AVRO_DEFLATE_CODEC);
+    } else {
+      throw Exception("Unknown codec codec");
+    }
     setMetadata(AVRO_SCHEMA_KEY, toString(schema));
 
     writeHeader();
@@ -89,12 +111,35 @@ void DataFileWriterBase::sync()
 
     encoderPtr_->init(*stream_);
     avro::encode(*encoderPtr_, objectCount_);
-    int64_t byteCount = buffer_->byteCount();
-    avro::encode(*encoderPtr_, byteCount);
-    encoderPtr_->flush();
-
-    auto_ptr<InputStream> in = memoryInputStream(*buffer_);
-    copy(*in, *stream_);
+    if (codec_ == NULL_CODEC) {
+        int64_t byteCount = buffer_->byteCount();
+        avro::encode(*encoderPtr_, byteCount);
+        encoderPtr_->flush();
+        std::auto_ptr<InputStream> in = memoryInputStream(*buffer_);
+        copy(*in, *stream_);
+    } else {
+        std::vector<char> buf;
+        {
+            boost::iostreams::filtering_ostream os;
+            if (codec_ == DEFLATE_CODEC) {
+                os.push(boost::iostreams::zlib_compressor(get_zlib_params()));
+            }
+            os.push(boost::iostreams::back_inserter(buf));
+            const uint8_t* data;
+            size_t len;
+
+            std::auto_ptr<InputStream> input = memoryInputStream(*buffer_);
+            while (input->next(&data, &len)) {
+                boost::iostreams::write(os, reinterpret_cast<const char*>(data), len);
+            }
+        } // make sure all is flushed
+        std::auto_ptr<InputStream> in = memoryInputStream(
+           reinterpret_cast<const uint8_t*>(&buf[0]), buf.size());
+        int64_t byteCount = buf.size();
+        avro::encode(*encoderPtr_, byteCount);
+        encoderPtr_->flush();
+        copy(*in, *stream_);
+    }
 
     encoderPtr_->init(*stream_);
     avro::encode(*encoderPtr_, sync_);
@@ -272,8 +317,30 @@ bool DataFileReaderBase::readDataBlock()
     decoder_->init(*stream_);
 
     auto_ptr<InputStream> st = boundedInputStream(*stream_, static_cast<size_t>(byteCount));
-    dataDecoder_->init(*st);
-    dataStream_ = st;
+    if (codec_ == NULL_CODEC) {
+        dataDecoder_->init(*st);
+        dataStream_ = st;
+    } else {
+        compressed_.clear();
+        const uint8_t* data;
+        size_t len;
+        while (st->next(&data, &len)) {
+            compressed_.insert(compressed_.end(), data, data + len);
+        }
+        // boost::iostreams::write(os, reinterpret_cast<const char*>(data), len);
+        os_.reset(new boost::iostreams::filtering_istream());
+        if (codec_ == DEFLATE_CODEC) {
+            os_->push(boost::iostreams::zlib_decompressor(get_zlib_params()));
+        } else {
+            throw Exception("Bad codec");
+        }
+        os_->push(boost::iostreams::basic_array_source<char>(
+            &compressed_[0], compressed_.size()));
+
+        std::auto_ptr<InputStream> in = istreamInputStream(*os_);
+        dataDecoder_->init(*in);
+        dataStream_ = in;
+    }
     return true;
 }
 
@@ -318,8 +385,13 @@ void DataFileReaderBase::readHeader()
     }
 
     it = metadata_.find(AVRO_CODEC_KEY);
-    if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) {
-        throw Exception("Unknown codec in data file: " + toString(it->second));
+    if (it != metadata_.end() && toString(it->second) == AVRO_DEFLATE_CODEC) {
+        codec_ = DEFLATE_CODEC;
+    } else {
+        codec_ = NULL_CODEC;
+        if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC)
{
+            throw Exception("Unknown codec in data file: " + toString(it->second));
+        }
     }
 
     avro::decode(*decoder_, sync_);

Modified: avro/trunk/lang/c++/test/DataFileTests.cc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/test/DataFileTests.cc?rev=1556373&r1=1556372&r2=1556373&view=diff
==============================================================================
--- avro/trunk/lang/c++/test/DataFileTests.cc (original)
+++ avro/trunk/lang/c++/test/DataFileTests.cc Tue Jan  7 21:53:54 2014
@@ -76,7 +76,7 @@ template <typename T> struct codec_trait
         avro::encode(e, c.re);
         avro::encode(e, c.im);
     }
-    
+
     static void decode(Decoder& d, Complex<T>& c) {
         avro::decode(d, c.re);
         avro::decode(d, c.im);
@@ -147,7 +147,7 @@ public:
     void testCleanup() {
         BOOST_CHECK(boost::filesystem::remove(filename));
     }
-    
+
     void testWrite() {
         avro::DataFileWriter<ComplexInteger> df(filename, writerSchema, 100);
         int64_t re = 3;
@@ -281,7 +281,7 @@ public:
         Pair p(writerSchema, GenericDatum());
         int64_t re = 3;
         int64_t im = 5;
-        
+
         const GenericDatum& ci = p.second;
         while (df.read(p)) {
             BOOST_REQUIRE_EQUAL(ci.type(), avro::AVRO_RECORD);
@@ -388,6 +388,37 @@ public:
         }
         BOOST_CHECK_EQUAL(i, count);
     }
+    /**
+     * Test writing DataFiles into other streams operations.
+     */
+    void testZip() {
+        const size_t number_of_objects = 100;
+        // first create a large file
+        ValidSchema dschema = avro::compileJsonSchemaFromString(sch);
+        {
+            avro::DataFileWriter<ComplexInteger> writer(
+              filename, dschema, 16 * 1024, avro::DEFLATE_CODEC);
+
+            for (size_t i = 0; i < number_of_objects; ++i) {
+                ComplexInteger d;
+                d.re = i;
+                d.im = 2 * i;
+                writer.write(d);
+            }
+        }
+        {
+            avro::DataFileReader<ComplexInteger> reader(filename, dschema);
+            std::vector<int> found;
+            ComplexInteger record;
+            while (reader.read(record)) {
+                found.push_back(record.re);
+            }
+            BOOST_CHECK_EQUAL(found.size(), number_of_objects);
+            for (unsigned int i = 0; i < found.size(); ++i) {
+                BOOST_CHECK_EQUAL(found[i], i);
+            }
+        }
+    }
 };
 
 void addReaderTests(test_suite* ts, const shared_ptr<DataFileTest>& t)
@@ -403,7 +434,7 @@ void addReaderTests(test_suite* ts, cons
 }
 
 test_suite*
-init_unit_test_suite( int argc, char* argv[] ) 
+init_unit_test_suite( int argc, char* argv[] )
 {
     test_suite* ts= BOOST_TEST_SUITE("DataFile tests");
     shared_ptr<DataFileTest> t1(new DataFileTest("test1.df", sch, isch));
@@ -430,5 +461,9 @@ init_unit_test_suite( int argc, char* ar
     ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteGenericByName, t5));
     addReaderTests(ts, t5);
 
+    shared_ptr<DataFileTest> t6(new DataFileTest("test6.df", dsch, dblsch));
+    ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testZip, t6));
+
+
     return ts;
 }



Mime
View raw message