accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [2/5] accumulo git commit: ACCUMULO-3304 Track assignment execution duration and warn when they take longer than some duration.
Date Sat, 08 Nov 2014 21:51:20 GMT
ACCUMULO-3304 Track assignment execution duration and warn when they take longer than some
duration.

Introduce a new configuration value, defaults to 10min, which controls
the duration an assignment must be running for a warning to be
printed. The period of checking threads is always half of the value.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/995080c4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/995080c4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/995080c4

Branch: refs/heads/master
Commit: 995080c44c2bf296eb58764cab6f536ce5a808a1
Parents: 520d802
Author: Josh Elser <elserj@apache.org>
Authored: Sat Nov 8 12:20:19 2014 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Sat Nov 8 12:20:19 2014 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |  2 +
 server/tserver/pom.xml                          |  9 +--
 .../tserver/ActiveAssignmentRunnable.java       | 78 ++++++++++++++++++++
 .../accumulo/tserver/RunnableStartedAt.java     | 51 +++++++++++++
 .../apache/accumulo/tserver/TabletServer.java   | 13 ++--
 .../tserver/TabletServerResourceManager.java    | 75 +++++++++++++++++--
 6 files changed, 210 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/995080c4/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index a03b210..aec7af5 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -278,6 +278,8 @@ public enum Property {
       "resiliency in the face of unexpected power outages, at the cost of speed. If method
is not available, the legacy 'sync' method " +
       "will be used to ensure backwards compatibility with older Hadoop versions. A value
of 'hflush' is the alternative to the default value " +
       "of 'hsync' which will result in faster writes, but with less durability"),
+  TSERV_ASSIGNMENT_DURATION_WARNING("tserver.assignment.duration.warning", "10m", PropertyType.TIMEDURATION,
"The amount of time an assignment can run "
+      + " before the server will print a warning along with the current stack trace. Meant
to help debug stuck assignments"),
 
   // properties that are specific to logger server behavior
   LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect
the behavior of the write-ahead logger servers"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/995080c4/server/tserver/pom.xml
----------------------------------------------------------------------
diff --git a/server/tserver/pom.xml b/server/tserver/pom.xml
index 3ea50ad..1e08504 100644
--- a/server/tserver/pom.xml
+++ b/server/tserver/pom.xml
@@ -88,6 +88,10 @@
       <artifactId>zookeeper</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -99,11 +103,6 @@
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/995080c4/server/tserver/src/main/java/org/apache/accumulo/tserver/ActiveAssignmentRunnable.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ActiveAssignmentRunnable.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ActiveAssignmentRunnable.java
new file mode 100644
index 0000000..dcbdae7
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ActiveAssignmentRunnable.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import jline.internal.Preconditions;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ActiveAssignmentRunnable implements Runnable {
+  private static final Logger log = LoggerFactory.getLogger(ActiveAssignmentRunnable.class);
+
+  private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments;
+  private final KeyExtent extent;
+  private final Runnable delegate;
+
+  // Make sure that the other thread calling getException will see the assignment by the
thread calling run()
+  private volatile Thread executingThread;
+
+  public ActiveAssignmentRunnable(ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments,
KeyExtent extent, Runnable delegate) {
+    Preconditions.checkNotNull(activeAssignments);
+    Preconditions.checkNotNull(extent);
+    Preconditions.checkNotNull(delegate);
+    this.activeAssignments = activeAssignments;
+    this.extent = extent;
+    this.delegate = delegate;
+  }
+
+  @Override
+  public void run() {
+    if (activeAssignments.containsKey(extent)) {
+      throw new IllegalStateException("Active assignment already exists for " + extent);
+    }
+
+    executingThread = Thread.currentThread();
+
+    try {
+      RunnableStartedAt runnableWithStartTime = new RunnableStartedAt(this, System.currentTimeMillis());
+      log.trace("Started assignment for {} at {}", extent, runnableWithStartTime.getStartTime());
+      activeAssignments.put(extent, runnableWithStartTime);
+      delegate.run();
+    } finally {
+      if (log.isTraceEnabled()) {
+        // Avoid the call to currentTimeMillis if we'd just throw it away anyways
+        log.trace("Finished assignment for {} at {}", extent, System.currentTimeMillis());
+      }
+      activeAssignments.remove(extent);
+    }
+  }
+
+  public Exception getException() {
+    final Exception e = new Exception("Assignment of " + extent);
+    if (null != executingThread) {
+      e.setStackTrace(executingThread.getStackTrace());
+    }
+    return e;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/995080c4/server/tserver/src/main/java/org/apache/accumulo/tserver/RunnableStartedAt.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/RunnableStartedAt.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/RunnableStartedAt.java
new file mode 100644
index 0000000..6513091
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/RunnableStartedAt.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.util.AbstractMap;
+import java.util.Map.Entry;
+
+/**
+ * Encapsulation of a task and the time it began execution.
+ */
+public class RunnableStartedAt extends AbstractMap.SimpleEntry<ActiveAssignmentRunnable,Long>
{
+
+  private static final long serialVersionUID = 1L;
+
+  public RunnableStartedAt(ActiveAssignmentRunnable task, Long startedAtMillis) {
+    super(task, startedAtMillis);
+  }
+
+  public RunnableStartedAt(Entry<? extends ActiveAssignmentRunnable,? extends Long>
entry) {
+    super(entry);
+  }
+
+  /**
+   * @return The task being executed
+   */
+  public ActiveAssignmentRunnable getTask() {
+    return getKey();
+  }
+
+  /**
+   * @return The time, in millis, that the runnable was submitted at
+   */
+  public Long getStartTime() {
+    return getValue();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/995080c4/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 94be0bb..8ef44da 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -2282,7 +2282,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       // add the assignment job to the appropriate queue
       log.info("Loading tablet " + extent);
 
-      final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
+      final AssignmentHandler ah = new AssignmentHandler(extent);
+      // final Runnable ah = new LoggingRunnable(log, );
       // Root tablet assignment must take place immediately
       if (extent.isRootTablet()) {
         new Daemon("Root Tablet Assignment") {
@@ -2299,9 +2300,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         }.start();
       } else {
         if (extent.isMeta()) {
-          resourceManager.addMetaDataAssignment(ah);
+          resourceManager.addMetaDataAssignment(extent, log, ah);
         } else {
-          resourceManager.addAssignment(ah);
+          resourceManager.addAssignment(extent, log, ah);
         }
       }
     }
@@ -2824,7 +2825,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     }
   }
 
-  private class AssignmentHandler implements Runnable {
+  protected class AssignmentHandler implements Runnable {
     private KeyExtent extent;
     private int retryAttempt = 0;
 
@@ -2979,10 +2980,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
               if (extent.isRootTablet()) {
                 new Daemon(new LoggingRunnable(log, handler), "Root tablet assignment retry").start();
               } else {
-                resourceManager.addMetaDataAssignment(handler);
+                resourceManager.addMetaDataAssignment(extent, log, handler);
               }
             } else {
-              resourceManager.addAssignment(handler);
+              resourceManager.addAssignment(extent, log, handler);
             }
           }
         }, reschedule);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/995080c4/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 935ffeb..7c0eedc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -54,6 +55,7 @@ import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.trace.instrument.TraceExecutorService;
 import org.apache.accumulo.tserver.FileManager.ScanFileManager;
 import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.tserver.TabletServer.AssignmentHandler;
 import org.apache.accumulo.tserver.compaction.CompactionStrategy;
 import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
@@ -62,9 +64,9 @@ import org.apache.log4j.Logger;
 
 /**
  * ResourceManager is responsible for managing the resources of all tablets within a tablet
server.
- * 
- * 
- * 
+ *
+ *
+ *
  */
 public class TabletServerResourceManager {
 
@@ -82,6 +84,8 @@ public class TabletServerResourceManager {
   private ExecutorService defaultReadAheadThreadPool;
   private Map<String,ExecutorService> threadPools = new TreeMap<String,ExecutorService>();
 
+  private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments;
+
   private HashSet<TabletResourceManager> tabletResources;
 
   private final VolumeManager fs;
@@ -196,6 +200,8 @@ public class TabletServerResourceManager {
 
     assignMetaDataPool = createEs(0, 1, 60, "metadata tablet assignment");
 
+    activeAssignments = new ConcurrentHashMap<KeyExtent,RunnableStartedAt>();
+
     readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read
ahead");
     defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT,
"metadata tablets read ahead");
 
@@ -209,6 +215,61 @@ public class TabletServerResourceManager {
     memoryManager.init(conf);
     memMgmt = new MemoryManagementFramework();
     memMgmt.startThreads();
+
+    SimpleTimer timer = SimpleTimer.getInstance();
+
+    // We can use the same map for both metadata and normal assignments since the keyspace
(extent)
+    // is guaranteed to be unique. Schedule the task once, the task will reschedule itself.
+    timer.schedule(new AssignmentWatcher(acuConf, activeAssignments, timer), 5000);
+  }
+
+  /**
+   * Accepts some map which is tracking active assignment task(s) (running) and monitors
them to ensure that the time the assignment(s) have been running don't
+   * exceed a threshold. If the time is exceeded a warning is printed and a stack trace is
logged for the running assignment.
+   */
+  protected static class AssignmentWatcher implements Runnable {
+    private static final Logger log = Logger.getLogger(AssignmentWatcher.class);
+
+    private final Map<KeyExtent,RunnableStartedAt> activeAssignments;
+    private final AccumuloConfiguration conf;
+    private final SimpleTimer timer;
+
+    public AssignmentWatcher(AccumuloConfiguration conf, Map<KeyExtent,RunnableStartedAt>
activeAssignments, SimpleTimer timer) {
+      this.conf = conf;
+      this.activeAssignments = activeAssignments;
+      this.timer = timer;
+    }
+
+    @Override
+    public void run() {
+      final long millisBeforeWarning = conf.getTimeInMillis(Property.TSERV_ASSIGNMENT_DURATION_WARNING);
+      try {
+        long now = System.currentTimeMillis();
+        KeyExtent extent;
+        RunnableStartedAt runnable;
+        for (Entry<KeyExtent,RunnableStartedAt> entry : activeAssignments.entrySet())
{
+          extent = entry.getKey();
+          runnable = entry.getValue();
+          final long duration = now - runnable.getStartTime();
+
+          // Print a warning if an assignment has been running for over the configured time
length
+          if (duration > millisBeforeWarning) {
+            log.warn("Assignment for " + extent + " has been running for at least " + duration
+ "ms", runnable.getTask().getException());
+          } else if (log.isTraceEnabled()) {
+            log.trace("Assignment for " + extent + " only running for " + duration + "ms");
+          }
+        }
+      } catch (Exception e) {
+        log.warn("Caught exception checking active assignments", e);
+      } finally {
+        // Don't run more often than every 5s
+        long delay = Math.max((long) (millisBeforeWarning * 0.5), 5000l);
+        if (log.isTraceEnabled()) {
+          log.trace("Rescheduling assignment watcher to run in " + delay + "ms");
+        }
+        timer.schedule(this, delay);
+      }
+    }
   }
 
   private static class TabletStateImpl implements TabletState, Cloneable {
@@ -647,12 +708,12 @@ public class TabletServerResourceManager {
     }
   }
 
-  public void addAssignment(Runnable assignmentHandler) {
-    assignmentPool.execute(assignmentHandler);
+  public void addAssignment(KeyExtent extent, Logger log, AssignmentHandler assignmentHandler)
{
+    assignmentPool.execute(new ActiveAssignmentRunnable(activeAssignments, extent, new LoggingRunnable(log,
assignmentHandler)));
   }
 
-  public void addMetaDataAssignment(Runnable assignmentHandler) {
-    assignMetaDataPool.execute(assignmentHandler);
+  public void addMetaDataAssignment(KeyExtent extent, Logger log, AssignmentHandler assignmentHandler)
{
+    assignMetaDataPool.execute(new ActiveAssignmentRunnable(activeAssignments, extent, new
LoggingRunnable(log, assignmentHandler)));
   }
 
   public void addMigration(KeyExtent tablet, Runnable migrationHandler) {


Mime
View raw message