accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [41/59] [abbrv] ACCUMULO-658 consistent package names to avoid overlapped sealed jars
Date Sat, 07 Sep 2013 03:28:44 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
new file mode 100644
index 0000000..5e1b460
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -0,0 +1,3797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TimerTask;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.CompressedIterators;
+import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
+import org.apache.accumulo.core.client.impl.TabletType;
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.constraints.Constraint.Environment;
+import org.apache.accumulo.core.constraints.Violations;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ConstraintViolationSummary;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.InitialMultiScan;
+import org.apache.accumulo.core.data.thrift.InitialScan;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.data.thrift.MapFileInfo;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TCMResult;
+import org.apache.accumulo.core.data.thrift.TCMStatus;
+import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TCondition;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
+import org.apache.accumulo.core.data.thrift.TConditionalSession;
+import org.apache.accumulo.core.data.thrift.TKey;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.data.thrift.TKeyValue;
+import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.accumulo.core.data.thrift.TRange;
+import org.apache.accumulo.core.data.thrift.UpdateErrors;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.master.thrift.Compacting;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletLoadState;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.ScanState;
+import org.apache.accumulo.core.tabletserver.thrift.ScanType;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.util.Stat;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.client.ClientServiceHandler;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.accumulo.server.master.state.TabletStateStore;
+import org.apache.accumulo.server.master.state.ZooTabletStateStore;
+import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.util.FileSystemMonitor;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.util.MapCounter;
+import org.apache.accumulo.server.util.MasterMetadataUtil;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.util.MetadataTableUtil.LogEntry;
+import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
+import org.apache.accumulo.server.util.time.RelativeTime;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.accumulo.start.classloader.vfs.ContextManager;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
+import org.apache.accumulo.trace.thrift.TInfo;
+import org.apache.accumulo.tserver.Compactor.CompactionInfo;
+import org.apache.accumulo.tserver.RowLocks.RowLock;
+import org.apache.accumulo.tserver.Tablet.CommitSession;
+import org.apache.accumulo.tserver.Tablet.KVEntry;
+import org.apache.accumulo.tserver.Tablet.LookupResult;
+import org.apache.accumulo.tserver.Tablet.MajorCompactionReason;
+import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.tserver.Tablet.ScanBatch;
+import org.apache.accumulo.tserver.Tablet.Scanner;
+import org.apache.accumulo.tserver.Tablet.SplitInfo;
+import org.apache.accumulo.tserver.Tablet.TConstraintViolationException;
+import org.apache.accumulo.tserver.Tablet.TabletClosedException;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
+import org.apache.accumulo.tserver.data.ServerConditionalMutation;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.accumulo.tserver.log.LogSorter;
+import org.apache.accumulo.tserver.log.MutationReceiver;
+import org.apache.accumulo.tserver.log.TabletServerLogger;
+import org.apache.accumulo.tserver.mastermessage.MasterMessage;
+import org.apache.accumulo.tserver.mastermessage.SplitReportMessage;
+import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
+import org.apache.accumulo.tserver.metrics.TabletServerMBean;
+import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.server.TServer;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+enum ScanRunState {
+  QUEUED, RUNNING, FINISHED
+}
+
+public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.tserver.metrics.TabletServerMBean {
+  private static final Logger log = Logger.getLogger(TabletServer.class);
+  
+  private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
+  private static long lastMemorySize = 0;
+  private static long gcTimeIncreasedCount;
+  
+  private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
+  private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
+  
+  private TabletServerLogger logger;
+  
+  protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
+  
+  private ServerConfiguration serverConfig;
+  private LogSorter logSorter = null;
+  
+  public TabletServer(ServerConfiguration conf, VolumeManager fs) {
+    super();
+    this.serverConfig = conf;
+    this.instance = conf.getInstance();
+    this.fs = fs;
+    this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
+    SimpleTimer.getInstance().schedule(new Runnable() {
+      @Override
+      public void run() {
+        synchronized (onlineTablets) {
+          long now = System.currentTimeMillis();
+          for (Tablet tablet : onlineTablets.values())
+            try {
+              tablet.updateRates(now);
+            } catch (Exception ex) {
+              log.error(ex, ex);
+            }
+        }
+      }
+    }, 5000, 5000);
+  }
+  
+  private synchronized static void logGCInfo(AccumuloConfiguration conf) {
+    List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
+    Runtime rt = Runtime.getRuntime();
+    
+    StringBuilder sb = new StringBuilder("gc");
+    
+    boolean sawChange = false;
+    
+    long maxIncreaseInCollectionTime = 0;
+    
+    for (GarbageCollectorMXBean gcBean : gcmBeans) {
+      Long prevTime = prevGcTime.get(gcBean.getName());
+      long pt = 0;
+      if (prevTime != null) {
+        pt = prevTime;
+      }
+      
+      long time = gcBean.getCollectionTime();
+      
+      if (time - pt != 0) {
+        sawChange = true;
+      }
+      
+      long increaseInCollectionTime = time - pt;
+      sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
+      maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
+      prevGcTime.put(gcBean.getName(), time);
+    }
+    
+    long mem = rt.freeMemory();
+    if (maxIncreaseInCollectionTime == 0) {
+      gcTimeIncreasedCount = 0;
+    } else {
+      gcTimeIncreasedCount++;
+      if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
+        log.warn("Running low on memory");
+        gcTimeIncreasedCount = 0;
+      }
+    }
+    
+    if (mem > lastMemorySize) {
+      sawChange = true;
+    }
+    
+    String sign = "+";
+    if (mem - lastMemorySize <= 0) {
+      sign = "";
+    }
+    
+    sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
+    
+    if (sawChange) {
+      log.debug(sb.toString());
+    }
+    
+    final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
+    if (maxIncreaseInCollectionTime > keepAliveTimeout) {
+      Halt.halt("Garbage collection may be interfering with lock keep-alive.  Halting.", -1);
+    }
+    
+    lastMemorySize = mem;
+  }
+  
+  private TabletStatsKeeper statsKeeper;
+  
+  private static class Session {
+    long lastAccessTime;
+    long startTime;
+    String user;
+    String client = TServerUtils.clientAddress.get();
+    public boolean reserved;
+    
+    public void cleanup() {}
+  }
+  
+  private static class SessionManager {
+    
+    SecureRandom random;
+    Map<Long,Session> sessions;
+    long maxIdle;
+    
+    SessionManager(AccumuloConfiguration conf) {
+      random = new SecureRandom();
+      sessions = new HashMap<Long,Session>();
+      
+      maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+      
+      Runnable r = new Runnable() {
+        @Override
+        public void run() {
+          sweep(maxIdle);
+        }
+      };
+      
+      SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
+    }
+    
+    synchronized long createSession(Session session, boolean reserve) {
+      long sid = random.nextLong();
+      
+      while (sessions.containsKey(sid)) {
+        sid = random.nextLong();
+      }
+      
+      sessions.put(sid, session);
+      
+      session.reserved = reserve;
+      
+      session.startTime = session.lastAccessTime = System.currentTimeMillis();
+      
+      return sid;
+    }
+    
+    long getMaxIdleTime() {
+      return maxIdle;
+    }
+    
+    /**
+     * while a session is reserved, it cannot be canceled or removed
+     * 
+     * @param sessionId
+     */
+    
+    synchronized Session reserveSession(long sessionId) {
+      Session session = sessions.get(sessionId);
+      if (session != null) {
+        if (session.reserved)
+          throw new IllegalStateException();
+        session.reserved = true;
+      }
+      
+      return session;
+      
+    }
+    
+    synchronized Session reserveSession(long sessionId, boolean wait) {
+      Session session = sessions.get(sessionId);
+      if (session != null) {
+        while (wait && session.reserved) {
+          try {
+            wait(1000);
+          } catch (InterruptedException e) {
+            throw new RuntimeException();
+          }
+        }
+        
+        if (session.reserved)
+          throw new IllegalStateException();
+        session.reserved = true;
+      }
+      
+      return session;
+      
+    }
+    
+    synchronized void unreserveSession(Session session) {
+      if (!session.reserved)
+        throw new IllegalStateException();
+      notifyAll();
+      session.reserved = false;
+      session.lastAccessTime = System.currentTimeMillis();
+    }
+    
+    synchronized void unreserveSession(long sessionId) {
+      Session session = getSession(sessionId);
+      if (session != null)
+        unreserveSession(session);
+    }
+    
+    synchronized Session getSession(long sessionId) {
+      Session session = sessions.get(sessionId);
+      if (session != null)
+        session.lastAccessTime = System.currentTimeMillis();
+      return session;
+    }
+    
+    Session removeSession(long sessionId) {
+      return removeSession(sessionId, false);
+    }
+    
+    Session removeSession(long sessionId, boolean unreserve) {
+      Session session = null;
+      synchronized (this) {
+        session = sessions.remove(sessionId);
+        if (unreserve && session != null)
+          unreserveSession(session);
+      }
+      
+      // do clean up out side of lock..
+      if (session != null)
+        session.cleanup();
+      
+      return session;
+    }
+    
+    private void sweep(long maxIdle) {
+      ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
+      synchronized (this) {
+        Iterator<Session> iter = sessions.values().iterator();
+        while (iter.hasNext()) {
+          Session session = iter.next();
+          long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+          if (idleTime > maxIdle && !session.reserved) {
+            iter.remove();
+            sessionsToCleanup.add(session);
+          }
+        }
+      }
+      
+      // do clean up outside of lock
+      for (Session session : sessionsToCleanup) {
+        session.cleanup();
+      }
+    }
+    
+    synchronized void removeIfNotAccessed(final long sessionId, long delay) {
+      Session session = sessions.get(sessionId);
+      if (session != null) {
+        final long removeTime = session.lastAccessTime;
+        TimerTask r = new TimerTask() {
+          @Override
+          public void run() {
+            Session sessionToCleanup = null;
+            synchronized (SessionManager.this) {
+              Session session2 = sessions.get(sessionId);
+              if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
+                sessions.remove(sessionId);
+                sessionToCleanup = session2;
+              }
+            }
+            
+            // call clean up outside of lock
+            if (sessionToCleanup != null)
+              sessionToCleanup.cleanup();
+          }
+        };
+        
+        SimpleTimer.getInstance().schedule(r, delay);
+      }
+    }
+    
+    public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
+      Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
+      for (Entry<Long,Session> entry : sessions.entrySet()) {
+        
+        Session session = entry.getValue();
+        @SuppressWarnings("rawtypes")
+        ScanTask nbt = null;
+        String tableID = null;
+        
+        if (session instanceof ScanSession) {
+          ScanSession ss = (ScanSession) session;
+          nbt = ss.nextBatchTask;
+          tableID = ss.extent.getTableId().toString();
+        } else if (session instanceof MultiScanSession) {
+          MultiScanSession mss = (MultiScanSession) session;
+          nbt = mss.lookupTask;
+          tableID = mss.threadPoolExtent.getTableId().toString();
+        }
+        
+        if (nbt == null)
+          continue;
+        
+        ScanRunState srs = nbt.getScanRunState();
+        
+        if (nbt == null || srs == ScanRunState.FINISHED)
+          continue;
+        
+        MapCounter<ScanRunState> stateCounts = counts.get(tableID);
+        if (stateCounts == null) {
+          stateCounts = new MapCounter<ScanRunState>();
+          counts.put(tableID, stateCounts);
+        }
+        
+        stateCounts.increment(srs, 1);
+      }
+      
+      return counts;
+    }
+    
+    public synchronized List<ActiveScan> getActiveScans() {
+      
+      ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
+      
+      long ct = System.currentTimeMillis();
+      
+      for (Entry<Long,Session> entry : sessions.entrySet()) {
+        Session session = entry.getValue();
+        if (session instanceof ScanSession) {
+          ScanSession ss = (ScanSession) session;
+          
+          ScanState state = ScanState.RUNNING;
+          
+          ScanTask<ScanBatch> nbt = ss.nextBatchTask;
+          if (nbt == null) {
+            state = ScanState.IDLE;
+          } else {
+            switch (nbt.getScanRunState()) {
+              case QUEUED:
+                state = ScanState.QUEUED;
+                break;
+              case FINISHED:
+                state = ScanState.IDLE;
+                break;
+              case RUNNING:
+              default:
+                /* do nothing */
+                break;
+            }
+          }
+          
+          activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
+              state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translator.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
+          
+        } else if (session instanceof MultiScanSession) {
+          MultiScanSession mss = (MultiScanSession) session;
+          
+          ScanState state = ScanState.RUNNING;
+          
+          ScanTask<MultiScanResult> nbt = mss.lookupTask;
+          if (nbt == null) {
+            state = ScanState.IDLE;
+          } else {
+            switch (nbt.getScanRunState()) {
+              case QUEUED:
+                state = ScanState.QUEUED;
+                break;
+              case FINISHED:
+                state = ScanState.IDLE;
+                break;
+              case RUNNING:
+              default:
+                /* do nothing */
+                break;
+            }
+          }
+          
+          activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
+              ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translator.CT), mss.ssiList, mss.ssio, mss.auths
+                  .getAuthorizationsBB()));
+        }
+      }
+      
+      return activeScans;
+    }
+  }
+  
+  static class TservConstraintEnv implements Environment {
+    
+    private TCredentials credentials;
+    private SecurityOperation security;
+    private Authorizations auths;
+    private KeyExtent ke;
+    
+    TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
+      this.security = secOp;
+      this.credentials = credentials;
+    }
+    
+    void setExtent(KeyExtent ke) {
+      this.ke = ke;
+    }
+    
+    @Override
+    public KeyExtent getExtent() {
+      return ke;
+    }
+    
+    @Override
+    public String getUser() {
+      return credentials.getPrincipal();
+    }
+    
+    @Override
+    public Authorizations getAuthorizations() {
+      if (auths == null)
+        try {
+          this.auths = security.getUserAuthorizations(credentials);
+        } catch (ThriftSecurityException e) {
+          throw new RuntimeException(e);
+        }
+      return auths;
+    }
+    
+  }
+  
+  private abstract class ScanTask<T> implements RunnableFuture<T> {
+    
+    protected AtomicBoolean interruptFlag;
+    protected ArrayBlockingQueue<Object> resultQueue;
+    protected AtomicInteger state;
+    protected AtomicReference<ScanRunState> runState;
+    
+    private static final int INITIAL = 1;
+    private static final int ADDED = 2;
+    private static final int CANCELED = 3;
+    
+    ScanTask() {
+      interruptFlag = new AtomicBoolean(false);
+      runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
+      state = new AtomicInteger(INITIAL);
+      resultQueue = new ArrayBlockingQueue<Object>(1);
+    }
+    
+    protected void addResult(Object o) {
+      if (state.compareAndSet(INITIAL, ADDED))
+        resultQueue.add(o);
+      else if (state.get() == ADDED)
+        throw new IllegalStateException("Tried to add more than one result");
+    }
+    
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      if (!mayInterruptIfRunning)
+        throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
+      
+      if (state.get() == CANCELED)
+        return true;
+      
+      if (state.compareAndSet(INITIAL, CANCELED)) {
+        interruptFlag.set(true);
+        resultQueue = null;
+        return true;
+      }
+      
+      return false;
+    }
+    
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+      throw new UnsupportedOperationException();
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      
+      ArrayBlockingQueue<Object> localRQ = resultQueue;
+      
+      if (state.get() == CANCELED)
+        throw new CancellationException();
+      
+      if (localRQ == null && state.get() == ADDED)
+        throw new IllegalStateException("Tried to get result twice");
+      
+      Object r = localRQ.poll(timeout, unit);
+      
+      // could have been canceled while waiting
+      if (state.get() == CANCELED) {
+        if (r != null)
+          throw new IllegalStateException("Nothing should have been added when in canceled state");
+        
+        throw new CancellationException();
+      }
+      
+      if (r == null)
+        throw new TimeoutException();
+      
+      // make this method stop working now that something is being
+      // returned
+      resultQueue = null;
+      
+      if (r instanceof Throwable)
+        throw new ExecutionException((Throwable) r);
+      
+      return (T) r;
+    }
+    
+    @Override
+    public boolean isCancelled() {
+      return state.get() == CANCELED;
+    }
+    
+    @Override
+    public boolean isDone() {
+      return runState.get().equals(ScanRunState.FINISHED);
+    }
+    
+    public ScanRunState getScanRunState() {
+      return runState.get();
+    }
+    
+  }
+  
+  private static class ConditionalSession extends Session {
+    public TCredentials credentials;
+    public Authorizations auths;
+    public String tableId;
+    public AtomicBoolean interruptFlag;
+    
+    @Override
+    public void cleanup() {
+      interruptFlag.set(true);
+    }
+  }
+  
+  private static class UpdateSession extends Session {
+    public Tablet currentTablet;
+    public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
+    Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>();
+    HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>();
+    public Violations violations;
+    public TCredentials credentials;
+    public long totalUpdates = 0;
+    public long flushTime = 0;
+    Stat prepareTimes = new Stat();
+    Stat walogTimes = new Stat();
+    Stat commitTimes = new Stat();
+    Stat authTimes = new Stat();
+    public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>();
+    public long queuedMutationSize = 0;
+    TservConstraintEnv cenv = null;
+  }
+  
+  private static class ScanSession extends Session {
+    public KeyExtent extent;
+    public HashSet<Column> columnSet;
+    public List<IterInfo> ssiList;
+    public Map<String,Map<String,String>> ssio;
+    public Authorizations auths;
+    public long entriesReturned = 0;
+    public Stat nbTimes = new Stat();
+    public long batchCount = 0;
+    public volatile ScanTask<ScanBatch> nextBatchTask;
+    public AtomicBoolean interruptFlag;
+    public Scanner scanner;
+    
+    @Override
+    public void cleanup() {
+      try {
+        if (nextBatchTask != null)
+          nextBatchTask.cancel(true);
+      } finally {
+        if (scanner != null)
+          scanner.close();
+      }
+    }
+    
+  }
+  
+  private static class MultiScanSession extends Session {
+    HashSet<Column> columnSet;
+    Map<KeyExtent,List<Range>> queries;
+    public List<IterInfo> ssiList;
+    public Map<String,Map<String,String>> ssio;
+    public Authorizations auths;
+    
+    // stats
+    int numRanges;
+    int numTablets;
+    int numEntries;
+    long totalLookupTime;
+    
+    public volatile ScanTask<MultiScanResult> lookupTask;
+    public KeyExtent threadPoolExtent;
+    
+    @Override
+    public void cleanup() {
+      if (lookupTask != null)
+        lookupTask.cancel(true);
+    }
+  }
+  
+  /**
+   * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
+   * are monotonically increasing.
+   * 
+   */
+  static class WriteTracker {
+    private static AtomicLong operationCounter = new AtomicLong(1);
+    private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
+    
+    WriteTracker() {
+      for (TabletType ttype : TabletType.values()) {
+        inProgressWrites.put(ttype, new TreeSet<Long>());
+      }
+    }
+    
+    synchronized long startWrite(TabletType ttype) {
+      long operationId = operationCounter.getAndIncrement();
+      inProgressWrites.get(ttype).add(operationId);
+      return operationId;
+    }
+    
+    synchronized void finishWrite(long operationId) {
+      if (operationId == -1)
+        return;
+      
+      boolean removed = false;
+      
+      for (TabletType ttype : TabletType.values()) {
+        removed = inProgressWrites.get(ttype).remove(operationId);
+        if (removed)
+          break;
+      }
+      
+      if (!removed) {
+        throw new IllegalArgumentException("Attempted to finish write not in progress,  operationId " + operationId);
+      }
+      
+      this.notifyAll();
+    }
+    
+    synchronized void waitForWrites(TabletType ttype) {
+      long operationId = operationCounter.getAndIncrement();
+      while (inProgressWrites.get(ttype).floor(operationId) != null) {
+        try {
+          this.wait();
+        } catch (InterruptedException e) {
+          log.error(e, e);
+        }
+      }
+    }
+    
+    public long startWrite(Set<Tablet> keySet) {
+      if (keySet.size() == 0)
+        return -1;
+      
+      ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
+      
+      for (Tablet tablet : keySet)
+        extents.add(tablet.getExtent());
+      
+      return startWrite(TabletType.type(extents));
+    }
+  }
+  
+  public AccumuloConfiguration getSystemConfiguration() {
+    return serverConfig.getConfiguration();
+  }
+  
+  TransactionWatcher watcher = new TransactionWatcher();
+  
+  private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
+    
+    SessionManager sessionManager;
+    
+    AccumuloConfiguration acuConf = getSystemConfiguration();
+    
+    TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
+    
+    TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
+    
+    WriteTracker writeTracker = new WriteTracker();
+    
+    private RowLocks rowLocks = new RowLocks();
+    
+    ThriftClientHandler() {
+      super(instance, watcher);
+      log.debug(ThriftClientHandler.class.getName() + " created");
+      sessionManager = new SessionManager(getSystemConfiguration());
+      // Register the metrics MBean
+      try {
+        updateMetrics.register();
+        scanMetrics.register();
+      } catch (Exception e) {
+        log.error("Exception registering MBean with MBean Server", e);
+      }
+    }
+    
+    @Override
+    public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
+        throws ThriftSecurityException {
+      
+      if (!security.canPerformSystemActions(credentials))
+        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+      
+      List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
+      
+      for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
+        TKeyExtent tke = entry.getKey();
+        Map<String,MapFileInfo> fileMap = entry.getValue();
+        Map<FileRef,MapFileInfo> fileRefMap = new HashMap<FileRef,MapFileInfo>();
+        for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
+          Path path = new Path(mapping.getKey());
+          FileSystem ns = fs.getFileSystemByPath(path);
+          path = ns.makeQualified(path);
+          fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
+        }
+        
+        Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
+        
+        if (importTablet == null) {
+          failures.add(tke);
+        } else {
+          try {
+            importTablet.importMapFiles(tid, fileRefMap, setTime);
+          } catch (IOException ioe) {
+            log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
+            failures.add(tke);
+          }
+        }
+      }
+      return failures;
+    }
+    
+    private class NextBatchTask extends ScanTask<ScanBatch> {
+      
+      private long scanID;
+      
+      NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
+        this.scanID = scanID;
+        this.interruptFlag = interruptFlag;
+        
+        if (interruptFlag.get())
+          cancel(true);
+      }
+      
+      @Override
+      public void run() {
+        
+        final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
+        String oldThreadName = Thread.currentThread().getName();
+        
+        try {
+          if (isCancelled() || scanSession == null)
+            return;
+          
+          runState.set(ScanRunState.RUNNING);
+          
+          Thread.currentThread().setName(
+              "User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
+          
+          Tablet tablet = onlineTablets.get(scanSession.extent);
+          
+          if (tablet == null) {
+            addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
+            return;
+          }
+          
+          long t1 = System.currentTimeMillis();
+          ScanBatch batch = scanSession.scanner.read();
+          long t2 = System.currentTimeMillis();
+          scanSession.nbTimes.addStat(t2 - t1);
+          
+          // there should only be one thing on the queue at a time, so
+          // it should be ok to call add()
+          // instead of put()... if add() fails because queue is at
+          // capacity it means there is code
+          // problem somewhere
+          addResult(batch);
+        } catch (TabletClosedException e) {
+          addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
+        } catch (IterationInterruptedException iie) {
+          if (!isCancelled()) {
+            log.warn("Iteration interrupted, when scan not cancelled", iie);
+            addResult(iie);
+          }
+        } catch (TooManyFilesException tmfe) {
+          addResult(tmfe);
+        } catch (Throwable e) {
+          log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)" : scanSession.extent), e);
+          addResult(e);
+        } finally {
+          runState.set(ScanRunState.FINISHED);
+          Thread.currentThread().setName(oldThreadName);
+        }
+        
+      }
+    }
+    
+    private class LookupTask extends ScanTask<MultiScanResult> {
+      
+      private long scanID;
+      
+      LookupTask(long scanID) {
+        this.scanID = scanID;
+      }
+      
+      @Override
+      public void run() {
+        MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
+        String oldThreadName = Thread.currentThread().getName();
+        
+        try {
+          if (isCancelled() || session == null)
+            return;
+          
+          TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
+          long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
+          
+          runState.set(ScanRunState.RUNNING);
+          Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
+          
+          long bytesAdded = 0;
+          long maxScanTime = 4000;
+          
+          long startTime = System.currentTimeMillis();
+          
+          ArrayList<KVEntry> results = new ArrayList<KVEntry>();
+          Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
+          ArrayList<KeyExtent> fullScans = new ArrayList<KeyExtent>();
+          KeyExtent partScan = null;
+          Key partNextKey = null;
+          boolean partNextKeyInclusive = false;
+          
+          Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
+          
+          // check the time so that the read ahead thread is not monopolized
+          while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
+            Entry<KeyExtent,List<Range>> entry = iter.next();
+            
+            iter.remove();
+            
+            // check that tablet server is serving requested tablet
+            Tablet tablet = onlineTablets.get(entry.getKey());
+            if (tablet == null) {
+              failures.put(entry.getKey(), entry.getValue());
+              continue;
+            }
+            Thread.currentThread().setName(
+                "Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
+            
+            LookupResult lookupResult;
+            try {
+              
+              // do the following check to avoid a race condition
+              // between setting false below and the task being
+              // canceled
+              if (isCancelled())
+                interruptFlag.set(true);
+              
+              lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
+                  session.ssio, interruptFlag);
+              
+              // if the tablet was closed it it possible that the
+              // interrupt flag was set.... do not want it set for
+              // the next
+              // lookup
+              interruptFlag.set(false);
+              
+            } catch (IOException e) {
+              log.warn("lookup failed for tablet " + entry.getKey(), e);
+              throw new RuntimeException(e);
+            }
+            
+            bytesAdded += lookupResult.bytesAdded;
+            
+            if (lookupResult.unfinishedRanges.size() > 0) {
+              if (lookupResult.closed) {
+                failures.put(entry.getKey(), lookupResult.unfinishedRanges);
+              } else {
+                session.queries.put(entry.getKey(), lookupResult.unfinishedRanges);
+                partScan = entry.getKey();
+                partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
+                partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
+              }
+            } else {
+              fullScans.add(entry.getKey());
+            }
+          }
+          
+          long finishTime = System.currentTimeMillis();
+          session.totalLookupTime += (finishTime - startTime);
+          session.numEntries += results.size();
+          
+          // convert everything to thrift before adding result
+          List<TKeyValue> retResults = new ArrayList<TKeyValue>();
+          for (KVEntry entry : results)
+            retResults.add(new TKeyValue(entry.key.toThrift(), ByteBuffer.wrap(entry.value)));
+          Map<TKeyExtent,List<TRange>> retFailures = Translator.translate(failures, Translator.KET, new Translator.ListTranslator<Range,TRange>(Translator.RT));
+          List<TKeyExtent> retFullScans = Translator.translate(fullScans, Translator.KET);
+          TKeyExtent retPartScan = null;
+          TKey retPartNextKey = null;
+          if (partScan != null) {
+            retPartScan = partScan.toThrift();
+            retPartNextKey = partNextKey.toThrift();
+          }
+          // add results to queue
+          addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey, partNextKeyInclusive, session.queries.size() != 0));
+        } catch (IterationInterruptedException iie) {
+          if (!isCancelled()) {
+            log.warn("Iteration interrupted, when scan not cancelled", iie);
+            addResult(iie);
+          }
+        } catch (Throwable e) {
+          log.warn("exception while doing multi-scan ", e);
+          addResult(e);
+        } finally {
+          Thread.currentThread().setName(oldThreadName);
+          runState.set(ScanRunState.FINISHED);
+        }
+      }
+    }
+    
+    @Override
+    public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
+        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated)
+        throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+      
+      Authorizations userauths = null;
+      if (!security.canScan(credentials, new String(textent.getTable()), range, columns, ssiList, ssio, authorizations))
+        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+      
+      userauths = security.getUserAuthorizations(credentials);
+      for (ByteBuffer auth : authorizations)
+        if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+      
+      KeyExtent extent = new KeyExtent(textent);
+      
+      // wait for any writes that are in flight.. this done to ensure
+      // consistency across client restarts... assume a client writes
+      // to accumulo and dies while waiting for a confirmation from
+      // accumulo... the client process restarts and tries to read
+      // data from accumulo making the assumption that it will get
+      // any writes previously made... however if the server side thread
+      // processing the write from the dead client is still in progress,
+      // the restarted client may not see the write unless we wait here.
+      // this behavior is very important when the client is reading the
+      // !METADATA table
+      if (waitForWrites)
+        writeTracker.waitForWrites(TabletType.type(extent));
+      
+      Tablet tablet = onlineTablets.get(extent);
+      if (tablet == null)
+        throw new NotServingTabletException(textent);
+      
+      ScanSession scanSession = new ScanSession();
+      scanSession.user = credentials.getPrincipal();
+      scanSession.extent = new KeyExtent(extent);
+      scanSession.columnSet = new HashSet<Column>();
+      scanSession.ssiList = ssiList;
+      scanSession.ssio = ssio;
+      scanSession.auths = new Authorizations(authorizations);
+      scanSession.interruptFlag = new AtomicBoolean();
+      
+      for (TColumn tcolumn : columns) {
+        scanSession.columnSet.add(new Column(tcolumn));
+      }
+      
+      scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated,
+          scanSession.interruptFlag);
+      
+      long sid = sessionManager.createSession(scanSession, true);
+      
+      ScanResult scanResult;
+      try {
+        scanResult = continueScan(tinfo, sid, scanSession);
+      } catch (NoSuchScanIDException e) {
+        log.error("The impossible happened", e);
+        throw new RuntimeException();
+      } finally {
+        sessionManager.unreserveSession(sid);
+      }
+      
+      return new InitialScan(sid, scanResult);
+    }
+    
+    @Override
+    public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException,
+        org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+      ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID);
+      if (scanSession == null) {
+        throw new NoSuchScanIDException();
+      }
+      
+      try {
+        return continueScan(tinfo, scanID, scanSession);
+      } finally {
+        sessionManager.unreserveSession(scanSession);
+      }
+    }
+    
+    private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException,
+        org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+      
+      if (scanSession.nextBatchTask == null) {
+        scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
+        resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+      }
+      
+      ScanBatch bresult;
+      try {
+        bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+        scanSession.nextBatchTask = null;
+      } catch (ExecutionException e) {
+        sessionManager.removeSession(scanID);
+        if (e.getCause() instanceof NotServingTabletException)
+          throw (NotServingTabletException) e.getCause();
+        else if (e.getCause() instanceof TooManyFilesException)
+          throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(scanSession.extent.toThrift());
+        else
+          throw new RuntimeException(e);
+      } catch (CancellationException ce) {
+        sessionManager.removeSession(scanID);
+        Tablet tablet = onlineTablets.get(scanSession.extent);
+        if (tablet == null || tablet.isClosed())
+          throw new NotServingTabletException(scanSession.extent.toThrift());
+        else
+          throw new NoSuchScanIDException();
+      } catch (TimeoutException e) {
+        List<TKeyValue> param = Collections.emptyList();
+        long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+        sessionManager.removeIfNotAccessed(scanID, timeout);
+        return new ScanResult(param, true);
+      } catch (Throwable t) {
+        sessionManager.removeSession(scanID);
+        log.warn("Failed to get next batch", t);
+        throw new RuntimeException(t);
+      }
+      
+      ScanResult scanResult = new ScanResult(Key.compress(bresult.results), bresult.more);
+      
+      scanSession.entriesReturned += scanResult.results.size();
+      
+      scanSession.batchCount++;
+      
+      if (scanResult.more && scanSession.batchCount > 3) {
+        // start reading next batch while current batch is transmitted
+        // to client
+        scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
+        resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
+      }
+      
+      if (!scanResult.more)
+        closeScan(tinfo, scanID);
+      
+      return scanResult;
+    }
+    
+    @Override
+    public void closeScan(TInfo tinfo, long scanID) {
+      ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
+      if (ss != null) {
+        long t2 = System.currentTimeMillis();
+        
+        log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId()
+            .toString(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
+        if (scanMetrics.isEnabled()) {
+          scanMetrics.add(TabletServerScanMetrics.scan, t2 - ss.startTime);
+          scanMetrics.add(TabletServerScanMetrics.resultSize, ss.entriesReturned);
+        }
+      }
+    }
+    
+    @Override
+    public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
+        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws ThriftSecurityException {
+      // find all of the tables that need to be scanned
+      HashSet<String> tables = new HashSet<String>();
+      for (TKeyExtent keyExtent : tbatch.keySet()) {
+        tables.add(new String(keyExtent.getTable()));
+      }
+      
+      if (tables.size() != 1)
+        throw new IllegalArgumentException("Cannot batch scan over multiple tables");
+      
+      // check if user has permission to the tables
+      Authorizations userauths = null;
+      for (String table : tables)
+        if (!security.canScan(credentials, table, tbatch, tcolumns, ssiList, ssio, authorizations))
+          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+      
+      userauths = security.getUserAuthorizations(credentials);
+      for (ByteBuffer auth : authorizations)
+        if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+      
+      Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, Translator.TKET, new Translator.ListTranslator<TRange,Range>(Translator.TRT));
+      
+      // This is used to determine which thread pool to use
+      KeyExtent threadPoolExtent = batch.keySet().iterator().next();
+      
+      if (waitForWrites)
+        writeTracker.waitForWrites(TabletType.type(batch.keySet()));
+      
+      MultiScanSession mss = new MultiScanSession();
+      mss.user = credentials.getPrincipal();
+      mss.queries = batch;
+      mss.columnSet = new HashSet<Column>(tcolumns.size());
+      mss.ssiList = ssiList;
+      mss.ssio = ssio;
+      mss.auths = new Authorizations(authorizations);
+      
+      mss.numTablets = batch.size();
+      for (List<Range> ranges : batch.values()) {
+        mss.numRanges += ranges.size();
+      }
+      
+      for (TColumn tcolumn : tcolumns)
+        mss.columnSet.add(new Column(tcolumn));
+      
+      mss.threadPoolExtent = threadPoolExtent;
+      
+      long sid = sessionManager.createSession(mss, true);
+      
+      MultiScanResult result;
+      try {
+        result = continueMultiScan(tinfo, sid, mss);
+      } catch (NoSuchScanIDException e) {
+        log.error("the impossible happened", e);
+        throw new RuntimeException("the impossible happened", e);
+      } finally {
+        sessionManager.unreserveSession(sid);
+      }
+      
+      return new InitialMultiScan(sid, result);
+    }
+    
+    @Override
+    public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
+      
+      MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
+      
+      if (session == null) {
+        throw new NoSuchScanIDException();
+      }
+      
+      try {
+        return continueMultiScan(tinfo, scanID, session);
+      } finally {
+        sessionManager.unreserveSession(session);
+      }
+    }
+    
+    private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException {
+      
+      if (session.lookupTask == null) {
+        session.lookupTask = new LookupTask(scanID);
+        resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
+      }
+      
+      try {
+        MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+        session.lookupTask = null;
+        return scanResult;
+      } catch (TimeoutException e1) {
+        long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+        sessionManager.removeIfNotAccessed(scanID, timeout);
+        List<TKeyValue> results = Collections.emptyList();
+        Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
+        List<TKeyExtent> fullScans = Collections.emptyList();
+        return new MultiScanResult(results, failures, fullScans, null, null, false, true);
+      } catch (Throwable t) {
+        sessionManager.removeSession(scanID);
+        log.warn("Failed to get multiscan result", t);
+        throw new RuntimeException(t);
+      }
+    }
+    
+    @Override
+    public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
+      MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
+      if (session == null) {
+        throw new NoSuchScanIDException();
+      }
+      
+      long t2 = System.currentTimeMillis();
+      log.debug(String.format("MultiScanSess %s %,d entries in %.2f secs (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", TServerUtils.clientAddress.get(),
+          session.numEntries, (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
+    }
+    
+    @Override
+    public long startUpdate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
+      // Make sure user is real
+      
+      security.authenticateUser(credentials, credentials);
+      if (updateMetrics.isEnabled())
+        updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
+      
+      UpdateSession us = new UpdateSession();
+      us.violations = new Violations();
+      us.credentials = credentials;
+      us.cenv = new TservConstraintEnv(security, us.credentials);
+      
+      long sid = sessionManager.createSession(us, false);
+      
+      return sid;
+    }
+    
+    private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
+      long t1 = System.currentTimeMillis();
+      if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent))
+        return;
+      if (us.currentTablet == null && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
+        // if there were previous failures, then do not accept additional writes
+        return;
+      }
+      
+      try {
+        // if user has no permission to write to this table, add it to
+        // the failures list
+        boolean sameTable = us.currentTablet != null && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
+        if (sameTable || security.canWrite(us.credentials, keyExtent.getTableId().toString())) {
+          long t2 = System.currentTimeMillis();
+          us.authTimes.addStat(t2 - t1);
+          us.currentTablet = onlineTablets.get(keyExtent);
+          if (us.currentTablet != null) {
+            us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
+          } else {
+            // not serving tablet, so report all mutations as
+            // failures
+            us.failures.put(keyExtent, 0l);
+            if (updateMetrics.isEnabled())
+              updateMetrics.add(TabletServerUpdateMetrics.unknownTabletErrors, 0);
+          }
+        } else {
+          log.warn("Denying access to table " + keyExtent.getTableId() + " for user " + us.credentials.getPrincipal());
+          long t2 = System.currentTimeMillis();
+          us.authTimes.addStat(t2 - t1);
+          us.currentTablet = null;
+          us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
+          if (updateMetrics.isEnabled())
+            updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
+          return;
+        }
+      } catch (ThriftSecurityException e) {
+        log.error("Denying permission to check user " + us.credentials.getPrincipal() + " with user " + e.getUser(), e);
+        long t2 = System.currentTimeMillis();
+        us.authTimes.addStat(t2 - t1);
+        us.currentTablet = null;
+        us.authFailures.put(keyExtent, e.getCode());
+        if (updateMetrics.isEnabled())
+          updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
+        return;
+      }
+    }
+    
+    @Override
+    public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List<TMutation> tmutations) {
+      UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
+      if (us == null) {
+        throw new RuntimeException("No Such SessionID");
+      }
+      
+      try {
+        KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+        setUpdateTablet(us, keyExtent);
+        
+        if (us.currentTablet != null) {
+          List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
+          for (TMutation tmutation : tmutations) {
+            Mutation mutation = new ServerMutation(tmutation);
+            mutations.add(mutation);
+            us.queuedMutationSize += mutation.numBytes();
+          }
+          if (us.queuedMutationSize > getSystemConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX))
+            flush(us);
+        }
+      } finally {
+        sessionManager.unreserveSession(us);
+      }
+    }
+    
+    private void flush(UpdateSession us) {
+      
+      int mutationCount = 0;
+      Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
+      Throwable error = null;
+      
+      long pt1 = System.currentTimeMillis();
+      
+      boolean containsMetadataTablet = false;
+      for (Tablet tablet : us.queuedMutations.keySet())
+        if (tablet.getExtent().isMeta())
+          containsMetadataTablet = true;
+      
+      if (!containsMetadataTablet && us.queuedMutations.size() > 0)
+        TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
+      
+      Span prep = Trace.start("prep");
+      try {
+        for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
+          
+          Tablet tablet = entry.getKey();
+          List<Mutation> mutations = entry.getValue();
+          if (mutations.size() > 0) {
+            try {
+              if (updateMetrics.isEnabled())
+                updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size());
+              
+              CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
+              if (commitSession == null) {
+                if (us.currentTablet == tablet) {
+                  us.currentTablet = null;
+                }
+                us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
+              } else {
+                sendables.put(commitSession, mutations);
+                mutationCount += mutations.size();
+              }
+              
+            } catch (TConstraintViolationException e) {
+              us.violations.add(e.getViolations());
+              if (updateMetrics.isEnabled())
+                updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0);
+              
+              if (e.getNonViolators().size() > 0) {
+                // only log and commit mutations if there were some
+                // that did not
+                // violate constraints... this is what
+                // prepareMutationsForCommit()
+                // expects
+                sendables.put(e.getCommitSession(), e.getNonViolators());
+              }
+              
+              mutationCount += mutations.size();
+              
+            } catch (HoldTimeoutException t) {
+              error = t;
+              log.debug("Giving up on mutations due to a long memory hold time");
+              break;
+            } catch (Throwable t) {
+              error = t;
+              log.error("Unexpected error preparing for commit", error);
+              break;
+            }
+          }
+        }
+      } finally {
+        prep.stop();
+      }
+      
+      long pt2 = System.currentTimeMillis();
+      long avgPrepareTime = (long) ((pt2 - pt1) / (double) us.queuedMutations.size());
+      us.prepareTimes.addStat(pt2 - pt1);
+      if (updateMetrics.isEnabled())
+        updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (avgPrepareTime));
+      
+      if (error != null) {
+        for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) {
+          e.getKey().abortCommit(e.getValue());
+        }
+        throw new RuntimeException(error);
+      }
+      try {
+        Span wal = Trace.start("wal");
+        try {
+          while (true) {
+            try {
+              long t1 = System.currentTimeMillis();
+              
+              logger.logManyTablets(sendables);
+              
+              long t2 = System.currentTimeMillis();
+              us.walogTimes.addStat(t2 - t1);
+              if (updateMetrics.isEnabled())
+                updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, (t2 - t1));
+              
+              break;
+            } catch (IOException ex) {
+              log.warn("logging mutations failed, retrying");
+            } catch (FSError ex) { // happens when DFS is localFS
+              log.warn("logging mutations failed, retrying");
+            } catch (Throwable t) {
+              log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
+              throw new RuntimeException(t);
+            }
+          }
+        } finally {
+          wal.stop();
+        }
+        
+        Span commit = Trace.start("commit");
+        try {
+          long t1 = System.currentTimeMillis();
+          for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
+            CommitSession commitSession = entry.getKey();
+            List<Mutation> mutations = entry.getValue();
+            
+            commitSession.commit(mutations);
+            
+            Tablet tablet = commitSession.getTablet();
+            
+            if (tablet == us.currentTablet) {
+              // because constraint violations may filter out some
+              // mutations, for proper
+              // accounting with the client code, need to increment
+              // the count based
+              // on the original number of mutations from the client
+              // NOT the filtered number
+              us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size());
+            }
+          }
+          long t2 = System.currentTimeMillis();
+          
+          long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size());
+          
+          us.flushTime += (t2 - pt1);
+          us.commitTimes.addStat(t2 - t1);
+          
+          if (updateMetrics.isEnabled())
+            updateMetrics.add(TabletServerUpdateMetrics.commitTime, avgCommitTime);
+        } finally {
+          commit.stop();
+        }
+      } finally {
+        us.queuedMutations.clear();
+        if (us.currentTablet != null) {
+          us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
+        }
+        us.queuedMutationSize = 0;
+      }
+      us.totalUpdates += mutationCount;
+    }
+    
+    @Override
+    public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
+      UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
+      if (us == null) {
+        throw new NoSuchScanIDException();
+      }
+      
+      // clients may or may not see data from an update session while
+      // it is in progress, however when the update session is closed
+      // want to ensure that reads wait for the write to finish
+      long opid = writeTracker.startWrite(us.queuedMutations.keySet());
+      
+      try {
+        flush(us);
+      } finally {
+        writeTracker.finishWrite(opid);
+      }
+      
+      log.debug(String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates,
+          (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
+          us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
+      if (us.failures.size() > 0) {
+        Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
+        log.debug(String.format("Failures: %d, first extent %s successful commits: %d", us.failures.size(), first.getKey().toString(), first.getValue()));
+      }
+      List<ConstraintViolationSummary> violations = us.violations.asList();
+      if (violations.size() > 0) {
+        ConstraintViolationSummary first = us.violations.asList().iterator().next();
+        log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(), first.violationDescription, first.numberOfViolatingMutations));
+      }
+      if (us.authFailures.size() > 0) {
+        KeyExtent first = us.authFailures.keySet().iterator().next();
+        log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), first.toString()));
+      }
+      
+      return new UpdateErrors(Translator.translate(us.failures, Translator.KET), Translator.translate(violations, Translator.CVST), Translator.translate(
+          us.authFailures, Translator.KET));
+    }
+    
+    @Override
+    public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation tmutation) throws NotServingTabletException,
+        ConstraintViolationException, ThriftSecurityException {
+      
+      if (!security.canWrite(credentials, new String(tkeyExtent.getTable())))
+        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+      KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+      Tablet tablet = onlineTablets.get(new KeyExtent(keyExtent));
+      if (tablet == null) {
+        throw new NotServingTabletException(tkeyExtent);
+      }
+      
+      if (!keyExtent.isMeta())
+        TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
+      
+      long opid = writeTracker.startWrite(TabletType.type(keyExtent));
+      
+      try {
+        Mutation mutation = new ServerMutation(tmutation);
+        List<Mutation> mutations = Collections.singletonList(mutation);
+        
+        Span prep = Trace.start("prep");
+        CommitSession cs;
+        try {
+          cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations);
+        } finally {
+          prep.stop();
+        }
+        if (cs == null) {
+          throw new NotServingTabletException(tkeyExtent);
+        }
+        
+        while (true) {
+          try {
+            Span wal = Trace.start("wal");
+            try {
+              logger.log(cs, cs.getWALogSeq(), mutation);
+            } finally {
+              wal.stop();
+            }
+            break;
+          } catch (IOException ex) {
+            log.warn(ex, ex);
+          }
+        }
+        
+        Span commit = Trace.start("commit");
+        try {
+          cs.commit(mutations);
+        } finally {
+          commit.stop();
+        }
+      } catch (TConstraintViolationException e) {
+        throw new ConstraintViolationException(Translator.translate(e.getViolations().asList(), Translator.CVST));
+      } finally {
+        writeTracker.finishWrite(opid);
+      }
+    }
+    
+    private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession cs,
+        List<String> symbols) throws IOException {
+      Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
+      
+      CompressedIterators compressedIters = new CompressedIterators(symbols);
+      
+      while (iter.hasNext()) {
+        Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
+        Tablet tablet = onlineTablets.get(entry.getKey());
+        
+        if (tablet == null || tablet.isClosed()) {
+          for (ServerConditionalMutation scm : entry.getValue())
+            results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          iter.remove();
+        } else {
+          List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
+          
+          for (ServerConditionalMutation scm : entry.getValue()) {
+            if (checkCondition(results, cs, compressedIters, tablet, scm))
+              okMutations.add(scm);
+          }
+          
+          entry.setValue(okMutations);
+        }
+        
+      }
+    }
+    
+    boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters, Tablet tablet,
+        ServerConditionalMutation scm) throws IOException {
+      boolean add = true;
+      
+      Set<Column> emptyCols = Collections.emptySet();
+      
+      for (TCondition tc : scm.getConditions()) {
+        
+        Range range;
+        if (tc.hasTimestamp)
+          range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
+        else
+          range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
+        
+        IterConfig ic = compressedIters.decompress(tc.iterators);
+        
+        Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
+        
+        try {
+          ScanBatch batch = scanner.read();
+          
+          Value val = null;
+          
+          for (KVEntry entry2 : batch.results) {
+            val = entry2.getValue();
+            break;
+          }
+          
+          if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
+            results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
+            add = false;
+            break;
+          }
+          
+        } catch (TabletClosedException e) {
+          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          add = false;
+          break;
+        } catch (IterationInterruptedException iie) {
+          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          add = false;
+          break;
+        } catch (TooManyFilesException tmfe) {
+          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          add = false;
+          break;
+        }
+      }
+      return add;
+    }
+    
+    private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) {
+      Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
+      
+      Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
+      
+      boolean sessionCanceled = sess.interruptFlag.get();
+      
+      for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
+        Tablet tablet = onlineTablets.get(entry.getKey());
+        if (tablet == null || tablet.isClosed() || sessionCanceled) {
+          for (ServerConditionalMutation scm : entry.getValue())
+            results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+        } else {
+          try {
+            
+            @SuppressWarnings("unchecked")
+            List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
+            if (mutations.size() > 0) {
+              
+              CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations);
+              
+              if (cs == null) {
+                for (ServerConditionalMutation scm : entry.getValue())
+                  results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+              } else {
+                for (ServerConditionalMutation scm : entry.getValue())
+                  results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
+                sendables.put(cs, mutations);
+              }
+            }
+          } catch (TConstraintViolationException e) {
+            if (e.getNonViolators().size() > 0) {
+              sendables.put(e.getCommitSession(), e.getNonViolators());
+              for (Mutation m : e.getNonViolators())
+                results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
+            }
+            
+            for (Mutation m : e.getViolators())
+              results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED));
+          }
+        }
+      }
+      
+      while (true && sendables.size() > 0) {
+        try {
+          logger.logManyTablets(sendables);
+          break;
+        } catch (IOException ex) {
+          log.warn("logging mutations failed, retrying");
+        } catch (FSError ex) { // happens when DFS is localFS
+          log.warn("logging mutations failed, retrying");
+        } catch (Throwable t) {
+          log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
+          throw new RuntimeException(t);
+        }
+      }
+      
+      for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
+        CommitSession commitSession = entry.getKey();
+        List<Mutation> mutations = entry.getValue();
+        
+        commitSession.commit(mutations);
+      }
+      
+    }
+    
+    private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs, Map<KeyExtent,List<ServerConditionalMutation>> updates,
+        ArrayList<TCMResult> results, List<String> symbols) throws IOException {
+      // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
+      ConditionalMutationSet.sortConditionalMutations(updates);
+      
+      Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<KeyExtent,List<ServerConditionalMutation>>();
+      
+      // can not process two mutations for the same row, because one will not see what the other writes
+      ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
+      
+      // get as many locks as possible w/o blocking... defer any rows that are locked
+      List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
+      try {
+        checkConditions(updates, results, cs, symbols);
+        writeConditionalMutations(updates, results, cs);
+      } finally {
+        rowLocks.releaseRowLocks(locks);
+      }
+      return deferred;
+    }
+    
+    @Override
+    public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID)
+        throws ThriftSecurityException, TException {
+      
+      Authorizations userauths = null;
+      if (!security.canConditionallyUpdate(credentials, tableID, authorizations))
+        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+      
+      userauths = security.getUserAuthorizations(credentials);
+      for (ByteBuffer auth : authorizations)
+        if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
+          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
+      
+      ConditionalSession cs = new ConditionalSession();
+      cs.auths = new Authorizations(authorizations);
+      cs.credentials = credentials;
+      cs.tableId = tableID;
+      cs.interruptFlag = new AtomicBoolean();
+      
+      long sid = sessionManager.createSession(cs, false);
+      return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());
+    }
+    
+    @Override
+    public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
+        throws NoSuchScanIDException, TException {
+      
+      ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
+      
+      if (cs == null || cs.interruptFlag.get())
+        throw new NoSuchScanIDException();
+      
+      Text tid = new Text(cs.tableId);
+      long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
+      
+      try {
+        Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
+            new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
+        
+        for (KeyExtent ke : updates.keySet())
+          if (!ke.getTableId().equals(tid))
+            throw new IllegalArgumentException("Unexpected table id " + tid + " != " + ke.getTableId());
+        
+        ArrayList<TCMResult> results = new ArrayList<TCMResult>();
+        
+        Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs, updates, results, symbols);
+        
+        while (deferred.size() > 0) {
+          deferred = conditionalUpdate(cs, deferred, results, symbols);
+        }
+        
+        return results;
+      } catch (IOException ioe) {
+        throw new TException(ioe);
+      } finally {
+        writeTracker.finishWrite(opid);
+        sessionManager.unreserveSession(sessID);
+      }
+    }
+    
+    @Override
+    public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
+      // this method should wait for any running conditional update to complete
+      // after this method returns a conditional update should not be able to start
+      
+      ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
+      if (cs != null)
+        cs.interruptFlag.set(true);
+      
+      cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
+      if (cs != null)
+        sessionManager.removeSession(sessID, true);
+    }
+    
+    @Override
+    public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {
+      sessionManager.removeSession(sessID, false);
+    }
+    
+    @Override
+    public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,
+        ThriftSecurityException {
+      
+      String tableId = new String(ByteBufferUtil.toBytes(tkeyExtent.table));
+      if (!security.canSplitTablet(credentials, tableId))
+        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+      
+      KeyExtent keyExtent = new KeyExtent(tkeyExtent);
+      
+      Tablet tablet = onlineTablets.get(keyExtent);
+      if (tablet == null) {
+        throw new NotServingTabletException(tkeyExtent);
+      }
+      
+      if (keyExtent.getEndRow() == null || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
+        try {
+          if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
+            throw new NotServingTabletException(tkeyExtent);
+          }
+        } catch (IOException e) {
+          log.warn("Failed to split " + keyExtent, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    
+    @Override
+    public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
+      return getStats(sessionManager.getActiveScansPerTable());
+    }
+    
+    @Override
+    public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException {
+      TreeMap<KeyExtent,Tablet> onlineTabletsCopy;
+      synchronized (onlineTablets) {
+        onlineTabletsCopy = new TreeMap<KeyExtent,Tablet>(onlineTablets);
+      }
+      List<TabletStats> result = new ArrayList<TabletStats>();
+      Text text = new Text(tableId);
+      KeyExtent start = new KeyExtent(text, new Text(), null);
+      for (Entry<KeyExtent,Tablet> entry : onlineTabletsCopy.tailMap(start).entrySet()) {
+        KeyExtent ke = entry.getKey();
+        if (ke.getTableId().compareTo(text) == 0) {
+          Tablet tablet = entry.getValue();
+          TabletStats stats = tablet.timer.getTabletStats();
+          stats.extent = ke.toThrift();
+          stats.ingestRate = tablet.ingestRate();
+          stats.queryRate = tablet.queryRate();
+          stats.splitCreationTime = tablet.getSplitCreationTime();
+          stats.numEntries = tablet.getNumEntries();
+          result.add(stats);
+        }
+      }
+      return result;
+    }
+    
+    private ZooCache masterLockCache = new ZooCache();
+    
+    private void checkPermission(TCredentials credentials, String lock, final String request) throws ThriftSecurityException {
+      boolean fatal = false;
+      try {
+        log.debug("Got " + request + " message from user: " + credentials.getPrincipal());
+        if (!security.canPerformSystemActions(credentials)) {
+          log.warn("Got " + request + " message from user: " + credentials.getPrincipal());
+          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
+        }
+      } catch (ThriftSecurityException e) {
+        log.warn("Got " + request + " message from unauthenticatable user: " + e.getUser());
+        if (SystemCredentials.get().getToken().getClass().getName().equals(credentials.getTokenClassName())) {
+          log.fatal("Got message from a service with a mismatched configuration. Please ensure a compatible configuration.", e);
+          fatal = true;
+        }
+        throw e;
+      } finally {
+        if (fatal) {
+          Halt.halt(1, new Runnable() {
+            @Override
+            public void run() {
+              logGCInfo(getSystemConfiguration());
+            }
+          });
+        }
+      }
+      
+      if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
+        log.warn("Got " + request + " message from master before lock acquired, ignoring...");
+        throw new RuntimeException("Lock not acquired");
+      }
+      
+      if (tabletServerLock != null && tabletServerLock.wasLockAcquired() && !tabletServerLock.isLocked()) {
+        Halt.halt(1, new Runnable() {
+          @Override
+          public void run() {
+            log.info("Tablet server no longer holds lock during checkPermission() : " + request + ", exiting");
+            logGCInfo(getSystemConfiguration());
+          }
+        });
+      }
+      
+      if (lock != null) {
+        ZooUtil.LockID lid = new ZooUtil.LockID(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK, lock);
+        
+        try {
+          if (!ZooLock.isLockHeld(masterLockCache, lid)) {
+            // maybe the cache is out of date and a new master holds the
+            // lock?
+            masterLockCache.clear();
+            if (!ZooLock.isLockHeld(masterLockCache, lid)) {
+              log.warn("Got " + request + " message from a master that does not hold the current lock " + lock);
+              throw new RuntimeException("bad master lock");
+            }
+          }
+        } catch (Exception e) {
+          throw new RuntimeException("bad master lock", e);
+        }
+      }
+    }
+    
+    @Override
+    public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) {
+      
+      try {
+        checkPermission(credentials, lock, "loadTablet");
+      } catch (ThriftSecurityException e) {
+        log.error(e, e);
+        throw new RuntimeException(e);
+      }
+      
+      final KeyExtent extent = new KeyExtent(textent);
+      
+      synchronized (unopenedTablets) {
+        synchronized (openingTablets) {
+          synchronized (onlineTablets) {
+            
+            // checking if this exact tablet is in any of the sets
+            // below is not a strong enough check
+            // when splits and fix splits occurring
+            
+            Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
+            Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
+            Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
+            
+            // ignore any tablets that have recently split
+            Iterator<KeyExtent> each = onlineOverlapping.iterator();
+            while (each.hasNext()) {
+              Tablet tablet = onlineTablets.get(each.next());
+              if (System.currentTimeMillis() - tablet.getSplitCreationTime() < RECENTLY_SPLIT_MILLIES) {
+                each.remove();
+              }
+            }
+            
+            Set<KeyExtent> all = new HashSet<KeyExtent>();
+            all.addAll(unopenedOverlapping);
+            all.addAll(openingOverlapping);
+            all.addAll(onlineOverlapping);
+            
+            if (!all.isEmpty()) {
+              if (all.size() != 1 || !all.contains(extent)) {
+                log.error("Tablet " + extent + " overlaps previously assigned " + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping);
+              }
+              return;
+            }
+            
+            unopenedTablets.add(extent);
+          }
+        }
+      }
+      
+      // add the assignment job to the appropriate queue
+      log.info("Loading tablet " + extent);
+      
+      final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
+      // Root tablet assignment must take place immediately
+      if (extent.isRootTablet()) {
+        new Daemon("Root Tablet Assignment") {
+          @Override
+          public void run() {
+            ah.run();
+            if (onlineTablets.containsKey(extent)) {
+              log.info("Root tablet loaded: " + extent);
+            } else {
+              log.info("Root tablet failed to 

<TRUNCATED>

Mime
View raw message