Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2825310AB6 for ; Tue, 7 May 2013 03:47:13 +0000 (UTC) Received: (qmail 51370 invoked by uid 500); 7 May 2013 03:47:12 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 51185 invoked by uid 500); 7 May 2013 03:47:07 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 51152 invoked by uid 99); 7 May 2013 03:47:05 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 May 2013 03:47:05 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 80A7288822E; Tue, 7 May 2013 03:47:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hshreedharan@apache.org To: commits@flume.apache.org Message-Id: <628aa015dcfe404cb76d7210a944df3b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-2027. Check for default replication fails on federated cluster in hdfs sink Date: Tue, 7 May 2013 03:47:05 +0000 (UTC) Updated Branches: refs/heads/trunk 2e1b7c23c -> fac09aa5f FLUME-2027. Check for default replication fails on federated cluster in hdfs sink (Mike Percy via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/fac09aa5 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/fac09aa5 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/fac09aa5 Branch: refs/heads/trunk Commit: fac09aa5f543f251acbe3aa205b470562b93b344 Parents: 2e1b7c2 Author: Hari Shreedharan Authored: Mon May 6 20:44:57 2013 -0700 Committer: Hari Shreedharan Committed: Mon May 6 20:44:57 2013 -0700 ---------------------------------------------------------------------- .../apache/flume/sink/hdfs/AbstractHDFSWriter.java | 57 ++++++++++++++- .../flume/sink/hdfs/HDFSCompressedDataStream.java | 2 +- .../org/apache/flume/sink/hdfs/HDFSDataStream.java | 2 +- .../apache/flume/sink/hdfs/HDFSSequenceFile.java | 2 +- 4 files changed, 56 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/fac09aa5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java index ff4f223..bc3b383 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java @@ -23,6 +23,7 @@ import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +40,9 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { private FSDataOutputStream outputStream; private FileSystem fs; + private Path destPath; private Method refGetNumCurrentReplicas = null; + private Method refGetDefaultReplication = null; private Integer configuredMinReplicas = null; final static Object [] NO_ARGS = new Object []{}; @@ -84,26 +87,43 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { } protected void registerCurrentStream(FSDataOutputStream outputStream, - FileSystem fs) { + FileSystem fs, Path destPath) { Preconditions.checkNotNull(outputStream, "outputStream must not be null"); Preconditions.checkNotNull(fs, "fs must not be null"); + Preconditions.checkNotNull(destPath, "destPath must not be null"); this.outputStream = outputStream; this.fs = fs; + this.destPath = destPath; this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream); + this.refGetDefaultReplication = reflectGetDefaultReplication(fs); } protected void unregisterCurrentStream() { this.outputStream = null; this.fs = null; + this.destPath = null; this.refGetNumCurrentReplicas = null; + this.refGetDefaultReplication = null; } public int getFsDesiredReplication() { - if (fs != null) { - return fs.getDefaultReplication(); + short replication = 0; + if (fs != null && destPath != null) { + if (refGetDefaultReplication != null) { + try { + replication = (Short) refGetDefaultReplication.invoke(fs, destPath); + } catch (IllegalAccessException e) { + logger.warn("Unexpected error calling getDefaultReplication(Path)", e); + } catch (InvocationTargetException e) { + logger.warn("Unexpected error calling getDefaultReplication(Path)", e); + } + } else { + // will not work on Federated HDFS (see HADOOP-8014) + replication = fs.getDefaultReplication(); + } } - return 0; + return replication; } /** @@ -163,4 +183,33 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { return m; } + /** + * Find the 'getDefaultReplication' method on the passed fs + * FileSystem that takes a Path argument. + * @return Method or null. + */ + private Method reflectGetDefaultReplication(FileSystem fileSystem) { + Method m = null; + if (fileSystem != null) { + Class fsClass = fileSystem.getClass(); + try { + m = fsClass.getMethod("getDefaultReplication", + new Class[] { Path.class }); + } catch (NoSuchMethodException e) { + logger.debug("FileSystem implementation doesn't support" + + " getDefaultReplication(Path); -- HADOOP-8014 not available; " + + "className = " + fsClass.getName() + "; err = " + e); + } catch (SecurityException e) { + logger.debug("No access to getDefaultReplication(Path) on " + + "FileSystem implementation -- HADOOP-8014 not available; " + + "className = " + fsClass.getName() + "; err = " + e); + } + } + if (m != null) { + logger.debug("Using FileSystem.getDefaultReplication(Path) from " + + "HADOOP-8014"); + } + return m; + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/fac09aa5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java index 0c618b5..2c2be6a 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java @@ -102,7 +102,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { + ") does not support append"); } - registerCurrentStream(fsOut, hdfs); + registerCurrentStream(fsOut, hdfs, dstPath); if (appending) { serializer.afterReopen(); http://git-wip-us.apache.org/repos/asf/flume/blob/fac09aa5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java index c87fafe..b8214be 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java @@ -90,7 +90,7 @@ public class HDFSDataStream extends AbstractHDFSWriter { } // must call superclass to check for replication issues - registerCurrentStream(outStream, hdfs); + registerCurrentStream(outStream, hdfs, dstPath); if (appending) { serializer.afterReopen(); http://git-wip-us.apache.org/repos/asf/flume/blob/fac09aa5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java index 1a401d6..0383744 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java @@ -92,7 +92,7 @@ public class HDFSSequenceFile extends AbstractHDFSWriter { writer = SequenceFile.createWriter(conf, outStream, serializer.getKeyClass(), serializer.getValueClass(), compType, codeC); - registerCurrentStream(outStream, hdfs); + registerCurrentStream(outStream, hdfs, dstPath); } @Override