accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject accumulo git commit: ACCUMULO-3980 watch the replication and reset the walog if it drops below the initial value
Date Mon, 31 Aug 2015 15:44:58 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master fec719f5d -> 9d25381b7


ACCUMULO-3980 watch the replication and reset the walog if it drops below the initial value


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9d25381b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9d25381b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9d25381b

Branch: refs/heads/master
Commit: 9d25381b7a0fa1f728eda13158db0888cb4b6b7d
Parents: fec719f
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Mon Aug 31 11:44:37 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Mon Aug 31 11:44:37 2015 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/log/DfsLogger.java  | 27 +++++++++++++++++---
 1 file changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d25381b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 47fd57d..131c61d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -190,16 +190,24 @@ public class DfsLogger implements Comparable<DfsLogger> {
             }
           }
         } catch (Exception ex) {
-          log.warn("Exception syncing " + ex);
-          for (DfsLogger.LogWork logWork : work) {
-            logWork.exception = ex;
-          }
+          fail(work, ex, "synching");
         }
         long duration = System.currentTimeMillis() - start;
         if (duration > slowFlushMillis) {
           String msg = new StringBuilder(128).append("Slow sync cost: ").append(duration).append("
ms, current pipeline: ")
               .append(Arrays.toString(getPipeLine())).toString();
           log.info(msg);
+          if (expectedReplication > 0) {
+            int current = expectedReplication;
+            try {
+              current = ((DFSOutputStream) logFile.getWrappedStream()).getCurrentBlockReplication();
+            } catch (IOException e) {
+              fail(work, e, "getting replication level");
+            }
+            if (current < expectedReplication) {
+              fail(work, new IOException("replication of " + current + " is less than " +
expectedReplication), "replication check");
+            }
+          }
         }
 
         for (DfsLogger.LogWork logWork : work)
@@ -209,6 +217,13 @@ public class DfsLogger implements Comparable<DfsLogger> {
             logWork.latch.countDown();
       }
     }
+
+    private void fail(ArrayList<DfsLogger.LogWork> work, Exception ex, String why)
{
+      log.warn("Exception " + why + " " + ex);
+      for (DfsLogger.LogWork logWork : work) {
+        logWork.exception = ex;
+      }
+    }
   }
 
   private static class LogWork {
@@ -276,6 +291,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
   private AtomicLong syncCounter;
   private AtomicLong flushCounter;
   private final long slowFlushMillis;
+  private int expectedReplication = 0;
 
   private DfsLogger(ServerResources conf) {
     this.conf = conf;
@@ -417,6 +433,9 @@ public class DfsLogger implements Comparable<DfsLogger> {
         logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize);
       else
         logFile = fs.create(new Path(logPath), true, 0, replication, blockSize);
+      if (logFile.getWrappedStream() instanceof DFSOutputStream) {
+        expectedReplication = ((DFSOutputStream) logFile.getWrappedStream()).getCurrentBlockReplication();
+      }
 
       sync = logFile.getClass().getMethod("hsync");
       flush = logFile.getClass().getMethod("hflush");


Mime
View raw message