accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject accumulo git commit: ACCUMULO-3976 print hdfs pipeline when the walog flush is slow
Date Fri, 28 Aug 2015 19:42:48 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master efbb394b7 -> e123c26c7


ACCUMULO-3976 print hdfs pipeline when the walog flush is slow


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e123c26c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e123c26c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e123c26c

Branch: refs/heads/master
Commit: e123c26c7425ec04ac4cbf3c62daae9e1994f4e6
Parents: efbb394
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Fri Aug 28 15:42:33 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Fri Aug 28 15:42:33 2015 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |  2 +
 .../apache/accumulo/tserver/log/DfsLogger.java  | 60 ++++++++++++++++++++
 2 files changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e123c26c/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index ef4d877..5bd5c8a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -332,6 +332,8 @@ public enum Property {
       "Memory to provide to batchwriter to replay mutations for replication"),
   TSERV_ASSIGNMENT_MAXCONCURRENT("tserver.assignment.concurrent.max", "2", PropertyType.COUNT,
       "The number of threads available to load tablets. Recoveries are still performed serially."),
+  TSERV_SLOW_FLUSH_MILLIS("tserver.slow.flush.time", "100ms", PropertyType.TIMEDURATION,
+      "If a flush to the write-ahead log takes longer than this period of time, debugging
information will written, and may result in a log rollover."),
 
   // properties that are specific to logger server behavior
   LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect
the behavior of the write-ahead logger servers"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e123c26c/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 397eeff..bdc7364 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -62,6 +62,7 @@ import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,6 +78,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
   public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
 
   private static final Logger log = LoggerFactory.getLogger(DfsLogger.class);
+  private static final Object[] NO_ARGS = new Object[] {};
 
   public static class LogClosedException extends IOException {
     private static final long serialVersionUID = 1L;
@@ -176,6 +178,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
           }
         }
 
+        long start = System.currentTimeMillis();
         try {
           if (durabilityMethod != null) {
             durabilityMethod.invoke(logFile);
@@ -191,6 +194,12 @@ public class DfsLogger implements Comparable<DfsLogger> {
             logWork.exception = ex;
           }
         }
+        long duration = System.currentTimeMillis() - start;
+        if (duration > slowFlushMillis) {
+          String msg = new StringBuilder().append("Slow sync cost: ").append(duration).append("
ms, current pipeline: ").append(Arrays.toString(getPipeLine()))
+              .toString();
+          log.info(msg);
+        }
 
         for (DfsLogger.LogWork logWork : work)
           if (logWork == CLOSED_MARKER)
@@ -265,11 +274,14 @@ public class DfsLogger implements Comparable<DfsLogger> {
   private String metaReference;
   private AtomicLong syncCounter;
   private AtomicLong flushCounter;
+  private final long slowFlushMillis;
+  private Method getPipeLine;
 
   public DfsLogger(ServerResources conf, AtomicLong syncCounter, AtomicLong flushCounter)
throws IOException {
     this.conf = conf;
     this.syncCounter = syncCounter;
     this.flushCounter = flushCounter;
+    this.slowFlushMillis = conf.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FLUSH_MILLIS);
   }
 
   /**
@@ -282,6 +294,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
     this.conf = conf;
     this.logPath = filename;
     metaReference = meta;
+    this.slowFlushMillis = conf.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FLUSH_MILLIS);
   }
 
   public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path,
AccumuloConfiguration conf) throws IOException {
@@ -404,6 +417,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
 
       sync = logFile.getClass().getMethod("hsync");
       flush = logFile.getClass().getMethod("hflush");
+      getPipeLine = this.getGetPipeline(logFile);
 
       // Initialize the crypto operations.
       org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf
@@ -629,4 +643,50 @@ public class DfsLogger implements Comparable<DfsLogger> {
     return getFileName().compareTo(o.getFileName());
   }
 
+  /*
+   * The following two methods were shamelessly lifted from HBASE-11240. Thanks HBase!
+   */
+
+  /**
+   * Find the 'getPipeline' on the passed <code>os</code> stream.
+   *
+   * @return Method or null.
+   */
+  private Method getGetPipeline(final FSDataOutputStream os) {
+    Method m = null;
+    if (os != null) {
+      Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
+      try {
+        m = wrappedStreamClass.getDeclaredMethod("getPipeline", new Class<?>[] {});
+        m.setAccessible(true);
+      } catch (NoSuchMethodException e) {
+        log.info("FileSystem's output stream doesn't support getPipeline; not available;
fsOut=" + wrappedStreamClass.getName());
+      } catch (SecurityException e) {
+        log.info("Doesn't have access to getPipeline on FileSystems's output stream ; fsOut="
+ wrappedStreamClass.getName(), e);
+        m = null; // could happen on setAccessible()
+      }
+    }
+    return m;
+  }
+
+  /**
+   * This method gets the pipeline for the current walog.
+   *
+   * @return non-null array of DatanodeInfo
+   */
+  DatanodeInfo[] getPipeLine() {
+    if (this.getPipeLine != null) {
+      Object repl;
+      try {
+        repl = this.getPipeLine.invoke(this.logFile, NO_ARGS);
+        if (repl instanceof DatanodeInfo[]) {
+          return ((DatanodeInfo[]) repl);
+        }
+      } catch (Exception e) {
+        log.info("Get pipeline failed", e);
+      }
+    }
+    return new DatanodeInfo[0];
+  }
+
 }


Mime
View raw message