accumulo-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] bbux-atg closed pull request #510: ACCUMULO-4074
Date Thu, 05 Jul 2018 16:56:07 GMT
bbux-atg closed pull request #510: ACCUMULO-4074
URL: https://github.com/apache/accumulo/pull/510
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index fc423acca5..42fc87a817 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -436,6 +436,8 @@
   TSERV_READ_AHEAD_MAXCONCURRENT("tserver.readahead.concurrent.max", "16", PropertyType.COUNT,
       "The maximum number of concurrent read ahead that will execute. This effectively"
           + " limits the number of long running scans that can run concurrently per tserver."),
+  TSERV_READ_AHEAD_PREFIX("tserver.readahead.concurrent.table.", null, PropertyType.PREFIX,
+      "Properties in this category allow overriding of table specific read ahead pools"),
   TSERV_METADATA_READ_AHEAD_MAXCONCURRENT("tserver.metadata.readahead.concurrent.max", "8",
       PropertyType.COUNT,
       "The maximum number of concurrent metadata read ahead that will execute."),
@@ -542,6 +544,9 @@
   TSERV_SUMMARY_RETRIEVAL_THREADS("tserver.summary.retrieval.threads", "10", PropertyType.COUNT,
       "The number of threads on each tablet server available to retrieve"
           + " summary data, that is not currently in cache, from RFiles."),
+  TSERV_SESSION_COMPARATOR_CLASS("tserver.summary.comparator.class", "", PropertyType.CLASSNAME,
+      "A customizable Scan session comparator. Note that by default, the value is empty"
+          + " and thus uses no session comparator"),
 
   // accumulo garbage collector properties
   GC_PREFIX("gc.", null, PropertyType.PREFIX,
diff --git a/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
new file mode 100644
index 0000000000..f688010f16
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/AccumuloUncaughtExceptionHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloUncaughtExceptionHandler implements UncaughtExceptionHandler {
+
+  private static final Logger log = LoggerFactory.getLogger(AccumuloUncaughtExceptionHandler.class);
+
+  @Override
+  public void uncaughtException(Thread t, Throwable e) {
+
+    log.error(String.format("Caught an exception in %s.  Shutting down.", t), e);
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
index d8b307c36c..0b4730cc70 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/NamingThreadFactory.java
@@ -26,6 +26,8 @@
 public class NamingThreadFactory implements ThreadFactory {
   private static final Logger log = LoggerFactory.getLogger(NamingThreadFactory.class);
 
+  private static final AccumuloUncaughtExceptionHandler uncaughtHandler = new AccumuloUncaughtExceptionHandler();
+
   private AtomicInteger threadNum = new AtomicInteger(1);
   private String name;
 
@@ -35,7 +37,10 @@ public NamingThreadFactory(String name) {
 
   @Override
   public Thread newThread(Runnable r) {
-    return new Daemon(new LoggingRunnable(log, r), name + " " + threadNum.getAndIncrement());
+    Thread thread = new Daemon(new LoggingRunnable(log, r),
+        name + " " + threadNum.getAndIncrement());
+    thread.setUncaughtExceptionHandler(uncaughtHandler);
+    return thread;
   }
 
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 71cebbe014..9a91e8723d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -32,10 +32,15 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.NamespaceNotFoundException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.impl.KeyExtent;
@@ -62,6 +67,7 @@
 import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
 import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.tserver.session.SessionComparator;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.htrace.wrappers.TraceExecutorService;
 import org.slf4j.Logger;
@@ -95,6 +101,8 @@
   private final ExecutorService summaryRemotePool;
   private final Map<String,ExecutorService> threadPools = new TreeMap<>();
 
+  private final Map<String,ExecutorService> tableThreadPools = new TreeMap<>();
+
   private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments;
 
   private final FileManager fileManager;
@@ -142,12 +150,34 @@ public void run() {
     return result;
   }
 
-  private ExecutorService createEs(int max, String name) {
-    return addEs(name, Executors.newFixedThreadPool(max, new NamingThreadFactory(name)));
-  }
+  private ExecutorService addEs(final int maxThreads, final Property prefix,
+      final String propertyName, String name, final ThreadPoolExecutor tp) {
+    ExecutorService result = addEs(name, tp);
+    SimpleTimer.getInstance(tserver.getConfiguration()).schedule(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          int max = maxThreads;
+          for (Entry<String,String> entry : conf.getSystemConfiguration()
+              .getAllPropertiesWithPrefix(prefix).entrySet()) {
+            if (entry.getKey().equals(propertyName)) {
+              if (null != entry.getValue() && entry.getValue().length() != 0)
+                max = Integer.parseInt(entry.getValue());
+              break;
+            }
+          }
+          if (tp.getMaximumPoolSize() != max) {
+            log.info("Changing {} to {}", maxThreads, max);
+            tp.setCorePoolSize(max);
+            tp.setMaximumPoolSize(max);
+          }
+        } catch (Throwable t) {
+          log.error("Failed to change thread pool size", t);
+        }
+      }
 
-  private ExecutorService createEs(Property max, String name) {
-    return createEs(max, name, new LinkedBlockingQueue<>());
+    }, 1000, 10 * 1000);
+    return result;
   }
 
   private ExecutorService createIdlingEs(Property max, String name, long timeout,
@@ -160,6 +190,86 @@ private ExecutorService createIdlingEs(Property max, String name, long
timeout,
     return addEs(max, name, tp);
   }
 
+  private ExecutorService createEs(int max, String name) {
+    return addEs(name, Executors.newFixedThreadPool(max, new NamingThreadFactory(name)));
+  }
+
+  private ExecutorService createEs(Property max, String name) {
+    return createEs(max, name, new LinkedBlockingQueue<>());
+  }
+
+  private ExecutorService createEs(int maxThreads, Property prefix, String propertyName,
+      String name, BlockingQueue<Runnable> queue) {
+    ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
+        TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
+    return addEs(maxThreads, prefix, propertyName, name, tp);
+  }
+
+  /**
+   * If we cannot instantiate the comparator we will default to the linked blocking queue
comparator
+   * 
+   * @param max
+   *          max number of threads
+   * @param comparator
+   *          comparator property
+   * @param name
+   *          name passed to the thread factory
+   * @return priority executor
+   */
+  private ExecutorService createPriorityExecutor(Property prefix, String propertyName,
+      final int maxThreads, Property comparator, String name) {
+
+    String comparatorClazz = conf.getSystemConfiguration().get(comparator);
+
+    if (null == comparatorClazz || comparatorClazz.length() == 0) {
+      log.debug("Using no comparator");
+      return createEs(maxThreads, prefix, propertyName, name, new LinkedBlockingQueue<>());
+    } else {
+      SessionComparator comparatorObj = Property.createInstanceFromPropertyName(
+          conf.getSystemConfiguration(), comparator, SessionComparator.class, null);
+      if (null != comparatorObj) {
+        log.debug("Using priority based scheduler {}", comparatorClazz);
+        return createEs(maxThreads, prefix, propertyName, name,
+            new PriorityBlockingQueue<>(maxThreads, comparatorObj));
+      } else {
+        log.debug("Using no comparator");
+        return createEs(maxThreads, prefix, propertyName, name, new LinkedBlockingQueue<>());
+      }
+    }
+  }
+
+  /**
+   * If we cannot instantiate the comparator we will default to the linked blocking queue
comparator
+   * 
+   * @param max
+   *          max number of threads
+   * @param comparator
+   *          comparator property
+   * @param name
+   *          name passed to the thread factory
+   * @return priority executor
+   */
+  private ExecutorService createPriorityExecutor(Property max, Property comparator, String
name) {
+    int maxThreads = conf.getSystemConfiguration().getCount(max);
+
+    String comparatorClazz = conf.getSystemConfiguration().get(comparator);
+
+    if (null == comparatorClazz || comparatorClazz.length() == 0) {
+      log.debug("Using no comparator");
+      return createEs(max, name, new LinkedBlockingQueue<>());
+    } else {
+      SessionComparator comparatorObj = Property.createInstanceFromPropertyName(
+          conf.getSystemConfiguration(), comparator, SessionComparator.class, null);
+      if (null != comparatorObj) {
+        log.debug("Using priority based scheduler {}", comparatorClazz);
+        return createEs(max, name, new PriorityBlockingQueue<>(maxThreads, comparatorObj));
+      } else {
+        log.debug("Using no comparator");
+        return createEs(max, name, new LinkedBlockingQueue<>());
+      }
+    }
+  }
+
   private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable>
queue) {
     int maxThreads = conf.getSystemConfiguration().getCount(max);
     ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L,
@@ -172,6 +282,35 @@ private ExecutorService createEs(int min, int max, int timeout, String
name) {
         new LinkedBlockingQueue<>(), new NamingThreadFactory(name)));
   }
 
+  /**
+   * Creates table specific thread pool for executing scan threads
+   * 
+   * @param instance
+   *          ZK instance.
+   * @param acuConf
+   *          accumulo configuration.
+   * @throws NamespaceNotFoundException
+   *           Error thrown by tables.getTableId when a name space is not found.
+   * @throws TableNotFoundException
+   *           Error thrown by tables.getTableId when a table is not found.
+   */
+  protected void createTablePools(Instance instance, AccumuloConfiguration acuConf)
+      throws NamespaceNotFoundException, TableNotFoundException {
+    for (Entry<String,String> entry : acuConf
+        .getAllPropertiesWithPrefix(Property.TSERV_READ_AHEAD_PREFIX).entrySet()) {
+      final String tableName = entry.getKey()
+          .substring(Property.TSERV_READ_AHEAD_PREFIX.getKey().length());
+      if (null == entry.getValue() || entry.getValue().length() == 0) {
+        throw new RuntimeException("Read ahead prefix is inproperly configured");
+      }
+      final int maxThreads = Integer.parseInt(entry.getValue());
+      final String tableId = Tables.getTableId(instance, tableName).canonicalID();
+      tableThreadPools.put(tableId,
+          createPriorityExecutor(Property.TSERV_READ_AHEAD_PREFIX, entry.getKey(), maxThreads,
+              Property.TSERV_SESSION_COMPARATOR_CLASS, tableName + " specific read ahead"));
+    }
+  }
+
   public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) {
     this.tserver = tserver;
     this.conf = tserver.getServerConfigurationFactory();
@@ -251,7 +390,8 @@ public TabletServerResourceManager(TabletServer tserver, VolumeManager
fs) {
 
     activeAssignments = new ConcurrentHashMap<>();
 
-    readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read
ahead");
+    readAheadThreadPool = createPriorityExecutor(Property.TSERV_READ_AHEAD_MAXCONCURRENT,
+        Property.TSERV_SESSION_COMPARATOR_CLASS, "tablet read ahead");
     defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT,
         "metadata tablets read ahead");
 
@@ -262,6 +402,13 @@ public TabletServerResourceManager(TabletServer tserver, VolumeManager
fs) {
     summaryParitionPool = createIdlingEs(Property.TSERV_SUMMARY_PARTITION_THREADS,
         "summary partition", 60, TimeUnit.SECONDS);
 
+    try {
+      createTablePools(tserver.getInstance(), acuConf);
+    } catch (NamespaceNotFoundException e) {
+      throw new RuntimeException(e);
+    } catch (TableNotFoundException e) {
+      throw new RuntimeException(e);
+    }
     int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
 
     Cache<String,Long> fileLenCache = CacheBuilder.newBuilder()
@@ -792,7 +939,10 @@ public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask)
{
   }
 
   public void executeReadAhead(KeyExtent tablet, Runnable task) {
-    if (tablet.isRootTablet()) {
+    ExecutorService service = tableThreadPools.get(tablet.getTableId().canonicalID());
+    if (null != service) {
+      service.execute(task);
+    } else if (tablet.isRootTablet()) {
       task.run();
     } else if (tablet.isMeta()) {
       defaultReadAheadThreadPool.execute(task);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
new file mode 100644
index 0000000000..d584242fab
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/DefaultSessionComparator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+public class DefaultSessionComparator extends SessionComparator {
+
+  @Override
+  public int compareSession(Session sessionA, Session sessionB) {
+
+    final long startTimeFirst = sessionA.startTime;
+    final long startTimeSecond = sessionB.startTime;
+
+    // use the lowest max idle time
+    final long maxIdle = sessionA.maxIdleAccessTime < sessionB.maxIdleAccessTime
+        ? sessionA.maxIdleAccessTime
+        : sessionB.maxIdleAccessTime;
+
+    final long currentTime = System.currentTimeMillis();
+
+    /*
+     * Multiply by -1 so that we have a sensical comparison. This means that if comparison
< 0,
+     * sessionA is newer. If comparison > 0, this means that session B is newer
+     */
+    int comparison = -1 * Long.compare(startTimeFirst, startTimeSecond);
+
+    if (!(sessionA.lastExecTime == -1 && sessionB.lastExecTime == -1)) {
+      if (comparison >= 0) {
+        long idleTimeA = currentTime - sessionA.lastExecTime;
+
+        /*
+         * If session B is newer, let's make sure that we haven't reached the max idle time,
where
+         * we have to begin aging A
+         */
+        if (idleTimeA > sessionA.maxIdleAccessTime) {
+          comparison = -1 * Long.valueOf(idleTimeA - maxIdle).intValue();
+        }
+      } else {
+        long idleTimeB = currentTime - sessionB.lastExecTime;
+
+        /*
+         * If session A is newer, let's make sure that B hasn't reached the max idle time,
where we
+         * have to begin aging A
+         */
+        if (idleTimeB > sessionA.maxIdleAccessTime) {
+          comparison = 1 * Long.valueOf(idleTimeB - maxIdle).intValue();
+        }
+      }
+    }
+
+    return comparison;
+  }
+
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index 4285a51446..981832af6a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -71,4 +71,14 @@ public boolean cleanup() {
     // the cancellation should provide us the safety to return true here
     return true;
   }
+
+  /**
+   * Ensure that the runnable actually runs
+   */
+  @Override
+  public void run() {
+    super.run();
+    lookupTask.run();
+  }
+
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index eed45cf073..32f7e34851 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -19,15 +19,17 @@
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.rpc.TServerUtils;
 
-public class Session {
+public abstract class Session implements Runnable {
 
   enum State {
     NEW, UNRESERVED, RESERVED, REMOVED
   }
 
   public final String client;
-  long lastAccessTime;
+  public long lastAccessTime;
+  protected volatile long lastExecTime = -1;
   public long startTime;
+  public long maxIdleAccessTime;
   State state = State.NEW;
   private final TCredentials credentials;
 
@@ -47,4 +49,18 @@ public TCredentials getCredentials() {
   public boolean cleanup() {
     return true;
   }
+
+  @Override
+  public void run() {
+    lastExecTime = System.currentTimeMillis();
+  }
+
+  public void setLastExecutionTime(long lastExecTime) {
+    this.lastExecTime = lastExecTime;
+  }
+
+  public long getLastExecutionTime() {
+    return lastExecTime;
+  }
+
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
new file mode 100644
index 0000000000..dcfa1d4bbd
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionComparator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import java.util.Comparator;
+
+public abstract class SessionComparator implements Comparator<Runnable> {
+
+  @Override
+  public int compare(Runnable sessionA, Runnable sessionB) {
+    if (sessionA instanceof Session && sessionB instanceof Session)
+      return compareSession((Session) sessionA, (Session) sessionB);
+    else
+      return 0;
+  }
+
+  public abstract int compareSession(final Session sessionA, final Session sessionB);
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 724d23c18f..c48569824d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -231,6 +231,9 @@ private void sweep(final long maxIdle, final long maxUpdateIdle) {
             configuredIdle = maxUpdateIdle;
           }
           long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+          if (idleTime > session.maxIdleAccessTime) {
+            session.maxIdleAccessTime = idleTime;
+          }
           if (idleTime > configuredIdle) {
             log.info("Closing idle session from user={}, client={}, idle={}ms", session.getUser(),
                 session.client, idleTime);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
new file mode 100644
index 0000000000..28e6ef22bb
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleRangePriorityComparator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+public class SingleRangePriorityComparator extends DefaultSessionComparator {
+
+  @Override
+  public int compareSession(Session sessionA, Session sessionB) {
+    int priority = super.compareSession(sessionA, sessionB);
+
+    if (sessionA instanceof MultiScanSession && sessionB instanceof ScanSession)
{
+      if (priority < 0) {
+        priority *= -1;
+      }
+    } else if (sessionB instanceof MultiScanSession && sessionA instanceof ScanSession)
{
+      if (priority > 0) {
+        priority *= -1;
+      }
+    }
+    return priority;
+  }
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 9ad246cc3a..083ea77251 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1537,7 +1537,13 @@ public synchronized void initiateMajorCompaction(MajorCompactionReason
reason) {
 
     majorCompactionQueued.add(reason);
 
-    getTabletResources().executeMajorCompaction(getExtent(), new CompactionRunner(this, reason));
+    try {
+      getTabletResources().executeMajorCompaction(getExtent(), new CompactionRunner(this,
reason));
+    } catch (RuntimeException t) {
+      log.debug("removing {} because we encountered an exception enqueing the CompactionRunner",
reason, t);
+      majorCompactionQueued.remove(reason);
+      throw t;
+    }
   }
 
   /**
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
new file mode 100644
index 0000000000..f7cc5cd2aa
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionComparatorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class SessionComparatorTest {
+
+  @Test
+  public void testSingleScanMultiScanNoRun() {
+    long time = System.currentTimeMillis();
+    ScanSession sessionA = emptyScanSession();
+    sessionA.lastAccessTime = 0;
+    sessionA.maxIdleAccessTime = 0;
+    sessionA.startTime = time - 1000;
+
+    MultiScanSession sessionB = emptyMultiScanSession();
+    sessionB.lastAccessTime = 0;
+    sessionB.maxIdleAccessTime = 1000;
+    sessionB.startTime = time - 800;
+
+    ScanSession sessionC = emptyScanSession();
+    sessionC.lastAccessTime = 0;
+    sessionC.maxIdleAccessTime = 1000;
+    sessionC.startTime = time - 800;
+
+    // a has never run, so it should be given priority
+    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
+    assertEquals(-1, comparator.compareSession(sessionA, sessionB));
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    assertEquals(1, comparator.compareSession(sessionB, sessionA));
+
+    // now let's assume they have been executed
+
+    assertEquals(1, comparator.compareSession(sessionA, sessionC));
+
+    assertEquals(0, comparator.compareSession(sessionC, sessionC));
+
+  }
+
+  @Test
+  public void testSingleScanRun() {
+    long time = System.currentTimeMillis();
+    ScanSession sessionA = emptyScanSession();
+    sessionA.lastAccessTime = 0;
+    sessionA.setLastExecutionTime(time);
+    sessionA.maxIdleAccessTime = 1000;
+    sessionA.startTime = time - 1000;
+
+    ScanSession sessionB = emptyScanSession();
+    sessionB.lastAccessTime = 0;
+    sessionB.setLastExecutionTime(time - 2000);
+    sessionB.maxIdleAccessTime = 1000;
+    sessionB.startTime = time - 800;
+
+    // b is newer
+    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
+    assertEquals(1, comparator.compareSession(sessionA, sessionB));
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    assertTrue(comparator.compareSession(sessionB, sessionA) < 0);
+
+    sessionB.setLastExecutionTime(time);
+    sessionA.setLastExecutionTime(time - 2000);
+
+    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    int comp = comparator.compareSession(sessionB, sessionA);
+    assertTrue("comparison is " + comp, comp >= 1);
+  }
+
+  @Test
+  public void testSingleScanMultiScanRun() {
+    long time = System.currentTimeMillis();
+    ScanSession sessionA = emptyScanSession();
+    sessionA.lastAccessTime = 0;
+    sessionA.setLastExecutionTime(time);
+    sessionA.maxIdleAccessTime = 1000;
+    sessionA.startTime = time - 1000;
+
+    MultiScanSession sessionB = emptyMultiScanSession();
+    sessionB.lastAccessTime = 0;
+    sessionB.setLastExecutionTime(time - 2000);
+    sessionB.maxIdleAccessTime = 1000;
+    sessionB.startTime = time - 800;
+
+    // b is newer
+    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
+    assertEquals(-1, comparator.compareSession(sessionA, sessionB));
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    assertTrue(comparator.compareSession(sessionB, sessionA) > 0);
+
+    sessionB.setLastExecutionTime(time);
+    sessionA.setLastExecutionTime(time - 2000);
+
+    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    int comp = comparator.compareSession(sessionB, sessionA);
+    assertTrue("comparison is " + comp, comp > 0);
+  }
+
+  @Test
+  public void testMultiScanRun() {
+    long time = System.currentTimeMillis();
+    ScanSession sessionA = emptyScanSession();
+    sessionA.lastAccessTime = 0;
+    sessionA.setLastExecutionTime(time);
+    sessionA.maxIdleAccessTime = 1000;
+    sessionA.startTime = time - 1000;
+
+    ScanSession sessionB = emptyScanSession();
+    sessionB.lastAccessTime = 0;
+    sessionB.setLastExecutionTime(time - 2000);
+    sessionB.maxIdleAccessTime = 1000;
+    sessionB.startTime = time - 800;
+
+    // b is newer
+    SingleRangePriorityComparator comparator = new SingleRangePriorityComparator();
+    assertEquals(1, comparator.compareSession(sessionA, sessionB));
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    assertTrue(comparator.compareSession(sessionB, sessionA) < 0);
+
+    sessionB.setLastExecutionTime(time);
+    sessionA.setLastExecutionTime(time - 2000);
+
+    
+    assertTrue(comparator.compareSession(sessionA, sessionB) < 0);
+
+    // b is before a in queue, b has never run, but because a is single
+    // we should be given priority
+    int comp = comparator.compareSession(sessionB, sessionA);
+    assertTrue("comparison is " + comp, comp >= 1);
+  }
+
+  private static ScanSession emptyScanSession() {
+    return new ScanSession(null, null, null, null, null, null, 0, 0, null);
+  }
+
+  private static MultiScanSession emptyMultiScanSession() {
+    return new MultiScanSession(null, null, null, null, null, null, null, 0, null);
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message