accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [42/53] [abbrv] ACCUMULO-658 consistent package names to avoid overlapped sealed jars
Date Fri, 06 Sep 2013 18:23:10 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
deleted file mode 100644
index dba6ac5..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ /dev/null
@@ -1,3792 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TimerTask;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.impl.CompressedIterators;
-import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
-import org.apache.accumulo.core.client.impl.ScannerImpl;
-import org.apache.accumulo.core.client.impl.TabletType;
-import org.apache.accumulo.core.client.impl.Translator;
-import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.constraints.Constraint.Environment;
-import org.apache.accumulo.core.constraints.Violations;
-import org.apache.accumulo.core.data.Column;
-import org.apache.accumulo.core.data.ConstraintViolationSummary;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.thrift.InitialMultiScan;
-import org.apache.accumulo.core.data.thrift.InitialScan;
-import org.apache.accumulo.core.data.thrift.IterInfo;
-import org.apache.accumulo.core.data.thrift.MapFileInfo;
-import org.apache.accumulo.core.data.thrift.MultiScanResult;
-import org.apache.accumulo.core.data.thrift.ScanResult;
-import org.apache.accumulo.core.data.thrift.TCMResult;
-import org.apache.accumulo.core.data.thrift.TCMStatus;
-import org.apache.accumulo.core.data.thrift.TColumn;
-import org.apache.accumulo.core.data.thrift.TCondition;
-import org.apache.accumulo.core.data.thrift.TConditionalMutation;
-import org.apache.accumulo.core.data.thrift.TConditionalSession;
-import org.apache.accumulo.core.data.thrift.TKey;
-import org.apache.accumulo.core.data.thrift.TKeyExtent;
-import org.apache.accumulo.core.data.thrift.TKeyValue;
-import org.apache.accumulo.core.data.thrift.TMutation;
-import org.apache.accumulo.core.data.thrift.TRange;
-import org.apache.accumulo.core.data.thrift.UpdateErrors;
-import org.apache.accumulo.core.iterators.IterationInterruptedException;
-import org.apache.accumulo.core.master.thrift.Compacting;
-import org.apache.accumulo.core.master.thrift.MasterClientService;
-import org.apache.accumulo.core.master.thrift.TableInfo;
-import org.apache.accumulo.core.master.thrift.TabletLoadState;
-import org.apache.accumulo.core.master.thrift.TabletServerStatus;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.SecurityUtil;
-import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
-import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
-import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
-import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
-import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
-import org.apache.accumulo.core.tabletserver.thrift.ScanState;
-import org.apache.accumulo.core.tabletserver.thrift.ScanType;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
-import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
-import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.ColumnFQ;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.ServerServices;
-import org.apache.accumulo.core.util.ServerServices.Service;
-import org.apache.accumulo.core.util.SimpleThreadPool;
-import org.apache.accumulo.core.util.Stat;
-import org.apache.accumulo.core.util.ThriftUtil;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.master.state.Assignment;
-import org.apache.accumulo.master.state.DistributedStoreException;
-import org.apache.accumulo.master.state.TServerInstance;
-import org.apache.accumulo.master.state.TabletLocationState;
-import org.apache.accumulo.master.state.TabletStateStore;
-import org.apache.accumulo.master.state.ZooTabletStateStore;
-import org.apache.accumulo.master.state.TabletLocationState.BadLocationStateException;
-import org.apache.accumulo.server.Accumulo;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.ServerOpts;
-import org.apache.accumulo.server.client.ClientServiceHandler;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.data.ServerMutation;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
-import org.apache.accumulo.server.problems.ProblemReport;
-import org.apache.accumulo.server.problems.ProblemReports;
-import org.apache.accumulo.server.security.AuditedSecurityOperation;
-import org.apache.accumulo.server.security.SecurityOperation;
-import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo;
-import org.apache.accumulo.server.tabletserver.RowLocks.RowLock;
-import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
-import org.apache.accumulo.server.tabletserver.Tablet.KVEntry;
-import org.apache.accumulo.server.tabletserver.Tablet.LookupResult;
-import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
-import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
-import org.apache.accumulo.server.tabletserver.Tablet.ScanBatch;
-import org.apache.accumulo.server.tabletserver.Tablet.Scanner;
-import org.apache.accumulo.server.tabletserver.Tablet.SplitInfo;
-import org.apache.accumulo.server.tabletserver.Tablet.TConstraintViolationException;
-import org.apache.accumulo.server.tabletserver.Tablet.TabletClosedException;
-import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
-import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
-import org.apache.accumulo.server.tabletserver.log.DfsLogger;
-import org.apache.accumulo.server.tabletserver.log.LogSorter;
-import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
-import org.apache.accumulo.server.tabletserver.log.TabletServerLogger;
-import org.apache.accumulo.server.tabletserver.mastermessage.MasterMessage;
-import org.apache.accumulo.server.tabletserver.mastermessage.SplitReportMessage;
-import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage;
-import org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean;
-import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics;
-import org.apache.accumulo.server.tabletserver.metrics.TabletServerScanMetrics;
-import org.apache.accumulo.server.tabletserver.metrics.TabletServerUpdateMetrics;
-import org.apache.accumulo.server.util.FileSystemMonitor;
-import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.MapCounter;
-import org.apache.accumulo.server.util.MasterMetadataUtil;
-import org.apache.accumulo.server.util.MetadataTableUtil;
-import org.apache.accumulo.server.util.MetadataTableUtil.LogEntry;
-import org.apache.accumulo.server.util.TServerUtils;
-import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
-import org.apache.accumulo.server.util.time.RelativeTime;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
-import org.apache.accumulo.server.zookeeper.TransactionWatcher;
-import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
-import org.apache.accumulo.start.classloader.vfs.ContextManager;
-import org.apache.accumulo.trace.instrument.Span;
-import org.apache.accumulo.trace.instrument.Trace;
-import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
-import org.apache.accumulo.trace.thrift.TInfo;
-import org.apache.accumulo.tserver.data.ServerConditionalMutation;
-import org.apache.commons.collections.map.LRUMap;
-import org.apache.hadoop.fs.FSError;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.TServiceClient;
-import org.apache.thrift.server.TServer;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-
-enum ScanRunState {
-  QUEUED, RUNNING, FINISHED
-}
-
-public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean {
-  private static final Logger log = Logger.getLogger(TabletServer.class);
-  
-  private static HashMap<String,Long> prevGcTime = new HashMap<String,Long>();
-  private static long lastMemorySize = 0;
-  private static long gcTimeIncreasedCount;
-  
-  private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
-  private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
-  
-  private TabletServerLogger logger;
-  
-  protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
-  
-  private ServerConfiguration serverConfig;
-  private LogSorter logSorter = null;
-  
-  public TabletServer(ServerConfiguration conf, VolumeManager fs) {
-    super();
-    this.serverConfig = conf;
-    this.instance = conf.getInstance();
-    this.fs = fs;
-    this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
-    SimpleTimer.getInstance().schedule(new Runnable() {
-      @Override
-      public void run() {
-        synchronized (onlineTablets) {
-          long now = System.currentTimeMillis();
-          for (Tablet tablet : onlineTablets.values())
-            try {
-              tablet.updateRates(now);
-            } catch (Exception ex) {
-              log.error(ex, ex);
-            }
-        }
-      }
-    }, 5000, 5000);
-  }
-  
-  private synchronized static void logGCInfo(AccumuloConfiguration conf) {
-    List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
-    Runtime rt = Runtime.getRuntime();
-    
-    StringBuilder sb = new StringBuilder("gc");
-    
-    boolean sawChange = false;
-    
-    long maxIncreaseInCollectionTime = 0;
-    
-    for (GarbageCollectorMXBean gcBean : gcmBeans) {
-      Long prevTime = prevGcTime.get(gcBean.getName());
-      long pt = 0;
-      if (prevTime != null) {
-        pt = prevTime;
-      }
-      
-      long time = gcBean.getCollectionTime();
-      
-      if (time - pt != 0) {
-        sawChange = true;
-      }
-      
-      long increaseInCollectionTime = time - pt;
-      sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0));
-      maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime);
-      prevGcTime.put(gcBean.getName(), time);
-    }
-    
-    long mem = rt.freeMemory();
-    if (maxIncreaseInCollectionTime == 0) {
-      gcTimeIncreasedCount = 0;
-    } else {
-      gcTimeIncreasedCount++;
-      if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
-        log.warn("Running low on memory");
-        gcTimeIncreasedCount = 0;
-      }
-    }
-    
-    if (mem > lastMemorySize) {
-      sawChange = true;
-    }
-    
-    String sign = "+";
-    if (mem - lastMemorySize <= 0) {
-      sign = "";
-    }
-    
-    sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory()));
-    
-    if (sawChange) {
-      log.debug(sb.toString());
-    }
-    
-    final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
-    if (maxIncreaseInCollectionTime > keepAliveTimeout) {
-      Halt.halt("Garbage collection may be interfering with lock keep-alive.  Halting.", -1);
-    }
-    
-    lastMemorySize = mem;
-  }
-  
-  private TabletStatsKeeper statsKeeper;
-  
-  private static class Session {
-    long lastAccessTime;
-    long startTime;
-    String user;
-    String client = TServerUtils.clientAddress.get();
-    public boolean reserved;
-    
-    public void cleanup() {}
-  }
-  
-  private static class SessionManager {
-    
-    SecureRandom random;
-    Map<Long,Session> sessions;
-    long maxIdle;
-    
-    SessionManager(AccumuloConfiguration conf) {
-      random = new SecureRandom();
-      sessions = new HashMap<Long,Session>();
-      
-      maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
-      
-      Runnable r = new Runnable() {
-        @Override
-        public void run() {
-          sweep(maxIdle);
-        }
-      };
-      
-      SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
-    }
-    
-    synchronized long createSession(Session session, boolean reserve) {
-      long sid = random.nextLong();
-      
-      while (sessions.containsKey(sid)) {
-        sid = random.nextLong();
-      }
-      
-      sessions.put(sid, session);
-      
-      session.reserved = reserve;
-      
-      session.startTime = session.lastAccessTime = System.currentTimeMillis();
-      
-      return sid;
-    }
-    
-    long getMaxIdleTime() {
-      return maxIdle;
-    }
-    
-    /**
-     * while a session is reserved, it cannot be canceled or removed
-     * 
-     * @param sessionId
-     */
-    
-    synchronized Session reserveSession(long sessionId) {
-      Session session = sessions.get(sessionId);
-      if (session != null) {
-        if (session.reserved)
-          throw new IllegalStateException();
-        session.reserved = true;
-      }
-      
-      return session;
-      
-    }
-    
-    synchronized Session reserveSession(long sessionId, boolean wait) {
-      Session session = sessions.get(sessionId);
-      if (session != null) {
-        while (wait && session.reserved) {
-          try {
-            wait(1000);
-          } catch (InterruptedException e) {
-            throw new RuntimeException();
-          }
-        }
-        
-        if (session.reserved)
-          throw new IllegalStateException();
-        session.reserved = true;
-      }
-      
-      return session;
-      
-    }
-    
-    synchronized void unreserveSession(Session session) {
-      if (!session.reserved)
-        throw new IllegalStateException();
-      notifyAll();
-      session.reserved = false;
-      session.lastAccessTime = System.currentTimeMillis();
-    }
-    
-    synchronized void unreserveSession(long sessionId) {
-      Session session = getSession(sessionId);
-      if (session != null)
-        unreserveSession(session);
-    }
-    
-    synchronized Session getSession(long sessionId) {
-      Session session = sessions.get(sessionId);
-      if (session != null)
-        session.lastAccessTime = System.currentTimeMillis();
-      return session;
-    }
-    
-    Session removeSession(long sessionId) {
-      return removeSession(sessionId, false);
-    }
-    
-    Session removeSession(long sessionId, boolean unreserve) {
-      Session session = null;
-      synchronized (this) {
-        session = sessions.remove(sessionId);
-        if (unreserve && session != null)
-          unreserveSession(session);
-      }
-      
-      // do clean up out side of lock..
-      if (session != null)
-        session.cleanup();
-      
-      return session;
-    }
-    
-    private void sweep(long maxIdle) {
-      ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
-      synchronized (this) {
-        Iterator<Session> iter = sessions.values().iterator();
-        while (iter.hasNext()) {
-          Session session = iter.next();
-          long idleTime = System.currentTimeMillis() - session.lastAccessTime;
-          if (idleTime > maxIdle && !session.reserved) {
-            iter.remove();
-            sessionsToCleanup.add(session);
-          }
-        }
-      }
-      
-      // do clean up outside of lock
-      for (Session session : sessionsToCleanup) {
-        session.cleanup();
-      }
-    }
-    
-    synchronized void removeIfNotAccessed(final long sessionId, long delay) {
-      Session session = sessions.get(sessionId);
-      if (session != null) {
-        final long removeTime = session.lastAccessTime;
-        TimerTask r = new TimerTask() {
-          @Override
-          public void run() {
-            Session sessionToCleanup = null;
-            synchronized (SessionManager.this) {
-              Session session2 = sessions.get(sessionId);
-              if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) {
-                sessions.remove(sessionId);
-                sessionToCleanup = session2;
-              }
-            }
-            
-            // call clean up outside of lock
-            if (sessionToCleanup != null)
-              sessionToCleanup.cleanup();
-          }
-        };
-        
-        SimpleTimer.getInstance().schedule(r, delay);
-      }
-    }
-    
-    public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
-      Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>();
-      for (Entry<Long,Session> entry : sessions.entrySet()) {
-        
-        Session session = entry.getValue();
-        @SuppressWarnings("rawtypes")
-        ScanTask nbt = null;
-        String tableID = null;
-        
-        if (session instanceof ScanSession) {
-          ScanSession ss = (ScanSession) session;
-          nbt = ss.nextBatchTask;
-          tableID = ss.extent.getTableId().toString();
-        } else if (session instanceof MultiScanSession) {
-          MultiScanSession mss = (MultiScanSession) session;
-          nbt = mss.lookupTask;
-          tableID = mss.threadPoolExtent.getTableId().toString();
-        }
-        
-        if (nbt == null)
-          continue;
-        
-        ScanRunState srs = nbt.getScanRunState();
-        
-        if (nbt == null || srs == ScanRunState.FINISHED)
-          continue;
-        
-        MapCounter<ScanRunState> stateCounts = counts.get(tableID);
-        if (stateCounts == null) {
-          stateCounts = new MapCounter<ScanRunState>();
-          counts.put(tableID, stateCounts);
-        }
-        
-        stateCounts.increment(srs, 1);
-      }
-      
-      return counts;
-    }
-    
-    public synchronized List<ActiveScan> getActiveScans() {
-      
-      ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
-      
-      long ct = System.currentTimeMillis();
-      
-      for (Entry<Long,Session> entry : sessions.entrySet()) {
-        Session session = entry.getValue();
-        if (session instanceof ScanSession) {
-          ScanSession ss = (ScanSession) session;
-          
-          ScanState state = ScanState.RUNNING;
-          
-          ScanTask<ScanBatch> nbt = ss.nextBatchTask;
-          if (nbt == null) {
-            state = ScanState.IDLE;
-          } else {
-            switch (nbt.getScanRunState()) {
-              case QUEUED:
-                state = ScanState.QUEUED;
-                break;
-              case FINISHED:
-                state = ScanState.IDLE;
-                break;
-              case RUNNING:
-              default:
-                /* do nothing */
-                break;
-            }
-          }
-          
-          activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE,
-              state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translator.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()));
-          
-        } else if (session instanceof MultiScanSession) {
-          MultiScanSession mss = (MultiScanSession) session;
-          
-          ScanState state = ScanState.RUNNING;
-          
-          ScanTask<MultiScanResult> nbt = mss.lookupTask;
-          if (nbt == null) {
-            state = ScanState.IDLE;
-          } else {
-            switch (nbt.getScanRunState()) {
-              case QUEUED:
-                state = ScanState.QUEUED;
-                break;
-              case FINISHED:
-                state = ScanState.IDLE;
-                break;
-              case RUNNING:
-              default:
-                /* do nothing */
-                break;
-            }
-          }
-          
-          activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime,
-              ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translator.CT), mss.ssiList, mss.ssio, mss.auths
-                  .getAuthorizationsBB()));
-        }
-      }
-      
-      return activeScans;
-    }
-  }
-  
-  static class TservConstraintEnv implements Environment {
-    
-    private TCredentials credentials;
-    private SecurityOperation security;
-    private Authorizations auths;
-    private KeyExtent ke;
-    
-    TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
-      this.security = secOp;
-      this.credentials = credentials;
-    }
-    
-    void setExtent(KeyExtent ke) {
-      this.ke = ke;
-    }
-    
-    @Override
-    public KeyExtent getExtent() {
-      return ke;
-    }
-    
-    @Override
-    public String getUser() {
-      return credentials.getPrincipal();
-    }
-    
-    @Override
-    public Authorizations getAuthorizations() {
-      if (auths == null)
-        try {
-          this.auths = security.getUserAuthorizations(credentials);
-        } catch (ThriftSecurityException e) {
-          throw new RuntimeException(e);
-        }
-      return auths;
-    }
-    
-  }
-  
-  private abstract class ScanTask<T> implements RunnableFuture<T> {
-    
-    protected AtomicBoolean interruptFlag;
-    protected ArrayBlockingQueue<Object> resultQueue;
-    protected AtomicInteger state;
-    protected AtomicReference<ScanRunState> runState;
-    
-    private static final int INITIAL = 1;
-    private static final int ADDED = 2;
-    private static final int CANCELED = 3;
-    
-    ScanTask() {
-      interruptFlag = new AtomicBoolean(false);
-      runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
-      state = new AtomicInteger(INITIAL);
-      resultQueue = new ArrayBlockingQueue<Object>(1);
-    }
-    
-    protected void addResult(Object o) {
-      if (state.compareAndSet(INITIAL, ADDED))
-        resultQueue.add(o);
-      else if (state.get() == ADDED)
-        throw new IllegalStateException("Tried to add more than one result");
-    }
-    
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-      if (!mayInterruptIfRunning)
-        throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task");
-      
-      if (state.get() == CANCELED)
-        return true;
-      
-      if (state.compareAndSet(INITIAL, CANCELED)) {
-        interruptFlag.set(true);
-        resultQueue = null;
-        return true;
-      }
-      
-      return false;
-    }
-    
-    @Override
-    public T get() throws InterruptedException, ExecutionException {
-      throw new UnsupportedOperationException();
-    }
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-      
-      ArrayBlockingQueue<Object> localRQ = resultQueue;
-      
-      if (state.get() == CANCELED)
-        throw new CancellationException();
-      
-      if (localRQ == null && state.get() == ADDED)
-        throw new IllegalStateException("Tried to get result twice");
-      
-      Object r = localRQ.poll(timeout, unit);
-      
-      // could have been canceled while waiting
-      if (state.get() == CANCELED) {
-        if (r != null)
-          throw new IllegalStateException("Nothing should have been added when in canceled state");
-        
-        throw new CancellationException();
-      }
-      
-      if (r == null)
-        throw new TimeoutException();
-      
-      // make this method stop working now that something is being
-      // returned
-      resultQueue = null;
-      
-      if (r instanceof Throwable)
-        throw new ExecutionException((Throwable) r);
-      
-      return (T) r;
-    }
-    
-    @Override
-    public boolean isCancelled() {
-      return state.get() == CANCELED;
-    }
-    
-    @Override
-    public boolean isDone() {
-      return runState.get().equals(ScanRunState.FINISHED);
-    }
-    
-    public ScanRunState getScanRunState() {
-      return runState.get();
-    }
-    
-  }
-  
-  private static class ConditionalSession extends Session {
-    public TCredentials credentials;
-    public Authorizations auths;
-    public String tableId;
-    public AtomicBoolean interruptFlag;
-    
-    @Override
-    public void cleanup() {
-      interruptFlag.set(true);
-    }
-  }
-  
-  private static class UpdateSession extends Session {
-    public Tablet currentTablet;
-    public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
-    Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>();
-    HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>();
-    public Violations violations;
-    public TCredentials credentials;
-    public long totalUpdates = 0;
-    public long flushTime = 0;
-    Stat prepareTimes = new Stat();
-    Stat walogTimes = new Stat();
-    Stat commitTimes = new Stat();
-    Stat authTimes = new Stat();
-    public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>();
-    public long queuedMutationSize = 0;
-    TservConstraintEnv cenv = null;
-  }
-  
-  private static class ScanSession extends Session {
-    public KeyExtent extent;
-    public HashSet<Column> columnSet;
-    public List<IterInfo> ssiList;
-    public Map<String,Map<String,String>> ssio;
-    public Authorizations auths;
-    public long entriesReturned = 0;
-    public Stat nbTimes = new Stat();
-    public long batchCount = 0;
-    public volatile ScanTask<ScanBatch> nextBatchTask;
-    public AtomicBoolean interruptFlag;
-    public Scanner scanner;
-    
-    @Override
-    public void cleanup() {
-      try {
-        if (nextBatchTask != null)
-          nextBatchTask.cancel(true);
-      } finally {
-        if (scanner != null)
-          scanner.close();
-      }
-    }
-    
-  }
-  
-  private static class MultiScanSession extends Session {
-    HashSet<Column> columnSet;
-    Map<KeyExtent,List<Range>> queries;
-    public List<IterInfo> ssiList;
-    public Map<String,Map<String,String>> ssio;
-    public Authorizations auths;
-    
-    // stats
-    int numRanges;
-    int numTablets;
-    int numEntries;
-    long totalLookupTime;
-    
-    public volatile ScanTask<MultiScanResult> lookupTask;
-    public KeyExtent threadPoolExtent;
-    
-    @Override
-    public void cleanup() {
-      if (lookupTask != null)
-        lookupTask.cancel(true);
-    }
-  }
-  
-  /**
-   * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids
-   * are monotonically increasing.
-   * 
-   */
-  static class WriteTracker {
-    private static AtomicLong operationCounter = new AtomicLong(1);
-    private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
-    
-    WriteTracker() {
-      for (TabletType ttype : TabletType.values()) {
-        inProgressWrites.put(ttype, new TreeSet<Long>());
-      }
-    }
-    
-    synchronized long startWrite(TabletType ttype) {
-      long operationId = operationCounter.getAndIncrement();
-      inProgressWrites.get(ttype).add(operationId);
-      return operationId;
-    }
-    
-    synchronized void finishWrite(long operationId) {
-      if (operationId == -1)
-        return;
-      
-      boolean removed = false;
-      
-      for (TabletType ttype : TabletType.values()) {
-        removed = inProgressWrites.get(ttype).remove(operationId);
-        if (removed)
-          break;
-      }
-      
-      if (!removed) {
-        throw new IllegalArgumentException("Attempted to finish write not in progress,  operationId " + operationId);
-      }
-      
-      this.notifyAll();
-    }
-    
-    synchronized void waitForWrites(TabletType ttype) {
-      long operationId = operationCounter.getAndIncrement();
-      while (inProgressWrites.get(ttype).floor(operationId) != null) {
-        try {
-          this.wait();
-        } catch (InterruptedException e) {
-          log.error(e, e);
-        }
-      }
-    }
-    
-    public long startWrite(Set<Tablet> keySet) {
-      if (keySet.size() == 0)
-        return -1;
-      
-      ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
-      
-      for (Tablet tablet : keySet)
-        extents.add(tablet.getExtent());
-      
-      return startWrite(TabletType.type(extents));
-    }
-  }
-  
-  public AccumuloConfiguration getSystemConfiguration() {
-    return serverConfig.getConfiguration();
-  }
-  
-  TransactionWatcher watcher = new TransactionWatcher();
-  
-  private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
-    
-    SessionManager sessionManager;
-    
-    AccumuloConfiguration acuConf = getSystemConfiguration();
-    
-    TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics();
-    
-    TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics();
-    
-    WriteTracker writeTracker = new WriteTracker();
-    
-    private RowLocks rowLocks = new RowLocks();
-    
-    ThriftClientHandler() {
-      super(instance, watcher);
-      log.debug(ThriftClientHandler.class.getName() + " created");
-      sessionManager = new SessionManager(getSystemConfiguration());
-      // Register the metrics MBean
-      try {
-        updateMetrics.register();
-        scanMetrics.register();
-      } catch (Exception e) {
-        log.error("Exception registering MBean with MBean Server", e);
-      }
-    }
-    
-    @Override
-    public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime)
-        throws ThriftSecurityException {
-      
-      if (!security.canPerformSystemActions(credentials))
-        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-      
-      List<TKeyExtent> failures = new ArrayList<TKeyExtent>();
-      
-      for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
-        TKeyExtent tke = entry.getKey();
-        Map<String,MapFileInfo> fileMap = entry.getValue();
-        Map<FileRef,MapFileInfo> fileRefMap = new HashMap<FileRef,MapFileInfo>();
-        for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
-          Path path = new Path(mapping.getKey());
-          FileSystem ns = fs.getFileSystemByPath(path);
-          path = ns.makeQualified(path);
-          fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
-        }
-        
-        Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
-        
-        if (importTablet == null) {
-          failures.add(tke);
-        } else {
-          try {
-            importTablet.importMapFiles(tid, fileRefMap, setTime);
-          } catch (IOException ioe) {
-            log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
-            failures.add(tke);
-          }
-        }
-      }
-      return failures;
-    }
-    
-    private class NextBatchTask extends ScanTask<ScanBatch> {
-      
-      private long scanID;
-      
-      NextBatchTask(long scanID, AtomicBoolean interruptFlag) {
-        this.scanID = scanID;
-        this.interruptFlag = interruptFlag;
-        
-        if (interruptFlag.get())
-          cancel(true);
-      }
-      
-      @Override
-      public void run() {
-        
-        final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID);
-        String oldThreadName = Thread.currentThread().getName();
-        
-        try {
-          if (isCancelled() || scanSession == null)
-            return;
-          
-          runState.set(ScanRunState.RUNNING);
-          
-          Thread.currentThread().setName(
-              "User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent);
-          
-          Tablet tablet = onlineTablets.get(scanSession.extent);
-          
-          if (tablet == null) {
-            addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
-            return;
-          }
-          
-          long t1 = System.currentTimeMillis();
-          ScanBatch batch = scanSession.scanner.read();
-          long t2 = System.currentTimeMillis();
-          scanSession.nbTimes.addStat(t2 - t1);
-          
-          // there should only be one thing on the queue at a time, so
-          // it should be ok to call add()
-          // instead of put()... if add() fails because queue is at
-          // capacity it means there is code
-          // problem somewhere
-          addResult(batch);
-        } catch (TabletClosedException e) {
-          addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift()));
-        } catch (IterationInterruptedException iie) {
-          if (!isCancelled()) {
-            log.warn("Iteration interrupted, when scan not cancelled", iie);
-            addResult(iie);
-          }
-        } catch (TooManyFilesException tmfe) {
-          addResult(tmfe);
-        } catch (Throwable e) {
-          log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)" : scanSession.extent), e);
-          addResult(e);
-        } finally {
-          runState.set(ScanRunState.FINISHED);
-          Thread.currentThread().setName(oldThreadName);
-        }
-        
-      }
-    }
-    
-    private class LookupTask extends ScanTask<MultiScanResult> {
-      
-      private long scanID;
-      
-      LookupTask(long scanID) {
-        this.scanID = scanID;
-      }
-      
-      @Override
-      public void run() {
-        MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID);
-        String oldThreadName = Thread.currentThread().getName();
-        
-        try {
-          if (isCancelled() || session == null)
-            return;
-          
-          TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString());
-          long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
-          
-          runState.set(ScanRunState.RUNNING);
-          Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: ");
-          
-          long bytesAdded = 0;
-          long maxScanTime = 4000;
-          
-          long startTime = System.currentTimeMillis();
-          
-          ArrayList<KVEntry> results = new ArrayList<KVEntry>();
-          Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
-          ArrayList<KeyExtent> fullScans = new ArrayList<KeyExtent>();
-          KeyExtent partScan = null;
-          Key partNextKey = null;
-          boolean partNextKeyInclusive = false;
-          
-          Iterator<Entry<KeyExtent,List<Range>>> iter = session.queries.entrySet().iterator();
-          
-          // check the time so that the read ahead thread is not monopolized
-          while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) {
-            Entry<KeyExtent,List<Range>> entry = iter.next();
-            
-            iter.remove();
-            
-            // check that tablet server is serving requested tablet
-            Tablet tablet = onlineTablets.get(entry.getKey());
-            if (tablet == null) {
-              failures.put(entry.getKey(), entry.getValue());
-              continue;
-            }
-            Thread.currentThread().setName(
-                "Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString());
-            
-            LookupResult lookupResult;
-            try {
-              
-              // do the following check to avoid a race condition
-              // between setting false below and the task being
-              // canceled
-              if (isCancelled())
-                interruptFlag.set(true);
-              
-              lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList,
-                  session.ssio, interruptFlag);
-              
-              // if the tablet was closed it it possible that the
-              // interrupt flag was set.... do not want it set for
-              // the next
-              // lookup
-              interruptFlag.set(false);
-              
-            } catch (IOException e) {
-              log.warn("lookup failed for tablet " + entry.getKey(), e);
-              throw new RuntimeException(e);
-            }
-            
-            bytesAdded += lookupResult.bytesAdded;
-            
-            if (lookupResult.unfinishedRanges.size() > 0) {
-              if (lookupResult.closed) {
-                failures.put(entry.getKey(), lookupResult.unfinishedRanges);
-              } else {
-                session.queries.put(entry.getKey(), lookupResult.unfinishedRanges);
-                partScan = entry.getKey();
-                partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey();
-                partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive();
-              }
-            } else {
-              fullScans.add(entry.getKey());
-            }
-          }
-          
-          long finishTime = System.currentTimeMillis();
-          session.totalLookupTime += (finishTime - startTime);
-          session.numEntries += results.size();
-          
-          // convert everything to thrift before adding result
-          List<TKeyValue> retResults = new ArrayList<TKeyValue>();
-          for (KVEntry entry : results)
-            retResults.add(new TKeyValue(entry.key.toThrift(), ByteBuffer.wrap(entry.value)));
-          Map<TKeyExtent,List<TRange>> retFailures = Translator.translate(failures, Translator.KET, new Translator.ListTranslator<Range,TRange>(Translator.RT));
-          List<TKeyExtent> retFullScans = Translator.translate(fullScans, Translator.KET);
-          TKeyExtent retPartScan = null;
-          TKey retPartNextKey = null;
-          if (partScan != null) {
-            retPartScan = partScan.toThrift();
-            retPartNextKey = partNextKey.toThrift();
-          }
-          // add results to queue
-          addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey, partNextKeyInclusive, session.queries.size() != 0));
-        } catch (IterationInterruptedException iie) {
-          if (!isCancelled()) {
-            log.warn("Iteration interrupted, when scan not cancelled", iie);
-            addResult(iie);
-          }
-        } catch (Throwable e) {
-          log.warn("exception while doing multi-scan ", e);
-          addResult(e);
-        } finally {
-          Thread.currentThread().setName(oldThreadName);
-          runState.set(ScanRunState.FINISHED);
-        }
-      }
-    }
-    
-    @Override
-    public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
-        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated)
-        throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
-      
-      Authorizations userauths = null;
-      if (!security.canScan(credentials, new String(textent.getTable()), range, columns, ssiList, ssio, authorizations))
-        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-      
-      userauths = security.getUserAuthorizations(credentials);
-      for (ByteBuffer auth : authorizations)
-        if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
-          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
-      
-      KeyExtent extent = new KeyExtent(textent);
-      
-      // wait for any writes that are in flight.. this done to ensure
-      // consistency across client restarts... assume a client writes
-      // to accumulo and dies while waiting for a confirmation from
-      // accumulo... the client process restarts and tries to read
-      // data from accumulo making the assumption that it will get
-      // any writes previously made... however if the server side thread
-      // processing the write from the dead client is still in progress,
-      // the restarted client may not see the write unless we wait here.
-      // this behavior is very important when the client is reading the
-      // !METADATA table
-      if (waitForWrites)
-        writeTracker.waitForWrites(TabletType.type(extent));
-      
-      Tablet tablet = onlineTablets.get(extent);
-      if (tablet == null)
-        throw new NotServingTabletException(textent);
-      
-      ScanSession scanSession = new ScanSession();
-      scanSession.user = credentials.getPrincipal();
-      scanSession.extent = new KeyExtent(extent);
-      scanSession.columnSet = new HashSet<Column>();
-      scanSession.ssiList = ssiList;
-      scanSession.ssio = ssio;
-      scanSession.auths = new Authorizations(authorizations);
-      scanSession.interruptFlag = new AtomicBoolean();
-      
-      for (TColumn tcolumn : columns) {
-        scanSession.columnSet.add(new Column(tcolumn));
-      }
-      
-      scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated,
-          scanSession.interruptFlag);
-      
-      long sid = sessionManager.createSession(scanSession, true);
-      
-      ScanResult scanResult;
-      try {
-        scanResult = continueScan(tinfo, sid, scanSession);
-      } catch (NoSuchScanIDException e) {
-        log.error("The impossible happened", e);
-        throw new RuntimeException();
-      } finally {
-        sessionManager.unreserveSession(sid);
-      }
-      
-      return new InitialScan(sid, scanResult);
-    }
-    
-    @Override
-    public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException,
-        org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
-      ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID);
-      if (scanSession == null) {
-        throw new NoSuchScanIDException();
-      }
-      
-      try {
-        return continueScan(tinfo, scanID, scanSession);
-      } finally {
-        sessionManager.unreserveSession(scanSession);
-      }
-    }
-    
-    private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException,
-        org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
-      
-      if (scanSession.nextBatchTask == null) {
-        scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
-        resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
-      }
-      
-      ScanBatch bresult;
-      try {
-        bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
-        scanSession.nextBatchTask = null;
-      } catch (ExecutionException e) {
-        sessionManager.removeSession(scanID);
-        if (e.getCause() instanceof NotServingTabletException)
-          throw (NotServingTabletException) e.getCause();
-        else if (e.getCause() instanceof TooManyFilesException)
-          throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(scanSession.extent.toThrift());
-        else
-          throw new RuntimeException(e);
-      } catch (CancellationException ce) {
-        sessionManager.removeSession(scanID);
-        Tablet tablet = onlineTablets.get(scanSession.extent);
-        if (tablet == null || tablet.isClosed())
-          throw new NotServingTabletException(scanSession.extent.toThrift());
-        else
-          throw new NoSuchScanIDException();
-      } catch (TimeoutException e) {
-        List<TKeyValue> param = Collections.emptyList();
-        long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
-        sessionManager.removeIfNotAccessed(scanID, timeout);
-        return new ScanResult(param, true);
-      } catch (Throwable t) {
-        sessionManager.removeSession(scanID);
-        log.warn("Failed to get next batch", t);
-        throw new RuntimeException(t);
-      }
-      
-      ScanResult scanResult = new ScanResult(Key.compress(bresult.results), bresult.more);
-      
-      scanSession.entriesReturned += scanResult.results.size();
-      
-      scanSession.batchCount++;
-      
-      if (scanResult.more && scanSession.batchCount > 3) {
-        // start reading next batch while current batch is transmitted
-        // to client
-        scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);
-        resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
-      }
-      
-      if (!scanResult.more)
-        closeScan(tinfo, scanID);
-      
-      return scanResult;
-    }
-    
-    @Override
-    public void closeScan(TInfo tinfo, long scanID) {
-      ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
-      if (ss != null) {
-        long t2 = System.currentTimeMillis();
-        
-        log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId()
-            .toString(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
-        if (scanMetrics.isEnabled()) {
-          scanMetrics.add(TabletServerScanMetrics.scan, t2 - ss.startTime);
-          scanMetrics.add(TabletServerScanMetrics.resultSize, ss.entriesReturned);
-        }
-      }
-    }
-    
-    @Override
-    public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
-        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws ThriftSecurityException {
-      // find all of the tables that need to be scanned
-      HashSet<String> tables = new HashSet<String>();
-      for (TKeyExtent keyExtent : tbatch.keySet()) {
-        tables.add(new String(keyExtent.getTable()));
-      }
-      
-      if (tables.size() != 1)
-        throw new IllegalArgumentException("Cannot batch scan over multiple tables");
-      
-      // check if user has permission to the tables
-      Authorizations userauths = null;
-      for (String table : tables)
-        if (!security.canScan(credentials, table, tbatch, tcolumns, ssiList, ssio, authorizations))
-          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-      
-      userauths = security.getUserAuthorizations(credentials);
-      for (ByteBuffer auth : authorizations)
-        if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
-          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
-      
-      Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, Translator.TKET, new Translator.ListTranslator<TRange,Range>(Translator.TRT));
-      
-      // This is used to determine which thread pool to use
-      KeyExtent threadPoolExtent = batch.keySet().iterator().next();
-      
-      if (waitForWrites)
-        writeTracker.waitForWrites(TabletType.type(batch.keySet()));
-      
-      MultiScanSession mss = new MultiScanSession();
-      mss.user = credentials.getPrincipal();
-      mss.queries = batch;
-      mss.columnSet = new HashSet<Column>(tcolumns.size());
-      mss.ssiList = ssiList;
-      mss.ssio = ssio;
-      mss.auths = new Authorizations(authorizations);
-      
-      mss.numTablets = batch.size();
-      for (List<Range> ranges : batch.values()) {
-        mss.numRanges += ranges.size();
-      }
-      
-      for (TColumn tcolumn : tcolumns)
-        mss.columnSet.add(new Column(tcolumn));
-      
-      mss.threadPoolExtent = threadPoolExtent;
-      
-      long sid = sessionManager.createSession(mss, true);
-      
-      MultiScanResult result;
-      try {
-        result = continueMultiScan(tinfo, sid, mss);
-      } catch (NoSuchScanIDException e) {
-        log.error("the impossible happened", e);
-        throw new RuntimeException("the impossible happened", e);
-      } finally {
-        sessionManager.unreserveSession(sid);
-      }
-      
-      return new InitialMultiScan(sid, result);
-    }
-    
-    @Override
-    public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
-      
-      MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
-      
-      if (session == null) {
-        throw new NoSuchScanIDException();
-      }
-      
-      try {
-        return continueMultiScan(tinfo, scanID, session);
-      } finally {
-        sessionManager.unreserveSession(session);
-      }
-    }
-    
-    private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException {
-      
-      if (session.lookupTask == null) {
-        session.lookupTask = new LookupTask(scanID);
-        resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
-      }
-      
-      try {
-        MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
-        session.lookupTask = null;
-        return scanResult;
-      } catch (TimeoutException e1) {
-        long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
-        sessionManager.removeIfNotAccessed(scanID, timeout);
-        List<TKeyValue> results = Collections.emptyList();
-        Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
-        List<TKeyExtent> fullScans = Collections.emptyList();
-        return new MultiScanResult(results, failures, fullScans, null, null, false, true);
-      } catch (Throwable t) {
-        sessionManager.removeSession(scanID);
-        log.warn("Failed to get multiscan result", t);
-        throw new RuntimeException(t);
-      }
-    }
-    
-    @Override
-    public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
-      MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
-      if (session == null) {
-        throw new NoSuchScanIDException();
-      }
-      
-      long t2 = System.currentTimeMillis();
-      log.debug(String.format("MultiScanSess %s %,d entries in %.2f secs (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", TServerUtils.clientAddress.get(),
-          session.numEntries, (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
-    }
-    
-    @Override
-    public long startUpdate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
-      // Make sure user is real
-      
-      security.authenticateUser(credentials, credentials);
-      if (updateMetrics.isEnabled())
-        updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
-      
-      UpdateSession us = new UpdateSession();
-      us.violations = new Violations();
-      us.credentials = credentials;
-      us.cenv = new TservConstraintEnv(security, us.credentials);
-      
-      long sid = sessionManager.createSession(us, false);
-      
-      return sid;
-    }
-    
-    private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
-      long t1 = System.currentTimeMillis();
-      if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent))
-        return;
-      if (us.currentTablet == null && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
-        // if there were previous failures, then do not accept additional writes
-        return;
-      }
-      
-      try {
-        // if user has no permission to write to this table, add it to
-        // the failures list
-        boolean sameTable = us.currentTablet != null && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
-        if (sameTable || security.canWrite(us.credentials, keyExtent.getTableId().toString())) {
-          long t2 = System.currentTimeMillis();
-          us.authTimes.addStat(t2 - t1);
-          us.currentTablet = onlineTablets.get(keyExtent);
-          if (us.currentTablet != null) {
-            us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
-          } else {
-            // not serving tablet, so report all mutations as
-            // failures
-            us.failures.put(keyExtent, 0l);
-            if (updateMetrics.isEnabled())
-              updateMetrics.add(TabletServerUpdateMetrics.unknownTabletErrors, 0);
-          }
-        } else {
-          log.warn("Denying access to table " + keyExtent.getTableId() + " for user " + us.credentials.getPrincipal());
-          long t2 = System.currentTimeMillis();
-          us.authTimes.addStat(t2 - t1);
-          us.currentTablet = null;
-          us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
-          if (updateMetrics.isEnabled())
-            updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
-          return;
-        }
-      } catch (ThriftSecurityException e) {
-        log.error("Denying permission to check user " + us.credentials.getPrincipal() + " with user " + e.getUser(), e);
-        long t2 = System.currentTimeMillis();
-        us.authTimes.addStat(t2 - t1);
-        us.currentTablet = null;
-        us.authFailures.put(keyExtent, e.getCode());
-        if (updateMetrics.isEnabled())
-          updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
-        return;
-      }
-    }
-    
-    @Override
-    public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List<TMutation> tmutations) {
-      UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
-      if (us == null) {
-        throw new RuntimeException("No Such SessionID");
-      }
-      
-      try {
-        KeyExtent keyExtent = new KeyExtent(tkeyExtent);
-        setUpdateTablet(us, keyExtent);
-        
-        if (us.currentTablet != null) {
-          List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
-          for (TMutation tmutation : tmutations) {
-            Mutation mutation = new ServerMutation(tmutation);
-            mutations.add(mutation);
-            us.queuedMutationSize += mutation.numBytes();
-          }
-          if (us.queuedMutationSize > getSystemConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX))
-            flush(us);
-        }
-      } finally {
-        sessionManager.unreserveSession(us);
-      }
-    }
-    
-    private void flush(UpdateSession us) {
-      
-      int mutationCount = 0;
-      Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
-      Throwable error = null;
-      
-      long pt1 = System.currentTimeMillis();
-      
-      boolean containsMetadataTablet = false;
-      for (Tablet tablet : us.queuedMutations.keySet())
-        if (tablet.getExtent().isMeta())
-          containsMetadataTablet = true;
-      
-      if (!containsMetadataTablet && us.queuedMutations.size() > 0)
-        TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
-      
-      Span prep = Trace.start("prep");
-      try {
-        for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
-          
-          Tablet tablet = entry.getKey();
-          List<Mutation> mutations = entry.getValue();
-          if (mutations.size() > 0) {
-            try {
-              if (updateMetrics.isEnabled())
-                updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size());
-              
-              CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
-              if (commitSession == null) {
-                if (us.currentTablet == tablet) {
-                  us.currentTablet = null;
-                }
-                us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
-              } else {
-                sendables.put(commitSession, mutations);
-                mutationCount += mutations.size();
-              }
-              
-            } catch (TConstraintViolationException e) {
-              us.violations.add(e.getViolations());
-              if (updateMetrics.isEnabled())
-                updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0);
-              
-              if (e.getNonViolators().size() > 0) {
-                // only log and commit mutations if there were some
-                // that did not
-                // violate constraints... this is what
-                // prepareMutationsForCommit()
-                // expects
-                sendables.put(e.getCommitSession(), e.getNonViolators());
-              }
-              
-              mutationCount += mutations.size();
-              
-            } catch (HoldTimeoutException t) {
-              error = t;
-              log.debug("Giving up on mutations due to a long memory hold time");
-              break;
-            } catch (Throwable t) {
-              error = t;
-              log.error("Unexpected error preparing for commit", error);
-              break;
-            }
-          }
-        }
-      } finally {
-        prep.stop();
-      }
-      
-      long pt2 = System.currentTimeMillis();
-      long avgPrepareTime = (long) ((pt2 - pt1) / (double) us.queuedMutations.size());
-      us.prepareTimes.addStat(pt2 - pt1);
-      if (updateMetrics.isEnabled())
-        updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (avgPrepareTime));
-      
-      if (error != null) {
-        for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) {
-          e.getKey().abortCommit(e.getValue());
-        }
-        throw new RuntimeException(error);
-      }
-      try {
-        Span wal = Trace.start("wal");
-        try {
-          while (true) {
-            try {
-              long t1 = System.currentTimeMillis();
-              
-              logger.logManyTablets(sendables);
-              
-              long t2 = System.currentTimeMillis();
-              us.walogTimes.addStat(t2 - t1);
-              if (updateMetrics.isEnabled())
-                updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, (t2 - t1));
-              
-              break;
-            } catch (IOException ex) {
-              log.warn("logging mutations failed, retrying");
-            } catch (FSError ex) { // happens when DFS is localFS
-              log.warn("logging mutations failed, retrying");
-            } catch (Throwable t) {
-              log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
-              throw new RuntimeException(t);
-            }
-          }
-        } finally {
-          wal.stop();
-        }
-        
-        Span commit = Trace.start("commit");
-        try {
-          long t1 = System.currentTimeMillis();
-          for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
-            CommitSession commitSession = entry.getKey();
-            List<Mutation> mutations = entry.getValue();
-            
-            commitSession.commit(mutations);
-            
-            Tablet tablet = commitSession.getTablet();
-            
-            if (tablet == us.currentTablet) {
-              // because constraint violations may filter out some
-              // mutations, for proper
-              // accounting with the client code, need to increment
-              // the count based
-              // on the original number of mutations from the client
-              // NOT the filtered number
-              us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size());
-            }
-          }
-          long t2 = System.currentTimeMillis();
-          
-          long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size());
-          
-          us.flushTime += (t2 - pt1);
-          us.commitTimes.addStat(t2 - t1);
-          
-          if (updateMetrics.isEnabled())
-            updateMetrics.add(TabletServerUpdateMetrics.commitTime, avgCommitTime);
-        } finally {
-          commit.stop();
-        }
-      } finally {
-        us.queuedMutations.clear();
-        if (us.currentTablet != null) {
-          us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
-        }
-        us.queuedMutationSize = 0;
-      }
-      us.totalUpdates += mutationCount;
-    }
-    
-    @Override
-    public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
-      UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
-      if (us == null) {
-        throw new NoSuchScanIDException();
-      }
-      
-      // clients may or may not see data from an update session while
-      // it is in progress, however when the update session is closed
-      // want to ensure that reads wait for the write to finish
-      long opid = writeTracker.startWrite(us.queuedMutations.keySet());
-      
-      try {
-        flush(us);
-      } finally {
-        writeTracker.finishWrite(opid);
-      }
-      
-      log.debug(String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates,
-          (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
-          us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
-      if (us.failures.size() > 0) {
-        Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
-        log.debug(String.format("Failures: %d, first extent %s successful commits: %d", us.failures.size(), first.getKey().toString(), first.getValue()));
-      }
-      List<ConstraintViolationSummary> violations = us.violations.asList();
-      if (violations.size() > 0) {
-        ConstraintViolationSummary first = us.violations.asList().iterator().next();
-        log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(), first.violationDescription, first.numberOfViolatingMutations));
-      }
-      if (us.authFailures.size() > 0) {
-        KeyExtent first = us.authFailures.keySet().iterator().next();
-        log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), first.toString()));
-      }
-      
-      return new UpdateErrors(Translator.translate(us.failures, Translator.KET), Translator.translate(violations, Translator.CVST), Translator.translate(
-          us.authFailures, Translator.KET));
-    }
-    
-    @Override
-    public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation tmutation) throws NotServingTabletException,
-        ConstraintViolationException, ThriftSecurityException {
-      
-      if (!security.canWrite(credentials, new String(tkeyExtent.getTable())))
-        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-      KeyExtent keyExtent = new KeyExtent(tkeyExtent);
-      Tablet tablet = onlineTablets.get(new KeyExtent(keyExtent));
-      if (tablet == null) {
-        throw new NotServingTabletException(tkeyExtent);
-      }
-      
-      if (!keyExtent.isMeta())
-        TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
-      
-      long opid = writeTracker.startWrite(TabletType.type(keyExtent));
-      
-      try {
-        Mutation mutation = new ServerMutation(tmutation);
-        List<Mutation> mutations = Collections.singletonList(mutation);
-        
-        Span prep = Trace.start("prep");
-        CommitSession cs;
-        try {
-          cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations);
-        } finally {
-          prep.stop();
-        }
-        if (cs == null) {
-          throw new NotServingTabletException(tkeyExtent);
-        }
-        
-        while (true) {
-          try {
-            Span wal = Trace.start("wal");
-            try {
-              logger.log(cs, cs.getWALogSeq(), mutation);
-            } finally {
-              wal.stop();
-            }
-            break;
-          } catch (IOException ex) {
-            log.warn(ex, ex);
-          }
-        }
-        
-        Span commit = Trace.start("commit");
-        try {
-          cs.commit(mutations);
-        } finally {
-          commit.stop();
-        }
-      } catch (TConstraintViolationException e) {
-        throw new ConstraintViolationException(Translator.translate(e.getViolations().asList(), Translator.CVST));
-      } finally {
-        writeTracker.finishWrite(opid);
-      }
-    }
-    
-    private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession cs,
-        List<String> symbols) throws IOException {
-      Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
-      
-      CompressedIterators compressedIters = new CompressedIterators(symbols);
-      
-      while (iter.hasNext()) {
-        Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
-        Tablet tablet = onlineTablets.get(entry.getKey());
-        
-        if (tablet == null || tablet.isClosed()) {
-          for (ServerConditionalMutation scm : entry.getValue())
-            results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-          iter.remove();
-        } else {
-          List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
-          
-          for (ServerConditionalMutation scm : entry.getValue()) {
-            if (checkCondition(results, cs, compressedIters, tablet, scm))
-              okMutations.add(scm);
-          }
-          
-          entry.setValue(okMutations);
-        }
-        
-      }
-    }
-    
-    boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters, Tablet tablet,
-        ServerConditionalMutation scm) throws IOException {
-      boolean add = true;
-      
-      Set<Column> emptyCols = Collections.emptySet();
-      
-      for (TCondition tc : scm.getConditions()) {
-        
-        Range range;
-        if (tc.hasTimestamp)
-          range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
-        else
-          range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
-        
-        IterConfig ic = compressedIters.decompress(tc.iterators);
-        
-        Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
-        
-        try {
-          ScanBatch batch = scanner.read();
-          
-          Value val = null;
-          
-          for (KVEntry entry2 : batch.results) {
-            val = entry2.getValue();
-            break;
-          }
-          
-          if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
-            results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
-            add = false;
-            break;
-          }
-          
-        } catch (TabletClosedException e) {
-          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-          add = false;
-          break;
-        } catch (IterationInterruptedException iie) {
-          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-          add = false;
-          break;
-        } catch (TooManyFilesException tmfe) {
-          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-          add = false;
-          break;
-        }
-      }
-      return add;
-    }
-    
-    private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) {
-      Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
-      
-      Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
-      
-      boolean sessionCanceled = sess.interruptFlag.get();
-      
-      for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
-        Tablet tablet = onlineTablets.get(entry.getKey());
-        if (tablet == null || tablet.isClosed() || sessionCanceled) {
-          for (ServerConditionalMutation scm : entry.getValue())
-            results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-        } else {
-          try {
-            
-            @SuppressWarnings("unchecked")
-            List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
-            if (mutations.size() > 0) {
-              
-              CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations);
-              
-              if (cs == null) {
-                for (ServerConditionalMutation scm : entry.getValue())
-                  results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-              } else {
-                for (ServerConditionalMutation scm : entry.getValue())
-                  results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
-                sendables.put(cs, mutations);
-              }
-            }
-          } catch (TConstraintViolationException e) {
-            if (e.getNonViolators().size() > 0) {
-              sendables.put(e.getCommitSession(), e.getNonViolators());
-              for (Mutation m : e.getNonViolators())
-                results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
-            }
-            
-            for (Mutation m : e.getViolators())
-              results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED));
-          }
-        }
-      }
-      
-      while (true && sendables.size() > 0) {
-        try {
-          logger.logManyTablets(sendables);
-          break;
-        } catch (IOException ex) {
-          log.warn("logging mutations failed, retrying");
-        } catch (FSError ex) { // happens when DFS is localFS
-          log.warn("logging mutations failed, retrying");
-        } catch (Throwable t) {
-          log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
-          throw new RuntimeException(t);
-        }
-      }
-      
-      for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
-        CommitSession commitSession = entry.getKey();
-        List<Mutation> mutations = entry.getValue();
-        
-        commitSession.commit(mutations);
-      }
-      
-    }
-    
-    private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs, Map<KeyExtent,List<ServerConditionalMutation>> updates,
-        ArrayList<TCMResult> results, List<String> symbols) throws IOException {
-      // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
-      ConditionalMutationSet.sortConditionalMutations(updates);
-      
-      Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<KeyExtent,List<ServerConditionalMutation>>();
-      
-      // can not process two mutations for the same row, because one will not see what the other writes
-      ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
-      
-      // get as many locks as possible w/o blocking... defer any rows that are locked
-      List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
-      try {
-        checkConditions(updates, results, cs, symbols);
-        writeConditionalMutations(updates, results, cs);
-      } finally {
-        rowLocks.releaseRowLocks(locks);
-      }
-      return deferred;
-    }
-    
-    @Override
-    public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID)
-        throws ThriftSecurityException, TException {
-      
-      Authorizations userauths = null;
-      if (!security.canConditionallyUpdate(credentials, tableID, authorizations))
-        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-      
-      userauths = security.getUserAuthorizations(credentials);
-      for (ByteBuffer auth : authorizations)
-        if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
-          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
-      
-      ConditionalSession cs = new ConditionalSession();
-      cs.auths = new Authorizations(authorizations);
-      cs.credentials = credentials;
-      cs.tableId = tableID;
-      cs.interruptFlag = new AtomicBoolean();
-      
-      long sid = sessionManager.createSession(cs, false);
-      return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());
-    }
-    
-    @Override
-    public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
-        throws NoSuchScanIDException, TException {
-      
-      ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
-      
-      if (cs == null || cs.interruptFlag.get())
-        throw new NoSuchScanIDException();
-      
-      Text tid = new Text(cs.tableId);
-      long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
-      
-      try {
-        Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
-            new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
-        
-        for (KeyExtent ke : updates.keySet())
-          if (!ke.getTableId().equals(tid))
-            throw new IllegalArgumentException("Unexpected table id " + tid + " != " + ke.getTableId());
-        
-        ArrayList<TCMResult> results = new ArrayList<TCMResult>();
-        
-        Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(cs, updates, results, symbols);
-        
-        while (deferred.size() > 0) {
-          deferred = conditionalUpdate(cs, deferred, results, symbols);
-        }
-        
-        return results;
-      } catch (IOException ioe) {
-        throw new TException(ioe);
-      } finally {
-        writeTracker.finishWrite(opid);
-        sessionManager.unreserveSession(sessID);
-      }
-    }
-    
-    @Override
-    public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
-      // this method should wait for any running conditional update to complete
-      // after this method returns a conditional update should not be able to start
-      
-      ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
-      if (cs != null)
-        cs.interruptFlag.set(true);
-      
-      cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
-      if (cs != null)
-        sessionManager.removeSession(sessID, true);
-    }
-    
-    @Override
-    public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {
-      sessionManager.removeSession(sessID, false);
-    }
-    
-    @Override
-    public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,
-        ThriftSecurityException {
-      
-      String tableId = new String(ByteBufferUtil.toBytes(tkeyExtent.table));
-      if (!security.canSplitTablet(credentials, tableId))
-        throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-      
-      KeyExtent keyExtent = new KeyExtent(tkeyExtent);
-      
-      Tablet tablet = onlineTablets.get(keyExtent);
-      if (tablet == null) {
-        throw new NotServingTabletException(tkeyExtent);
-      }
-      
-      if (keyExtent.getEndRow() == null || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
-        try {
-          if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
-            throw new NotServingTabletException(tkeyExtent);
-          }
-        } catch (IOException e) {
-          log.warn("Failed to split " + keyExtent, e);
-          throw new RuntimeException(e);
-        }
-      }
-    }
-    
-    @Override
-    public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
-      return getStats(sessionManager.getActiveScansPerTable());
-    }
-    
-    @Override
-    public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException {
-      TreeMap<KeyExtent,Tablet> onlineTabletsCopy;
-      synchronized (onlineTablets) {
-        onlineTabletsCopy = new TreeMap<KeyExtent,Tablet>(onlineTablets);
-      }
-      List<TabletStats> result = new ArrayList<TabletStats>();
-      Text text = new Text(tableId);
-      KeyExtent start = new KeyExtent(text, new Text(), null);
-      for (Entry<KeyExtent,Tablet> entry : onlineTabletsCopy.tailMap(start).entrySet()) {
-        KeyExtent ke = entry.getKey();
-        if (ke.getTableId().compareTo(text) == 0) {
-          Tablet tablet = entry.getValue();
-          TabletStats stats = tablet.timer.getTabletStats();
-          stats.extent = ke.toThrift();
-          stats.ingestRate = tablet.ingestRate();
-          stats.queryRate = tablet.queryRate();
-          stats.splitCreationTime = tablet.getSplitCreationTime();
-          stats.numEntries = tablet.getNumEntries();
-          result.add(stats);
-        }
-      }
-      return result;
-    }
-    
-    private ZooCache masterLockCache = new ZooCache();
-    
-    private void checkPermission(TCredentials credentials, String lock, final String request) throws ThriftSecurityException {
-      boolean fatal = false;
-      try {
-        log.debug("Got " + request + " message from user: " + credentials.getPrincipal());
-        if (!security.canPerformSystemActions(credentials)) {
-          log.warn("Got " + request + " message from user: " + credentials.getPrincipal());
-          throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-        }
-      } catch (ThriftSecurityException e) {
-        log.warn("Got " + request + " message from unauthenticatable user: " + e.getUser());
-        if (SystemCredentials.get().getToken().getClass().getName().equals(credentials.getTokenClassName())) {
-          log.fatal("Got message from a service with a mismatched configuration. Please ensure a compatible configuration.", e);
-          fatal = true;
-        }
-        throw e;
-      } finally {
-        if (fatal) {
-          Halt.halt(1, new Runnable() {
-            @Override
-            public void run() {
-              logGCInfo(getSystemConfiguration());
-            }
-          });
-        }
-      }
-      
-      if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
-        log.warn("Got " + request + " message from master before lock acquired, ignoring...");
-        throw new RuntimeException("Lock not acquired");
-      }
-      
-      if (tabletServerLock != null && tabletServerLock.wasLockAcquired() && !tabletServerLock.isLocked()) {
-        Halt.halt(1, new Runnable() {
-          @Override
-          public void run() {
-            log.info("Tablet server no longer holds lock during checkPermission() : " + request + ", exiting");
-            logGCInfo(getSystemConfiguration());
-          }
-        });
-      }
-      
-      if (lock != null) {
-        ZooUtil.LockID lid = new ZooUtil.LockID(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK, lock);
-        
-        try {
-          if (!ZooLock.isLockHeld(masterLockCache, lid)) {
-            // maybe the cache is out of date and a new master holds the
-            // lock?
-            masterLockCache.clear();
-            if (!ZooLock.isLockHeld(masterLockCache, lid)) {
-              log.warn("Got " + request + " message from a master that does not hold the current lock " + lock);
-              throw new RuntimeException("bad master lock");
-            }
-          }
-        } catch (Exception e) {
-          throw new RuntimeException("bad master lock", e);
-        }
-      }
-    }
-    
-    @Override
-    public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) {
-      
-      try {
-        checkPermission(credentials, lock, "loadTablet");
-      } catch (ThriftSecurityException e) {
-        log.error(e, e);
-        throw new RuntimeException(e);
-      }
-      
-      final KeyExtent extent = new KeyExtent(textent);
-      
-      synchronized (unopenedTablets) {
-        synchronized (openingTablets) {
-          synchronized (onlineTablets) {
-            
-            // checking if this exact tablet is in any of the sets
-            // below is not a strong enough check
-            // when splits and fix splits occurring
-            
-            Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
-            Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
-            Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
-            
-            // ignore any tablets that have recently split
-            Iterator<KeyExtent> each = onlineOverlapping.iterator();
-            while (each.hasNext()) {
-              Tablet tablet = onlineTablets.get(each.next());
-              if (System.currentTimeMillis() - tablet.getSplitCreationTime() < RECENTLY_SPLIT_MILLIES) {
-                each.remove();
-              }
-            }
-            
-            Set<KeyExtent> all = new HashSet<KeyExtent>();
-            all.addAll(unopenedOverlapping);
-            all.addAll(openingOverlapping);
-            all.addAll(onlineOverlapping);
-            
-            if (!all.isEmpty()) {
-              if (all.size() != 1 || !all.contains(extent)) {
-                log.error("Tablet " + extent + " overlaps previously assigned " + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping);
-              }
-              return;
-            }
-            
-            unopenedTablets.add(extent);
-          }
-        }
-      }
-      
-      // add the assignment job to the appropriate queue
-      log.info("Loading tablet " + extent);
-      
-      final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent));
-      // Root tablet assignment must take place immediately
-      if (ex

<TRUNCATED>

Mime
View raw message