flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject flume git commit: FLUME-2443: org.apache.hadoop.fs.FSDataOutputStream.sync() is deprecated in hadoop 2.4
Date Tue, 03 Feb 2015 21:33:45 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.6 555d8b88d -> d47ee46b0


FLUME-2443: org.apache.hadoop.fs.FSDataOutputStream.sync() is deprecated in hadoop 2.4

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d47ee46b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d47ee46b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d47ee46b

Branch: refs/heads/flume-1.6
Commit: d47ee46b0f76d61f15e153f159e19387da46cc85
Parents: 555d8b8
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Tue Feb 3 13:32:28 2015 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Tue Feb 3 13:33:33 2015 -0800

----------------------------------------------------------------------
 .../flume/sink/hdfs/AbstractHDFSWriter.java     | 51 ++++++++++++++++++++
 .../sink/hdfs/HDFSCompressedDataStream.java     |  4 +-
 .../apache/flume/sink/hdfs/HDFSDataStream.java  |  4 +-
 .../flume/sink/hdfs/HDFSSequenceFile.java       |  2 +-
 4 files changed, 56 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d47ee46b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
index 043ca6c..e367e12 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
@@ -19,6 +19,7 @@ package org.apache.flume.sink.hdfs;
 
 import com.google.common.base.Preconditions;
 import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -27,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -43,6 +45,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
   private Path destPath;
   private Method refGetNumCurrentReplicas = null;
   private Method refGetDefaultReplication = null;
+  private Method refHflushOrSync = null;
   private Integer configuredMinReplicas = null;
   private Integer numberOfCloseRetries = null;
   private long timeBetweenCloseRetries = Long.MAX_VALUE;
@@ -110,6 +113,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
     this.destPath = destPath;
     this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream);
     this.refGetDefaultReplication = reflectGetDefaultReplication(fs);
+    this.refHflushOrSync = reflectHflushOrSync(outputStream);
 
   }
 
@@ -225,4 +229,51 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
     }
     return m;
   }
+
+  private Method reflectHflushOrSync(FSDataOutputStream os) {
+    Method m = null;
+    if(os != null) {
+      Class<?> fsDataOutputStreamClass = os.getClass();
+      try {
+        m = fsDataOutputStreamClass.getMethod("hflush");
+      } catch (NoSuchMethodException ex) {
+        logger.debug("HFlush not found. Will use sync() instead");
+        try {
+          m = fsDataOutputStreamClass.getMethod("sync");
+        } catch (Exception ex1) {
+          String msg = "Neither hflush not sync were found. That seems to be " +
+            "a problem!";
+          logger.error(msg);
+          throw new FlumeException(msg, ex1);
+        }
+      }
+    }
+    return m;
+  }
+
+  /**
+   * If hflush is available in this version of HDFS, then this method calls
+   * hflush, else it calls sync.
+   * @param os - The stream to flush/sync
+   * @throws IOException
+   */
+  protected void hflushOrSync(FSDataOutputStream os) throws IOException {
+    try {
+      // At this point the refHflushOrSync cannot be null,
+      // since register method would have thrown if it was.
+      this.refHflushOrSync.invoke(os);
+    } catch (InvocationTargetException e) {
+      String msg = "Error while trying to hflushOrSync!";
+      logger.error(msg);
+      Throwable cause = e.getCause();
+      if(cause != null && cause instanceof IOException) {
+        throw (IOException)cause;
+      }
+      throw new FlumeException(msg, e);
+    } catch (Exception e) {
+      String msg = "Error while trying to hflushOrSync!";
+      logger.error(msg);
+      throw new FlumeException(msg, e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/d47ee46b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index dc93e4f..f128795 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -139,7 +139,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter {
       isFinished = true;
     }
     fsOut.flush();
-    fsOut.sync();
+    hflushOrSync(this.fsOut);
   }
 
   @Override
@@ -151,7 +151,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter {
       isFinished = true;
     }
     fsOut.flush();
-    fsOut.sync();
+    hflushOrSync(fsOut);
     cmpOut.close();
     if (compressor != null) {
       CodecPool.returnCompressor(compressor);

http://git-wip-us.apache.org/repos/asf/flume/blob/d47ee46b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
index 6fa12eb..7054bfc 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
@@ -128,7 +128,7 @@ public class HDFSDataStream extends AbstractHDFSWriter {
   public void sync() throws IOException {
     serializer.flush();
     outStream.flush();
-    outStream.sync();
+    hflushOrSync(outStream);
   }
 
   @Override
@@ -136,7 +136,7 @@ public class HDFSDataStream extends AbstractHDFSWriter {
     serializer.flush();
     serializer.beforeClose();
     outStream.flush();
-    outStream.sync();
+    hflushOrSync(outStream);
     outStream.close();
 
     unregisterCurrentStream();

http://git-wip-us.apache.org/repos/asf/flume/blob/d47ee46b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index 2608987..a261cce 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -110,7 +110,7 @@ public class HDFSSequenceFile extends AbstractHDFSWriter {
 
   @Override
   public void sync() throws IOException {
-    writer.syncFs();
+    hflushOrSync(outStream);
   }
 
   @Override


Mime
View raw message