flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject svn commit: r1352766 - in /incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src: main/java/org/apache/flume/sink/hdfs/ test/java/org/apache/flume/sink/hdfs/
Date Fri, 22 Jun 2012 03:27:56 GMT
Author: hshreedharan
Date: Fri Jun 22 03:27:55 2012
New Revision: 1352766

URL: http://svn.apache.org/viewvc?rev=1352766&view=rev
Log:
FLUME-1301. HDFSCompressedDataStream can lose data.

(Mike Percy via Hari Shreedharan)


Added:
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
  (with props)
Modified:
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
    incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java?rev=1352766&r1=1352765&r2=1352766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
(original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
Fri Jun 22 03:27:55 2012
@@ -20,7 +20,6 @@ package org.apache.flume.sink.hdfs;
 
 import java.io.IOException;
 import org.apache.flume.Context;
-
 import org.apache.flume.Event;
 import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
@@ -31,10 +30,17 @@ import org.apache.hadoop.io.SequenceFile
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HDFSCompressedDataStream implements HDFSWriter {
 
-  private CompressionOutputStream outStream;
+  private static final Logger logger =
+      LoggerFactory.getLogger(HDFSCompressedDataStream.class);
+
+  private FSDataOutputStream fsOut;
+  private CompressionOutputStream cmpOut;
+  private boolean isFinished = false;
 
   @Override
   public void configure(Context context) {
@@ -51,36 +57,49 @@ public class HDFSCompressedDataStream im
   @Override
   public void open(String filePath, CompressionCodec codec,
       CompressionType cType, FlumeFormatter fmt) throws IOException {
-
-    FSDataOutputStream fsOutStream;
     Configuration conf = new Configuration();
     Path dstPath = new Path(filePath);
     FileSystem hdfs = dstPath.getFileSystem(conf);
 
     if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
     (dstPath)) {
-      fsOutStream = hdfs.append(dstPath);
+      fsOut = hdfs.append(dstPath);
     } else {
-      fsOutStream = hdfs.create(dstPath);
+      fsOut = hdfs.create(dstPath);
     }
-    outStream = codec.createOutputStream(fsOutStream);
+    cmpOut = codec.createOutputStream(fsOut);
+    isFinished = false;
   }
 
   @Override
   public void append(Event e, FlumeFormatter fmt) throws IOException {
+    if (isFinished) {
+      cmpOut.resetState();
+      isFinished = false;
+    }
     byte[] bValue = fmt.getBytes(e);
-    outStream.write(bValue, 0, bValue.length);
+    cmpOut.write(bValue);
   }
 
   @Override
   public void sync() throws IOException {
-    outStream.flush();
+    // We must use finish() and resetState() here -- flush() is apparently not
+    // supported by the compressed output streams (it's a no-op).
+    // Also, since resetState() writes headers, avoid calling it without an
+    // additional write/append operation.
+    // Note: There are bugs in Hadoop & JDK w/ pure-java gzip; see HADOOP-8522.
+    if (!isFinished) {
+      cmpOut.finish();
+      isFinished = true;
+    }
+    fsOut.flush();
+    fsOut.sync();
   }
 
   @Override
   public void close() throws IOException {
-    outStream.flush();
-    outStream.close();
+    sync();
+    cmpOut.close();
   }
 
 }

Added: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java?rev=1352766&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
(added)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
Fri Jun 22 03:27:55 2012
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import com.google.common.base.Charsets;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHDFSCompressedDataStream {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(TestHDFSCompressedDataStream.class);
+
+  // make sure the data makes it to disk if we sync() the data stream
+  @Test
+  public void testGzipDurability() throws IOException {
+    File file = new File("target/test/data/foo.gz");
+    String fileURI = file.getAbsoluteFile().toURI().toString();
+    logger.info("File URI: {}", fileURI);
+
+    Configuration conf = new Configuration();
+    // local FS must be raw in order to be Syncable
+    conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
+    Path path = new Path(fileURI);
+    FileSystem fs = path.getFileSystem(conf); // get FS with our conf cached
+    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+
+    HDFSCompressedDataStream writer = new HDFSCompressedDataStream();
+    FlumeFormatter fmt = new HDFSTextFormatter();
+    writer.open(fileURI, factory.getCodec(new Path(fileURI)),
+        SequenceFile.CompressionType.BLOCK, fmt);
+    String body = "yarf!";
+    Event evt = EventBuilder.withBody(body, Charsets.UTF_8);
+    writer.append(evt, fmt);
+    writer.sync();
+
+    byte[] buf = new byte[256];
+    GZIPInputStream cmpIn = new GZIPInputStream(new FileInputStream(file));
+    int len = cmpIn.read(buf);
+    String result = new String(buf, 0, len, Charsets.UTF_8);
+    result = result.trim(); // HDFSTextFormatter adds a newline
+
+    Assert.assertEquals("input and output must match", body, result);
+  }
+
+}

Propchange: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java?rev=1352766&r1=1352765&r2=1352766&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
(original)
+++ incubator/flume/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
Fri Jun 22 03:27:55 2012
@@ -222,9 +222,20 @@ public class TestHDFSEventSinkOnMiniClus
       String line = reader.readLine();
       logger.info("First line in file {}: {}", filePath, line);
       Assert.assertEquals(EVENT_BODY_1, line);
-      line = reader.readLine();
-      logger.info("Second line in file {}: {}", filePath, line);
-      Assert.assertEquals(EVENT_BODY_2, line);
+
+      // The rest of this test is commented-out (will fail) for 2 reasons:
+      //
+      // (1) At the time of this writing, Hadoop has a bug which causes the
+      // non-native gzip implementation to create invalid gzip files when
+      // finish() and resetState() are called. See HADOOP-8522.
+      //
+      // (2) Even if HADOOP-8522 is fixed, the JDK GZipInputStream is unable
+      // to read multi-member (concatenated) gzip files. See this Sun bug:
+      // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4691425
+      //
+      //line = reader.readLine();
+      //logger.info("Second line in file {}: {}", filePath, line);
+      //Assert.assertEquals(EVENT_BODY_2, line);
     }
 
     if (!KEEP_DATA) {



Mime
View raw message