accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [1/6] accumulo git commit: ACCUMULO-3509: Make cleanup stateful to minimize blocking
Date Tue, 12 Jan 2016 05:24:50 GMT
Repository: accumulo
Updated Branches:
  refs/heads/1.6 567f52fb2 -> 46ad8368e
  refs/heads/1.7 642add8cc -> 656282825
  refs/heads/master 21123cca7 -> 18725dd6c


ACCUMULO-3509: Make cleanup stateful to minimize blocking

By enabling state ( true/false) within the cleanup method, the change will avoid blocking
on a scan session being swept. if the session cleanup blocks because a ScanSession is
still being read, we may block until the ScanBatch returns for that ScanSession.

The change uses a simple semaphore ( purely because I like the word ) to attempt acquisition.
If that fails, we return false from the cleanup and reintroduce that Session back into
the queue to clean up.

Closes apache/accumulo#62


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/46ad8368
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/46ad8368
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/46ad8368

Branch: refs/heads/1.6
Commit: 46ad8368e160c56c03571b467f8ae603c50992f6
Parents: 567f52f
Author: phrocker <marc.parisi@gmail.com>
Authored: Mon Jan 4 10:59:28 2016 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Mon Jan 11 21:24:05 2016 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   6 +-
 .../org/apache/accumulo/tserver/Tablet.java     |  67 +++++--
 .../apache/accumulo/tserver/TabletServer.java   |  81 +++++++--
 .../test/functional/ScanSessionTimeOutIT.java   |  15 +-
 .../test/functional/SessionBlockVerifyIT.java   | 176 +++++++++++++++++++
 5 files changed, 305 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 632bb59..9243494 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -225,7 +225,11 @@ public enum Property {
           + " tserver.walog.max.size >= this property."),
   TSERV_MEM_MGMT("tserver.memory.manager", "org.apache.accumulo.server.tabletserver.LargestFirstMemoryManager",
PropertyType.CLASSNAME,
       "An implementation of MemoryManger that accumulo will use."),
-  TSERV_SESSION_MAXIDLE("tserver.session.idle.max", "1m", PropertyType.TIMEDURATION, "maximum
idle time for a session"),
+  TSERV_SESSION_MAXIDLE("tserver.session.idle.max", "1m", PropertyType.TIMEDURATION, "When
a tablet server's SimpleTimer thread triggers to check "
+      + "idle sessions, this configurable option will be used to evaluate scan sessions to
determine if they can be closed due to inactivity"),
+  TSERV_UPDATE_SESSION_MAXIDLE("tserver.session.update.idle.max", "1m", PropertyType.TIMEDURATION,
+      "When a tablet server's SimpleTimer thread triggers to check "
+          + "idle sessions, this configurable option will be used to evaluate update sessions
to determine if they can be closed due to inactivity"),
   TSERV_READ_AHEAD_MAXCONCURRENT("tserver.readahead.concurrent.max", "16", PropertyType.COUNT,
       "The maximum number of concurrent read ahead that will execute. This effectively"
           + " limits the number of long running scans that can run concurrently per tserver."),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index efed665..3f00c0b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -39,6 +39,8 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -1761,33 +1763,47 @@ public class Tablet {
     private ScanDataSource isolatedDataSource;
     private boolean sawException = false;
     private boolean scanClosed = false;
+    /**
+     * A fair semaphore of one is used since explicitly know the access pattern will be one
thread to read and another to call close if the session becomes
+     * idle. Since we're explicitly preventing re-entrance, we're currently using a Sempahore.
If at any point we decide read needs to be re-entrant, we can
+     * switch to a Reentrant lock.
+     */
+    private Semaphore scannerSemaphore;
 
     Scanner(Range range, ScanOptions options) {
       this.range = range;
       this.options = options;
+      scannerSemaphore = new Semaphore(1, true);
     }
 
-    synchronized ScanBatch read() throws IOException, TabletClosedException {
+    ScanBatch read() throws IOException, TabletClosedException {
 
-      if (sawException)
-        throw new IllegalStateException("Tried to use scanner after exception occurred.");
-
-      if (scanClosed)
-        throw new IllegalStateException("Tried to use scanner after it was closed.");
+      ScanDataSource dataSource = null;
 
       Batch results = null;
 
-      ScanDataSource dataSource;
+      try {
 
-      if (options.isolated) {
-        if (isolatedDataSource == null)
-          isolatedDataSource = new ScanDataSource(options);
-        dataSource = isolatedDataSource;
-      } else {
-        dataSource = new ScanDataSource(options);
-      }
+        try {
+          scannerSemaphore.acquire();
+        } catch (InterruptedException e) {
+          sawException = true;
+        }
 
-      try {
+        // sawException may have occurred within close, so we cannot assume that an interrupted
exception was its cause
+        if (sawException)
+          throw new IllegalStateException("Tried to use scanner after exception occurred.");
+
+        if (scanClosed)
+          throw new IllegalStateException("Tried to use scanner after it was closed.");
+
+        if (options.isolated) {
+          if (isolatedDataSource == null)
+            isolatedDataSource = new ScanDataSource(options);
+          dataSource = isolatedDataSource;
+        } else {
+          dataSource = new ScanDataSource(options);
+        }
 
         SortedKeyValueIterator<Key,Value> iter;
 
@@ -1834,9 +1850,9 @@ public class Tablet {
       } finally {
         // code in finally block because always want
         // to return mapfiles, even when exception is thrown
-        if (!options.isolated)
+        if (null != dataSource && !options.isolated)
           dataSource.close(false);
-        else if (dataSource.fileManager != null)
+        else if (null != dataSource && dataSource.fileManager != null)
           dataSource.fileManager.detach();
 
         synchronized (Tablet.this) {
@@ -1846,19 +1862,32 @@ public class Tablet {
             queryBytes += results.numBytes;
           }
         }
+
+        scannerSemaphore.release();
       }
     }
 
     // close and read are synchronized because can not call close on the data source while
it is in use
     // this cloud lead to the case where file iterators that are in use by a thread are returned
     // to the pool... this would be bad
-    void close() {
+    boolean close() {
       options.interruptFlag.set(true);
-      synchronized (this) {
+      boolean obtainedLock = false;
+      try {
+        obtainedLock = scannerSemaphore.tryAcquire(10, TimeUnit.MILLISECONDS);
+        if (!obtainedLock)
+          return false;
+
         scanClosed = true;
         if (isolatedDataSource != null)
           isolatedDataSource.close(false);
+      } catch (InterruptedException e) {
+        return false;
+      } finally {
+        if (obtainedLock)
+          scannerSemaphore.release();
       }
+      return true;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 29cf0d3..b7aaf06 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -237,6 +237,8 @@ import org.apache.thrift.server.TServer;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 import com.google.common.net.HostAndPort;
 
 enum ScanRunState {
@@ -386,25 +388,30 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     String client = TServerUtils.clientAddress.get();
     public boolean reserved;
 
-    public void cleanup() {}
+    public boolean cleanup() {
+      return true;
+    }
   }
 
   private static class SessionManager {
 
     SecureRandom random;
     Map<Long,Session> sessions;
-    long maxIdle;
+    private long maxIdle;
+    private long maxUpdateIdle;
+    private List<Session> idleSessions = new ArrayList<Session>();
+    private final Long expiredSessionMarker = new Long(-1);
 
     SessionManager(AccumuloConfiguration conf) {
       random = new SecureRandom();
       sessions = new HashMap<Long,Session>();
-
+      maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
       maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 
       Runnable r = new Runnable() {
         @Override
         public void run() {
-          sweep(maxIdle);
+          sweep(maxIdle, maxUpdateIdle);
         }
       };
 
@@ -506,14 +513,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       return session;
     }
 
-    private void sweep(long maxIdle) {
+    private void sweep(final long maxIdle, final long maxUpdateIdle) {
       ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
       synchronized (this) {
         Iterator<Session> iter = sessions.values().iterator();
         while (iter.hasNext()) {
           Session session = iter.next();
+          long configuredIdle = maxIdle;
+          if (session instanceof UpdateSession) {
+            configuredIdle = maxUpdateIdle;
+          }
           long idleTime = System.currentTimeMillis() - session.lastAccessTime;
-          if (idleTime > maxIdle && !session.reserved) {
+          if (idleTime > configuredIdle && !session.reserved) {
             log.info("Closing idle session from user=" + session.user + ", client=" + session.client
+ ", idle=" + idleTime + "ms");
             iter.remove();
             sessionsToCleanup.add(session);
@@ -521,10 +532,21 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         }
       }
 
-      // do clean up outside of lock
-      for (Session session : sessionsToCleanup) {
-        session.cleanup();
+      // do clean up outside of lock for TabletServer in a synchronized block for simplicity
vice a synchronized list
+
+      synchronized (idleSessions) {
+
+        sessionsToCleanup.addAll(idleSessions);
+
+        idleSessions.clear();
+
+        // perform cleanup for all of the sessions
+        for (Session session : sessionsToCleanup) {
+          if (!session.cleanup())
+            idleSessions.add(session);
+        }
       }
+
     }
 
     synchronized void removeIfNotAccessed(final long sessionId, final long delay) {
@@ -556,7 +578,18 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
     public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable()
{
       Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
-      for (Entry<Long,Session> entry : sessions.entrySet()) {
+      Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>();
+
+      synchronized (idleSessions) {
+        /**
+         * Add sessions so that get the list returned in the active scans call
+         */
+        for (Session session : idleSessions) {
+          copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session));
+        }
+      }
+
+      for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions))
{
 
         Session session = entry.getValue();
         @SuppressWarnings("rawtypes")
@@ -595,11 +628,20 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
     public synchronized List<ActiveScan> getActiveScans() {
 
-      ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
+      final List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
+      final long ct = System.currentTimeMillis();
+      final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>();
 
-      long ct = System.currentTimeMillis();
+      synchronized (idleSessions) {
+        /**
+         * Add sessions so that get the list returned in the active scans call
+         */
+        for (Session session : idleSessions) {
+          copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session));
+        }
+      }
 
-      for (Entry<Long,Session> entry : sessions.entrySet()) {
+      for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions))
{
         Session session = entry.getValue();
         if (session instanceof ScanSession) {
           ScanSession ss = (ScanSession) session;
@@ -841,8 +883,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     public AtomicBoolean interruptFlag;
 
     @Override
-    public void cleanup() {
+    public boolean cleanup() {
       interruptFlag.set(true);
+      return true;
     }
   }
 
@@ -879,13 +922,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
 
     @Override
-    public void cleanup() {
+    public boolean cleanup() {
       try {
         if (nextBatchTask != null)
           nextBatchTask.cancel(true);
       } finally {
         if (scanner != null)
-          scanner.close();
+          return scanner.close();
+        else
+          return true;
       }
     }
 
@@ -908,9 +953,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     public KeyExtent threadPoolExtent;
 
     @Override
-    public void cleanup() {
+    public boolean cleanup() {
       if (lookupTask != null)
         lookupTask.cancel(true);
+      // the cancellation should provide us the safety to return true here
+      return true;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
b/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
index 91fc9eb..6009462 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
@@ -49,7 +49,7 @@ public class ScanSessionTimeOutIT extends AccumuloClusterIT {
 
   @Override
   public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite)
{
-    cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_SESSION_MAXIDLE.getKey(), "3"));
+    cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString()));
   }
 
   @Override
@@ -63,12 +63,21 @@ public class ScanSessionTimeOutIT extends AccumuloClusterIT {
   public void reduceSessionIdle() throws Exception {
     InstanceOperations ops = getConnector().instanceOperations();
     sessionIdle = ops.getSystemConfiguration().get(Property.TSERV_SESSION_MAXIDLE.getKey());
-    ops.setProperty(Property.TSERV_SESSION_MAXIDLE.getKey(), "3");
+    ops.setProperty(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString());
     log.info("Waiting for existing session idle time to expire");
     Thread.sleep(AccumuloConfiguration.getTimeInMillis(sessionIdle));
     log.info("Finished waiting");
   }
 
+  /**
+   * Returns the max idle time as a string.
+   *
+   * @return new max idle time
+   */
+  protected String getMaxIdleTimeString() {
+    return "3";
+  }
+
   @After
   public void resetSessionIdle() throws Exception {
     if (null != sessionIdle) {
@@ -108,7 +117,7 @@ public class ScanSessionTimeOutIT extends AccumuloClusterIT {
 
   }
 
-  private void verify(Iterator<Entry<Key,Value>> iter, int start, int stop) throws
Exception {
+  protected void verify(Iterator<Entry<Key,Value>> iter, int start, int stop)
throws Exception {
     for (int i = start; i < stop; i++) {
 
       Text er = new Text(String.format("%08d", i));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/46ad8368/test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java
b/test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java
new file mode 100644
index 0000000..05f304b
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.ActiveScan;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Verify that we have resolved blocking issue by ensuring that we have not lost scan sessions
which we know to currently be running
+ */
+public class SessionBlockVerifyIT extends ScanSessionTimeOutIT {
+  private static final Logger log = LoggerFactory.getLogger(SessionBlockVerifyIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite)
{
+    Map<String,String> siteConfig = cfg.getSiteConfig();
+    cfg.setNumTservers(1);
+    siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString());
+    siteConfig.put(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey(), "11");
+    cfg.setSiteConfig(siteConfig);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Override
+  protected String getMaxIdleTimeString() {
+    return "1s";
+  }
+
+  ExecutorService service = Executors.newFixedThreadPool(10);
+
+  @Test
+  public void run() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+
+    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+
+    for (int i = 0; i < 1000; i++) {
+      Mutation m = new Mutation(new Text(String.format("%08d", i)));
+      for (int j = 0; j < 3; j++)
+        m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8)));
+
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    Scanner scanner = c.createScanner(tableName, new Authorizations());
+    scanner.setReadaheadThreshold(20000);
+    scanner.setRange(new Range(String.format("%08d", 0), String.format("%08d", 1000)));
+
+    // test by making a slow iterator and then a couple of fast ones.
+    // when then checking we shouldn't have any running except the slow iterator
+    IteratorSetting setting = new IteratorSetting(21, SlowIterator.class);
+    SlowIterator.setSeekSleepTime(setting, Long.MAX_VALUE);
+    SlowIterator.setSleepTime(setting, Long.MAX_VALUE);
+    scanner.addScanIterator(setting);
+
+    final Iterator<Entry<Key,Value>> slow = scanner.iterator();
+
+    final List<Future<Boolean>> callables = new ArrayList<Future<Boolean>>();
+    final CountDownLatch latch = new CountDownLatch(10);
+    for (int i = 0; i < 10; i++) {
+      Future<Boolean> callable = service.submit(new Callable<Boolean>() {
+        public Boolean call() {
+          latch.countDown();
+          while (slow.hasNext()) {
+
+            slow.next();
+          }
+          return slow.hasNext();
+        }
+      });
+      callables.add(callable);
+    }
+
+    latch.await();
+
+    log.info("Starting SessionBlockVerifyIT");
+
+    // let's add more for good measure.
+    for (int i = 0; i < 2; i++) {
+      Scanner scanner2 = c.createScanner(tableName, new Authorizations());
+
+      scanner2.setRange(new Range(String.format("%08d", 0), String.format("%08d", 1000)));
+
+      scanner2.setBatchSize(1);
+      Iterator<Entry<Key,Value>> iter = scanner2.iterator();
+      // call super's verify mechanism
+      verify(iter, 0, 1000);
+
+    }
+
+    int sessionsFound = 0;
+    // we have configured 1 tserver, so we can grab the one and only
+    String tserver = Iterables.getOnlyElement(c.instanceOperations().getTabletServers());
+
+    final List<ActiveScan> scans = c.instanceOperations().getActiveScans(tserver);
+
+    for (ActiveScan scan : scans) {
+      // only here to minimize chance of seeing meta extent scans
+
+      if (tableName.equals(scan.getTable()) && scan.getSsiList().size() > 0) {
+        assertEquals("Not the expected iterator", 1, scan.getSsiList().size());
+        assertTrue("Not the expected iterator", scan.getSsiList().iterator().next().contains("SlowIterator"));
+        sessionsFound++;
+      }
+
+    }
+
+    /**
+     * The message below indicates the problem that we experience within ACCUMULO-3509. The
issue manifests as a blockage in the Scanner synchronization that
+     * prevent us from making the close call against it. Since the close blocks until a read
is finished, we ultimately have a block within the sweep of
+     * SessionManager. As a result never reap subsequent idle sessions AND we will orphan
the sessionsToCleanup in the sweep, leading to an inaccurate count
+     * within sessionsFound.
+     */
+    assertEquals("Must have ten sessions. Failure indicates a synchronization block within
the sweep mechanism", 10, sessionsFound);
+    for (Future<Boolean> callable : callables) {
+      callable.cancel(true);
+    }
+    service.shutdown();
+  }
+
+}


Mime
View raw message