Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7802417492 for ; Thu, 9 Oct 2014 22:47:19 +0000 (UTC) Received: (qmail 31926 invoked by uid 500); 9 Oct 2014 22:47:19 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 31816 invoked by uid 500); 9 Oct 2014 22:47:19 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 31571 invoked by uid 99); 9 Oct 2014 22:47:19 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Oct 2014 22:47:19 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id EC107DF47; Thu, 9 Oct 2014 22:47:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Thu, 09 Oct 2014 22:47:24 -0000 Message-Id: <445bddf4f9b34198a21245aa091d49ae@git.apache.org> In-Reply-To: <4e1fd50ee8e0474e8733a2202f890df8@git.apache.org> References: <4e1fd50ee8e0474e8733a2202f890df8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/10] Merge branch '1.5' into 1.6 http://git-wip-us.apache.org/repos/asf/accumulo/blob/3d7d762a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 2551bd0,0000000..3e89ca8 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -1,3926 -1,0 +1,3926 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + +import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TimerTask; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import javax.management.ObjectName; +import javax.management.StandardMBean; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.CompressedIterators; +import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig; +import org.apache.accumulo.core.client.impl.ScannerImpl; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.TabletType; +import org.apache.accumulo.core.client.impl.Translator; +import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator; +import org.apache.accumulo.core.client.impl.Translator.TRangeTranslator; +import org.apache.accumulo.core.client.impl.Translators; +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.constraints.Constraint.Environment; +import org.apache.accumulo.core.constraints.Violations; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.ConstraintViolationSummary; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.thrift.InitialMultiScan; +import org.apache.accumulo.core.data.thrift.InitialScan; +import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.data.thrift.MapFileInfo; +import org.apache.accumulo.core.data.thrift.MultiScanResult; +import org.apache.accumulo.core.data.thrift.ScanResult; +import org.apache.accumulo.core.data.thrift.TCMResult; +import org.apache.accumulo.core.data.thrift.TCMStatus; +import org.apache.accumulo.core.data.thrift.TColumn; +import org.apache.accumulo.core.data.thrift.TCondition; +import org.apache.accumulo.core.data.thrift.TConditionalMutation; +import org.apache.accumulo.core.data.thrift.TConditionalSession; +import org.apache.accumulo.core.data.thrift.TKey; +import org.apache.accumulo.core.data.thrift.TKeyExtent; +import org.apache.accumulo.core.data.thrift.TKeyValue; +import org.apache.accumulo.core.data.thrift.TMutation; +import org.apache.accumulo.core.data.thrift.TRange; +import org.apache.accumulo.core.data.thrift.UpdateErrors; +import org.apache.accumulo.core.iterators.IterationInterruptedException; +import org.apache.accumulo.core.master.thrift.Compacting; +import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.master.thrift.TabletLoadState; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.security.AuthorizationContainer; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.SecurityUtil; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; +import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; +import org.apache.accumulo.core.tabletserver.thrift.ScanState; +import org.apache.accumulo.core.tabletserver.thrift.ScanType; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor; +import org.apache.accumulo.core.tabletserver.thrift.TabletStats; +import org.apache.accumulo.core.util.ByteBufferUtil; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.MapCounter; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.SimpleThreadPool; +import org.apache.accumulo.core.util.Stat; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.client.ClientServiceHandler; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.data.ServerMutation; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManager.FileType; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.fs.VolumeUtil; +import org.apache.accumulo.server.log.SortedLogState; +import org.apache.accumulo.server.master.recovery.RecoveryPath; +import org.apache.accumulo.server.master.state.Assignment; +import org.apache.accumulo.server.master.state.DistributedStoreException; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; +import org.apache.accumulo.server.master.state.TabletStateStore; +import org.apache.accumulo.server.master.state.ZooTabletStateStore; +import org.apache.accumulo.server.metrics.AbstractMetricsImpl; +import org.apache.accumulo.server.problems.ProblemReport; +import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.util.FileSystemMonitor; +import org.apache.accumulo.server.util.Halt; +import org.apache.accumulo.server.util.MasterMetadataUtil; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.accumulo.server.util.RpcWrapper; +import org.apache.accumulo.server.util.TServerUtils; +import org.apache.accumulo.server.util.TServerUtils.ServerAddress; +import org.apache.accumulo.server.util.time.RelativeTime; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; +import org.apache.accumulo.server.zookeeper.TransactionWatcher; +import org.apache.accumulo.server.zookeeper.ZooCache; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; +import org.apache.accumulo.start.classloader.vfs.ContextManager; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.accumulo.trace.thrift.TInfo; +import org.apache.accumulo.tserver.Compactor.CompactionInfo; +import org.apache.accumulo.tserver.RowLocks.RowLock; +import org.apache.accumulo.tserver.Tablet.CommitSession; +import org.apache.accumulo.tserver.Tablet.KVEntry; +import org.apache.accumulo.tserver.Tablet.LookupResult; +import org.apache.accumulo.tserver.Tablet.MinorCompactionReason; +import org.apache.accumulo.tserver.Tablet.ScanBatch; +import org.apache.accumulo.tserver.Tablet.Scanner; +import org.apache.accumulo.tserver.Tablet.SplitInfo; +import org.apache.accumulo.tserver.Tablet.TConstraintViolationException; +import org.apache.accumulo.tserver.Tablet.TabletClosedException; +import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager; +import org.apache.accumulo.tserver.TabletStatsKeeper.Operation; +import org.apache.accumulo.tserver.compaction.MajorCompactionReason; +import org.apache.accumulo.tserver.data.ServerConditionalMutation; +import org.apache.accumulo.tserver.log.DfsLogger; +import org.apache.accumulo.tserver.log.LogSorter; +import org.apache.accumulo.tserver.log.MutationReceiver; +import org.apache.accumulo.tserver.log.TabletServerLogger; +import org.apache.accumulo.tserver.mastermessage.MasterMessage; +import org.apache.accumulo.tserver.mastermessage.SplitReportMessage; +import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage; +import org.apache.accumulo.tserver.metrics.TabletServerMBean; +import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics; +import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; +import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics; +import org.apache.commons.collections.map.LRUMap; +import org.apache.hadoop.fs.FSError; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.TServiceClient; +import org.apache.thrift.server.TServer; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; + +import com.google.common.net.HostAndPort; + +enum ScanRunState { + QUEUED, RUNNING, FINISHED +} + +public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.tserver.metrics.TabletServerMBean { + private static final Logger log = Logger.getLogger(TabletServer.class); + + private static HashMap prevGcTime = new HashMap(); + private static long lastMemorySize = 0; + private static long lastMemoryCheckTime = 0; + private static long gcTimeIncreasedCount; + + private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000; + private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000; + private static final long TIME_BETWEEN_GC_CHECKS = 5000; + + private TabletServerLogger logger; + + protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics(); + + private ServerConfiguration serverConfig; + private LogSorter logSorter = null; + + public TabletServer(ServerConfiguration conf, VolumeManager fs) { + super(); + this.serverConfig = conf; + this.instance = conf.getInstance(); + this.fs = fs; + this.logSorter = new LogSorter(instance, fs, getSystemConfiguration()); + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + synchronized (onlineTablets) { + long now = System.currentTimeMillis(); + for (Tablet tablet : onlineTablets.values()) + try { + tablet.updateRates(now); + } catch (Exception ex) { + log.error(ex, ex); + } + } + } + }, TIME_BETWEEN_GC_CHECKS, TIME_BETWEEN_GC_CHECKS); + } + + private synchronized static void logGCInfo(AccumuloConfiguration conf) { + long now = System.currentTimeMillis(); + + List gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); + Runtime rt = Runtime.getRuntime(); + + StringBuilder sb = new StringBuilder("gc"); + + boolean sawChange = false; + + long maxIncreaseInCollectionTime = 0; + + for (GarbageCollectorMXBean gcBean : gcmBeans) { + Long prevTime = prevGcTime.get(gcBean.getName()); + long pt = 0; + if (prevTime != null) { + pt = prevTime; + } + + long time = gcBean.getCollectionTime(); + + if (time - pt != 0) { + sawChange = true; + } + + long increaseInCollectionTime = time - pt; + sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0)); + maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime); + prevGcTime.put(gcBean.getName(), time); + } + + long mem = rt.freeMemory(); + if (maxIncreaseInCollectionTime == 0) { + gcTimeIncreasedCount = 0; + } else { + gcTimeIncreasedCount++; + if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) { + log.warn("Running low on memory"); + gcTimeIncreasedCount = 0; + } + } + + if (mem > lastMemorySize) { + sawChange = true; + } + + String sign = "+"; + if (mem - lastMemorySize <= 0) { + sign = ""; + } + + sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory())); + + if (sawChange) { + log.debug(sb.toString()); + } + + final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); + if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) { + long diff = now - lastMemoryCheckTime; + if (diff > keepAliveTimeout) { + log.warn(String.format("GC pause checker not called in a timely fashion. Expected every %.1f seconds but was %.1f seconds since last check", + TIME_BETWEEN_GC_CHECKS / 1000., diff / 1000.)); + } + lastMemoryCheckTime = now; + return; + } + + if (maxIncreaseInCollectionTime > keepAliveTimeout) { + Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1); + } + + lastMemorySize = mem; + lastMemoryCheckTime = now; + } + + private TabletStatsKeeper statsKeeper; + + private static class Session { + long lastAccessTime; + long startTime; + String user; + String client = TServerUtils.clientAddress.get(); + public boolean reserved; + + public void cleanup() {} + } + + private static class SessionManager { + + SecureRandom random; + Map sessions; + long maxIdle; + + SessionManager(AccumuloConfiguration conf) { + random = new SecureRandom(); + sessions = new HashMap(); + + maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); + + Runnable r = new Runnable() { + @Override + public void run() { + sweep(maxIdle); + } + }; + + SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000)); + } + + synchronized long createSession(Session session, boolean reserve) { + long sid = random.nextLong(); + + while (sessions.containsKey(sid)) { + sid = random.nextLong(); + } + + sessions.put(sid, session); + + session.reserved = reserve; + + session.startTime = session.lastAccessTime = System.currentTimeMillis(); + + return sid; + } + + long getMaxIdleTime() { + return maxIdle; + } + + /** + * while a session is reserved, it cannot be canceled or removed + */ + synchronized Session reserveSession(long sessionId) { + Session session = sessions.get(sessionId); + if (session != null) { + if (session.reserved) + throw new IllegalStateException(); + session.reserved = true; + } + + return session; + + } + + synchronized Session reserveSession(long sessionId, boolean wait) { + Session session = sessions.get(sessionId); + if (session != null) { + while (wait && session.reserved) { + try { + wait(1000); + } catch (InterruptedException e) { + throw new RuntimeException(); + } + } + + if (session.reserved) + throw new IllegalStateException(); + session.reserved = true; + } + + return session; + + } + + synchronized void unreserveSession(Session session) { + if (!session.reserved) + throw new IllegalStateException(); + notifyAll(); + session.reserved = false; + session.lastAccessTime = System.currentTimeMillis(); + } + + synchronized void unreserveSession(long sessionId) { + Session session = getSession(sessionId); + if (session != null) + unreserveSession(session); + } + + synchronized Session getSession(long sessionId) { + Session session = sessions.get(sessionId); + if (session != null) + session.lastAccessTime = System.currentTimeMillis(); + return session; + } + + Session removeSession(long sessionId) { + return removeSession(sessionId, false); + } + + Session removeSession(long sessionId, boolean unreserve) { + Session session = null; + synchronized (this) { + session = sessions.remove(sessionId); + if (unreserve && session != null) + unreserveSession(session); + } + + // do clean up out side of lock.. + if (session != null) + session.cleanup(); + + return session; + } + + private void sweep(long maxIdle) { + ArrayList sessionsToCleanup = new ArrayList(); + synchronized (this) { + Iterator iter = sessions.values().iterator(); + while (iter.hasNext()) { + Session session = iter.next(); + long idleTime = System.currentTimeMillis() - session.lastAccessTime; + if (idleTime > maxIdle && !session.reserved) { + iter.remove(); + sessionsToCleanup.add(session); + } + } + } + + // do clean up outside of lock + for (Session session : sessionsToCleanup) { + session.cleanup(); + } + } + + synchronized void removeIfNotAccessed(final long sessionId, long delay) { + Session session = sessions.get(sessionId); + if (session != null) { + final long removeTime = session.lastAccessTime; + TimerTask r = new TimerTask() { + @Override + public void run() { + Session sessionToCleanup = null; + synchronized (SessionManager.this) { + Session session2 = sessions.get(sessionId); + if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) { + sessions.remove(sessionId); + sessionToCleanup = session2; + } + } + + // call clean up outside of lock + if (sessionToCleanup != null) + sessionToCleanup.cleanup(); + } + }; + + SimpleTimer.getInstance().schedule(r, delay); + } + } + + public synchronized Map> getActiveScansPerTable() { + Map> counts = new HashMap>(); + for (Entry entry : sessions.entrySet()) { + + Session session = entry.getValue(); + @SuppressWarnings("rawtypes") + ScanTask nbt = null; + String tableID = null; + + if (session instanceof ScanSession) { + ScanSession ss = (ScanSession) session; + nbt = ss.nextBatchTask; + tableID = ss.extent.getTableId().toString(); + } else if (session instanceof MultiScanSession) { + MultiScanSession mss = (MultiScanSession) session; + nbt = mss.lookupTask; + tableID = mss.threadPoolExtent.getTableId().toString(); + } + + if (nbt == null) + continue; + + ScanRunState srs = nbt.getScanRunState(); + + if (srs == ScanRunState.FINISHED) + continue; + + MapCounter stateCounts = counts.get(tableID); + if (stateCounts == null) { + stateCounts = new MapCounter(); + counts.put(tableID, stateCounts); + } + + stateCounts.increment(srs, 1); + } + + return counts; + } + + public synchronized List getActiveScans() { + + ArrayList activeScans = new ArrayList(); + + long ct = System.currentTimeMillis(); + + for (Entry entry : sessions.entrySet()) { + Session session = entry.getValue(); + if (session instanceof ScanSession) { + ScanSession ss = (ScanSession) session; + + ScanState state = ScanState.RUNNING; + + ScanTask nbt = ss.nextBatchTask; + if (nbt == null) { + state = ScanState.IDLE; + } else { + switch (nbt.getScanRunState()) { + case QUEUED: + state = ScanState.QUEUED; + break; + case FINISHED: + state = ScanState.IDLE; + break; + case RUNNING: + default: + /* do nothing */ + break; + } + } + + activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE, + state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB())); + + } else if (session instanceof MultiScanSession) { + MultiScanSession mss = (MultiScanSession) session; + + ScanState state = ScanState.RUNNING; + + ScanTask nbt = mss.lookupTask; + if (nbt == null) { + state = ScanState.IDLE; + } else { + switch (nbt.getScanRunState()) { + case QUEUED: + state = ScanState.QUEUED; + break; + case FINISHED: + state = ScanState.IDLE; + break; + case RUNNING: + default: + /* do nothing */ + break; + } + } + + activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime, + ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths + .getAuthorizationsBB())); + } + } + + return activeScans; + } + } + + static class TservConstraintEnv implements Environment { + + private TCredentials credentials; + private SecurityOperation security; + private Authorizations auths; + private KeyExtent ke; + + TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) { + this.security = secOp; + this.credentials = credentials; + } + + void setExtent(KeyExtent ke) { + this.ke = ke; + } + + @Override + public KeyExtent getExtent() { + return ke; + } + + @Override + public String getUser() { + return credentials.getPrincipal(); + } + + @Override + @Deprecated + public Authorizations getAuthorizations() { + if (auths == null) + try { + this.auths = security.getUserAuthorizations(credentials); + } catch (ThriftSecurityException e) { + throw new RuntimeException(e); + } + return auths; + } + + @Override + public AuthorizationContainer getAuthorizationsContainer() { + return new AuthorizationContainer() { + @Override + public boolean contains(ByteSequence auth) { + try { + return security.userHasAuthorizations(credentials, + Collections. singletonList(ByteBuffer.wrap(auth.getBackingArray(), auth.offset(), auth.length()))); + } catch (ThriftSecurityException e) { + throw new RuntimeException(e); + } + } + }; + } + } + + private abstract class ScanTask implements RunnableFuture { + + protected AtomicBoolean interruptFlag; + protected ArrayBlockingQueue resultQueue; + protected AtomicInteger state; + protected AtomicReference runState; + + private static final int INITIAL = 1; + private static final int ADDED = 2; + private static final int CANCELED = 3; + + ScanTask() { + interruptFlag = new AtomicBoolean(false); + runState = new AtomicReference(ScanRunState.QUEUED); + state = new AtomicInteger(INITIAL); + resultQueue = new ArrayBlockingQueue(1); + } + + protected void addResult(Object o) { + if (state.compareAndSet(INITIAL, ADDED)) + resultQueue.add(o); + else if (state.get() == ADDED) + throw new IllegalStateException("Tried to add more than one result"); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (!mayInterruptIfRunning) + throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task"); + + if (state.get() == CANCELED) + return true; + + if (state.compareAndSet(INITIAL, CANCELED)) { + interruptFlag.set(true); + resultQueue = null; + return true; + } + + return false; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(); + } + + @SuppressWarnings("unchecked") + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + + ArrayBlockingQueue localRQ = resultQueue; + + if (state.get() == CANCELED) + throw new CancellationException(); + + if (localRQ == null && state.get() == ADDED) + throw new IllegalStateException("Tried to get result twice"); + + Object r = localRQ.poll(timeout, unit); + + // could have been canceled while waiting + if (state.get() == CANCELED) { + if (r != null) + throw new IllegalStateException("Nothing should have been added when in canceled state"); + + throw new CancellationException(); + } + + if (r == null) + throw new TimeoutException(); + + // make this method stop working now that something is being + // returned + resultQueue = null; + + if (r instanceof Throwable) + throw new ExecutionException((Throwable) r); + + return (T) r; + } + + @Override + public boolean isCancelled() { + return state.get() == CANCELED; + } + + @Override + public boolean isDone() { + return runState.get().equals(ScanRunState.FINISHED); + } + + public ScanRunState getScanRunState() { + return runState.get(); + } + + } + + private static class ConditionalSession extends Session { + public TCredentials credentials; + public Authorizations auths; + public String tableId; + public AtomicBoolean interruptFlag; + + @Override + public void cleanup() { + interruptFlag.set(true); + } + } + + private static class UpdateSession extends Session { + public Tablet currentTablet; + public MapCounter successfulCommits = new MapCounter(); + Map failures = new HashMap(); + HashMap authFailures = new HashMap(); + public Violations violations; + public TCredentials credentials; + public long totalUpdates = 0; + public long flushTime = 0; + Stat prepareTimes = new Stat(); + Stat walogTimes = new Stat(); + Stat commitTimes = new Stat(); + Stat authTimes = new Stat(); + public Map> queuedMutations = new HashMap>(); + public long queuedMutationSize = 0; + TservConstraintEnv cenv = null; + } + + private static class ScanSession extends Session { + public KeyExtent extent; + public HashSet columnSet; + public List ssiList; + public Map> ssio; + public Authorizations auths; + public long entriesReturned = 0; + public Stat nbTimes = new Stat(); + public long batchCount = 0; + public volatile ScanTask nextBatchTask; + public AtomicBoolean interruptFlag; + public Scanner scanner; + public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD; + + @Override + public void cleanup() { + try { + if (nextBatchTask != null) + nextBatchTask.cancel(true); + } finally { + if (scanner != null) + scanner.close(); + } + } + + } + + private static class MultiScanSession extends Session { + HashSet columnSet; + Map> queries; + public List ssiList; + public Map> ssio; + public Authorizations auths; + + // stats + int numRanges; + int numTablets; + int numEntries; + long totalLookupTime; + + public volatile ScanTask lookupTask; + public KeyExtent threadPoolExtent; + + @Override + public void cleanup() { + if (lookupTask != null) + lookupTask.cancel(true); + } + } + + /** + * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids + * are monotonically increasing. + * + */ + static class WriteTracker { + private static AtomicLong operationCounter = new AtomicLong(1); + private Map> inProgressWrites = new EnumMap>(TabletType.class); + + WriteTracker() { + for (TabletType ttype : TabletType.values()) { + inProgressWrites.put(ttype, new TreeSet()); + } + } + + synchronized long startWrite(TabletType ttype) { + long operationId = operationCounter.getAndIncrement(); + inProgressWrites.get(ttype).add(operationId); + return operationId; + } + + synchronized void finishWrite(long operationId) { + if (operationId == -1) + return; + + boolean removed = false; + + for (TabletType ttype : TabletType.values()) { + removed = inProgressWrites.get(ttype).remove(operationId); + if (removed) + break; + } + + if (!removed) { + throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId); + } + + this.notifyAll(); + } + + synchronized void waitForWrites(TabletType ttype) { + long operationId = operationCounter.getAndIncrement(); + while (inProgressWrites.get(ttype).floor(operationId) != null) { + try { + this.wait(); + } catch (InterruptedException e) { + log.error(e, e); + } + } + } + + public long startWrite(Set keySet) { + if (keySet.size() == 0) + return -1; + + ArrayList extents = new ArrayList(keySet.size()); + + for (Tablet tablet : keySet) + extents.add(tablet.getExtent()); + + return startWrite(TabletType.type(extents)); + } + } + + public AccumuloConfiguration getSystemConfiguration() { + return serverConfig.getConfiguration(); + } + + TransactionWatcher watcher = new TransactionWatcher(); + + private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface { + + SessionManager sessionManager; + + AccumuloConfiguration acuConf = getSystemConfiguration(); + + TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics(); + + TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics(); + + WriteTracker writeTracker = new WriteTracker(); + + private RowLocks rowLocks = new RowLocks(); + + ThriftClientHandler() { + super(instance, watcher, fs); + log.debug(ThriftClientHandler.class.getName() + " created"); + sessionManager = new SessionManager(getSystemConfiguration()); + // Register the metrics MBean + try { + updateMetrics.register(); + scanMetrics.register(); + } catch (Exception e) { + log.error("Exception registering MBean with MBean Server", e); + } + } + + @Override + public List bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map> files, boolean setTime) + throws ThriftSecurityException { + + if (!security.canPerformSystemActions(credentials)) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + + List failures = new ArrayList(); + + for (Entry> entry : files.entrySet()) { + TKeyExtent tke = entry.getKey(); + Map fileMap = entry.getValue(); + Map fileRefMap = new HashMap(); + for (Entry mapping : fileMap.entrySet()) { + Path path = new Path(mapping.getKey()); + FileSystem ns = fs.getVolumeByPath(path).getFileSystem(); + path = ns.makeQualified(path); + fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue()); + } + + Tablet importTablet = onlineTablets.get(new KeyExtent(tke)); + + if (importTablet == null) { + failures.add(tke); + } else { + try { + importTablet.importMapFiles(tid, fileRefMap, setTime); + } catch (IOException ioe) { + log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage()); + failures.add(tke); + } + } + } + return failures; + } + + private class NextBatchTask extends ScanTask { + + private long scanID; + + NextBatchTask(long scanID, AtomicBoolean interruptFlag) { + this.scanID = scanID; + this.interruptFlag = interruptFlag; + + if (interruptFlag.get()) + cancel(true); + } + + @Override + public void run() { + + final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID); + String oldThreadName = Thread.currentThread().getName(); + + try { + if (isCancelled() || scanSession == null) + return; + + runState.set(ScanRunState.RUNNING); + + Thread.currentThread().setName( + "User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent); + + Tablet tablet = onlineTablets.get(scanSession.extent); + + if (tablet == null) { + addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift())); + return; + } + + long t1 = System.currentTimeMillis(); + ScanBatch batch = scanSession.scanner.read(); + long t2 = System.currentTimeMillis(); + scanSession.nbTimes.addStat(t2 - t1); + + // there should only be one thing on the queue at a time, so + // it should be ok to call add() + // instead of put()... if add() fails because queue is at + // capacity it means there is code + // problem somewhere + addResult(batch); + } catch (TabletClosedException e) { + addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift())); + } catch (IterationInterruptedException iie) { + if (!isCancelled()) { + log.warn("Iteration interrupted, when scan not cancelled", iie); + addResult(iie); + } + } catch (TooManyFilesException tmfe) { + addResult(tmfe); + } catch (Throwable e) { + log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)" : scanSession.extent), e); + addResult(e); + } finally { + runState.set(ScanRunState.FINISHED); + Thread.currentThread().setName(oldThreadName); + } + + } + } + + private class LookupTask extends ScanTask { + + private long scanID; + + LookupTask(long scanID) { + this.scanID = scanID; + } + + @Override + public void run() { + MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID); + String oldThreadName = Thread.currentThread().getName(); + + try { + if (isCancelled() || session == null) + return; + + TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString()); + long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM); + + runState.set(ScanRunState.RUNNING); + Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: "); + + long bytesAdded = 0; + long maxScanTime = 4000; + + long startTime = System.currentTimeMillis(); + + ArrayList results = new ArrayList(); + Map> failures = new HashMap>(); + ArrayList fullScans = new ArrayList(); + KeyExtent partScan = null; + Key partNextKey = null; + boolean partNextKeyInclusive = false; + + Iterator>> iter = session.queries.entrySet().iterator(); + + // check the time so that the read ahead thread is not monopolized + while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) { + Entry> entry = iter.next(); + + iter.remove(); + + // check that tablet server is serving requested tablet + Tablet tablet = onlineTablets.get(entry.getKey()); + if (tablet == null) { + failures.put(entry.getKey(), entry.getValue()); + continue; + } + Thread.currentThread().setName( + "Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString()); + + LookupResult lookupResult; + try { + + // do the following check to avoid a race condition + // between setting false below and the task being + // canceled + if (isCancelled()) + interruptFlag.set(true); + + lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList, + session.ssio, interruptFlag); + + // if the tablet was closed it it possible that the + // interrupt flag was set.... do not want it set for + // the next + // lookup + interruptFlag.set(false); + + } catch (IOException e) { + log.warn("lookup failed for tablet " + entry.getKey(), e); + throw new RuntimeException(e); + } + + bytesAdded += lookupResult.bytesAdded; + + if (lookupResult.unfinishedRanges.size() > 0) { + if (lookupResult.closed) { + failures.put(entry.getKey(), lookupResult.unfinishedRanges); + } else { + session.queries.put(entry.getKey(), lookupResult.unfinishedRanges); + partScan = entry.getKey(); + partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey(); + partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive(); + } + } else { + fullScans.add(entry.getKey()); + } + } + + long finishTime = System.currentTimeMillis(); + session.totalLookupTime += (finishTime - startTime); + session.numEntries += results.size(); + + // convert everything to thrift before adding result + List retResults = new ArrayList(); + for (KVEntry entry : results) - retResults.add(new TKeyValue(entry.key.toThrift(), ByteBuffer.wrap(entry.value))); ++ retResults.add(new TKeyValue(entry.getKey().toThrift(), ByteBuffer.wrap(entry.getValue().get()))); + Map> retFailures = Translator.translate(failures, Translators.KET, new Translator.ListTranslator(Translators.RT)); + List retFullScans = Translator.translate(fullScans, Translators.KET); + TKeyExtent retPartScan = null; + TKey retPartNextKey = null; + if (partScan != null) { + retPartScan = partScan.toThrift(); + retPartNextKey = partNextKey.toThrift(); + } + // add results to queue + addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey, partNextKeyInclusive, session.queries.size() != 0)); + } catch (IterationInterruptedException iie) { + if (!isCancelled()) { + log.warn("Iteration interrupted, when scan not cancelled", iie); + addResult(iie); + } + } catch (Throwable e) { + log.warn("exception while doing multi-scan ", e); + addResult(e); + } finally { + Thread.currentThread().setName(oldThreadName); + runState.set(ScanRunState.FINISHED); + } + } + } + + @Override + public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List columns, int batchSize, + List ssiList, Map> ssio, List authorizations, boolean waitForWrites, boolean isolated, + long readaheadThreshold) throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { + + String tableId = new String(textent.getTable(), Constants.UTF8); + if (!security.canScan(credentials, tableId, Tables.getNamespaceId(instance, tableId), range, columns, ssiList, ssio, authorizations)) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + + if (!security.userHasAuthorizations(credentials, authorizations)) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS); + + KeyExtent extent = new KeyExtent(textent); + + // wait for any writes that are in flight.. this done to ensure + // consistency across client restarts... assume a client writes + // to accumulo and dies while waiting for a confirmation from + // accumulo... the client process restarts and tries to read + // data from accumulo making the assumption that it will get + // any writes previously made... however if the server side thread + // processing the write from the dead client is still in progress, + // the restarted client may not see the write unless we wait here. + // this behavior is very important when the client is reading the + // metadata + if (waitForWrites) + writeTracker.waitForWrites(TabletType.type(extent)); + + Tablet tablet = onlineTablets.get(extent); + if (tablet == null) + throw new NotServingTabletException(textent); + + ScanSession scanSession = new ScanSession(); + scanSession.user = credentials.getPrincipal(); + scanSession.extent = new KeyExtent(extent); + scanSession.columnSet = new HashSet(); + scanSession.ssiList = ssiList; + scanSession.ssio = ssio; + scanSession.auths = new Authorizations(authorizations); + scanSession.interruptFlag = new AtomicBoolean(); + scanSession.readaheadThreshold = readaheadThreshold; + + for (TColumn tcolumn : columns) { + scanSession.columnSet.add(new Column(tcolumn)); + } + + scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated, + scanSession.interruptFlag); + + long sid = sessionManager.createSession(scanSession, true); + + ScanResult scanResult; + try { + scanResult = continueScan(tinfo, sid, scanSession); + } catch (NoSuchScanIDException e) { + log.error("The impossible happened", e); + throw new RuntimeException(); + } finally { + sessionManager.unreserveSession(sid); + } + + return new InitialScan(sid, scanResult); + } + + @Override + public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException, + org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { + ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID); + if (scanSession == null) { + throw new NoSuchScanIDException(); + } + + try { + return continueScan(tinfo, scanID, scanSession); + } finally { + sessionManager.unreserveSession(scanSession); + } + } + + private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException, + org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { + + if (scanSession.nextBatchTask == null) { + scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag); + resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask); + } + + ScanBatch bresult; + try { + bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); + scanSession.nextBatchTask = null; + } catch (ExecutionException e) { + sessionManager.removeSession(scanID); + if (e.getCause() instanceof NotServingTabletException) + throw (NotServingTabletException) e.getCause(); + else if (e.getCause() instanceof TooManyFilesException) + throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(scanSession.extent.toThrift()); + else + throw new RuntimeException(e); + } catch (CancellationException ce) { + sessionManager.removeSession(scanID); + Tablet tablet = onlineTablets.get(scanSession.extent); + if (tablet == null || tablet.isClosed()) + throw new NotServingTabletException(scanSession.extent.toThrift()); + else + throw new NoSuchScanIDException(); + } catch (TimeoutException e) { + List param = Collections.emptyList(); + long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT); + sessionManager.removeIfNotAccessed(scanID, timeout); + return new ScanResult(param, true); + } catch (Throwable t) { + sessionManager.removeSession(scanID); + log.warn("Failed to get next batch", t); + throw new RuntimeException(t); + } + + ScanResult scanResult = new ScanResult(Key.compress(bresult.results), bresult.more); + + scanSession.entriesReturned += scanResult.results.size(); + + scanSession.batchCount++; + + if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) { + // start reading next batch while current batch is transmitted + // to client + scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag); + resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask); + } + + if (!scanResult.more) + closeScan(tinfo, scanID); + + return scanResult; + } + + @Override + public void closeScan(TInfo tinfo, long scanID) { + ScanSession ss = (ScanSession) sessionManager.removeSession(scanID); + if (ss != null) { + long t2 = System.currentTimeMillis(); + + log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId() + .toString(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString())); + if (scanMetrics.isEnabled()) { + scanMetrics.add(TabletServerScanMetrics.scan, t2 - ss.startTime); + scanMetrics.add(TabletServerScanMetrics.resultSize, ss.entriesReturned); + } + } + } + + @Override + public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map> tbatch, List tcolumns, + List ssiList, Map> ssio, List authorizations, boolean waitForWrites) throws ThriftSecurityException { + // find all of the tables that need to be scanned + HashSet tables = new HashSet(); + for (TKeyExtent keyExtent : tbatch.keySet()) { + tables.add(new String(keyExtent.getTable(), Constants.UTF8)); + } + + if (tables.size() != 1) + throw new IllegalArgumentException("Cannot batch scan over multiple tables"); + + // check if user has permission to the tables + for (String tableId : tables) + if (!security.canScan(credentials, tableId, Tables.getNamespaceId(instance, tableId), tbatch, tcolumns, ssiList, ssio, authorizations)) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + + try { + if (!security.userHasAuthorizations(credentials, authorizations)) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS); + } catch (ThriftSecurityException tse) { + log.error(tse, tse); + throw tse; + } + Map> batch = Translator.translate(tbatch, new TKeyExtentTranslator(), new Translator.ListTranslator( + new TRangeTranslator())); + + // This is used to determine which thread pool to use + KeyExtent threadPoolExtent = batch.keySet().iterator().next(); + + if (waitForWrites) + writeTracker.waitForWrites(TabletType.type(batch.keySet())); + + MultiScanSession mss = new MultiScanSession(); + mss.user = credentials.getPrincipal(); + mss.queries = batch; + mss.columnSet = new HashSet(tcolumns.size()); + mss.ssiList = ssiList; + mss.ssio = ssio; + mss.auths = new Authorizations(authorizations); + + mss.numTablets = batch.size(); + for (List ranges : batch.values()) { + mss.numRanges += ranges.size(); + } + + for (TColumn tcolumn : tcolumns) + mss.columnSet.add(new Column(tcolumn)); + + mss.threadPoolExtent = threadPoolExtent; + + long sid = sessionManager.createSession(mss, true); + + MultiScanResult result; + try { + result = continueMultiScan(tinfo, sid, mss); + } catch (NoSuchScanIDException e) { + log.error("the impossible happened", e); + throw new RuntimeException("the impossible happened", e); + } finally { + sessionManager.unreserveSession(sid); + } + + return new InitialMultiScan(sid, result); + } + + @Override + public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException { + + MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID); + + if (session == null) { + throw new NoSuchScanIDException(); + } + + try { + return continueMultiScan(tinfo, scanID, session); + } finally { + sessionManager.unreserveSession(session); + } + } + + private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException { + + if (session.lookupTask == null) { + session.lookupTask = new LookupTask(scanID); + resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask); + } + + try { + MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); + session.lookupTask = null; + return scanResult; + } catch (TimeoutException e1) { + long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT); + sessionManager.removeIfNotAccessed(scanID, timeout); + List results = Collections.emptyList(); + Map> failures = Collections.emptyMap(); + List fullScans = Collections.emptyList(); + return new MultiScanResult(results, failures, fullScans, null, null, false, true); + } catch (Throwable t) { + sessionManager.removeSession(scanID); + log.warn("Failed to get multiscan result", t); + throw new RuntimeException(t); + } + } + + @Override + public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException { + MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID); + if (session == null) { + throw new NoSuchScanIDException(); + } + + long t2 = System.currentTimeMillis(); + log.debug(String.format("MultiScanSess %s %,d entries in %.2f secs (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", TServerUtils.clientAddress.get(), + session.numEntries, (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, session.numRanges)); + } + + @Override + public long startUpdate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException { + // Make sure user is real + + security.authenticateUser(credentials, credentials); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0); + + UpdateSession us = new UpdateSession(); + us.violations = new Violations(); + us.credentials = credentials; + us.cenv = new TservConstraintEnv(security, us.credentials); + + long sid = sessionManager.createSession(us, false); + + return sid; + } + + private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) { + long t1 = System.currentTimeMillis(); + if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) + return; + if (us.currentTablet == null && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) { + // if there were previous failures, then do not accept additional writes + return; + } + + try { + // if user has no permission to write to this table, add it to + // the failures list + boolean sameTable = us.currentTablet != null && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId())); + String tableId = keyExtent.getTableId().toString(); + if (sameTable || security.canWrite(us.credentials, tableId, Tables.getNamespaceId(instance, tableId))) { + long t2 = System.currentTimeMillis(); + us.authTimes.addStat(t2 - t1); + us.currentTablet = onlineTablets.get(keyExtent); + if (us.currentTablet != null) { + us.queuedMutations.put(us.currentTablet, new ArrayList()); + } else { + // not serving tablet, so report all mutations as + // failures + us.failures.put(keyExtent, 0l); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.unknownTabletErrors, 0); + } + } else { + log.warn("Denying access to table " + keyExtent.getTableId() + " for user " + us.credentials.getPrincipal()); + long t2 = System.currentTimeMillis(); + us.authTimes.addStat(t2 - t1); + us.currentTablet = null; + us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0); + return; + } + } catch (ThriftSecurityException e) { + log.error("Denying permission to check user " + us.credentials.getPrincipal() + " with user " + e.getUser(), e); + long t2 = System.currentTimeMillis(); + us.authTimes.addStat(t2 - t1); + us.currentTablet = null; + us.authFailures.put(keyExtent, e.getCode()); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0); + return; + } + } + + @Override + public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List tmutations) { + UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID); + if (us == null) { + throw new RuntimeException("No Such SessionID"); + } + + try { + KeyExtent keyExtent = new KeyExtent(tkeyExtent); + setUpdateTablet(us, keyExtent); + + if (us.currentTablet != null) { + List mutations = us.queuedMutations.get(us.currentTablet); + for (TMutation tmutation : tmutations) { + Mutation mutation = new ServerMutation(tmutation); + mutations.add(mutation); + us.queuedMutationSize += mutation.numBytes(); + } + if (us.queuedMutationSize > getSystemConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX)) + flush(us); + } + } finally { + sessionManager.unreserveSession(us); + } + } + + private void flush(UpdateSession us) { + + int mutationCount = 0; + Map> sendables = new HashMap>(); + Throwable error = null; + + long pt1 = System.currentTimeMillis(); + + boolean containsMetadataTablet = false; + for (Tablet tablet : us.queuedMutations.keySet()) + if (tablet.getExtent().isMeta()) + containsMetadataTablet = true; + + if (!containsMetadataTablet && us.queuedMutations.size() > 0) + TabletServer.this.resourceManager.waitUntilCommitsAreEnabled(); + + Span prep = Trace.start("prep"); + try { + for (Entry> entry : us.queuedMutations.entrySet()) { + + Tablet tablet = entry.getKey(); + List mutations = entry.getValue(); + if (mutations.size() > 0) { + try { + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size()); + + CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations); + if (commitSession == null) { + if (us.currentTablet == tablet) { + us.currentTablet = null; + } + us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet)); + } else { + sendables.put(commitSession, mutations); + mutationCount += mutations.size(); + } + + } catch (TConstraintViolationException e) { + us.violations.add(e.getViolations()); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0); + + if (e.getNonViolators().size() > 0) { + // only log and commit mutations if there were some + // that did not + // violate constraints... this is what + // prepareMutationsForCommit() + // expects + sendables.put(e.getCommitSession(), e.getNonViolators()); + } + + mutationCount += mutations.size(); + + } catch (HoldTimeoutException t) { + error = t; + log.debug("Giving up on mutations due to a long memory hold time"); + break; + } catch (Throwable t) { + error = t; + log.error("Unexpected error preparing for commit", error); + break; + } + } + } + } finally { + prep.stop(); + } + + long pt2 = System.currentTimeMillis(); + us.prepareTimes.addStat(pt2 - pt1); + updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size()); + + if (error != null) { + for (Entry> e : sendables.entrySet()) { + e.getKey().abortCommit(e.getValue()); + } + throw new RuntimeException(error); + } + try { + Span wal = Trace.start("wal"); + try { + while (true) { + try { + long t1 = System.currentTimeMillis(); + + logger.logManyTablets(sendables); + + long t2 = System.currentTimeMillis(); + us.walogTimes.addStat(t2 - t1); + updateWalogWriteTime((t2 - t1)); + break; + } catch (IOException ex) { + log.warn("logging mutations failed, retrying"); + } catch (FSError ex) { // happens when DFS is localFS + log.warn("logging mutations failed, retrying"); + } catch (Throwable t) { + log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t); + throw new RuntimeException(t); + } + } + } finally { + wal.stop(); + } + + Span commit = Trace.start("commit"); + try { + long t1 = System.currentTimeMillis(); + for (Entry> entry : sendables.entrySet()) { + CommitSession commitSession = entry.getKey(); + List mutations = entry.getValue(); + + commitSession.commit(mutations); + + Tablet tablet = commitSession.getTablet(); + + if (tablet == us.currentTablet) { + // because constraint violations may filter out some + // mutations, for proper + // accounting with the client code, need to increment + // the count based + // on the original number of mutations from the client + // NOT the filtered number + us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size()); + } + } + long t2 = System.currentTimeMillis(); + + us.flushTime += (t2 - pt1); + us.commitTimes.addStat(t2 - t1); + + updateAvgCommitTime(t2 - t1, sendables.size()); + } finally { + commit.stop(); + } + } finally { + us.queuedMutations.clear(); + if (us.currentTablet != null) { + us.queuedMutations.put(us.currentTablet, new ArrayList()); + } + us.queuedMutationSize = 0; + } + us.totalUpdates += mutationCount; + } + + private void updateWalogWriteTime(long time) { + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, time); + } + + private void updateAvgCommitTime(long time, int size) { + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.commitTime, (long) ((time) / (double) size)); + } + + private void updateAvgPrepTime(long time, int size) { + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (long) ((time) / (double) size)); + } + + @Override + public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException { + UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID); + if (us == null) { + throw new NoSuchScanIDException(); + } + + // clients may or may not see data from an update session while + // it is in progress, however when the update session is closed + // want to ensure that reads wait for the write to finish + long opid = writeTracker.startWrite(us.queuedMutations.keySet()); + + try { + flush(us); + } finally { + writeTracker.finishWrite(opid); + } + + log.debug(String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates, + (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0, + us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0)); + if (us.failures.size() > 0) { + Entry first = us.failures.entrySet().iterator().next(); + log.debug(String.format("Failures: %d, first extent %s successful commits: %d", us.failures.size(), first.getKey().toString(), first.getValue())); + } + List violations = us.violations.asList(); + if (violations.size() > 0) { + ConstraintViolationSummary first = us.violations.asList().iterator().next(); + log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(), first.violationDescription, first.numberOfViolatingMutations)); + } + if (us.authFailures.size() > 0) { + KeyExtent first = us.authFailures.keySet().iterator().next(); + log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), first.toString())); + } + + return new UpdateErrors(Translator.translate(us.failures, Translators.KET), Translator.translate(violations, Translators.CVST), Translator.translate( + us.authFailures, Translators.KET)); + } + + @Override + public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation tmutation) throws NotServingTabletException, + ConstraintViolationException, ThriftSecurityException { + + String tableId = new String(tkeyExtent.getTable(), Constants.UTF8); + if (!security.canWrite(credentials, tableId, Tables.getNamespaceId(instance, tableId))) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + KeyExtent keyExtent = new KeyExtent(tkeyExtent); + Tablet tablet = onlineTablets.get(new KeyExtent(keyExtent)); + if (tablet == null) { + throw new NotServingTabletException(tkeyExtent); + } + + if (!keyExtent.isMeta()) + TabletServer.this.resourceManager.waitUntilCommitsAreEnabled(); + + long opid = writeTracker.startWrite(TabletType.type(keyExtent)); + + try { + Mutation mutation = new ServerMutation(tmutation); + List mutations = Collections.singletonList(mutation); + + Span prep = Trace.start("prep"); + CommitSession cs; + try { + cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations); + } finally { + prep.stop(); + } + if (cs == null) { + throw new NotServingTabletException(tkeyExtent); + } + + while (true) { + try { + Span wal = Trace.start("wal"); + try { + logger.log(cs, cs.getWALogSeq(), mutation); + } finally { + wal.stop(); + } + break; + } catch (IOException ex) { + log.warn(ex, ex); + } + } + + Span commit = Trace.start("commit"); + try { + cs.commit(mutations); + } finally { + commit.stop(); + } + } catch (TConstraintViolationException e) { + throw new ConstraintViolationException(Translator.translate(e.getViolations().asList(), Translators.CVST)); + } finally { + writeTracker.finishWrite(opid); + } + } + + private void checkConditions(Map> updates, ArrayList results, ConditionalSession cs, + List symbols) throws IOException { + Iterator>> iter = updates.entrySet().iterator(); + + CompressedIterators compressedIters = new CompressedIterators(symbols); + + while (iter.hasNext()) { + Entry> entry = iter.next(); + Tablet tablet = onlineTablets.get(entry.getKey()); + + if (tablet == null || tablet.isClosed()) { + for (ServerConditionalMutation scm : entry.getValue()) + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + iter.remove(); + } else { + List okMutations = new ArrayList(entry.getValue().size()); + + for (ServerConditionalMutation scm : entry.getValue()) { + if (checkCondition(results, cs, compressedIters, tablet, scm)) + okMutations.add(scm); + } + + entry.setValue(okMutations); + } + + } + } + + boolean checkCondition(ArrayList results, ConditionalSession cs, CompressedIterators compressedIters, Tablet tablet, + ServerConditionalMutation scm) throws IOException { + boolean add = true; + + Set emptyCols = Collections.emptySet(); + + for (TCondition tc : scm.getConditions()) { + + Range range; + if (tc.hasTimestamp) + range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs()); + else + range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv())); + + IterConfig ic = compressedIters.decompress(tc.iterators); + + Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag); + + try { + ScanBatch batch = scanner.read(); + + Value val = null; + + for (KVEntry entry2 : batch.results) { + val = entry2.getValue(); + break; + } + + if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) { + results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED)); + add = false; + break; + } + + } catch (TabletClosedException e) { + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + add = false; + break; + } catch (IterationInterruptedException iie) { + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + add = false; + break; + } catch (TooManyFilesException tmfe) { + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + add = false; + break; + } + } + return add; + } + + private void writeConditionalMutations(Map> updates, ArrayList results, ConditionalSession sess) { + Set>> es = updates.entrySet(); + + Map> sendables = new HashMap>(); + + boolean sessionCanceled = sess.interruptFlag.get(); + + Span prepSpan = Trace.start("prep"); + try { + long t1 = System.currentTimeMillis(); + for (Entry> entry : es) { + Tablet tablet = onlineTablets.get(entry.getKey()); + if (tablet == null || tablet.isClosed() || sessionCanceled) { + for (ServerConditionalMutation scm : entry.getValue()) + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + } else { + try { + + @SuppressWarnings("unchecked") + List mutations = (List) (List) entry.getValue(); + if (mutations.size() > 0) { + + CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, sess.credentials), mutations); + + if (cs == null) { + for (ServerConditionalMutation scm : entry.getValue()) + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + } else { + for (ServerConditionalMutation scm : entry.getValue()) + results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED)); + sendables.put(cs, mutations); + } + } + } catch (TConstraintViolationException e) { + if (e.getNonViolators().size() > 0) { + sendables.put(e.getCommitSession(), e.getNonViolators()); + for (Mutation m : e.getNonViolators()) + results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED)); + } + + for (Mutation m : e.getViolators()) + results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED)); + } + } + } + + long t2 = System.currentTimeMillis(); + updateAvgPrepTime(t2 - t1, es.size()); + } finally { + prepSpan.stop(); + } + + Span walSpan = Trace.start("wal"); + try { + while (true && sendables.size() > 0) { + try { + long t1 = System.currentTimeMillis(); + logger.logManyTablets(sendables); + long t2 = System.currentTimeMillis(); + updateWalogWriteTime(t2 - t1); + break; + } catch (IOException ex) { + log.warn("logging mutations failed, retrying"); + } catch (FSError ex) { // happens when DFS is localFS + log.warn("logging mutations failed, retrying"); + } catch (Throwable t) { + log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t); + throw new RuntimeException(t); + } + } + } finally { + walSpan.stop(); + } + + Span commitSpan = Trace.start("commit"); + try { + long t1 = System.currentTimeMillis(); + for (Entry> entry : sendables.entrySet()) { + CommitSession commitSession = entry.getKey(); + List mutations = entry.getValue(); + + commitSession.commit(mutations); + } + long t2 = System.currentTimeMillis(); + updateAvgCommitTime(t2 - t1, sendables.size()); + } finally { + commitSpan.stop(); + } + + } + + private Map> conditionalUpdate(ConditionalSession cs, Map> updates, + ArrayList results, List symbols) throws IOException { + // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows. + ConditionalMutationSet.sortConditionalMutations(updates); + + Map> deferred = new HashMap>(); + + // can not process two mutations for the same row, because one will not see what the other writes + ConditionalMutationSet.deferDuplicatesRows(updates, deferred); + + // get as many locks as possible w/o blocking... defer any rows that are locked + List locks = rowLocks.acquireRowlocks(updates, deferred); + try { + Span checkSpan = Trace.start("Check conditions"); + try { + checkConditions(updates, results, cs, symbols); + } finally { + checkSpan.stop(); + } + + Span updateSpan = Trace.start("apply conditional mutations"); + try { + writeConditionalMutations(updates, results, cs); + } finally { + updateSpan.stop(); + } + } finally { + rowLocks.releaseRowLocks(locks); + } + return deferred; + } + + @Override + public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials, List authorizations, String tableId) + throws ThriftSecurityException, TException { + + Authorizations userauths = null; + if (!security.canConditionallyUpdate(credentials, tableId, Tables.getNamespaceId(instance, tableId), authorizations)) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + + userauths = security.getUserAuthorizations(credentials); + for (ByteBuffer auth : authorizations) + if (!userauths.contains(ByteBufferUtil.toBytes(auth))) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS); + + ConditionalSession cs = new ConditionalSession(); + cs.auths = new Authorizations(authorizations); + cs.credentials = credentials; + cs.tableId = tableId; + cs.interruptFlag = new AtomicBoolean(); + + long sid = sessionManager.createSession(cs, false); + return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime()); + } + + @Override + public List conditionalUpdate(TInfo tinfo, long sessID, Map> mutations, List symbols) + throws NoSuchScanIDException, TException { + + ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID); + + if (cs == null || cs.interruptFlag.get()) + throw new NoSuchScanIDException(); + + if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID)) + TabletServer.this.resourceManager.waitUntilCommitsAreEnabled(); + + Text tid = new Text(cs.tableId); + long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null))); + + try { + Map> updates = Translator.translate(mutations, Translators.TKET, + new Translator.ListTranslator(ServerConditionalMutation.TCMT)); + + for (KeyExtent ke : updates.keySet()) + if (!ke.getTableId().equals(tid)) + throw new IllegalArgumentException("Unexpected table id " + tid + " != " + ke.getTableId()); + + ArrayList results = new ArrayList(); + + Map> deferred = conditionalUpdate(cs, updates, results, symbols); + + while (deferred.size() > 0) { + deferred = conditionalUpdate(cs, deferred, results, symbols); + } + + return results; + } catch (IOException ioe) { + throw new TException(ioe); + } finally { + writeTracker.finishWrite(opid); + sessionManager.unreserveSession(sessID); + } + } + + @Override + public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException { + // this method should wait for any running conditional update to complete + // after this method returns a conditional update should not be able to start + + ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID); + if (cs != null) + cs.interruptFlag.set(true); + + cs = (ConditionalSession) sessionManager.reserveSession(sessID, true); + if (cs != null) + sessionManager.removeSession(sessID, true); + } + + @Override + public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException { + sessionManager.removeSession(sessID, false); + } + + @Override + public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException, + ThriftSecurityException { + + String tableId = new String(ByteBufferUtil.toBytes(tkeyExtent.table)); + String namespaceId; + try { + namespaceId = Tables.getNamespaceId(instance, tableId); + } catch (IllegalArgumentException ex) { + // table does not exist, try to educate the client + throw new NotServingTabletException(tkeyExtent); + } + + if (!security.canSplitTablet(credentials, tableId, namespaceId)) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + + KeyExtent keyExtent = new KeyExtent(tkeyExtent); + + Tablet tablet = onlineTablets.get(keyExtent); + if (tablet == null) { + throw new NotServingTabletException(tkeyExtent); + } + + if (keyExtent.getEndRow() == null || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) { + try { + if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) { + throw new NotServingTabletException(tkeyExtent); + } + } catch (IOException e) { + log.warn("Failed to split " + keyExtent, e); + throw new RuntimeException(e); + } + } + } + + @Override + public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { + return getStats(sessionManager.getActiveScansPerTable()); + } + + @Override + public List getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException { + TreeMap onlineTabletsCopy; + synchronized (onlineTablets) { + onlineTabletsCopy = new TreeMap(onlineTablets); + } + List result = new ArrayList(); + Text text = new Text(tableId); + KeyExtent start = new KeyExtent(text, new Text(), null); + for (Entry entry : onlineTabletsCopy.tailMap(start).entrySet()) { + KeyExtent ke = entry.getKey(); + if (ke.getTableId().compareTo(text) == 0) { + Tablet tablet = entry.getValue(); + TabletStats stats = tablet.timer.getTabletStats(); + stats.extent = ke.toThrift(); + stats.ingestRate = tablet.ingestRate(); + stats.queryRate = tablet.queryRate(); + stats.splitCreationTime = tablet.getSplitCreationTime(); + stats.numEntries = tablet.getNumEntries(); + result.add(stats); + } + } + return result; + } + + private ZooCache masterLockCache = new ZooCache(); + + private void checkPermission(TCredentials credentials, String lock, final String request) throws ThriftSecurityException { + boolean fatal = false; + try { + log.debug("Got " + request + " message from user: " + credentials.getPrincipal()); + if (!security.canPerformSystemActions(credentials)) { + log.warn("Got " + request + " message from user: " + credentials.getPrincipal()); + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + } + } catch (ThriftSecurityException e) { + log.warn("Got " + request + " message from unauthenticatable user: " + e.getUser()); + if (SystemCredentials.get().getToken().getClass().getName().equals(credentials.getTokenClassName())) { + log.fatal("Got message from a service with a mismatched configuration. Please ensure a compatible configuration.", e); + fatal = true; + } + throw e; + } finally { + if (fatal) { + Halt.halt(1, new Runnable(