accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject git commit: ACCUMULO-2259 moved ScanTask classes to scan package, moved JMX impl to metrics package
Date Tue, 17 Jun 2014 13:47:03 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 3ed4b2796 -> f2aeb14c7


ACCUMULO-2259 moved ScanTask classes to scan package, moved JMX impl to metrics package


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f2aeb14c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f2aeb14c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f2aeb14c

Branch: refs/heads/master
Commit: f2aeb14c77fc960f7cf18073120d6e77f3f68a46
Parents: 3ed4b27
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Tue Jun 17 09:46:44 2014 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Tue Jun 17 09:47:06 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/SessionManager.java | 314 -----------
 .../apache/accumulo/tserver/TabletServer.java   | 545 ++-----------------
 .../apache/accumulo/tserver/log/DfsLogger.java  |   2 +-
 .../tserver/metrics/TabletServerMBeanImpl.java  | 206 +++++++
 .../accumulo/tserver/scan/LookupTask.java       | 173 ++++++
 .../accumulo/tserver/scan/NextBatchTask.java    |  96 ++++
 .../accumulo/tserver/scan/ScanRunState.java     |  21 +
 .../apache/accumulo/tserver/scan/ScanTask.java  | 129 +++++
 .../tserver/session/MultiScanSession.java       |   2 +-
 .../accumulo/tserver/session/ScanSession.java   |   2 +-
 .../tserver/session/SessionManager.java         | 311 +++++++++++
 11 files changed, 993 insertions(+), 808 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f2aeb14c/server/tserver/src/main/java/org/apache/accumulo/tserver/SessionManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/SessionManager.java
deleted file mode 100644
index b200c9e..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/SessionManager.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver;
-
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TimerTask;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.impl.Translator;
-import org.apache.accumulo.core.client.impl.Translators;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.thrift.MultiScanResult;
-import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
-import org.apache.accumulo.core.tabletserver.thrift.ScanState;
-import org.apache.accumulo.core.tabletserver.thrift.ScanType;
-import org.apache.accumulo.core.util.MapCounter;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.tserver.TabletServer.ScanRunState;
-import org.apache.accumulo.tserver.TabletServer.ScanTask;
-import org.apache.accumulo.tserver.session.MultiScanSession;
-import org.apache.accumulo.tserver.session.ScanSession;
-import org.apache.accumulo.tserver.session.Session;
-import org.apache.accumulo.tserver.tablet.ScanBatch;
-
-public class SessionManager {
-
-  private final SecureRandom random = new SecureRandom();
-  private final Map<Long,Session> sessions = new HashMap<Long,Session>();
-  private final long maxIdle;
-  private final AccumuloConfiguration aconf;
-
-  SessionManager(AccumuloConfiguration conf) {
-    aconf = conf;
-    maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
-
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        sweep(maxIdle);
-      }
-    };
-
-    SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
-  }
-
-  synchronized long createSession(Session session, boolean reserve) {
-    long sid = random.nextLong();
-
-    while (sessions.containsKey(sid)) {
-      sid = random.nextLong();
-    }
-
-    sessions.put(sid, session);
-
-    session.reserved = reserve;
-
-    session.startTime = session.lastAccessTime = System.currentTimeMillis();
-
-    return sid;
-  }
-
-  long getMaxIdleTime() {
-    return maxIdle;
-  }
-
-  /**
-   * while a session is reserved, it cannot be canceled or removed
-   *
-   * @param sessionId
-   */
-
-  synchronized Session reserveSession(long sessionId) {
-    Session session = sessions.get(sessionId);
-    if (session != null) {
-      if (session.reserved)
-        throw new IllegalStateException();
-      session.reserved = true;
-    }
-
-    return session;
-
-  }
-
-  synchronized Session reserveSession(long sessionId, boolean wait) {
-    Session session = sessions.get(sessionId);
-    if (session != null) {
-      while (wait && session.reserved) {
-        try {
-          wait(1000);
-        } catch (InterruptedException e) {
-          throw new RuntimeException();
-        }
-      }
-
-      if (session.reserved)
-        throw new IllegalStateException();
-      session.reserved = true;
-    }
-
-    return session;
-
-  }
-
-  synchronized void unreserveSession(Session session) {
-    if (!session.reserved)
-      throw new IllegalStateException();
-    notifyAll();
-    session.reserved = false;
-    session.lastAccessTime = System.currentTimeMillis();
-  }
-
-  synchronized void unreserveSession(long sessionId) {
-    Session session = getSession(sessionId);
-    if (session != null)
-      unreserveSession(session);
-  }
-
-  synchronized Session getSession(long sessionId) {
-    Session session = sessions.get(sessionId);
-    if (session != null)
-      session.lastAccessTime = System.currentTimeMillis();
-    return session;
-  }
-
-  Session removeSession(long sessionId) {
-    return removeSession(sessionId, false);
-  }
-
-  Session removeSession(long sessionId, boolean unreserve) {
-    Session session = null;
-    synchronized (this) {
-      session = sessions.remove(sessionId);
-      if (unreserve && session != null)
-        unreserveSession(session);
-    }
-
-    // do clean up out side of lock..
-    if (session != null)
-      session.cleanup();
-
-    return session;
-  }
-
-  private void sweep(long maxIdle) {
-    List<Session> sessionsToCleanup = new ArrayList<Session>();
-    synchronized (this) {
-      Iterator<Session> iter = sessions.values().iterator();
-      while (iter.hasNext()) {
-        Session session = iter.next();
-        long idleTime = System.currentTimeMillis() - session.lastAccessTime;
-        if (idleTime > maxIdle && !session.reserved) {
-          iter.remove();
-          sessionsToCleanup.add(session);
-        }
-      }
-    }
-
-    // do clean up outside of lock
-    for (Session session : sessionsToCleanup) {
-      session.cleanup();
-    }
-  }
-
-  synchronized void removeIfNotAccessed(final long sessionId, long delay) {
-    Session session = sessions.get(sessionId);
-    if (session != null) {
-      final long removeTime = session.lastAccessTime;
-      TimerTask r = new TimerTask() {
-        @Override
-        public void run() {
-          Session sessionToCleanup = null;
-          synchronized (SessionManager.this) {
-            Session session2 = sessions.get(sessionId);
-            if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
-              sessions.remove(sessionId);
-              sessionToCleanup = session2;
-            }
-          }
-
-          // call clean up outside of lock
-          if (sessionToCleanup != null)
-            sessionToCleanup.cleanup();
-        }
-      };
-
-      SimpleTimer.getInstance(aconf).schedule(r, delay);
-    }
-  }
-
-  public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
-    Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
-    for (Entry<Long,Session> entry : sessions.entrySet()) {
-
-      Session session = entry.getValue();
-      @SuppressWarnings("rawtypes")
-      ScanTask nbt = null;
-      String tableID = null;
-
-      if (session instanceof ScanSession) {
-        ScanSession ss = (ScanSession) session;
-        nbt = ss.nextBatchTask;
-        tableID = ss.extent.getTableId().toString();
-      } else if (session instanceof MultiScanSession) {
-        MultiScanSession mss = (MultiScanSession) session;
-        nbt = mss.lookupTask;
-        tableID = mss.threadPoolExtent.getTableId().toString();
-      }
-
-      if (nbt == null)
-        continue;
-
-      ScanRunState srs = nbt.getScanRunState();
-
-      if (srs == ScanRunState.FINISHED)
-        continue;
-
-      MapCounter<ScanRunState> stateCounts = counts.get(tableID);
-      if (stateCounts == null) {
-        stateCounts = new MapCounter<ScanRunState>();
-        counts.put(tableID, stateCounts);
-      }
-
-      stateCounts.increment(srs, 1);
-    }
-
-    return counts;
-  }
-
-  public synchronized List<ActiveScan> getActiveScans() {
-
-    List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
-
-    long ct = System.currentTimeMillis();
-
-    for (Entry<Long,Session> entry : sessions.entrySet()) {
-      Session session = entry.getValue();
-      if (session instanceof ScanSession) {
-        ScanSession ss = (ScanSession) session;
-
-        ScanState state = ScanState.RUNNING;
-
-        ScanTask<ScanBatch> nbt = ss.nextBatchTask;
-        if (nbt == null) {
-          state = ScanState.IDLE;
-        } else {
-          switch (nbt.getScanRunState()) {
-            case QUEUED:
-              state = ScanState.QUEUED;
-              break;
-            case FINISHED:
-              state = ScanState.IDLE;
-              break;
-            case RUNNING:
-            default:
-              /* do nothing */
-              break;
-          }
-        }
-
-        activeScans.add(new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
-            state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
-
-      } else if (session instanceof MultiScanSession) {
-        MultiScanSession mss = (MultiScanSession) session;
-
-        ScanState state = ScanState.RUNNING;
-
-        ScanTask<MultiScanResult> nbt = mss.lookupTask;
-        if (nbt == null) {
-          state = ScanState.IDLE;
-        } else {
-          switch (nbt.getScanRunState()) {
-            case QUEUED:
-              state = ScanState.QUEUED;
-              break;
-            case FINISHED:
-              state = ScanState.IDLE;
-              break;
-            case RUNNING:
-            default:
-              /* do nothing */
-              break;
-          }
-        }
-
-        activeScans.add(new ActiveScan(mss.client, mss.getUser(), mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
-            ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths
-                .getAuthorizationsBB()));
-      }
-    }
-
-    return activeScans;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f2aeb14c/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index a9ad8ed..f67c51f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -40,21 +41,16 @@ import java.util.SortedSet;
 import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
-import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
 import org.apache.accumulo.core.Constants;
@@ -94,7 +90,6 @@ import org.apache.accumulo.core.data.thrift.TColumn;
 import org.apache.accumulo.core.data.thrift.TCondition;
 import org.apache.accumulo.core.data.thrift.TConditionalMutation;
 import org.apache.accumulo.core.data.thrift.TConditionalSession;
-import org.apache.accumulo.core.data.thrift.TKey;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.data.thrift.TKeyValue;
 import org.apache.accumulo.core.data.thrift.TMutation;
@@ -163,7 +158,6 @@ import org.apache.accumulo.server.master.state.TabletLocationState;
 import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
 import org.apache.accumulo.server.master.state.TabletStateStore;
 import org.apache.accumulo.server.master.state.ZooTabletStateStore;
-import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -201,14 +195,20 @@ import org.apache.accumulo.tserver.mastermessage.MasterMessage;
 import org.apache.accumulo.tserver.mastermessage.SplitReportMessage;
 import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
 import org.apache.accumulo.tserver.metrics.TabletServerMBean;
+import org.apache.accumulo.tserver.metrics.TabletServerMBeanImpl;
 import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
 import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
 import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
 import org.apache.accumulo.tserver.replication.ReplicationServicerHandler;
 import org.apache.accumulo.tserver.replication.ReplicationWorker;
+import org.apache.accumulo.tserver.scan.LookupTask;
+import org.apache.accumulo.tserver.scan.NextBatchTask;
+import org.apache.accumulo.tserver.scan.ScanRunState;
 import org.apache.accumulo.tserver.session.ConditionalSession;
 import org.apache.accumulo.tserver.session.MultiScanSession;
 import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.Session;
+import org.apache.accumulo.tserver.session.SessionManager;
 import org.apache.accumulo.tserver.session.UpdateSession;
 import org.apache.accumulo.tserver.tablet.CommitSession;
 import org.apache.accumulo.tserver.tablet.CompactionInfo;
@@ -219,7 +219,6 @@ import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.apache.accumulo.tserver.tablet.Scanner;
 import org.apache.accumulo.tserver.tablet.SplitInfo;
 import org.apache.accumulo.tserver.tablet.Tablet;
-import org.apache.accumulo.tserver.tablet.Tablet.LookupResult;
 import org.apache.accumulo.tserver.tablet.TabletClosedException;
 import org.apache.commons.collections.map.LRUMap;
 import org.apache.hadoop.fs.FSError;
@@ -236,13 +235,8 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
 
 import com.google.common.net.HostAndPort;
 
-public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.tserver.metrics.TabletServerMBean {
-  static enum ScanRunState {
-    QUEUED, RUNNING, FINISHED
-  }
-
+public class TabletServer implements Runnable {
   private static final Logger log = Logger.getLogger(TabletServer.class);
-  private static final String METRICS_PREFIX = "tserver";
   private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
   private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
   private static final Set<Column> EMPTY_COLUMNS = Collections.emptySet();
@@ -298,8 +292,6 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
   private String lockID;
 
-  private static ObjectName OBJECT_NAME = null;
-
   public static final AtomicLong seekCount = new AtomicLong(0);
   
   private final AtomicLong totalMinorCompactions = new AtomicLong(0);
@@ -310,6 +302,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     this.fs = fs;
     AccumuloConfiguration aconf = getSystemConfiguration();
     Instance instance = getInstance();
+    this.sessionManager = new SessionManager(aconf);
     this.logSorter = new LogSorter(instance, fs, aconf);
     this.replWorker = new ReplicationWorker(instance, fs, aconf);
     this.statsKeeper = new TabletStatsKeeper();
@@ -339,121 +332,21 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     this.resourceManager = new TabletServerResourceManager(getInstance(), fs);
   }
 
-  public static abstract class ScanTask<T> implements RunnableFuture<T> {
-
-    protected AtomicBoolean interruptFlag;
-    protected ArrayBlockingQueue<Object> resultQueue;
-    protected AtomicInteger state;
-    protected AtomicReference<ScanRunState> runState;
-
-    private static final int INITIAL = 1;
-    private static final int ADDED = 2;
-    private static final int CANCELED = 3;
-
-    ScanTask() {
-      interruptFlag = new AtomicBoolean(false);
-      runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
-      state = new AtomicInteger(INITIAL);
-      resultQueue = new ArrayBlockingQueue<Object>(1);
-    }
-
-    protected void addResult(Object o) {
-      if (state.compareAndSet(INITIAL, ADDED))
-        resultQueue.add(o);
-      else if (state.get() == ADDED)
-        throw new IllegalStateException("Tried to add more than one result");
-    }
-
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-      if (!mayInterruptIfRunning)
-        throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
-
-      if (state.get() == CANCELED)
-        return true;
-
-      if (state.compareAndSet(INITIAL, CANCELED)) {
-        interruptFlag.set(true);
-        resultQueue = null;
-        return true;
-      }
-
-      return false;
-    }
-
-    @Override
-    public T get() throws InterruptedException, ExecutionException {
-      throw new UnsupportedOperationException();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-
-      ArrayBlockingQueue<Object> localRQ = resultQueue;
-
-      if (state.get() == CANCELED)
-        throw new CancellationException();
-
-      if (localRQ == null && state.get() == ADDED)
-        throw new IllegalStateException("Tried to get result twice");
-
-      Object r = localRQ.poll(timeout, unit);
-
-      // could have been canceled while waiting
-      if (state.get() == CANCELED) {
-        if (r != null)
-          throw new IllegalStateException("Nothing should have been added when in canceled state");
-
-        throw new CancellationException();
-      }
-
-      if (r == null)
-        throw new TimeoutException();
-
-      // make this method stop working now that something is being
-      // returned
-      resultQueue = null;
-
-      if (r instanceof Throwable)
-        throw new ExecutionException((Throwable) r);
-
-      return (T) r;
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return state.get() == CANCELED;
-    }
-
-    @Override
-    public boolean isDone() {
-      return runState.get().equals(ScanRunState.FINISHED);
-    }
-
-    public ScanRunState getScanRunState() {
-      return runState.get();
-    }
-
-  }
-
   public AccumuloConfiguration getSystemConfiguration() {
     return serverConfig.getConfiguration();
   }
 
-  private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
-
-    private final SessionManager sessionManager = new SessionManager(getSystemConfiguration());
+  private final SessionManager sessionManager;
 
-    private final AccumuloConfiguration acuConf = getSystemConfiguration();
+  private final TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
 
-    private final TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
+  private final TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
 
-    private final TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
+  private final WriteTracker writeTracker = new WriteTracker();
 
-    private final WriteTracker writeTracker = new WriteTracker();
+  private final RowLocks rowLocks = new RowLocks();
 
-    private final RowLocks rowLocks = new RowLocks();
+  private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
 
     ThriftClientHandler() {
       super(getInstance(), watcher, fs);
@@ -503,195 +396,6 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       return failures;
     }
 
-    private class NextBatchTask extends ScanTask<ScanBatch> {
-
-      private long scanID;
-
-      NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
-        this.scanID = scanID;
-        this.interruptFlag = interruptFlag;
-
-        if (interruptFlag.get())
-          cancel(true);
-      }
-
-      @Override
-      public void run() {
-
-        final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
-        String oldThreadName = Thread.currentThread().getName();
-
-        try {
-          if (isCancelled() || scanSession == null)
-            return;
-
-          runState.set(ScanRunState.RUNNING);
-
-          Thread.currentThread().setName(
-              "User: " + scanSession.getUser() + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
-
-          Tablet tablet = onlineTablets.get(scanSession.extent);
-
-          if (tablet == null) {
-            addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
-            return;
-          }
-
-          long t1 = System.currentTimeMillis();
-          ScanBatch batch = scanSession.scanner.read();
-          long t2 = System.currentTimeMillis();
-          scanSession.nbTimes.addStat(t2 - t1);
-
-          // there should only be one thing on the queue at a time, so
-          // it should be ok to call add()
-          // instead of put()... if add() fails because queue is at
-          // capacity it means there is code
-          // problem somewhere
-          addResult(batch);
-        } catch (TabletClosedException e) {
-          addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
-        } catch (IterationInterruptedException iie) {
-          if (!isCancelled()) {
-            log.warn("Iteration interrupted, when scan not cancelled", iie);
-            addResult(iie);
-          }
-        } catch (TooManyFilesException tmfe) {
-          addResult(tmfe);
-        } catch (Throwable e) {
-          log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)" : scanSession.extent), e);
-          addResult(e);
-        } finally {
-          runState.set(ScanRunState.FINISHED);
-          Thread.currentThread().setName(oldThreadName);
-        }
-
-      }
-    }
-
-    private class LookupTask extends ScanTask<MultiScanResult> {
-
-      private final long scanID;
-
-      LookupTask(long scanID) {
-        this.scanID = scanID;
-      }
-
-      @Override
-      public void run() {
-        MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
-        String oldThreadName = Thread.currentThread().getName();
-
-        try {
-          if (isCancelled() || session == null)
-            return;
-
-          TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(getInstance(), session.threadPoolExtent.getTableId().toString());
-          long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
-
-          runState.set(ScanRunState.RUNNING);
-          Thread.currentThread().setName("Client: " + session.client + " User: " + session.getUser() + " Start: " + session.startTime + " Table: ");
-
-          long bytesAdded = 0;
-          long maxScanTime = 4000;
-
-          long startTime = System.currentTimeMillis();
-
-          List<KVEntry> results = new ArrayList<KVEntry>();
-          Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
-          List<KeyExtent> fullScans = new ArrayList<KeyExtent>();
-          KeyExtent partScan = null;
-          Key partNextKey = null;
-          boolean partNextKeyInclusive = false;
-
-          Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
-
-          // check the time so that the read ahead thread is not monopolized
-          while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
-            Entry<KeyExtent,List<Range>> entry = iter.next();
-
-            iter.remove();
-
-            // check that tablet server is serving requested tablet
-            Tablet tablet = onlineTablets.get(entry.getKey());
-            if (tablet == null) {
-              failures.put(entry.getKey(), entry.getValue());
-              continue;
-            }
-            Thread.currentThread().setName(
-                "Client: " + session.client + " User: " + session.getUser() + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
-
-            LookupResult lookupResult;
-            try {
-
-              // do the following check to avoid a race condition
-              // between setting false below and the task being
-              // canceled
-              if (isCancelled())
-                interruptFlag.set(true);
-
-              lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
-                  session.ssio, interruptFlag);
-
-              // if the tablet was closed it it possible that the
-              // interrupt flag was set.... do not want it set for
-              // the next
-              // lookup
-              interruptFlag.set(false);
-
-            } catch (IOException e) {
-              log.warn("lookup failed for tablet " + entry.getKey(), e);
-              throw new RuntimeException(e);
-            }
-
-            bytesAdded += lookupResult.bytesAdded;
-
-            if (lookupResult.unfinishedRanges.size() > 0) {
-              if (lookupResult.closed) {
-                failures.put(entry.getKey(), lookupResult.unfinishedRanges);
-              } else {
-                session.queries.put(entry.getKey(), lookupResult.unfinishedRanges);
-                partScan = entry.getKey();
-                partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
-                partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
-              }
-            } else {
-              fullScans.add(entry.getKey());
-            }
-          }
-
-          long finishTime = System.currentTimeMillis();
-          session.totalLookupTime += (finishTime - startTime);
-          session.numEntries += results.size();
-
-          // convert everything to thrift before adding result
-          List<TKeyValue> retResults = new ArrayList<TKeyValue>();
-          for (KVEntry entry : results)
-            retResults.add(new TKeyValue(entry.getKey().toThrift(), ByteBuffer.wrap(entry.getValue().get())));
-          Map<TKeyExtent,List<TRange>> retFailures = Translator.translate(failures, Translators.KET, new Translator.ListTranslator<Range,TRange>(Translators.RT));
-          List<TKeyExtent> retFullScans = Translator.translate(fullScans, Translators.KET);
-          TKeyExtent retPartScan = null;
-          TKey retPartNextKey = null;
-          if (partScan != null) {
-            retPartScan = partScan.toThrift();
-            retPartNextKey = partNextKey.toThrift();
-          }
-          // add results to queue
-          addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey, partNextKeyInclusive, session.queries.size() != 0));
-        } catch (IterationInterruptedException iie) {
-          if (!isCancelled()) {
-            log.warn("Iteration interrupted, when scan not cancelled", iie);
-            addResult(iie);
-          }
-        } catch (Throwable e) {
-          log.warn("exception while doing multi-scan ", e);
-          addResult(e);
-        } finally {
-          Thread.currentThread().setName(oldThreadName);
-          runState.set(ScanRunState.FINISHED);
-        }
-      }
-    }
-
     @Override
     public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
         List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated,
@@ -765,7 +469,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
 
       if (scanSession.nextBatchTask == null) {
-        scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
+        scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, scanID, scanSession.interruptFlag);
         resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
       }
 
@@ -790,7 +494,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
           throw new NoSuchScanIDException();
       } catch (TimeoutException e) {
         List<TKeyValue> param = Collections.emptyList();
-        long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+        long timeout = getSystemConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
         sessionManager.removeIfNotAccessed(scanID, timeout);
         return new ScanResult(param, true);
       } catch (Throwable t) {
@@ -808,7 +512,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) {
         // start reading next batch while current batch is transmitted
         // to client
-        scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
+        scanSession.nextBatchTask = new NextBatchTask(TabletServer.this, scanID, scanSession.interruptFlag);
         resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
       }
 
@@ -910,7 +614,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException {
 
       if (session.lookupTask == null) {
-        session.lookupTask = new LookupTask(scanID);
+        session.lookupTask = new LookupTask(TabletServer.this, scanID);
         resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
       }
 
@@ -919,7 +623,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         session.lookupTask = null;
         return scanResult;
       } catch (TimeoutException e1) {
-        long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+        long timeout = getSystemConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
         sessionManager.removeIfNotAccessed(scanID, timeout);
         List<TKeyValue> results = Collections.emptyList();
         Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
@@ -1988,7 +1692,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
         try {
           Path source = new Path(filename);
-          if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
+          if (getSystemConfiguration().getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
             Path walogArchive = fs.matchingFileSystem(source, ServerConstants.getWalogArchives());
             fs.mkdirs(walogArchive);
             Path dest = new Path(walogArchive, source.getName());
@@ -1998,7 +1702,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
           } else {
             log.info("Deleting walog " + filename);
             Path sourcePath = new Path(filename);
-            if (!(!acuConf.getBoolean(Property.GC_TRASH_IGNORE) && fs.moveToTrash(sourcePath)) && !fs.deleteRecursively(sourcePath))
+            if (!(!getSystemConfiguration().getBoolean(Property.GC_TRASH_IGNORE) && fs.moveToTrash(sourcePath)) && !fs.deleteRecursively(sourcePath))
               log.warn("Failed to delete walog " + source);
             for (String recovery : ServerConstants.getRecoveryDirs()) {
               Path recoveryPath = new Path(recovery, source.getName());
@@ -2059,6 +1763,14 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     return majorCompactorDisabled;
   }
 
+  public Tablet getOnlineTablet(KeyExtent extent) {
+    return onlineTablets.get(extent);
+  }
+
+  public Session getSession(long sessionId) {
+    return sessionManager.getSession(sessionId);
+  }
+
   public void executeSplit(Tablet tablet) {
     resourceManager.executeSplit(tablet.getExtent(), new LoggingRunnable(log, new SplitRunner(tablet)));
   }
@@ -2678,10 +2390,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000);
 
     try {
-      OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
       // Do this because interface not in same package.
-      StandardMBean mbean = new StandardMBean(this, TabletServerMBean.class, false);
-      this.register(mbean);
+      TabletServerMBeanImpl beanImpl = new TabletServerMBeanImpl(this);
+      StandardMBean mbean = new StandardMBean(beanImpl, TabletServerMBean.class, false);
+      beanImpl.register(mbean);
       mincMetrics.register();
     } catch (Exception e) {
       log.error("Error registering with JMX", e);
@@ -3196,176 +2908,6 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     return -1;
   }
 
-  // JMX methods
-
-  @Override
-  public long getEntries() {
-    if (this.isEnabled()) {
-      long result = 0;
-      for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        result += tablet.getNumEntries();
-      }
-      return result;
-    }
-    return 0;
-  }
-
-  @Override
-  public long getEntriesInMemory() {
-    if (this.isEnabled()) {
-      long result = 0;
-      for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        result += tablet.getNumEntriesInMemory();
-      }
-      return result;
-    }
-    return 0;
-  }
-
-  @Override
-  public long getIngest() {
-    if (this.isEnabled()) {
-      long result = 0;
-      for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        result += tablet.getNumEntriesInMemory();
-      }
-      return result;
-    }
-    return 0;
-  }
-
-  @Override
-  public int getMajorCompactions() {
-    if (this.isEnabled()) {
-      int result = 0;
-      for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        if (tablet.isMajorCompactionRunning())
-          result++;
-      }
-      return result;
-    }
-    return 0;
-  }
-
-  @Override
-  public int getMajorCompactionsQueued() {
-    if (this.isEnabled()) {
-      int result = 0;
-      for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        if (tablet.isMajorCompactionQueued())
-          result++;
-      }
-      return result;
-    }
-    return 0;
-  }
-
-  @Override
-  public int getMinorCompactions() {
-    if (this.isEnabled()) {
-      int result = 0;
-      for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        if (tablet.isMinorCompactionRunning())
-          result++;
-      }
-      return result;
-    }
-    return 0;
-  }
-
-  @Override
-  public int getMinorCompactionsQueued() {
-    if (this.isEnabled()) {
-      int result = 0;
-      for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        if (tablet.isMinorCompactionQueued())
-          result++;
-      }
-      return result;
-    }
-    return 0;
-  }
-
-  @Override
-  public int getOnlineCount() {
-    if (this.isEnabled())
-      return onlineTablets.size();
-    return 0;
-  }
-
-  @Override
-  public int getOpeningCount() {
-    if (this.isEnabled())
-      return openingTablets.size();
-    return 0;
-  }
-
-  @Override
-  public long getQueries() {
-    if (this.isEnabled()) {
-      long result = 0;
-      for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        result += tablet.totalQueries();
-      }
-      return result;
-    }
-    return 0;
-  }
-
-  @Override
-  public int getUnopenedCount() {
-    if (this.isEnabled())
-      return unopenedTablets.size();
-    return 0;
-  }
-
-  @Override
-  public String getName() {
-    if (this.isEnabled())
-      return getClientAddressString();
-    return "";
-  }
-
-  @Override
-  public long getTotalMinorCompactions() {
-    if (this.isEnabled())
-      return totalMinorCompactions.get();
-    return 0;
-  }
-
-  @Override
-  public double getHoldTime() {
-    if (this.isEnabled())
-      return this.resourceManager.holdTime() / 1000.;
-    return 0;
-  }
-
-  @Override
-  public double getAverageFilesPerTablet() {
-    if (this.isEnabled()) {
-      int count = 0;
-      long result = 0;
-      for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        result += tablet.getDatafiles().size();
-        count++;
-      }
-      if (count == 0)
-        return 0;
-      return result / (double) count;
-    }
-    return 0;
-  }
-
-  @Override
-  protected ObjectName getObjectName() {
-    return OBJECT_NAME;
-  }
-
-  @Override
-  protected String getMetricsPrefix() {
-    return METRICS_PREFIX;
-  }
-
   public TableConfiguration getTableConfiguration(KeyExtent extent) {
     return ServerConfiguration.getTableConfiguration(getInstance(), extent.getTableId().toString());
   }
@@ -3390,8 +2932,29 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     };
   }
 
+
+  public Collection<Tablet> getOnlineTablets() {
+    return Collections.unmodifiableCollection(onlineTablets.values());
+  }
+
   public VolumeManager getFileSystem() {
     return fs;
   }
 
+  public int getOpeningCount() {
+    return openingTablets.size();
+  }
+
+  public int getUnopenedCount() {
+    return unopenedTablets.size();
+  }
+
+  public long getTotalMinorCompactions() {
+    return totalMinorCompactions.get();
+  }
+
+  public double getHoldTimeMillis() {
+    return resourceManager.holdTime();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f2aeb14c/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index dfe6a8a..c01e54a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -229,7 +229,7 @@ public class DfsLogger {
   }
 
   /**
-   * Refernce a pre-existing log file.
+   * Reference a pre-existing log file.
    * @param meta the cq for the "log" entry in +r/!0
    */
   public DfsLogger(ServerResources conf, String filename, String meta) throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f2aeb14c/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBeanImpl.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBeanImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBeanImpl.java
new file mode 100644
index 0000000..3970379
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBeanImpl.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.metrics;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.accumulo.tserver.tablet.Tablet;
+
+public class TabletServerMBeanImpl extends AbstractMetricsImpl implements TabletServerMBean {
+
+  private static final String METRICS_PREFIX = "tserver";
+  private static ObjectName OBJECT_NAME = null;
+
+  final TabletServer server;
+  
+  public TabletServerMBeanImpl(TabletServer server) throws MalformedObjectNameException {
+    this.server = server;
+    OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
+  }
+  
+  @Override
+  public long getEntries() {
+    if (isEnabled()) {
+      long result = 0;
+      for (Tablet tablet : server.getOnlineTablets()) {
+        result += tablet.getNumEntries();
+      }
+      return result;
+    }
+    return 0;
+  }
+
+  @Override
+  public long getEntriesInMemory() {
+    if (isEnabled()) {
+      long result = 0;
+      for (Tablet tablet : server.getOnlineTablets()) {
+        result += tablet.getNumEntriesInMemory();
+      }
+      return result;
+    }
+    return 0;
+  }
+
+  @Override
+  public long getIngest() {
+    if (isEnabled()) {
+      long result = 0;
+      for (Tablet tablet : server.getOnlineTablets()) {
+        result += tablet.getNumEntriesInMemory();
+      }
+      return result;
+    }
+    return 0;
+  }
+
+  @Override
+  public int getMajorCompactions() {
+    if (isEnabled()) {
+      int result = 0;
+      for (Tablet tablet : server.getOnlineTablets()) {
+        if (tablet.isMajorCompactionRunning())
+          result++;
+      }
+      return result;
+    }
+    return 0;
+  }
+
+  @Override
+  public int getMajorCompactionsQueued() {
+    if (isEnabled()) {
+      int result = 0;
+      for (Tablet tablet : server.getOnlineTablets()) {
+        if (tablet.isMajorCompactionQueued())
+          result++;
+      }
+      return result;
+    }
+    return 0;
+  }
+
+  @Override
+  public int getMinorCompactions() {
+    if (isEnabled()) {
+      int result = 0;
+      for (Tablet tablet : server.getOnlineTablets()) {
+        if (tablet.isMinorCompactionRunning())
+          result++;
+      }
+      return result;
+    }
+    return 0;
+  }
+
+  @Override
+  public int getMinorCompactionsQueued() {
+    if (isEnabled()) {
+      int result = 0;
+      for (Tablet tablet : server.getOnlineTablets()) {
+        if (tablet.isMinorCompactionQueued())
+          result++;
+      }
+      return result;
+    }
+    return 0;
+  }
+
+  @Override
+  public int getOnlineCount() {
+    if (isEnabled())
+      return server.getOnlineTablets().size();
+    return 0;
+  }
+
+  @Override
+  public int getOpeningCount() {
+    if (isEnabled())
+      return server.getOpeningCount();
+    return 0;
+  }
+
+  @Override
+  public long getQueries() {
+    if (isEnabled()) {
+      long result = 0;
+      for (Tablet tablet : server.getOnlineTablets()) {
+        result += tablet.totalQueries();
+      }
+      return result;
+    }
+    return 0;
+  }
+
+  @Override
+  public int getUnopenedCount() {
+    if (isEnabled())
+      return server.getUnopenedCount();
+    return 0;
+  }
+
+  @Override
+  public String getName() {
+    if (isEnabled())
+      return server.getClientAddressString();
+    return "";
+  }
+
+  @Override
+  public long getTotalMinorCompactions() {
+    if (isEnabled())
+      return server.getTotalMinorCompactions();
+    return 0;
+  }
+
+  @Override
+  public double getHoldTime() {
+    if (isEnabled())
+      return server.getHoldTimeMillis() / 1000.;
+    return 0;
+  }
+
+  @Override
+  public double getAverageFilesPerTablet() {
+    if (isEnabled()) {
+      int count = 0;
+      long result = 0;
+      for (Tablet tablet : server.getOnlineTablets()) {
+        result += tablet.getDatafiles().size();
+        count++;
+      }
+      if (count == 0)
+        return 0;
+      return result / (double) count;
+    }
+    return 0;
+  }
+
+  @Override
+  protected ObjectName getObjectName() {
+    return OBJECT_NAME;
+  }
+
+  @Override
+  protected String getMetricsPrefix() {
+    return METRICS_PREFIX;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f2aeb14c/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
new file mode 100644
index 0000000..184dc4e
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.scan;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.Translators;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.data.thrift.TKey;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.data.thrift.TKeyValue;
+import org.apache.accumulo.core.data.thrift.TRange;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.tablet.KVEntry;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.Tablet.LookupResult;
+import org.apache.log4j.Logger;
+
+public class LookupTask extends ScanTask<MultiScanResult> {
+
+  private static final Logger log = Logger.getLogger(LookupTask.class);
+  
+  private final long scanID;
+
+  public LookupTask(TabletServer server, long scanID) {
+    super(server);
+    this.scanID = scanID;
+  }
+
+  @Override
+  public void run() {
+    MultiScanSession session = (MultiScanSession) server.getSession(scanID);
+    String oldThreadName = Thread.currentThread().getName();
+
+    try {
+      if (isCancelled() || session == null)
+        return;
+
+      TableConfiguration acuTableConf = server.getTableConfiguration(session.threadPoolExtent);
+      long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
+
+      runState.set(ScanRunState.RUNNING);
+      Thread.currentThread().setName("Client: " + session.client + " User: " + session.getUser() + " Start: " + session.startTime + " Table: ");
+
+      long bytesAdded = 0;
+      long maxScanTime = 4000;
+
+      long startTime = System.currentTimeMillis();
+
+      List<KVEntry> results = new ArrayList<KVEntry>();
+      Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
+      List<KeyExtent> fullScans = new ArrayList<KeyExtent>();
+      KeyExtent partScan = null;
+      Key partNextKey = null;
+      boolean partNextKeyInclusive = false;
+
+      Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
+
+      // check the time so that the read ahead thread is not monopolized
+      while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
+        Entry<KeyExtent,List<Range>> entry = iter.next();
+
+        iter.remove();
+
+        // check that tablet server is serving requested tablet
+        Tablet tablet = server.getOnlineTablet(entry.getKey());
+        if (tablet == null) {
+          failures.put(entry.getKey(), entry.getValue());
+          continue;
+        }
+        Thread.currentThread().setName(
+            "Client: " + session.client + " User: " + session.getUser() + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
+
+        LookupResult lookupResult;
+        try {
+
+          // do the following check to avoid a race condition
+          // between setting false below and the task being
+          // canceled
+          if (isCancelled())
+            interruptFlag.set(true);
+
+          lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
+              session.ssio, interruptFlag);
+
+          // if the tablet was closed it it possible that the
+          // interrupt flag was set.... do not want it set for
+          // the next
+          // lookup
+          interruptFlag.set(false);
+
+        } catch (IOException e) {
+          log.warn("lookup failed for tablet " + entry.getKey(), e);
+          throw new RuntimeException(e);
+        }
+
+        bytesAdded += lookupResult.bytesAdded;
+
+        if (lookupResult.unfinishedRanges.size() > 0) {
+          if (lookupResult.closed) {
+            failures.put(entry.getKey(), lookupResult.unfinishedRanges);
+          } else {
+            session.queries.put(entry.getKey(), lookupResult.unfinishedRanges);
+            partScan = entry.getKey();
+            partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
+            partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
+          }
+        } else {
+          fullScans.add(entry.getKey());
+        }
+      }
+
+      long finishTime = System.currentTimeMillis();
+      session.totalLookupTime += (finishTime - startTime);
+      session.numEntries += results.size();
+
+      // convert everything to thrift before adding result
+      List<TKeyValue> retResults = new ArrayList<TKeyValue>();
+      for (KVEntry entry : results)
+        retResults.add(new TKeyValue(entry.getKey().toThrift(), ByteBuffer.wrap(entry.getValue().get())));
+      Map<TKeyExtent,List<TRange>> retFailures = Translator.translate(failures, Translators.KET, new Translator.ListTranslator<Range,TRange>(Translators.RT));
+      List<TKeyExtent> retFullScans = Translator.translate(fullScans, Translators.KET);
+      TKeyExtent retPartScan = null;
+      TKey retPartNextKey = null;
+      if (partScan != null) {
+        retPartScan = partScan.toThrift();
+        retPartNextKey = partNextKey.toThrift();
+      }
+      // add results to queue
+      addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey, partNextKeyInclusive, session.queries.size() != 0));
+    } catch (IterationInterruptedException iie) {
+      if (!isCancelled()) {
+        log.warn("Iteration interrupted, when scan not cancelled", iie);
+        addResult(iie);
+      }
+    } catch (Throwable e) {
+      log.warn("exception while doing multi-scan ", e);
+      addResult(e);
+    } finally {
+      Thread.currentThread().setName(oldThreadName);
+      runState.set(ScanRunState.FINISHED);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f2aeb14c/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
new file mode 100644
index 0000000..5248ecc
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.scan;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.accumulo.tserver.TooManyFilesException;
+import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletClosedException;
+import org.apache.log4j.Logger;
+
+public class NextBatchTask extends ScanTask<ScanBatch> {
+
+  static final Logger log = Logger.getLogger(TabletServer.class);
+  
+  private final long scanID;
+
+  public NextBatchTask(TabletServer server, long scanID, AtomicBoolean interruptFlag) {
+    super(server);
+    this.scanID = scanID;
+    this.interruptFlag = interruptFlag;
+
+    if (interruptFlag.get())
+      cancel(true);
+  }
+
+  @Override
+  public void run() {
+
+    final ScanSession scanSession = (ScanSession) server.getSession(scanID);
+    String oldThreadName = Thread.currentThread().getName();
+
+    try {
+      if (isCancelled() || scanSession == null)
+        return;
+
+      runState.set(ScanRunState.RUNNING);
+
+      Thread.currentThread().setName(
+          "User: " + scanSession.getUser() + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
+
+      Tablet tablet = server.getOnlineTablet(scanSession.extent);
+
+      if (tablet == null) {
+        addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
+        return;
+      }
+
+      long t1 = System.currentTimeMillis();
+      ScanBatch batch = scanSession.scanner.read();
+      long t2 = System.currentTimeMillis();
+      scanSession.nbTimes.addStat(t2 - t1);
+
+      // there should only be one thing on the queue at a time, so
+      // it should be ok to call add()
+      // instead of put()... if add() fails because queue is at
+      // capacity it means there is code
+      // problem somewhere
+      addResult(batch);
+    } catch (TabletClosedException e) {
+      addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
+    } catch (IterationInterruptedException iie) {
+      if (!isCancelled()) {
+        log.warn("Iteration interrupted, when scan not cancelled", iie);
+        addResult(iie);
+      }
+    } catch (TooManyFilesException tmfe) {
+      addResult(tmfe);
+    } catch (Throwable e) {
+      log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)" : scanSession.extent), e);
+      addResult(e);
+    } finally {
+      runState.set(ScanRunState.FINISHED);
+      Thread.currentThread().setName(oldThreadName);
+    }
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f2aeb14c/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanRunState.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanRunState.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanRunState.java
new file mode 100644
index 0000000..f2b804c
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanRunState.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.scan;
+
+public enum ScanRunState {
+  QUEUED, RUNNING, FINISHED
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f2aeb14c/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
new file mode 100644
index 0000000..a647c84
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.scan;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.tserver.TabletServer;
+
+public abstract class ScanTask<T> implements RunnableFuture<T> {
+
+  protected final TabletServer server;
+  protected AtomicBoolean interruptFlag;
+  protected ArrayBlockingQueue<Object> resultQueue;
+  protected AtomicInteger state;
+  protected AtomicReference<ScanRunState> runState;
+
+  private static final int INITIAL = 1;
+  private static final int ADDED = 2;
+  private static final int CANCELED = 3;
+
+  ScanTask(TabletServer server) {
+    this.server = server;
+    interruptFlag = new AtomicBoolean(false);
+    runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
+    state = new AtomicInteger(INITIAL);
+    resultQueue = new ArrayBlockingQueue<Object>(1);
+  }
+
+  protected void addResult(Object o) {
+    if (state.compareAndSet(INITIAL, ADDED))
+      resultQueue.add(o);
+    else if (state.get() == ADDED)
+      throw new IllegalStateException("Tried to add more than one result");
+  }
+
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    if (!mayInterruptIfRunning)
+      throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
+
+    if (state.get() == CANCELED)
+      return true;
+
+    if (state.compareAndSet(INITIAL, CANCELED)) {
+      interruptFlag.set(true);
+      resultQueue = null;
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override
+  public T get() throws InterruptedException, ExecutionException {
+    throw new UnsupportedOperationException();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+
+    ArrayBlockingQueue<Object> localRQ = resultQueue;
+
+    if (state.get() == CANCELED)
+      throw new CancellationException();
+
+    if (localRQ == null && state.get() == ADDED)
+      throw new IllegalStateException("Tried to get result twice");
+
+    Object r = localRQ.poll(timeout, unit);
+
+    // could have been canceled while waiting
+    if (state.get() == CANCELED) {
+      if (r != null)
+        throw new IllegalStateException("Nothing should have been added when in canceled state");
+
+      throw new CancellationException();
+    }
+
+    if (r == null)
+      throw new TimeoutException();
+
+    // make this method stop working now that something is being
+    // returned
+    resultQueue = null;
+
+    if (r instanceof Throwable)
+      throw new ExecutionException((Throwable) r);
+
+    return (T) r;
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return state.get() == CANCELED;
+  }
+
+  @Override
+  public boolean isDone() {
+    return runState.get().equals(ScanRunState.FINISHED);
+  }
+
+  public ScanRunState getScanRunState() {
+    return runState.get();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f2aeb14c/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index 5d7c1fb..110fcac 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -27,7 +27,7 @@ import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.data.thrift.MultiScanResult;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.tserver.TabletServer.ScanTask;
+import org.apache.accumulo.tserver.scan.ScanTask;
 
 public class MultiScanSession extends Session {
   public final KeyExtent threadPoolExtent;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f2aeb14c/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index 72d8b67..3f9b7af 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@ -27,7 +27,7 @@ import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.Stat;
-import org.apache.accumulo.tserver.TabletServer.ScanTask;
+import org.apache.accumulo.tserver.scan.ScanTask;
 import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.apache.accumulo.tserver.tablet.Scanner;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f2aeb14c/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
new file mode 100644
index 0000000..ab8bbe3
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.session;
+
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.Translators;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ScanState;
+import org.apache.accumulo.core.tabletserver.thrift.ScanType;
+import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.tserver.scan.ScanRunState;
+import org.apache.accumulo.tserver.scan.ScanTask;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+
+public class SessionManager {
+
+  private final SecureRandom random = new SecureRandom();
+  private final Map<Long,Session> sessions = new HashMap<Long,Session>();
+  private final long maxIdle;
+  private final AccumuloConfiguration aconf;
+
+  public SessionManager(AccumuloConfiguration conf) {
+    aconf = conf;
+    maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        sweep(maxIdle);
+      }
+    };
+
+    SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
+  }
+
+  public synchronized long createSession(Session session, boolean reserve) {
+    long sid = random.nextLong();
+
+    while (sessions.containsKey(sid)) {
+      sid = random.nextLong();
+    }
+
+    sessions.put(sid, session);
+
+    session.reserved = reserve;
+
+    session.startTime = session.lastAccessTime = System.currentTimeMillis();
+
+    return sid;
+  }
+
+  public long getMaxIdleTime() {
+    return maxIdle;
+  }
+
+  /**
+   * while a session is reserved, it cannot be canceled or removed
+   *
+   * @param sessionId
+   */
+
+  public synchronized Session reserveSession(long sessionId) {
+    Session session = sessions.get(sessionId);
+    if (session != null) {
+      if (session.reserved)
+        throw new IllegalStateException();
+      session.reserved = true;
+    }
+
+    return session;
+
+  }
+
+  public synchronized Session reserveSession(long sessionId, boolean wait) {
+    Session session = sessions.get(sessionId);
+    if (session != null) {
+      while (wait && session.reserved) {
+        try {
+          wait(1000);
+        } catch (InterruptedException e) {
+          throw new RuntimeException();
+        }
+      }
+
+      if (session.reserved)
+        throw new IllegalStateException();
+      session.reserved = true;
+    }
+
+    return session;
+
+  }
+
+  public synchronized void unreserveSession(Session session) {
+    if (!session.reserved)
+      throw new IllegalStateException();
+    notifyAll();
+    session.reserved = false;
+    session.lastAccessTime = System.currentTimeMillis();
+  }
+
+  public synchronized void unreserveSession(long sessionId) {
+    Session session = getSession(sessionId);
+    if (session != null)
+      unreserveSession(session);
+  }
+
+  public synchronized Session getSession(long sessionId) {
+    Session session = sessions.get(sessionId);
+    if (session != null)
+      session.lastAccessTime = System.currentTimeMillis();
+    return session;
+  }
+
+  public Session removeSession(long sessionId) {
+    return removeSession(sessionId, false);
+  }
+
+  public Session removeSession(long sessionId, boolean unreserve) {
+    Session session = null;
+    synchronized (this) {
+      session = sessions.remove(sessionId);
+      if (unreserve && session != null)
+        unreserveSession(session);
+    }
+
+    // do clean up out side of lock..
+    if (session != null)
+      session.cleanup();
+
+    return session;
+  }
+
+  private void sweep(long maxIdle) {
+    List<Session> sessionsToCleanup = new ArrayList<Session>();
+    synchronized (this) {
+      Iterator<Session> iter = sessions.values().iterator();
+      while (iter.hasNext()) {
+        Session session = iter.next();
+        long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+        if (idleTime > maxIdle && !session.reserved) {
+          iter.remove();
+          sessionsToCleanup.add(session);
+        }
+      }
+    }
+
+    // do clean up outside of lock
+    for (Session session : sessionsToCleanup) {
+      session.cleanup();
+    }
+  }
+
+  public synchronized void removeIfNotAccessed(final long sessionId, long delay) {
+    Session session = sessions.get(sessionId);
+    if (session != null) {
+      final long removeTime = session.lastAccessTime;
+      TimerTask r = new TimerTask() {
+        @Override
+        public void run() {
+          Session sessionToCleanup = null;
+          synchronized (SessionManager.this) {
+            Session session2 = sessions.get(sessionId);
+            if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
+              sessions.remove(sessionId);
+              sessionToCleanup = session2;
+            }
+          }
+
+          // call clean up outside of lock
+          if (sessionToCleanup != null)
+            sessionToCleanup.cleanup();
+        }
+      };
+
+      SimpleTimer.getInstance(aconf).schedule(r, delay);
+    }
+  }
+
+  public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
+    Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
+    for (Entry<Long,Session> entry : sessions.entrySet()) {
+
+      Session session = entry.getValue();
+      @SuppressWarnings("rawtypes")
+      ScanTask nbt = null;
+      String tableID = null;
+
+      if (session instanceof ScanSession) {
+        ScanSession ss = (ScanSession) session;
+        nbt = ss.nextBatchTask;
+        tableID = ss.extent.getTableId().toString();
+      } else if (session instanceof MultiScanSession) {
+        MultiScanSession mss = (MultiScanSession) session;
+        nbt = mss.lookupTask;
+        tableID = mss.threadPoolExtent.getTableId().toString();
+      }
+
+      if (nbt == null)
+        continue;
+
+      ScanRunState srs = nbt.getScanRunState();
+
+      if (srs == ScanRunState.FINISHED)
+        continue;
+
+      MapCounter<ScanRunState> stateCounts = counts.get(tableID);
+      if (stateCounts == null) {
+        stateCounts = new MapCounter<ScanRunState>();
+        counts.put(tableID, stateCounts);
+      }
+
+      stateCounts.increment(srs, 1);
+    }
+
+    return counts;
+  }
+
+  public synchronized List<ActiveScan> getActiveScans() {
+
+    List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
+
+    long ct = System.currentTimeMillis();
+
+    for (Entry<Long,Session> entry : sessions.entrySet()) {
+      Session session = entry.getValue();
+      if (session instanceof ScanSession) {
+        ScanSession ss = (ScanSession) session;
+
+        ScanState state = ScanState.RUNNING;
+
+        ScanTask<ScanBatch> nbt = ss.nextBatchTask;
+        if (nbt == null) {
+          state = ScanState.IDLE;
+        } else {
+          switch (nbt.getScanRunState()) {
+            case QUEUED:
+              state = ScanState.QUEUED;
+              break;
+            case FINISHED:
+              state = ScanState.IDLE;
+              break;
+            case RUNNING:
+            default:
+              /* do nothing */
+              break;
+          }
+        }
+
+        activeScans.add(new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
+            state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
+
+      } else if (session instanceof MultiScanSession) {
+        MultiScanSession mss = (MultiScanSession) session;
+
+        ScanState state = ScanState.RUNNING;
+
+        ScanTask<MultiScanResult> nbt = mss.lookupTask;
+        if (nbt == null) {
+          state = ScanState.IDLE;
+        } else {
+          switch (nbt.getScanRunState()) {
+            case QUEUED:
+              state = ScanState.QUEUED;
+              break;
+            case FINISHED:
+              state = ScanState.IDLE;
+              break;
+            case RUNNING:
+            default:
+              /* do nothing */
+              break;
+          }
+        }
+
+        activeScans.add(new ActiveScan(mss.client, mss.getUser(), mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
+            ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths
+                .getAuthorizationsBB()));
+      }
+    }
+
+    return activeScans;
+  }
+}
\ No newline at end of file


Mime
View raw message