flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1709: HDFS CompressedDataStream doesn't support serializer parameter
Date Fri, 30 Nov 2012 19:27:15 GMT
Updated Branches:
  refs/heads/flume-1.4 523b6b446 -> 5efc2d62f


FLUME-1709: HDFS CompressedDataStream doesn't support serializer parameter

(Cameron Gandevia via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5efc2d62
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5efc2d62
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5efc2d62

Branch: refs/heads/flume-1.4
Commit: 5efc2d62f49a9aea9a006b74bbb27cdea56219af
Parents: 523b6b4
Author: Brock Noland <brock@apache.org>
Authored: Fri Nov 30 13:26:31 2012 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Fri Nov 30 13:26:46 2012 -0600

----------------------------------------------------------------------
 .../flume/sink/hdfs/HDFSCompressedDataStream.java  |   38 +++++-
 .../sink/hdfs/TestHDFSCompressedDataStream.java    |  101 ++++++++++++--
 2 files changed, 120 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/5efc2d62/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index 80341ef..afcd9d6 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -21,6 +21,8 @@ package org.apache.flume.sink.hdfs;
 import java.io.IOException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.serialization.EventSerializer;
+import org.apache.flume.serialization.EventSerializerFactory;
 import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -42,9 +44,15 @@ public class HDFSCompressedDataStream implements HDFSWriter {
   private CompressionOutputStream cmpOut;
   private boolean isFinished = false;
 
+  private String serializerType;
+  private Context serializerContext;
+  private EventSerializer serializer;
+
   @Override
   public void configure(Context context) {
-    // no-op
+    serializerType = context.getString("serializer", "TEXT");
+    serializerContext = new Context(
+        context.getSubProperties(EventSerializer.CTX_PREFIX));
   }
 
   @Override
@@ -61,13 +69,28 @@ public class HDFSCompressedDataStream implements HDFSWriter {
     Path dstPath = new Path(filePath);
     FileSystem hdfs = dstPath.getFileSystem(conf);
 
+    boolean appending = false;
     if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
     (dstPath)) {
       fsOut = hdfs.append(dstPath);
+      appending = true;
     } else {
       fsOut = hdfs.create(dstPath);
     }
     cmpOut = codec.createOutputStream(fsOut);
+    serializer = EventSerializerFactory.getInstance(serializerType,
+        serializerContext, cmpOut);
+    if (appending && !serializer.supportsReopen()) {
+      cmpOut.close();
+      serializer = null;
+      throw new IOException("serializer (" + serializerType
+          + ") does not support append");
+    }
+    if (appending) {
+      serializer.afterReopen();
+    } else {
+      serializer.afterCreate();
+    }
     isFinished = false;
   }
 
@@ -77,8 +100,7 @@ public class HDFSCompressedDataStream implements HDFSWriter {
       cmpOut.resetState();
       isFinished = false;
     }
-    byte[] bValue = fmt.getBytes(e);
-    cmpOut.write(bValue);
+    serializer.write(e);
   }
 
   @Override
@@ -88,6 +110,7 @@ public class HDFSCompressedDataStream implements HDFSWriter {
     // Also, since resetState() writes headers, avoid calling it without an
     // additional write/append operation.
     // Note: There are bugs in Hadoop & JDK w/ pure-java gzip; see HADOOP-8522.
+    serializer.flush();
     if (!isFinished) {
       cmpOut.finish();
       isFinished = true;
@@ -98,7 +121,14 @@ public class HDFSCompressedDataStream implements HDFSWriter {
 
   @Override
   public void close() throws IOException {
-    sync();
+    serializer.flush();
+    serializer.beforeClose();
+    if (!isFinished) {
+      cmpOut.finish();
+      isFinished = true;
+    }
+    fsOut.flush();
+    fsOut.sync();
     cmpOut.close();
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/5efc2d62/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
index f537732..cfa946a 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
@@ -18,51 +18,74 @@
 
 package org.apache.flume.sink.hdfs;
 
-import com.google.common.base.Charsets;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.util.Arrays;
+import java.util.List;
 import java.util.zip.GZIPInputStream;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
 public class TestHDFSCompressedDataStream {
 
   private static final Logger logger =
       LoggerFactory.getLogger(TestHDFSCompressedDataStream.class);
 
-  // make sure the data makes it to disk if we sync() the data stream
-  @Test
-  public void testGzipDurability() throws IOException {
-    File file = new File("target/test/data/foo.gz");
-    String fileURI = file.getAbsoluteFile().toURI().toString();
+  private File file;
+  private String fileURI;
+  private CompressionCodecFactory factory;
+  private FlumeFormatter fmt;
+
+  @Before
+  public void init() throws Exception {
+    this.file = new File("target/test/data/foo.gz");
+    this.fileURI = file.getAbsoluteFile().toURI().toString();
     logger.info("File URI: {}", fileURI);
 
     Configuration conf = new Configuration();
     // local FS must be raw in order to be Syncable
     conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
     Path path = new Path(fileURI);
-    FileSystem fs = path.getFileSystem(conf); // get FS with our conf cached
-    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+    path.getFileSystem(conf); // get FS with our conf cached
 
+    this.factory = new CompressionCodecFactory(conf);
+    this.fmt = new HDFSTextFormatter();
+  }
+
+  // make sure the data makes it to disk if we sync() the data stream
+  @Test
+  public void testGzipDurability() throws Exception {
+    Context context = new Context();
     HDFSCompressedDataStream writer = new HDFSCompressedDataStream();
-    FlumeFormatter fmt = new HDFSTextFormatter();
+    writer.configure(context);
     writer.open(fileURI, factory.getCodec(new Path(fileURI)),
         SequenceFile.CompressionType.BLOCK, fmt);
-    String body = "yarf!";
-    Event evt = EventBuilder.withBody(body, Charsets.UTF_8);
-    writer.append(evt, fmt);
-    writer.sync();
+
+    String[] bodies = { "yarf!" };
+    writeBodies(writer, bodies);
 
     byte[] buf = new byte[256];
     GZIPInputStream cmpIn = new GZIPInputStream(new FileInputStream(file));
@@ -70,7 +93,55 @@ public class TestHDFSCompressedDataStream {
     String result = new String(buf, 0, len, Charsets.UTF_8);
     result = result.trim(); // HDFSTextFormatter adds a newline
 
-    Assert.assertEquals("input and output must match", body, result);
+    Assert.assertEquals("input and output must match", bodies[0], result);
+  }
+
+  @Test
+  public void testGzipDurabilityWithSerializer() throws Exception {
+    Context context = new Context();
+    context.put("serializer", "AVRO_EVENT");
+
+    HDFSCompressedDataStream writer = new HDFSCompressedDataStream();
+    writer.configure(context);
+
+    FlumeFormatter fmt = new HDFSTextFormatter();
+    writer.open(fileURI, factory.getCodec(new Path(fileURI)),
+        SequenceFile.CompressionType.BLOCK, fmt);
+
+    String[] bodies = { "yarf!", "yarfing!" };
+    writeBodies(writer, bodies);
+
+    int found = 0;
+    int expected = bodies.length;
+    List<String> expectedBodies = Lists.newArrayList(bodies);
+
+    GZIPInputStream cmpIn = new GZIPInputStream(new FileInputStream(file));
+    DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
+    DataFileStream<GenericRecord> avroStream =
+        new DataFileStream<GenericRecord>(cmpIn, reader);
+    GenericRecord record = new GenericData.Record(avroStream.getSchema());
+    while (avroStream.hasNext()) {
+      avroStream.next(record);
+      CharsetDecoder decoder = Charsets.UTF_8.newDecoder();
+      String bodyStr = decoder.decode((ByteBuffer) record.get("body"))
+          .toString();
+      expectedBodies.remove(bodyStr);
+      found++;
+    }
+    avroStream.close();
+    cmpIn.close();
+
+    Assert.assertTrue("Found = " + found + ", Expected = " + expected
+        + ", Left = " + expectedBodies.size() + " " + expectedBodies,
+        expectedBodies.size() == 0);
   }
 
+  private void writeBodies(HDFSCompressedDataStream writer, String... bodies)
+      throws Exception {
+    for (String body : bodies) {
+      Event evt = EventBuilder.withBody(body, Charsets.UTF_8);
+      writer.append(evt, fmt);
+    }
+    writer.sync();
+  }
 }


Mime
View raw message