accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [1/2] git commit: ACCUMULO-1345 log warning and stack trace when compaction does not make progress
Date Fri, 06 Sep 2013 22:37:27 GMT
Updated Branches:
  refs/heads/master a4ba6d9ce -> de24f8322


ACCUMULO-1345 log warning and stack trace when compaction does not make progress


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dee8bbb9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dee8bbb9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dee8bbb9

Branch: refs/heads/master
Commit: dee8bbb98ba155bec1612d4a4919648159efeb1e
Parents: a4ba6d9
Author: Keith Turner <kturner@apache.org>
Authored: Fri Sep 6 18:17:44 2013 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Fri Sep 6 18:17:44 2013 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   2 +
 .../server/tabletserver/CompactionWatcher.java  | 109 +++++++++++++++++++
 .../accumulo/server/tabletserver/Compactor.java |  32 +++++-
 .../server/tabletserver/MinorCompactor.java     |   5 +-
 .../server/tabletserver/TabletServer.java       |   6 +-
 5 files changed, 150 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/dee8bbb9/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index a28c52b..e58e771 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -177,6 +177,8 @@ public enum Property {
       "The maximum number of concurrent major compactions for a tablet server"),
   TSERV_MINC_MAXCONCURRENT("tserver.compaction.minor.concurrent.max", "4", PropertyType.COUNT,
       "The maximum number of concurrent minor compactions for a tablet server"),
+  TSERV_COMPACTION_WARN_TIME("tserver.compaction.warn.time", "10m", PropertyType.TIMEDURATION,
+      "When a compaction has not made progress for this time period, a warning will be logged"),
   TSERV_BLOOM_LOAD_MAXCONCURRENT("tserver.bloom.load.concurrent.max", "4", PropertyType.COUNT,
       "The number of concurrent threads that will load bloom filters in the background. "
           + "Setting this to zero will make bloom filters load in the foreground."),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dee8bbb9/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionWatcher.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionWatcher.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionWatcher.java
new file mode 100644
index 0000000..44b26ca
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/CompactionWatcher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.log4j.Logger;
+
+/**
+ * 
+ */
+public class CompactionWatcher implements Runnable {
+  private Map<List<Long>,ObservedCompactionInfo> observedCompactions = new HashMap<List<Long>,ObservedCompactionInfo>();
+  private AccumuloConfiguration config;
+  private static boolean watching = false;
+  
+  private static class ObservedCompactionInfo {
+    CompactionInfo compactionInfo;
+    long firstSeen;
+    boolean loggedWarning;
+    
+    ObservedCompactionInfo(CompactionInfo ci, long time) {
+      this.compactionInfo = ci;
+      this.firstSeen = time;
+    }
+  }
+
+  public CompactionWatcher(AccumuloConfiguration config) {
+    this.config = config;
+  }
+
+  public void run() {
+    List<CompactionInfo> runningCompactions = Compactor.getRunningCompactions();
+    
+    Set<List<Long>> newKeys = new HashSet<List<Long>>();
+    
+    long time = System.currentTimeMillis();
+
+    for (CompactionInfo ci : runningCompactions) {
+      List<Long> compactionKey = Arrays.asList(ci.getID(), ci.getEntriesRead(), ci.getEntriesWritten());
+      newKeys.add(compactionKey);
+      
+      if (!observedCompactions.containsKey(compactionKey)) {
+        observedCompactions.put(compactionKey, new ObservedCompactionInfo(ci, time));
+      }
+    }
+    
+    // look for compactions that finished or made progress and logged a warning
+    HashMap<List<Long>,ObservedCompactionInfo> copy = new HashMap<List<Long>,ObservedCompactionInfo>(observedCompactions);
+    copy.keySet().removeAll(newKeys);
+    
+    for (ObservedCompactionInfo oci : copy.values()) {
+      if (oci.loggedWarning) {
+        Logger.getLogger(CompactionWatcher.class).info("Compaction of " + oci.compactionInfo.getExtent()
+ " is no longer stuck");
+      }
+    }
+
+    // remove any compaction that completed or made progress
+    observedCompactions.keySet().retainAll(newKeys);
+    
+    long warnTime = config.getTimeInMillis(Property.TSERV_COMPACTION_WARN_TIME);
+
+    // check for stuck compactions
+    for (ObservedCompactionInfo oci : observedCompactions.values()) {
+      if (time - oci.firstSeen > warnTime && !oci.loggedWarning) {
+        Thread compactionThread = oci.compactionInfo.getThread();
+        if (compactionThread != null) {
+          StackTraceElement[] trace = compactionThread.getStackTrace();
+          Exception e = new Exception("Possible stack trace of compaction stuck on " + oci.compactionInfo.getExtent());
+          e.setStackTrace(trace);
+          Logger.getLogger(CompactionWatcher.class).warn(
+              "Compaction of " + oci.compactionInfo.getExtent() + " has not made progress
for at least " + (time - oci.firstSeen) + "ms", e);
+          oci.loggedWarning = true;
+        }
+      }
+    }
+  }
+
+  public static synchronized void startWatching(AccumuloConfiguration config) {
+    if (!watching) {
+      SimpleTimer.getInstance().schedule(new CompactionWatcher(config), 10000, 10000);
+      watching = true;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dee8bbb9/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
index 480fbaa..4df2637 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
@@ -144,6 +144,13 @@ public class Compactor implements Callable<CompactionStats> {
   private AtomicLong entriesWritten = new AtomicLong(0);
   private DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
   
+  private static AtomicLong nextCompactorID = new AtomicLong(0);
+  
+  // a unique id to identify a compactor
+  private long compactorID = nextCompactorID.getAndIncrement();
+
+  protected volatile Thread thread;
+
   private synchronized void setLocalityGroup(String name) {
     this.currentLocalityGroup = name;
   }
@@ -169,6 +176,26 @@ public class Compactor implements Callable<CompactionStats> {
       this.compactor = compactor;
     }
 
+    public long getID() {
+      return compactor.compactorID;
+    }
+    
+    public KeyExtent getExtent() {
+      return compactor.getExtent();
+    }
+    
+    public long getEntriesRead() {
+      return entriesRead;
+    }
+    
+    public long getEntriesWritten() {
+      return entriesWritten;
+    }
+
+    public Thread getThread() {
+      return compactor.thread;
+    }
+
     public ActiveCompaction toThrift() {
       
       CompactionType type;
@@ -289,6 +316,7 @@ public class Compactor implements Callable<CompactionStats> {
     String oldThreadName = Thread.currentThread().getName();
     String newThreadName = "MajC compacting " + extent.toString() + " started " + dateFormatter.format(new
Date()) + " file: " + outputFile;
     Thread.currentThread().setName(newThreadName);
+    thread = Thread.currentThread();
     try {
       FileOperations fileFactory = FileOperations.getInstance();
       FileSystem ns = this.fs.getFileSystemByPath(outputFile.path());
@@ -344,8 +372,10 @@ public class Compactor implements Callable<CompactionStats> {
       throw e;
     } finally {
       Thread.currentThread().setName(oldThreadName);
-      if (remove)
+      if (remove) {
+        thread = null;
         runningCompactions.remove(this);
+      }
 
       try {
         if (mfw != null) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dee8bbb9/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
index 49ffc65..4478f8e 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
@@ -29,13 +29,13 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
@@ -138,6 +138,7 @@ public class MinorCompactor extends Compactor {
         
       } while (true);
     } finally {
+      thread = null;
       runningCompactions.remove(this);
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dee8bbb9/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 3216731..be2b7a8 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -2475,6 +2475,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
   
   private class MajorCompactor implements Runnable {
     
+    public MajorCompactor(AccumuloConfiguration config) {
+      CompactionWatcher.startWatching(config);
+    }
+
     @Override
     public void run() {
       while (!majorCompactorDisabled) {
@@ -3433,7 +3437,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     statsKeeper = new TabletStatsKeeper();
     
     // start major compactor
-    majorCompactorThread = new Daemon(new LoggingRunnable(log, new MajorCompactor()));
+    majorCompactorThread = new Daemon(new LoggingRunnable(log, new MajorCompactor(getSystemConfiguration())));
     majorCompactorThread.setName("Split/MajC initiator");
     majorCompactorThread.start();
   }


Mime
View raw message