flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2027. Check for default replication fails on federated cluster in hdfs sink
Date Tue, 07 May 2013 03:47:29 GMT
Updated Branches:
  refs/heads/flume-1.4 543a5a196 -> 5f03e1d51


FLUME-2027. Check for default replication fails on federated cluster in hdfs sink

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5f03e1d5
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5f03e1d5
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5f03e1d5

Branch: refs/heads/flume-1.4
Commit: 5f03e1d513bb3a041d7cd2be02816525db55b1d9
Parents: 543a5a1
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Mon May 6 20:44:57 2013 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Mon May 6 20:47:21 2013 -0700

----------------------------------------------------------------------
 .../apache/flume/sink/hdfs/AbstractHDFSWriter.java |   57 ++++++++++++++-
 .../flume/sink/hdfs/HDFSCompressedDataStream.java  |    2 +-
 .../org/apache/flume/sink/hdfs/HDFSDataStream.java |    2 +-
 .../apache/flume/sink/hdfs/HDFSSequenceFile.java   |    2 +-
 4 files changed, 56 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/5f03e1d5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
index ff4f223..bc3b383 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java
@@ -23,6 +23,7 @@ import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +40,9 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
 
   private FSDataOutputStream outputStream;
   private FileSystem fs;
+  private Path destPath;
   private Method refGetNumCurrentReplicas = null;
+  private Method refGetDefaultReplication = null;
   private Integer configuredMinReplicas = null;
 
   final static Object [] NO_ARGS = new Object []{};
@@ -84,26 +87,43 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
   }
 
   protected void registerCurrentStream(FSDataOutputStream outputStream,
-                                      FileSystem fs) {
+                                      FileSystem fs, Path destPath) {
     Preconditions.checkNotNull(outputStream, "outputStream must not be null");
     Preconditions.checkNotNull(fs, "fs must not be null");
+    Preconditions.checkNotNull(destPath, "destPath must not be null");
 
     this.outputStream = outputStream;
     this.fs = fs;
+    this.destPath = destPath;
     this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream);
+    this.refGetDefaultReplication = reflectGetDefaultReplication(fs);
   }
 
   protected void unregisterCurrentStream() {
     this.outputStream = null;
     this.fs = null;
+    this.destPath = null;
     this.refGetNumCurrentReplicas = null;
+    this.refGetDefaultReplication = null;
   }
 
   public int getFsDesiredReplication() {
-    if (fs != null) {
-      return fs.getDefaultReplication();
+    short replication = 0;
+    if (fs != null && destPath != null) {
+      if (refGetDefaultReplication != null) {
+        try {
+          replication = (Short) refGetDefaultReplication.invoke(fs, destPath);
+        } catch (IllegalAccessException e) {
+          logger.warn("Unexpected error calling getDefaultReplication(Path)", e);
+        } catch (InvocationTargetException e) {
+          logger.warn("Unexpected error calling getDefaultReplication(Path)", e);
+        }
+      } else {
+        // will not work on Federated HDFS (see HADOOP-8014)
+        replication = fs.getDefaultReplication();
+      }
     }
-    return 0;
+    return replication;
   }
 
   /**
@@ -163,4 +183,33 @@ public abstract class AbstractHDFSWriter implements HDFSWriter {
     return m;
   }
 
+  /**
+   * Find the 'getDefaultReplication' method on the passed <code>fs</code>
+   * FileSystem that takes a Path argument.
+   * @return Method or null.
+   */
+  private Method reflectGetDefaultReplication(FileSystem fileSystem) {
+    Method m = null;
+    if (fileSystem != null) {
+      Class<?> fsClass = fileSystem.getClass();
+      try {
+        m = fsClass.getMethod("getDefaultReplication",
+            new Class<?>[] { Path.class });
+      } catch (NoSuchMethodException e) {
+        logger.debug("FileSystem implementation doesn't support"
+            + " getDefaultReplication(Path); -- HADOOP-8014 not available; " +
+            "className = " + fsClass.getName() + "; err = " + e);
+      } catch (SecurityException e) {
+        logger.debug("No access to getDefaultReplication(Path) on "
+            + "FileSystem implementation -- HADOOP-8014 not available; " +
+            "className = " + fsClass.getName() + "; err = " + e);
+      }
+    }
+    if (m != null) {
+      logger.debug("Using FileSystem.getDefaultReplication(Path) from " +
+          "HADOOP-8014");
+    }
+    return m;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/5f03e1d5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index 0c618b5..2c2be6a 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -102,7 +102,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter {
           + ") does not support append");
     }
 
-    registerCurrentStream(fsOut, hdfs);
+    registerCurrentStream(fsOut, hdfs, dstPath);
 
     if (appending) {
       serializer.afterReopen();

http://git-wip-us.apache.org/repos/asf/flume/blob/5f03e1d5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
index c87fafe..b8214be 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
@@ -90,7 +90,7 @@ public class HDFSDataStream extends AbstractHDFSWriter {
     }
 
     // must call superclass to check for replication issues
-    registerCurrentStream(outStream, hdfs);
+    registerCurrentStream(outStream, hdfs, dstPath);
 
     if (appending) {
       serializer.afterReopen();

http://git-wip-us.apache.org/repos/asf/flume/blob/5f03e1d5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index 1a401d6..0383744 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -92,7 +92,7 @@ public class HDFSSequenceFile extends AbstractHDFSWriter {
     writer = SequenceFile.createWriter(conf, outStream,
         serializer.getKeyClass(), serializer.getValueClass(), compType, codeC);
 
-    registerCurrentStream(outStream, hdfs);
+    registerCurrentStream(outStream, hdfs, dstPath);
   }
 
   @Override


Mime
View raw message