Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D7EF81173D for ; Wed, 9 Apr 2014 17:59:33 +0000 (UTC) Received: (qmail 38659 invoked by uid 500); 9 Apr 2014 17:58:13 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 38107 invoked by uid 500); 9 Apr 2014 17:57:56 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 37256 invoked by uid 99); 9 Apr 2014 17:57:35 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Apr 2014 17:57:35 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7402D94F8BC; Wed, 9 Apr 2014 17:57:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Wed, 09 Apr 2014 17:57:52 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [21/64] [abbrv] Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java ---------------------------------------------------------------------- diff --cc server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java index ad3d615,0000000..b9b68bb mode 100644,000000..100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java @@@ -1,3620 -1,0 +1,3614 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.tabletserver; + +import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD; + +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TimerTask; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import javax.management.ObjectName; +import javax.management.StandardMBean; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.ScannerImpl; +import org.apache.accumulo.core.client.impl.TabletType; +import org.apache.accumulo.core.client.impl.Translator; ++import org.apache.accumulo.core.client.impl.Translators; +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; - import org.apache.accumulo.core.client.impl.Translators; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.constraints.Constraint.Environment; +import org.apache.accumulo.core.constraints.Violations; +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.ConstraintViolationSummary; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.thrift.InitialMultiScan; +import org.apache.accumulo.core.data.thrift.InitialScan; +import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.data.thrift.MapFileInfo; +import org.apache.accumulo.core.data.thrift.MultiScanResult; +import org.apache.accumulo.core.data.thrift.ScanResult; +import org.apache.accumulo.core.data.thrift.TColumn; +import org.apache.accumulo.core.data.thrift.TKey; +import org.apache.accumulo.core.data.thrift.TKeyExtent; +import org.apache.accumulo.core.data.thrift.TKeyValue; +import org.apache.accumulo.core.data.thrift.TMutation; +import org.apache.accumulo.core.data.thrift.TRange; +import org.apache.accumulo.core.data.thrift.UpdateErrors; +import org.apache.accumulo.core.file.FileUtil; +import org.apache.accumulo.core.iterators.IterationInterruptedException; +import org.apache.accumulo.core.master.thrift.Compacting; +import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.master.thrift.TabletLoadState; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.SecurityUtil; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; +import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; +import org.apache.accumulo.core.tabletserver.thrift.ScanState; +import org.apache.accumulo.core.tabletserver.thrift.ScanType; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor; +import org.apache.accumulo.core.tabletserver.thrift.TabletStats; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.util.ByteBufferUtil; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.SimpleThreadPool; +import org.apache.accumulo.core.util.Stat; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.client.ClientServiceHandler; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.data.ServerMutation; +import org.apache.accumulo.server.logger.LogFileKey; +import org.apache.accumulo.server.logger.LogFileValue; +import org.apache.accumulo.server.master.state.Assignment; +import org.apache.accumulo.server.master.state.DistributedStoreException; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; +import org.apache.accumulo.server.master.state.TabletStateStore; +import org.apache.accumulo.server.master.state.ZooTabletStateStore; +import org.apache.accumulo.server.metrics.AbstractMetricsImpl; +import org.apache.accumulo.server.problems.ProblemReport; +import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityConstants; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo; +import org.apache.accumulo.server.tabletserver.Tablet.CommitSession; +import org.apache.accumulo.server.tabletserver.Tablet.KVEntry; +import org.apache.accumulo.server.tabletserver.Tablet.LookupResult; +import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason; +import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason; +import org.apache.accumulo.server.tabletserver.Tablet.ScanBatch; +import org.apache.accumulo.server.tabletserver.Tablet.Scanner; +import org.apache.accumulo.server.tabletserver.Tablet.SplitInfo; +import org.apache.accumulo.server.tabletserver.Tablet.TConstraintViolationException; +import org.apache.accumulo.server.tabletserver.Tablet.TabletClosedException; +import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager; +import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation; +import org.apache.accumulo.server.tabletserver.log.DfsLogger; +import org.apache.accumulo.server.tabletserver.log.LogSorter; +import org.apache.accumulo.server.tabletserver.log.MutationReceiver; +import org.apache.accumulo.server.tabletserver.log.TabletServerLogger; +import org.apache.accumulo.server.tabletserver.mastermessage.MasterMessage; +import org.apache.accumulo.server.tabletserver.mastermessage.SplitReportMessage; +import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage; +import org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean; +import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics; +import org.apache.accumulo.server.tabletserver.metrics.TabletServerScanMetrics; +import org.apache.accumulo.server.tabletserver.metrics.TabletServerUpdateMetrics; +import org.apache.accumulo.server.trace.TraceFileSystem; +import org.apache.accumulo.server.util.FileSystemMonitor; +import org.apache.accumulo.server.util.Halt; +import org.apache.accumulo.server.util.MapCounter; +import org.apache.accumulo.server.util.MetadataTable; +import org.apache.accumulo.server.util.MetadataTable.LogEntry; +import org.apache.accumulo.server.util.TServerUtils; +import org.apache.accumulo.server.util.TServerUtils.ServerPort; +import org.apache.accumulo.server.util.time.RelativeTime; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; +import org.apache.accumulo.server.zookeeper.TransactionWatcher; +import org.apache.accumulo.server.zookeeper.ZooCache; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.start.Platform; +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; +import org.apache.accumulo.start.classloader.vfs.ContextManager; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.accumulo.trace.instrument.thrift.TraceWrap; +import org.apache.accumulo.trace.thrift.TInfo; +import org.apache.commons.collections.map.LRUMap; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Reader; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.TServiceClient; +import org.apache.thrift.server.TServer; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; + +enum ScanRunState { + QUEUED, RUNNING, FINISHED +} + +public class TabletServer extends AbstractMetricsImpl implements org.apache.accumulo.server.tabletserver.metrics.TabletServerMBean { + private static final Logger log = Logger.getLogger(TabletServer.class); + + private static HashMap prevGcTime = new HashMap(); + private static long lastMemorySize = 0; + private static long gcTimeIncreasedCount; + + private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000; + + private TabletServerLogger logger; + + protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics(); + + private ServerConfiguration serverConfig; + private LogSorter logSorter = null; + + public TabletServer(ServerConfiguration conf, FileSystem fs) { + super(); + this.serverConfig = conf; + this.instance = conf.getInstance(); + this.fs = TraceFileSystem.wrap(fs); + this.logSorter = new LogSorter(instance, fs, getSystemConfiguration()); + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + synchronized (onlineTablets) { + long now = System.currentTimeMillis(); + for (Tablet tablet : onlineTablets.values()) + try { + tablet.updateRates(now); + } catch (Exception ex) { + log.error(ex, ex); + } + } + } + }, 5000, 5000); + } + + private synchronized static void logGCInfo(AccumuloConfiguration conf) { + List gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); + Runtime rt = Runtime.getRuntime(); + + StringBuilder sb = new StringBuilder("gc"); + + boolean sawChange = false; + + long maxIncreaseInCollectionTime = 0; + + for (GarbageCollectorMXBean gcBean : gcmBeans) { + Long prevTime = prevGcTime.get(gcBean.getName()); + long pt = 0; + if (prevTime != null) { + pt = prevTime; + } + + long time = gcBean.getCollectionTime(); + + if (time - pt != 0) { + sawChange = true; + } + + long increaseInCollectionTime = time - pt; + sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0)); + maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime); + prevGcTime.put(gcBean.getName(), time); + } + + long mem = rt.freeMemory(); + if (maxIncreaseInCollectionTime == 0) { + gcTimeIncreasedCount = 0; + } else { + gcTimeIncreasedCount++; + if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) { + log.warn("Running low on memory"); + gcTimeIncreasedCount = 0; + } + } + + if (mem > lastMemorySize) { + sawChange = true; + } + + String sign = "+"; + if (mem - lastMemorySize <= 0) { + sign = ""; + } + + sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory())); + + if (sawChange) { + log.debug(sb.toString()); + } + + final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); + if (maxIncreaseInCollectionTime > keepAliveTimeout) { + Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1); + } + + lastMemorySize = mem; + } + + private TabletStatsKeeper statsKeeper; + + private static class Session { + long lastAccessTime; + long startTime; + String user; + String client = TServerUtils.clientAddress.get(); + public boolean reserved; + + public void cleanup() {} + } + + private static class SessionManager { + + SecureRandom random; + Map sessions; + + SessionManager(AccumuloConfiguration conf) { + random = new SecureRandom(); + sessions = new HashMap(); + + final long maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); + + Runnable r = new Runnable() { + @Override + public void run() { + sweep(maxIdle); + } + }; + + SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000)); + } + + synchronized long createSession(Session session, boolean reserve) { + long sid = random.nextLong(); + + while (sessions.containsKey(sid)) { + sid = random.nextLong(); + } + + sessions.put(sid, session); + + session.reserved = reserve; + + session.startTime = session.lastAccessTime = System.currentTimeMillis(); + + return sid; + } + + /** + * while a session is reserved, it cannot be canceled or removed + * + * @param sessionId + */ + + synchronized Session reserveSession(long sessionId) { + Session session = sessions.get(sessionId); + if (session != null) { + if (session.reserved) + throw new IllegalStateException(); + session.reserved = true; + } + + return session; + + } + + synchronized void unreserveSession(Session session) { + if (!session.reserved) + throw new IllegalStateException(); + session.reserved = false; + session.lastAccessTime = System.currentTimeMillis(); + } + + synchronized void unreserveSession(long sessionId) { + Session session = getSession(sessionId); + if (session != null) + unreserveSession(session); + } + + synchronized Session getSession(long sessionId) { + Session session = sessions.get(sessionId); + if (session != null) + session.lastAccessTime = System.currentTimeMillis(); + return session; + } + + Session removeSession(long sessionId) { + Session session = null; + synchronized (this) { + session = sessions.remove(sessionId); + } + + // do clean up out side of lock.. + if (session != null) + session.cleanup(); + + return session; + } + + private void sweep(long maxIdle) { + ArrayList sessionsToCleanup = new ArrayList(); + synchronized (this) { + Iterator iter = sessions.values().iterator(); + while (iter.hasNext()) { + Session session = iter.next(); + long idleTime = System.currentTimeMillis() - session.lastAccessTime; + if (idleTime > maxIdle && !session.reserved) { + iter.remove(); + sessionsToCleanup.add(session); + } + } + } + + // do clean up outside of lock + for (Session session : sessionsToCleanup) { + session.cleanup(); + } + } + + synchronized void removeIfNotAccessed(final long sessionId, long delay) { + Session session = sessions.get(sessionId); + if (session != null) { + final long removeTime = session.lastAccessTime; + TimerTask r = new TimerTask() { + @Override + public void run() { + Session sessionToCleanup = null; + synchronized (SessionManager.this) { + Session session2 = sessions.get(sessionId); + if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) { + sessions.remove(sessionId); + sessionToCleanup = session2; + } + } + + // call clean up outside of lock + if (sessionToCleanup != null) + sessionToCleanup.cleanup(); + } + }; + + SimpleTimer.getInstance().schedule(r, delay); + } + } + + public synchronized Map> getActiveScansPerTable() { + Map> counts = new HashMap>(); + for (Entry entry : sessions.entrySet()) { + + Session session = entry.getValue(); + @SuppressWarnings("rawtypes") + ScanTask nbt = null; + String tableID = null; + + if (session instanceof ScanSession) { + ScanSession ss = (ScanSession) session; + nbt = ss.nextBatchTask; + tableID = ss.extent.getTableId().toString(); + } else if (session instanceof MultiScanSession) { + MultiScanSession mss = (MultiScanSession) session; + nbt = mss.lookupTask; + tableID = mss.threadPoolExtent.getTableId().toString(); + } + + if (nbt == null) + continue; + + ScanRunState srs = nbt.getScanRunState(); + + if (srs == ScanRunState.FINISHED) + continue; + + MapCounter stateCounts = counts.get(tableID); + if (stateCounts == null) { + stateCounts = new MapCounter(); + counts.put(tableID, stateCounts); + } + + stateCounts.increment(srs, 1); + } + + return counts; + } + + public synchronized List getActiveScans() { + + ArrayList activeScans = new ArrayList(); + + long ct = System.currentTimeMillis(); + + for (Entry entry : sessions.entrySet()) { + Session session = entry.getValue(); + if (session instanceof ScanSession) { + ScanSession ss = (ScanSession) session; + + ScanState state = ScanState.RUNNING; + + ScanTask nbt = ss.nextBatchTask; + if (nbt == null) { + state = ScanState.IDLE; + } else { + switch (nbt.getScanRunState()) { + case QUEUED: + state = ScanState.QUEUED; + break; + case FINISHED: + state = ScanState.IDLE; + break; + case RUNNING: + default: + /* do nothing */ + break; + } + } + + activeScans.add(new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE, + state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB())); + + } else if (session instanceof MultiScanSession) { + MultiScanSession mss = (MultiScanSession) session; + + ScanState state = ScanState.RUNNING; + + ScanTask nbt = mss.lookupTask; + if (nbt == null) { + state = ScanState.IDLE; + } else { + switch (nbt.getScanRunState()) { + case QUEUED: + state = ScanState.QUEUED; + break; + case FINISHED: + state = ScanState.IDLE; + break; + case RUNNING: + default: + /* do nothing */ + break; + } + } + + activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime, + ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths + .getAuthorizationsBB())); + } + } + + return activeScans; + } + } + + static class TservConstraintEnv implements Environment { + + private TCredentials credentials; + private SecurityOperation security; + private Authorizations auths; + private KeyExtent ke; + + TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) { + this.security = secOp; + this.credentials = credentials; + } + + void setExtent(KeyExtent ke) { + this.ke = ke; + } + + @Override + public KeyExtent getExtent() { + return ke; + } + + @Override + public String getUser() { + return credentials.getPrincipal(); + } + + @Override + public Authorizations getAuthorizations() { + if (auths == null) + try { + this.auths = security.getUserAuthorizations(credentials); + } catch (ThriftSecurityException e) { + throw new RuntimeException(e); + } + return auths; + } + + } + + private abstract class ScanTask implements RunnableFuture { + + protected AtomicBoolean interruptFlag; + protected ArrayBlockingQueue resultQueue; + protected AtomicInteger state; + protected AtomicReference runState; + + private static final int INITIAL = 1; + private static final int ADDED = 2; + private static final int CANCELED = 3; + + ScanTask() { + interruptFlag = new AtomicBoolean(false); + runState = new AtomicReference(ScanRunState.QUEUED); + state = new AtomicInteger(INITIAL); + resultQueue = new ArrayBlockingQueue(1); + } + + protected void addResult(Object o) { + if (state.compareAndSet(INITIAL, ADDED)) + resultQueue.add(o); + else if (state.get() == ADDED) + throw new IllegalStateException("Tried to add more than one result"); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (!mayInterruptIfRunning) + throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task"); + + if (state.get() == CANCELED) + return true; + + if (state.compareAndSet(INITIAL, CANCELED)) { + interruptFlag.set(true); + resultQueue = null; + return true; + } + + return false; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(); + } + + @SuppressWarnings("unchecked") + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + + ArrayBlockingQueue localRQ = resultQueue; + + if (state.get() == CANCELED) + throw new CancellationException(); + + if (localRQ == null && state.get() == ADDED) + throw new IllegalStateException("Tried to get result twice"); + + Object r = localRQ.poll(timeout, unit); + + // could have been canceled while waiting + if (state.get() == CANCELED) { + if (r != null) + throw new IllegalStateException("Nothing should have been added when in canceled state"); + + throw new CancellationException(); + } + + if (r == null) + throw new TimeoutException(); + + // make this method stop working now that something is being + // returned + resultQueue = null; + + if (r instanceof Throwable) + throw new ExecutionException((Throwable) r); + + return (T) r; + } + + @Override + public boolean isCancelled() { + return state.get() == CANCELED; + } + + @Override + public boolean isDone() { + return runState.get().equals(ScanRunState.FINISHED); + } + + public ScanRunState getScanRunState() { + return runState.get(); + } + + } + + private static class UpdateSession extends Session { + public Tablet currentTablet; + public MapCounter successfulCommits = new MapCounter(); + Map failures = new HashMap(); + HashMap authFailures = new HashMap(); + public Violations violations; + public TCredentials credentials; + public long totalUpdates = 0; + public long flushTime = 0; + Stat prepareTimes = new Stat(); + Stat walogTimes = new Stat(); + Stat commitTimes = new Stat(); + Stat authTimes = new Stat(); + public Map> queuedMutations = new HashMap>(); + public long queuedMutationSize = 0; + TservConstraintEnv cenv = null; + } + + private static class ScanSession extends Session { + public KeyExtent extent; + public HashSet columnSet; + public List ssiList; + public Map> ssio; + public Authorizations auths; + public long entriesReturned = 0; + public Stat nbTimes = new Stat(); + public long batchCount = 0; + public volatile ScanTask nextBatchTask; + public AtomicBoolean interruptFlag; + public Scanner scanner; + + @Override + public void cleanup() { + try { + if (nextBatchTask != null) + nextBatchTask.cancel(true); + } finally { + if (scanner != null) + scanner.close(); + } + } + + } + + private static class MultiScanSession extends Session { + HashSet columnSet; + Map> queries; + public List ssiList; + public Map> ssio; + public Authorizations auths; + + // stats + int numRanges; + int numTablets; + int numEntries; + long totalLookupTime; + + public volatile ScanTask lookupTask; + public KeyExtent threadPoolExtent; + + @Override + public void cleanup() { + if (lookupTask != null) + lookupTask.cancel(true); + } + } + + /** + * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids + * are monotonically increasing. + * + */ + static class WriteTracker { + private static AtomicLong operationCounter = new AtomicLong(1); + private Map> inProgressWrites = new EnumMap>(TabletType.class); + + WriteTracker() { + for (TabletType ttype : TabletType.values()) { + inProgressWrites.put(ttype, new TreeSet()); + } + } + + synchronized long startWrite(TabletType ttype) { + long operationId = operationCounter.getAndIncrement(); + inProgressWrites.get(ttype).add(operationId); + return operationId; + } + + synchronized void finishWrite(long operationId) { + if (operationId == -1) + return; + + boolean removed = false; + + for (TabletType ttype : TabletType.values()) { + removed = inProgressWrites.get(ttype).remove(operationId); + if (removed) + break; + } + + if (!removed) { + throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId); + } + + this.notifyAll(); + } + + synchronized void waitForWrites(TabletType ttype) { + long operationId = operationCounter.getAndIncrement(); + while (inProgressWrites.get(ttype).floor(operationId) != null) { + try { + this.wait(); + } catch (InterruptedException e) { + log.error(e, e); + } + } + } + + public long startWrite(Set keySet) { + if (keySet.size() == 0) + return -1; + + ArrayList extents = new ArrayList(keySet.size()); + + for (Tablet tablet : keySet) + extents.add(tablet.getExtent()); + + return startWrite(TabletType.type(extents)); + } + } + + public AccumuloConfiguration getSystemConfiguration() { + return serverConfig.getConfiguration(); + } + + TransactionWatcher watcher = new TransactionWatcher(); + + private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface { + + SessionManager sessionManager; + + AccumuloConfiguration acuConf = getSystemConfiguration(); + + TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics(); + + TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics(); + + WriteTracker writeTracker = new WriteTracker(); + + ThriftClientHandler() { + super(instance, watcher); + log.debug(ThriftClientHandler.class.getName() + " created"); + sessionManager = new SessionManager(getSystemConfiguration()); + // Register the metrics MBean + try { + updateMetrics.register(); + scanMetrics.register(); + } catch (Exception e) { + log.error("Exception registering MBean with MBean Server", e); + } + } + + @Override + public List bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map> files, boolean setTime) + throws ThriftSecurityException { + + if (!security.canPerformSystemActions(credentials)) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + + List failures = new ArrayList(); + + for (Entry> entry : files.entrySet()) { + TKeyExtent tke = entry.getKey(); + Map fileMap = entry.getValue(); + + Tablet importTablet = onlineTablets.get(new KeyExtent(tke)); + + if (importTablet == null) { + failures.add(tke); + } else { + try { + importTablet.importMapFiles(tid, fileMap, setTime); + } catch (IOException ioe) { + log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage()); + failures.add(tke); + } + } + } + return failures; + } + + private class NextBatchTask extends ScanTask { + + private long scanID; + + NextBatchTask(long scanID, AtomicBoolean interruptFlag) { + this.scanID = scanID; + this.interruptFlag = interruptFlag; + + if (interruptFlag.get()) + cancel(true); + } + + @Override + public void run() { + + final ScanSession scanSession = (ScanSession) sessionManager.getSession(scanID); + String oldThreadName = Thread.currentThread().getName(); + + try { + if (isCancelled() || scanSession == null) + return; + + runState.set(ScanRunState.RUNNING); + + Thread.currentThread().setName( + "User: " + scanSession.user + " Start: " + scanSession.startTime + " Client: " + scanSession.client + " Tablet: " + scanSession.extent); + + Tablet tablet = onlineTablets.get(scanSession.extent); + + if (tablet == null) { + addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift())); + return; + } + + long t1 = System.currentTimeMillis(); + ScanBatch batch = scanSession.scanner.read(); + long t2 = System.currentTimeMillis(); + scanSession.nbTimes.addStat(t2 - t1); + + // there should only be one thing on the queue at a time, so + // it should be ok to call add() + // instead of put()... if add() fails because queue is at + // capacity it means there is code + // problem somewhere + addResult(batch); + } catch (TabletClosedException e) { + addResult(new org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException(scanSession.extent.toThrift())); + } catch (IterationInterruptedException iie) { + if (!isCancelled()) { + log.warn("Iteration interrupted, when scan not cancelled", iie); + addResult(iie); + } + } catch (TooManyFilesException tmfe) { + addResult(tmfe); + } catch (Throwable e) { + log.warn("exception while scanning tablet " + (scanSession == null ? "(unknown)" : scanSession.extent), e); + addResult(e); + } finally { + runState.set(ScanRunState.FINISHED); + Thread.currentThread().setName(oldThreadName); + } + + } + } + + private class LookupTask extends ScanTask { + + private long scanID; + + LookupTask(long scanID) { + this.scanID = scanID; + } + + @Override + public void run() { + MultiScanSession session = (MultiScanSession) sessionManager.getSession(scanID); + String oldThreadName = Thread.currentThread().getName(); + + try { + if (isCancelled() || session == null) + return; + + TableConfiguration acuTableConf = ServerConfiguration.getTableConfiguration(instance, session.threadPoolExtent.getTableId().toString()); + long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM); + + runState.set(ScanRunState.RUNNING); + Thread.currentThread().setName("Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Table: "); + + long bytesAdded = 0; + long maxScanTime = 4000; + + long startTime = System.currentTimeMillis(); + + ArrayList results = new ArrayList(); + Map> failures = new HashMap>(); + ArrayList fullScans = new ArrayList(); + KeyExtent partScan = null; + Key partNextKey = null; + boolean partNextKeyInclusive = false; + + Iterator>> iter = session.queries.entrySet().iterator(); + + // check the time so that the read ahead thread is not monopolized + while (iter.hasNext() && bytesAdded < maxResultsSize && (System.currentTimeMillis() - startTime) < maxScanTime) { + Entry> entry = iter.next(); + + iter.remove(); + + // check that tablet server is serving requested tablet + Tablet tablet = onlineTablets.get(entry.getKey()); + if (tablet == null) { + failures.put(entry.getKey(), entry.getValue()); + continue; + } + Thread.currentThread().setName( + "Client: " + session.client + " User: " + session.user + " Start: " + session.startTime + " Tablet: " + entry.getKey().toString()); + + LookupResult lookupResult; + try { + + // do the following check to avoid a race condition + // between setting false below and the task being + // canceled + if (isCancelled()) + interruptFlag.set(true); + + lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList, + session.ssio, interruptFlag); + + // if the tablet was closed it it possible that the + // interrupt flag was set.... do not want it set for + // the next + // lookup + interruptFlag.set(false); + + } catch (IOException e) { + log.warn("lookup failed for tablet " + entry.getKey(), e); + throw new RuntimeException(e); + } + + bytesAdded += lookupResult.bytesAdded; + + if (lookupResult.unfinishedRanges.size() > 0) { + if (lookupResult.closed) { + failures.put(entry.getKey(), lookupResult.unfinishedRanges); + } else { + session.queries.put(entry.getKey(), lookupResult.unfinishedRanges); + partScan = entry.getKey(); + partNextKey = lookupResult.unfinishedRanges.get(0).getStartKey(); + partNextKeyInclusive = lookupResult.unfinishedRanges.get(0).isStartKeyInclusive(); + } + } else { + fullScans.add(entry.getKey()); + } + } + + long finishTime = System.currentTimeMillis(); + session.totalLookupTime += (finishTime - startTime); + session.numEntries += results.size(); + + // convert everything to thrift before adding result + List retResults = new ArrayList(); + for (KVEntry entry : results) + retResults.add(new TKeyValue(entry.key.toThrift(), ByteBuffer.wrap(entry.value))); + Map> retFailures = Translator.translate(failures, Translators.KET, new Translator.ListTranslator(Translators.RT)); + List retFullScans = Translator.translate(fullScans, Translators.KET); + TKeyExtent retPartScan = null; + TKey retPartNextKey = null; + if (partScan != null) { + retPartScan = partScan.toThrift(); + retPartNextKey = partNextKey.toThrift(); + } + // add results to queue + addResult(new MultiScanResult(retResults, retFailures, retFullScans, retPartScan, retPartNextKey, partNextKeyInclusive, session.queries.size() != 0)); + } catch (IterationInterruptedException iie) { + if (!isCancelled()) { + log.warn("Iteration interrupted, when scan not cancelled", iie); + addResult(iie); + } + } catch (Throwable e) { + log.warn("exception while doing multi-scan ", e); + addResult(e); + } finally { + Thread.currentThread().setName(oldThreadName); + runState.set(ScanRunState.FINISHED); + } + } + } + + @Override + public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List columns, int batchSize, + List ssiList, Map> ssio, List authorizations, boolean waitForWrites, boolean isolated) + throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { + + Authorizations userauths = null; + if (!security.canScan(credentials, new String(textent.getTable(), Constants.UTF8))) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + + userauths = security.getUserAuthorizations(credentials); + for (ByteBuffer auth : authorizations) + if (!userauths.contains(ByteBufferUtil.toBytes(auth))) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS); + + KeyExtent extent = new KeyExtent(textent); + + // wait for any writes that are in flight.. this done to ensure + // consistency across client restarts... assume a client writes + // to accumulo and dies while waiting for a confirmation from + // accumulo... the client process restarts and tries to read + // data from accumulo making the assumption that it will get + // any writes previously made... however if the server side thread + // processing the write from the dead client is still in progress, + // the restarted client may not see the write unless we wait here. + // this behavior is very important when the client is reading the + // !METADATA table + if (waitForWrites) + writeTracker.waitForWrites(TabletType.type(extent)); + + Tablet tablet = onlineTablets.get(extent); + if (tablet == null) + throw new NotServingTabletException(textent); + + ScanSession scanSession = new ScanSession(); + scanSession.user = credentials.getPrincipal(); + scanSession.extent = new KeyExtent(extent); + scanSession.columnSet = new HashSet(); + scanSession.ssiList = ssiList; + scanSession.ssio = ssio; + scanSession.auths = new Authorizations(authorizations); + scanSession.interruptFlag = new AtomicBoolean(); + + for (TColumn tcolumn : columns) { + scanSession.columnSet.add(new Column(tcolumn)); + } + + scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated, + scanSession.interruptFlag); + + long sid = sessionManager.createSession(scanSession, true); + + ScanResult scanResult; + try { + scanResult = continueScan(tinfo, sid, scanSession); + } catch (NoSuchScanIDException e) { + log.error("The impossible happened", e); + throw new RuntimeException(); + } finally { + sessionManager.unreserveSession(sid); + } + + return new InitialScan(sid, scanResult); + } + + @Override + public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException, + org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { + ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID); + if (scanSession == null) { + throw new NoSuchScanIDException(); + } + + try { + return continueScan(tinfo, scanID, scanSession); + } finally { + sessionManager.unreserveSession(scanSession); + } + } + + private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession) throws NoSuchScanIDException, NotServingTabletException, + org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException { + + if (scanSession.nextBatchTask == null) { + scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag); + resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask); + } + + ScanBatch bresult; + try { + bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); + scanSession.nextBatchTask = null; + } catch (ExecutionException e) { + sessionManager.removeSession(scanID); + if (e.getCause() instanceof NotServingTabletException) + throw (NotServingTabletException) e.getCause(); + else if (e.getCause() instanceof TooManyFilesException) + throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(scanSession.extent.toThrift()); + else + throw new RuntimeException(e); + } catch (CancellationException ce) { + sessionManager.removeSession(scanID); + Tablet tablet = onlineTablets.get(scanSession.extent); + if (tablet == null || tablet.isClosed()) + throw new NotServingTabletException(scanSession.extent.toThrift()); + else + throw new NoSuchScanIDException(); + } catch (TimeoutException e) { + List param = Collections.emptyList(); + long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT); + sessionManager.removeIfNotAccessed(scanID, timeout); + return new ScanResult(param, true); + } catch (Throwable t) { + sessionManager.removeSession(scanID); + log.warn("Failed to get next batch", t); + throw new RuntimeException(t); + } + + ScanResult scanResult = new ScanResult(Key.compress(bresult.results), bresult.more); + + scanSession.entriesReturned += scanResult.results.size(); + + scanSession.batchCount++; + + if (scanResult.more && scanSession.batchCount > 3) { + // start reading next batch while current batch is transmitted + // to client + scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag); + resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask); + } + + if (!scanResult.more) + closeScan(tinfo, scanID); + + return scanResult; + } + + @Override + public void closeScan(TInfo tinfo, long scanID) { + ScanSession ss = (ScanSession) sessionManager.removeSession(scanID); + if (ss != null) { + long t2 = System.currentTimeMillis(); + + log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ", TServerUtils.clientAddress.get(), ss.extent.getTableId() + .toString(), ss.entriesReturned, (t2 - ss.startTime) / 1000.0, ss.nbTimes.toString())); + if (scanMetrics.isEnabled()) { + scanMetrics.add(TabletServerScanMetrics.scan, t2 - ss.startTime); + scanMetrics.add(TabletServerScanMetrics.resultSize, ss.entriesReturned); + } + } + } + + @Override + public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map> tbatch, List tcolumns, + List ssiList, Map> ssio, List authorizations, boolean waitForWrites) throws ThriftSecurityException { + // find all of the tables that need to be scanned + HashSet tables = new HashSet(); + for (TKeyExtent keyExtent : tbatch.keySet()) { + tables.add(new String(keyExtent.getTable(), Constants.UTF8)); + } + + // check if user has permission to the tables + Authorizations userauths = null; + for (String table : tables) + if (!security.canScan(credentials, table)) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + + userauths = security.getUserAuthorizations(credentials); + for (ByteBuffer auth : authorizations) + if (!userauths.contains(ByteBufferUtil.toBytes(auth))) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS); + + KeyExtent threadPoolExtent = null; + + Map> batch = Translator.translate(tbatch, Translators.TKET, new Translator.ListTranslator(Translators.TRT)); + + for (KeyExtent keyExtent : batch.keySet()) { + if (threadPoolExtent == null) { + threadPoolExtent = keyExtent; + } else if (keyExtent.isRootTablet()) { + throw new IllegalArgumentException("Cannot batch query root tablet with other tablets " + threadPoolExtent + " " + keyExtent); + } else if (keyExtent.isMeta() && !threadPoolExtent.isMeta()) { + throw new IllegalArgumentException("Cannot batch query !METADATA and non !METADATA tablets " + threadPoolExtent + " " + keyExtent); + } + + } + + if (waitForWrites) + writeTracker.waitForWrites(TabletType.type(batch.keySet())); + + MultiScanSession mss = new MultiScanSession(); + mss.user = credentials.getPrincipal(); + mss.queries = batch; + mss.columnSet = new HashSet(tcolumns.size()); + mss.ssiList = ssiList; + mss.ssio = ssio; + mss.auths = new Authorizations(authorizations); + + mss.numTablets = batch.size(); + for (List ranges : batch.values()) { + mss.numRanges += ranges.size(); + } + + for (TColumn tcolumn : tcolumns) + mss.columnSet.add(new Column(tcolumn)); + + mss.threadPoolExtent = threadPoolExtent; + + long sid = sessionManager.createSession(mss, true); + + MultiScanResult result; + try { + result = continueMultiScan(tinfo, sid, mss); + } catch (NoSuchScanIDException e) { + log.error("the impossible happened", e); + throw new RuntimeException("the impossible happened", e); + } finally { + sessionManager.unreserveSession(sid); + } + + return new InitialMultiScan(sid, result); + } + + @Override + public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException { + + MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID); + + if (session == null) { + throw new NoSuchScanIDException(); + } + + try { + return continueMultiScan(tinfo, scanID, session); + } finally { + sessionManager.unreserveSession(session); + } + } + + private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session) throws NoSuchScanIDException { + + if (session.lookupTask == null) { + session.lookupTask = new LookupTask(scanID); + resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask); + } + + try { + MultiScanResult scanResult = session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); + session.lookupTask = null; + return scanResult; + } catch (TimeoutException e1) { + long timeout = acuConf.getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT); + sessionManager.removeIfNotAccessed(scanID, timeout); + List results = Collections.emptyList(); + Map> failures = Collections.emptyMap(); + List fullScans = Collections.emptyList(); + return new MultiScanResult(results, failures, fullScans, null, null, false, true); + } catch (Throwable t) { + sessionManager.removeSession(scanID); + log.warn("Failed to get multiscan result", t); + throw new RuntimeException(t); + } + } + + @Override + public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException { + MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID); + if (session == null) { + throw new NoSuchScanIDException(); + } + + long t2 = System.currentTimeMillis(); + log.debug(String.format("MultiScanSess %s %,d entries in %.2f secs (lookup_time:%.2f secs tablets:%,d ranges:%,d) ", TServerUtils.clientAddress.get(), + session.numEntries, (t2 - session.startTime) / 1000.0, session.totalLookupTime / 1000.0, session.numTablets, session.numRanges)); + } + + @Override + public long startUpdate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException { + // Make sure user is real + + security.authenticateUser(credentials, credentials); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0); + + UpdateSession us = new UpdateSession(); + us.violations = new Violations(); + us.credentials = credentials; + us.cenv = new TservConstraintEnv(security, us.credentials); + + long sid = sessionManager.createSession(us, false); + + return sid; + } + + private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) { + long t1 = System.currentTimeMillis(); + if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent)) + return; + if (us.currentTablet == null && (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) { + // if there were previous failures, then do not accept additional writes + return; + } + + try { + // if user has no permission to write to this table, add it to + // the failures list + boolean sameTable = us.currentTablet != null && (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId())); + if (sameTable || security.canWrite(us.credentials, keyExtent.getTableId().toString())) { + long t2 = System.currentTimeMillis(); + us.authTimes.addStat(t2 - t1); + us.currentTablet = onlineTablets.get(keyExtent); + if (us.currentTablet != null) { + us.queuedMutations.put(us.currentTablet, new ArrayList()); + } else { + // not serving tablet, so report all mutations as + // failures + us.failures.put(keyExtent, 0l); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.unknownTabletErrors, 0); + } + } else { + log.warn("Denying access to table " + keyExtent.getTableId() + " for user " + us.credentials.getPrincipal()); + long t2 = System.currentTimeMillis(); + us.authTimes.addStat(t2 - t1); + us.currentTablet = null; + us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0); + return; + } + } catch (ThriftSecurityException e) { + log.error("Denying permission to check user " + us.credentials.getPrincipal() + " with user " + e.getUser(), e); + long t2 = System.currentTimeMillis(); + us.authTimes.addStat(t2 - t1); + us.currentTablet = null; + us.authFailures.put(keyExtent, e.getCode()); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0); + return; + } + } + + @Override + public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent, List tmutations) { + UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID); + if (us == null) { + throw new RuntimeException("No Such SessionID"); + } + + try { + KeyExtent keyExtent = new KeyExtent(tkeyExtent); + setUpdateTablet(us, keyExtent); + + if (us.currentTablet != null) { + List mutations = us.queuedMutations.get(us.currentTablet); + for (TMutation tmutation : tmutations) { + Mutation mutation = new ServerMutation(tmutation); + mutations.add(mutation); + us.queuedMutationSize += mutation.numBytes(); + } + if (us.queuedMutationSize > getSystemConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX)) + flush(us); + } + } finally { + sessionManager.unreserveSession(us); + } + } + + private void flush(UpdateSession us) { + + int mutationCount = 0; + Map> sendables = new HashMap>(); + Throwable error = null; + + long pt1 = System.currentTimeMillis(); + + boolean containsMetadataTablet = false; + for (Tablet tablet : us.queuedMutations.keySet()) + if (tablet.getExtent().isMeta()) + containsMetadataTablet = true; + + if (!containsMetadataTablet && us.queuedMutations.size() > 0) + TabletServer.this.resourceManager.waitUntilCommitsAreEnabled(); + + Span prep = Trace.start("prep"); + for (Entry> entry : us.queuedMutations.entrySet()) { + + Tablet tablet = entry.getKey(); + List mutations = entry.getValue(); + if (mutations.size() > 0) { + try { + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size()); + + CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations); + if (commitSession == null) { + if (us.currentTablet == tablet) { + us.currentTablet = null; + } + us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet)); + } else { + sendables.put(commitSession, mutations); + mutationCount += mutations.size(); + } + + } catch (TConstraintViolationException e) { + us.violations.add(e.getViolations()); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0); + + if (e.getNonViolators().size() > 0) { + // only log and commit mutations if there were some + // that did not + // violate constraints... this is what + // prepareMutationsForCommit() + // expects + sendables.put(e.getCommitSession(), e.getNonViolators()); + } + + mutationCount += mutations.size(); + + } catch (HoldTimeoutException t) { + error = t; + log.debug("Giving up on mutations due to a long memory hold time"); + break; + } catch (Throwable t) { + error = t; + log.error("Unexpected error preparing for commit", error); + break; + } + } + } + prep.stop(); + + Span wal = Trace.start("wal"); + long pt2 = System.currentTimeMillis(); + long avgPrepareTime = (long) ((pt2 - pt1) / (double) us.queuedMutations.size()); + us.prepareTimes.addStat(pt2 - pt1); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.commitPrep, (avgPrepareTime)); + + if (error != null) { + for (Entry> e : sendables.entrySet()) { + e.getKey().abortCommit(e.getValue()); + } + throw new RuntimeException(error); + } + try { + while (true) { + try { + long t1 = System.currentTimeMillis(); + + logger.logManyTablets(sendables); + + long t2 = System.currentTimeMillis(); + us.walogTimes.addStat(t2 - t1); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, (t2 - t1)); + + break; + } catch (IOException ex) { + log.warn("logging mutations failed, retrying"); + } catch (Throwable t) { + log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t); + throw new RuntimeException(t); + } + } + + wal.stop(); + + Span commit = Trace.start("commit"); + long t1 = System.currentTimeMillis(); + for (Entry> entry : sendables.entrySet()) { + CommitSession commitSession = entry.getKey(); + List mutations = entry.getValue(); + + commitSession.commit(mutations); + + Tablet tablet = commitSession.getTablet(); + + if (tablet == us.currentTablet) { + // because constraint violations may filter out some + // mutations, for proper + // accounting with the client code, need to increment + // the count based + // on the original number of mutations from the client + // NOT the filtered number + us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size()); + } + } + long t2 = System.currentTimeMillis(); + + long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size()); + + us.flushTime += (t2 - pt1); + us.commitTimes.addStat(t2 - t1); + + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.commitTime, avgCommitTime); + commit.stop(); + } finally { + us.queuedMutations.clear(); + if (us.currentTablet != null) { + us.queuedMutations.put(us.currentTablet, new ArrayList()); + } + us.queuedMutationSize = 0; + } + us.totalUpdates += mutationCount; + } + + @Override + public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException { + UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID); + if (us == null) { + throw new NoSuchScanIDException(); + } + + // clients may or may not see data from an update session while + // it is in progress, however when the update session is closed + // want to ensure that reads wait for the write to finish + long opid = writeTracker.startWrite(us.queuedMutations.keySet()); + + try { + flush(us); + } finally { + writeTracker.finishWrite(opid); + } + + log.debug(String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)", TServerUtils.clientAddress.get(), us.totalUpdates, + (System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(), us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0, + us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0)); + if (us.failures.size() > 0) { + Entry first = us.failures.entrySet().iterator().next(); + log.debug(String.format("Failures: %d, first extent %s successful commits: %d", us.failures.size(), first.getKey().toString(), first.getValue())); + } + List violations = us.violations.asList(); + if (violations.size() > 0) { + ConstraintViolationSummary first = us.violations.asList().iterator().next(); + log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(), first.violationDescription, first.numberOfViolatingMutations)); + } + if (us.authFailures.size() > 0) { + KeyExtent first = us.authFailures.keySet().iterator().next(); + log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(), first.toString())); + } + + return new UpdateErrors(Translator.translate(us.failures, Translators.KET), Translator.translate(violations, Translators.CVST), Translator.translate( + us.authFailures, Translators.KET)); + } + + @Override + public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation tmutation) throws NotServingTabletException, + ConstraintViolationException, ThriftSecurityException { + + if (!security.canWrite(credentials, new String(tkeyExtent.getTable(), Constants.UTF8))) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + KeyExtent keyExtent = new KeyExtent(tkeyExtent); + Tablet tablet = onlineTablets.get(new KeyExtent(keyExtent)); + if (tablet == null) { + throw new NotServingTabletException(tkeyExtent); + } + + if (!keyExtent.isMeta()) + TabletServer.this.resourceManager.waitUntilCommitsAreEnabled(); + + long opid = writeTracker.startWrite(TabletType.type(keyExtent)); + + try { + Mutation mutation = new ServerMutation(tmutation); + List mutations = Collections.singletonList(mutation); + + Span prep = Trace.start("prep"); + CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations); + prep.stop(); + if (cs == null) { + throw new NotServingTabletException(tkeyExtent); + } + + while (true) { + try { + Span wal = Trace.start("wal"); + logger.log(cs, cs.getWALogSeq(), mutation); + wal.stop(); + break; + } catch (IOException ex) { + log.warn(ex, ex); + } + } + + Span commit = Trace.start("commit"); + cs.commit(mutations); + commit.stop(); + } catch (TConstraintViolationException e) { + throw new ConstraintViolationException(Translator.translate(e.getViolations().asList(), Translators.CVST)); + } finally { + writeTracker.finishWrite(opid); + } + } + + @Override + public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) + throws NotServingTabletException, ThriftSecurityException { + + String tableId = new String(ByteBufferUtil.toBytes(tkeyExtent.table), Constants.UTF8); + if (!security.canSplitTablet(credentials, tableId)) + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + + KeyExtent keyExtent = new KeyExtent(tkeyExtent); + + Tablet tablet = onlineTablets.get(keyExtent); + if (tablet == null) { + throw new NotServingTabletException(tkeyExtent); + } + + if (keyExtent.getEndRow() == null || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) { + try { + if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) { + throw new NotServingTabletException(tkeyExtent); + } + } catch (IOException e) { + log.warn("Failed to split " + keyExtent, e); + throw new RuntimeException(e); + } + } + } + + @Override + public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { + return getStats(sessionManager.getActiveScansPerTable()); + } + + @Override + public List getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException { + TreeMap onlineTabletsCopy; + synchronized (onlineTablets) { + onlineTabletsCopy = new TreeMap(onlineTablets); + } + List result = new ArrayList(); + Text text = new Text(tableId); + KeyExtent start = new KeyExtent(text, new Text(), null); + for (Entry entry : onlineTabletsCopy.tailMap(start).entrySet()) { + KeyExtent ke = entry.getKey(); + if (ke.getTableId().compareTo(text) == 0) { + Tablet tablet = entry.getValue(); + TabletStats stats = tablet.timer.getTabletStats(); + stats.extent = ke.toThrift(); + stats.ingestRate = tablet.ingestRate(); + stats.queryRate = tablet.queryRate(); + stats.splitCreationTime = tablet.getSplitCreationTime(); + stats.numEntries = tablet.getNumEntries(); + result.add(stats); + } + } + return result; + } + + private ZooCache masterLockCache = new ZooCache(); + + private void checkPermission(TCredentials credentials, String lock, boolean requiresSystemPermission, final String request) + throws ThriftSecurityException { + if (requiresSystemPermission) { + boolean fatal = false; + try { + log.debug("Got " + request + " message from user: " + credentials.getPrincipal()); + if (!security.canPerformSystemActions(credentials)) { + log.warn("Got " + request + " message from user: " + credentials.getPrincipal()); + throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + } + } catch (ThriftSecurityException e) { + log.warn("Got " + request + " message from unauthenticatable user: " + e.getUser()); + if (e.getUser().equals(SecurityConstants.SYSTEM_PRINCIPAL)) { + log.fatal("Got message from a service with a mismatched configuration. Please ensure a compatible configuration.", e); + fatal = true; + } + throw e; + } finally { + if (fatal) { + Halt.halt(1, new Runnable() { + @Override + public void run() { + logGCInfo(getSystemConfiguration()); + } + }); + } + } + } + + if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) { + log.warn("Got " + request + " message from master before lock acquired, ignoring..."); + throw new RuntimeException("Lock not acquired"); + } + + if (tabletServerLock != null && tabletServerLock.wasLockAcquired() && !tabletServerLock.isLocked()) { + Halt.halt(1, new Runnable() { + @Override + public void run() { + log.info("Tablet server no longer holds lock during checkPermission() : " + request + ", exiting"); + logGCInfo(getSystemConfiguration()); + } + }); + } + + if (lock != null) { + ZooUtil.LockID lid = new ZooUtil.LockID(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK, lock); + + try { + if (!ZooLock.isLockHeld(masterLockCache, lid)) { + // maybe the cache is out of date and a new master holds the + // lock? + masterLockCache.clear(); + if (!ZooLock.isLockHeld(masterLockCache, lid)) { + log.warn("Got " + request + " message from a master that does not hold the current lock " + lock); + throw new RuntimeException("bad master lock"); + } + } + } catch (Exception e) { + throw new RuntimeException("bad master lock", e); + } + } + } + + @Override + public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) { + + try { + checkPermission(credentials, lock, true, "loadTablet"); + } catch (ThriftSecurityException e) { + log.error(e, e); + throw new RuntimeException(e); + } + + final KeyExtent extent = new KeyExtent(textent); + + synchronized (unopenedTablets) { + synchronized (openingTablets) { + synchronized (onlineTablets) { + + // checking if this exact tablet is in any of the sets + // below is not a strong enough check + // when splits and fix splits occurring + + Set unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets); + Set openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets); + Set onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets); + Set all = new HashSet(); + all.addAll(unopenedOverlapping); + all.addAll(openingOverlapping); + all.addAll(onlineOverlapping); + + if (!all.isEmpty()) { + if (all.size() != 1 || !all.contains(extent)) { + log.error("Tablet " + extent + " overlaps previously assigned " + unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping); + } + return; + } + + unopenedTablets.add(extent); + } + } + } + + // add the assignment job to the appropriate queue + log.info("Loading tablet " + extent); + + final Runnable ah = new LoggingRunnable(log, new AssignmentHandler(extent)); + // Root tablet assignment must take place immediately + if (extent.isRootTablet()) { + new Daemon("Root Tablet Assignment") { + @Override + public void run() { + ah.run(); + if (onlineTablets.containsKey(extent)) { + log.info("Root tablet loaded: " + extent); + } else { + log.info("Root tablet failed to load"); + } + + } + }.start(); + } else { + if (extent.isMeta()) { + resourceManager.addMetaDataAssignment(ah); + } else { + resourceManager.addAssignment(ah); + } + } + } + + @Override + public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent, boolean save) { + try { + checkPermission(credentials, lock, true, "unloadTablet"); + } catch (ThriftSecurityException e) { + log.error(e, e); + throw new RuntimeException(e); + } + + KeyExtent extent = new KeyExtent(textent); + + resourceManager.addMigration(extent, new LoggingRunnable(log, new UnloadTabletHandler(extent, save))); + } + + @Override + public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) { + try { + checkPermission(credentials, lock, true, "flush"); + } catch (ThriftSecurityException e) { + log.error(e, e); + throw new RuntimeException(e); + } + + ArrayList tabletsToFlush = new ArrayList(); + + KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow)); + + synchronized (onlineTablets) { + for (Tablet tablet : onlineTablets.values()) + if (ke.overlaps(tablet.getExtent())) + tabletsToFlush.add(tablet); + } + + Long flushID = null; + + for (Tablet tablet : tabletsToFlush) { + if (flushID == null) { + // read the flush id once from zookeeper instead of reading + // it for each tablet + try { + flushID = tablet.getFlushID(); + } catch (NoNodeException e) { + // table was probably deleted + log.info("Asked to flush table that has no flush id " + ke + " " + e.getMessage()); + return; + } + } + tablet.flush(flushID); + } + } + + @Override + public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) throws TException { + try { + checkPermission(credentials, lock, true, "flushTablet"); + } catch (ThriftSecurityException e) { + log.error(e, e); + throw new RuntimeException(e); + } + + Tablet tablet = onlineTablets.get(new KeyExtent(textent)); + if (tablet != null) { + log.info("Flushing " + tablet.getExtent()); + try { + tablet.flush(tablet.getFlushID()); + } catch (NoNodeException nne) { + log.info("Asked to flush tablet that has no flush id " + new KeyExtent(textent) + " " + nne.getMessage()); + } + } + } + + @Override + public void halt(TInfo tinfo, TCredentials credentials, String lock) throws ThriftSecurityException { + + checkPermission(credentials, lock, true, "halt"); + + Halt.halt(0, new Runnable() { + @Override + public void run() { + log.info("Master requested tablet server halt"); + logGCInfo(getSystemConfiguration()); + serverStopRequested = true; + try { + tabletServerLock.unlock(); + } catch (Exception e) { + log.error(e, e); + } + } + }); + } + + @Override + public void fastHalt(TInfo info, TCredentials credentials, String lock) { + try { + halt(info, credentials, lock); + } catch (Exception e) { + log.warn("Error halting", e); + } + } + + @Override + public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { + return statsKeeper.getTabletStats(); + } + + @Override + public List getActiveScans(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { + try { + checkPermission(credentials, null, true, "getScans"); + } catch (ThriftSecurityException e) { + log.error(e, e); + throw e; + } + + return sessionManager.getActiveScans(); + } + + @Override + public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent) throws TException { + try { + checkPermission(credentials, lock, true, "chop"); + } catch (ThriftSecurityException e) { + log.error(e, e); + throw new RuntimeException(e); + } + + KeyExtent ke = new KeyExtent(textent); + + Tablet tablet = onlineTablets.get(ke); + if (tablet != null) { + tablet.chopFiles(); + } + } + + @Override + public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) + throws TException { + try { + checkPermission(credentials, lock, true, "compact"); + } catch (ThriftSecurityException e) { + log.error(e, e); + throw new RuntimeException(e); + } + + KeyExtent ke = new KeyExtent(new Text(tableId), ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow)); + + ArrayList tabletsToCompact = new ArrayList(); + synchronized (onlineTablets) { + for (Tablet tablet : onlineTablets.values()) + if (ke.overlaps(tablet.getExtent())) + tabletsToCompact.add(tablet); + } + + Long compactionId = null; + + for (Tablet tablet : tabletsToCompact) { + // all for the same table id, so only need to read + // compaction id once + if (compactionId == null) + try { + compactionId = tablet.getCompactionID().getFirst(); + } catch (NoNodeException e) { + log.info("Asked to compact table with no compaction id " + ke + " " + e.getMessage()); + return; + } + tablet.compactAll(compactionId); + } + + } + - /* - * (non-Javadoc) - * - * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.trace.thrift.TInfo, - * org.apache.accumulo.core.security.thrift.Credentials, java.util.List) - */ + @Override + public void removeLogs(TInfo tinfo, TCredentials credentials, List filenames) throws TException { + String myname = getClientAddressString(); + myname = myname.replace(':', '+'); + Path logDir = new Path(Constants.getWalDirectory(acuConf), myname); + Set loggers = new HashSet(); + logger.getLoggers(loggers); + nextFile: for (String filename : filenames) { + for (String logger : loggers) { + if (logger.contains(filename)) + continue nextFile; + } + List onlineTabletsCopy = new ArrayList(); + synchronized (onlineTablets) { + onlineTabletsCopy.addAll(onlineTablets.values()); + } + for (Tablet tablet : onlineTabletsCopy) { + for (String current : tablet.getCurrentLogs()) { + if (current.contains(filename)) { + log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent()); + continue nextFile; + } + } + } + try { + String source = logDir + "/" + filename; + if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) { + String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive"; + fs.mkdirs(new Path(walogArchive)); + String dest = walogArchive + "/" + filename; + log.info("Archiving walog " + source + " to " + dest); + if (!fs.rename(new Path(source), new Path(dest))) + log.error("rename is unsuccessful"); + } else { + log.info("Deleting walog " + filename); + Trash trash = new Trash(fs, fs.getConf()); + Path sourcePath = new Path(source); + if (!(!acuConf.getBoolean(Property.GC_TRASH_IGNORE) && trash.moveToTrash(sourcePath)) && !fs.delete(sourcePath, true)) + log.warn("Failed to delete walog " + source); + Path recoveryPath = new Path(Constants.getRecoveryDir(acuConf), filename); + try { + if (trash.moveToTrash(recoveryPath) || fs.delete(recoveryPath, true)) + log.info("Deleted any recovery log " + filename); + } catch (FileNotFoundException ex) { + // ignore + } + + } + } catch (IOException e) { + log.warn("Error attempting to delete write-ahead log " + filename + ": " + e); + } + } + } + + @Override + public List getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { + try { + checkPermission(credentials, null, true, "getActiveCompactions"); + } catch (ThriftSecurityException e) { + log.error(e, e); + throw e; + } + + List compactions = Compactor.getRunningCompactions(); + List ret = new ArrayList(compactions.size()); + + for (CompactionInfo compactionInfo : compactions) { + ret.add(compactionInfo.toThrift()); + } + + return ret; + } + } + + private class SplitRunner implements Runnable { + private Tablet tablet; + + public SplitRunner(Tablet tablet) { + this.tablet = tablet; + } + + @Override + public void run() { + if (majorCompactorDisabled) { + // this will make split task that were queued when shutdown was + // initiated exit + return; + } + + splitTablet(tablet); + } + } + + boolean isMajorCompactionDisabled() { + return majorCompactorDisabled; + } + + void executeSplit(Tablet tablet) { + resourceManager.executeSplit(tablet.getExtent(), new LoggingRunnable(log, new SplitRunner(tablet))); + } + + private class MajorCompactor implements Runnable { + + @Override + public void run() { + while (!majorCompactorDisabled) { + try { + UtilWaitThread.sleep(getSystemConfiguration().getTimeInMillis(Property.TSERV_MAJC_DELAY)); + + TreeMap copyOnlineTablets = new TreeMap(); + + synchronized (onlineTablets) { + copyOnlineTablets.putAll(onlineTablets); // avoid + // concurrent + // modification + } + + int numMajorCompactions