hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ndimi...@apache.org
Subject hbase git commit: Revert "HBASE-14230 replace reflection in FSHlog with HdfsDataOutputStream#getCurrentBlockReplication()"
Date Fri, 18 Sep 2015 21:32:27 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 cf135c444 -> 7fb12e333


Revert "HBASE-14230 replace reflection in FSHlog with HdfsDataOutputStream#getCurrentBlockReplication()"

This reverts commit 945477e2e625ed650dd08bf1c4e70ebebda33d4c.

Also reintroduces the NO_ARGS instance that was removed in HBASE-14401


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7fb12e33
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7fb12e33
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7fb12e33

Branch: refs/heads/branch-1
Commit: 7fb12e33315504a51578fdf747f9b8050d62bffb
Parents: cf135c4
Author: Nick Dimiduk <ndimiduk@apache.org>
Authored: Fri Sep 18 14:27:45 2015 -0700
Committer: Nick Dimiduk <ndimiduk@apache.org>
Committed: Fri Sep 18 14:27:45 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 97 +++++++++++++++++---
 1 file changed, 85 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7fb12e33/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 5f9e3cd..3122513 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -78,8 +79,6 @@ import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hbase.wal.WALSplitter;
-import org.apache.hadoop.hdfs.DFSOutputStream;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.NullScope;
@@ -277,8 +276,12 @@ public class FSHLog implements WAL {
   // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be
triggered
   private final int minTolerableReplication;
 
+  // DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection.
+  private final Method getNumCurrentReplicas;
+  private final Method getPipeLine; // refers to DFSOutputStream.getPipeLine
   private final int slowSyncNs;
 
+  private final static Object [] NO_ARGS = new Object []{};
   // If live datanode count is lower than the default replicas value,
   // RollWriter will be triggered in each sync(So the RollWriter will be
   // triggered one by one in a short time). Using it as a workaround to slow
@@ -525,6 +528,10 @@ public class FSHLog implements WAL {
     this.slowSyncNs =
         1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
           DEFAULT_SLOW_SYNC_TIME_MS);
+    // handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with
+    // HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection.
+    this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
+    this.getPipeLine = getGetPipeline(this.hdfs_out);
 
     // This is the 'writer' -- a single threaded executor.  This single thread 'consumes'
what is
     // put on the ring buffer.
@@ -1414,6 +1421,34 @@ public class FSHLog implements WAL {
     return len;
   }
 
+  /**
+   * Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
+   * This is used for getting current replicas of a file being written.
+   * @return Method or null.
+   */
+  private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
+    // TODO: Remove all this and use the now publically available
+    // HdfsDataOutputStream#getCurrentBlockReplication()
+    Method m = null;
+    if (os != null) {
+      Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
+      try {
+        m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class<?>[]
{});
+        m.setAccessible(true);
+      } catch (NoSuchMethodException e) {
+        LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " +
+         "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName());
+      } catch (SecurityException e) {
+        LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826
" +
+          "not available; fsOut=" + wrappedStreamClass.getName(), e);
+        m = null; // could happen on setAccessible()
+      }
+    }
+    if (m != null) {
+      if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas");
+    }
+    return m;
+  }
 
   /**
    * This method gets the datanode replication count for the current WAL.
@@ -1428,12 +1463,16 @@ public class FSHLog implements WAL {
    * @throws Exception
    */
   @VisibleForTesting
-  int getLogReplication() {
-    try {
-      return ((HdfsDataOutputStream)this.hdfs_out).getCurrentBlockReplication();
-    } catch (IOException e) {
-      return 0;
+  int getLogReplication()
+  throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
+    final OutputStream stream = getOutputStream();
+    if (this.getNumCurrentReplicas != null && stream != null) {
+      Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS);
+      if (repl instanceof Integer) {
+        return ((Integer)repl).intValue();
+      }
     }
+    return 0;
   }
 
   @Override
@@ -1966,17 +2005,51 @@ public class FSHLog implements WAL {
       System.exit(-1);
     }
   }
+  
+  /**
+   * Find the 'getPipeline' on the passed <code>os</code> stream.
+   * @return Method or null.
+   */
+  private Method getGetPipeline(final FSDataOutputStream os) {
+    Method m = null;
+    if (os != null) {
+      Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
+          .getClass();
+      try {
+        m = wrappedStreamClass.getDeclaredMethod("getPipeline",
+          new Class<?>[] {});
+        m.setAccessible(true);
+      } catch (NoSuchMethodException e) {
+        LOG.info("FileSystem's output stream doesn't support"
+            + " getPipeline; not available; fsOut="
+            + wrappedStreamClass.getName());
+      } catch (SecurityException e) {
+        LOG.info(
+          "Doesn't have access to getPipeline on "
+              + "FileSystems's output stream ; fsOut="
+              + wrappedStreamClass.getName(), e);
+        m = null; // could happen on setAccessible()
+      }
+    }
+    return m;
+  }
 
   /**
    * This method gets the pipeline for the current WAL.
    */
   @VisibleForTesting
   DatanodeInfo[] getPipeLine() {
-    if (this.hdfs_out != null) {
-      return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline();
-    } else {
-      return new DatanodeInfo[0];
+    if (this.getPipeLine != null && this.hdfs_out != null) {
+      Object repl;
+      try {
+        repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS);
+        if (repl instanceof DatanodeInfo[]) {
+          return ((DatanodeInfo[]) repl);
+        }
+      } catch (Exception e) {
+        LOG.info("Get pipeline failed", e);
+      }
     }
-
+    return new DatanodeInfo[0];
   }
 }
\ No newline at end of file


Mime
View raw message