Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5174910943 for ; Fri, 7 Mar 2014 20:54:17 +0000 (UTC) Received: (qmail 22438 invoked by uid 500); 7 Mar 2014 20:53:47 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 21827 invoked by uid 500); 7 Mar 2014 20:53:26 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 21210 invoked by uid 99); 7 Mar 2014 20:53:17 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Mar 2014 20:53:17 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2E30093A879; Fri, 7 Mar 2014 20:53:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Fri, 07 Mar 2014 20:53:44 -0000 Message-Id: <17399eb68720422dbaa91ae9bf7c26a1@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [30/50] [abbrv] Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT http://git-wip-us.apache.org/repos/asf/accumulo/blob/778fc985/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java ---------------------------------------------------------------------- diff --cc server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java index 303449d,0000000..a6a5e88 mode 100644,000000..100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java @@@ -1,3922 -1,0 +1,3923 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.tabletserver; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.impl.ScannerImpl; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationObserver; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.constraints.Violations; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.KeyValue; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.data.thrift.MapFileInfo; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.FileUtil; +import org.apache.accumulo.core.iterators.IterationInterruptedException; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; +import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter; +import org.apache.accumulo.core.iterators.system.DeletingIterator; +import org.apache.accumulo.core.iterators.system.InterruptibleIterator; +import org.apache.accumulo.core.iterators.system.MultiIterator; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource; +import org.apache.accumulo.core.iterators.system.StatsIterator; +import org.apache.accumulo.core.iterators.system.VisibilityFilter; +import org.apache.accumulo.core.master.thrift.TabletLoadState; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.security.CredentialHelper; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.accumulo.core.util.MetadataTable.DataFileValue; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.constraints.ConstraintChecker; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.tableOps.CompactRange.CompactionIterators; +import org.apache.accumulo.server.problems.ProblemReport; +import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.problems.ProblemType; +import org.apache.accumulo.server.security.SecurityConstants; +import org.apache.accumulo.server.tabletserver.Compactor.CompactionCanceledException; +import org.apache.accumulo.server.tabletserver.Compactor.CompactionEnv; +import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager; +import org.apache.accumulo.server.tabletserver.InMemoryMap.MemoryIterator; +import org.apache.accumulo.server.tabletserver.TabletServer.TservConstraintEnv; +import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager; +import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation; +import org.apache.accumulo.server.tabletserver.log.DfsLogger; +import org.apache.accumulo.server.tabletserver.log.MutationReceiver; +import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage; +import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics; +import org.apache.accumulo.server.trace.TraceFileSystem; +import org.apache.accumulo.server.util.MapCounter; +import org.apache.accumulo.server.util.MetadataTable; +import org.apache.accumulo.server.util.MetadataTable.LogEntry; +import org.apache.accumulo.server.util.TabletOperations; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; + + +/* + * We need to be able to have the master tell a tabletServer to + * close this file, and the tablet server to handle all pending client reads + * before closing + * + */ + +/** + * + * this class just provides an interface to read from a MapFile mostly takes care of reporting start and end keys + * + * need this because a single row extent can have multiple columns this manages all the columns (each handled by a store) for a single row-extent + * + * + */ + +public class Tablet { + + enum MajorCompactionReason { + // do not change the order, the order of this enum determines the order + // in which queued major compactions are executed + USER, + CHOP, + NORMAL, + IDLE + } + + enum MinorCompactionReason { + USER, SYSTEM, CLOSE + } + + public class CommitSession { + + private int seq; + private InMemoryMap memTable; + private int commitsInProgress; + private long maxCommittedTime = Long.MIN_VALUE; + + private CommitSession(int seq, InMemoryMap imm) { + this.seq = seq; + this.memTable = imm; + commitsInProgress = 0; + } + + public int getWALogSeq() { + return seq; + } + + private void decrementCommitsInProgress() { + if (commitsInProgress < 1) + throw new IllegalStateException("commitsInProgress = " + commitsInProgress); + + commitsInProgress--; + if (commitsInProgress == 0) + Tablet.this.notifyAll(); + } + + private void incrementCommitsInProgress() { + if (commitsInProgress < 0) + throw new IllegalStateException("commitsInProgress = " + commitsInProgress); + + commitsInProgress++; + } + + private void waitForCommitsToFinish() { + while (commitsInProgress > 0) { + try { + Tablet.this.wait(50); + } catch (InterruptedException e) { + log.warn(e, e); + } + } + } + + public void abortCommit(List value) { + Tablet.this.abortCommit(this, value); + } + + public void commit(List mutations) { + Tablet.this.commit(this, mutations); + } + + public Tablet getTablet() { + return Tablet.this; + } + + public boolean beginUpdatingLogsUsed(ArrayList copy, boolean mincFinish) { + return Tablet.this.beginUpdatingLogsUsed(memTable, copy, mincFinish); + } + + public void finishUpdatingLogsUsed() { + Tablet.this.finishUpdatingLogsUsed(); + } + + public int getLogId() { + return logId; + } + + public KeyExtent getExtent() { + return extent; + } + + private void updateMaxCommittedTime(long time) { + maxCommittedTime = Math.max(time, maxCommittedTime); + } + + private long getMaxCommittedTime() { + if (maxCommittedTime == Long.MIN_VALUE) + throw new IllegalStateException("Tried to read max committed time when it was never set"); + return maxCommittedTime; + } + + } + + private class TabletMemory { + private InMemoryMap memTable; + private InMemoryMap otherMemTable; + private InMemoryMap deletingMemTable; + private int nextSeq = 1; + private CommitSession commitSession; + + TabletMemory() { + memTable = new InMemoryMap(tabletServer.getSystemConfiguration()); + commitSession = new CommitSession(nextSeq, memTable); + nextSeq += 2; + } + + InMemoryMap getMemTable() { + return memTable; + } + + InMemoryMap getMinCMemTable() { + return otherMemTable; + } + + CommitSession prepareForMinC() { + if (otherMemTable != null) { + throw new IllegalStateException(); + } + + if (deletingMemTable != null) { + throw new IllegalStateException(); + } + + otherMemTable = memTable; + memTable = new InMemoryMap(tabletServer.getSystemConfiguration()); + + CommitSession oldCommitSession = commitSession; + commitSession = new CommitSession(nextSeq, memTable); + nextSeq += 2; + + tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), otherMemTable.estimatedSizeInBytes()); + + return oldCommitSession; + } + + void finishedMinC() { + + if (otherMemTable == null) { + throw new IllegalStateException(); + } + + if (deletingMemTable != null) { + throw new IllegalStateException(); + } + + deletingMemTable = otherMemTable; + + otherMemTable = null; + Tablet.this.notifyAll(); + } + + void finalizeMinC() { + try { + deletingMemTable.delete(15000); + } finally { + synchronized (Tablet.this) { + if (otherMemTable != null) { + throw new IllegalStateException(); + } + + if (deletingMemTable == null) { + throw new IllegalStateException(); + } + + deletingMemTable = null; + + tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0); + } + } + } + + boolean memoryReservedForMinC() { + return otherMemTable != null || deletingMemTable != null; + } + + void waitForMinC() { + while (otherMemTable != null || deletingMemTable != null) { + try { + Tablet.this.wait(50); + } catch (InterruptedException e) { + log.warn(e, e); + } + } + } + + void mutate(CommitSession cm, List mutations) { + cm.memTable.mutate(mutations); + } + + void updateMemoryUsageStats() { + long other = 0; + if (otherMemTable != null) + other = otherMemTable.estimatedSizeInBytes(); + else if (deletingMemTable != null) + other = deletingMemTable.estimatedSizeInBytes(); + + tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other); + } + + List getIterators() { + List toReturn = new ArrayList(2); + toReturn.add(memTable.skvIterator()); + if (otherMemTable != null) + toReturn.add(otherMemTable.skvIterator()); + return toReturn; + } + + void returnIterators(List iters) { + for (MemoryIterator iter : iters) { + iter.close(); + } + } + + public long getNumEntries() { + if (otherMemTable != null) + return memTable.getNumEntries() + otherMemTable.getNumEntries(); + return memTable.getNumEntries(); + } + + CommitSession getCommitSession() { + return commitSession; + } + } + + private TabletMemory tabletMemory; + + private final TabletTime tabletTime; + private long persistedTime; + private final Object timeLock = new Object(); + + private final Path location; // absolute path of this tablets dir + private TServerInstance lastLocation; + + private Configuration conf; + private FileSystem fs; + + private TableConfiguration acuTableConf; + + private volatile boolean tableDirChecked = false; + + private AtomicLong dataSourceDeletions = new AtomicLong(0); + private Set activeScans = new HashSet(); + + private volatile boolean closing = false; + private boolean closed = false; + private boolean closeComplete = false; + + private long lastFlushID = -1; + private long lastCompactID = -1; + + private KeyExtent extent; + + private TabletResourceManager tabletResources; + final private DatafileManager datafileManager; + private volatile boolean majorCompactionInProgress = false; + private volatile boolean majorCompactionWaitingToStart = false; + private Set majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class)); + private volatile boolean minorCompactionInProgress = false; + private volatile boolean minorCompactionWaitingToStart = false; + + private boolean updatingFlushID = false; + + private AtomicReference constraintChecker = new AtomicReference(); + + private final String tabletDirectory; + + private int writesInProgress = 0; + + private static final Logger log = Logger.getLogger(Tablet.class); + public TabletStatsKeeper timer; + + private Rate queryRate = new Rate(0.2); + private long queryCount = 0; + + private Rate queryByteRate = new Rate(0.2); + private long queryBytes = 0; + + private Rate ingestRate = new Rate(0.2); + private long ingestCount = 0; + + private Rate ingestByteRate = new Rate(0.2); + private long ingestBytes = 0; + + private byte[] defaultSecurityLabel = new byte[0]; + + private long lastMinorCompactionFinishTime; + private long lastMapFileImportTime; + + private volatile long numEntries; + private volatile long numEntriesInMemory; + + + // a count of the amount of data read by the iterators + private AtomicLong scannedCount = new AtomicLong(0); + private Rate scannedRate = new Rate(0.2); + + private ConfigurationObserver configObserver; + + private TabletServer tabletServer; + + private final int logId; + // ensure we only have one reader/writer of our bulk file notes at at time + public final Object bulkFileImportLock = new Object(); + + public int getLogId() { + return logId; + } + + public static class TabletClosedException extends RuntimeException { + public TabletClosedException(Exception e) { + super(e); + } + + public TabletClosedException() { + super(); + } + + private static final long serialVersionUID = 1L; + } + + String getNextMapFilename(String prefix) throws IOException { + String extension = FileOperations.getNewFileExtension(tabletServer.getTableConfiguration(extent)); + checkTabletDir(); + return location.toString() + "/" + prefix + UniqueNameAllocator.getInstance().getNextName() + "." + extension; + } + + private void checkTabletDir() throws IOException { + if (!tableDirChecked) { + checkTabletDir(this.location); + tableDirChecked = true; + } + } + + private void checkTabletDir(Path tabletDir) throws IOException { + + FileStatus[] files = null; + try { + files = fs.listStatus(tabletDir); + } catch (FileNotFoundException ex) { + // ignored + } + + if (files == null) { + if (tabletDir.getName().startsWith("c-")) + log.debug("Tablet " + extent + " had no dir, creating " + tabletDir); // its a clone dir... + else + log.warn("Tablet " + extent + " had no dir, creating " + tabletDir); + + fs.mkdirs(tabletDir); + } + } + + private static String rel2abs(String relPath, KeyExtent extent) { + if (relPath.startsWith("../")) + return ServerConstants.getTablesDir() + relPath.substring(2); + else + return ServerConstants.getTablesDir() + "/" + extent.getTableId() + relPath; + } + + class DatafileManager { + // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles + final private Map datafileSizes = Collections.synchronizedMap(new TreeMap()); + + DatafileManager(SortedMap datafileSizes) { + for (Entry datafiles : datafileSizes.entrySet()) + this.datafileSizes.put(new Path(rel2abs(datafiles.getKey(), extent)), datafiles.getValue()); + } + + Path mergingMinorCompactionFile = null; + Set filesToDeleteAfterScan = new HashSet(); + Map> scanFileReservations = new HashMap>(); + MapCounter fileScanReferenceCounts = new MapCounter(); + long nextScanReservationId = 0; + boolean reservationsBlocked = false; + + Set majorCompactingFiles = new HashSet(); + + Pair> reserveFilesForScan() { + synchronized (Tablet.this) { + + while (reservationsBlocked) { + try { + Tablet.this.wait(50); + } catch (InterruptedException e) { + log.warn(e, e); + } + } + + Set absFilePaths = new HashSet(datafileSizes.keySet()); + + long rid = nextScanReservationId++; + + scanFileReservations.put(rid, absFilePaths); + + Map ret = new HashMap(); + + for (Path path : absFilePaths) { + fileScanReferenceCounts.increment(path, 1); + ret.put(path.toString(), datafileSizes.get(path)); + } + + return new Pair>(rid, ret); + } + } + + void returnFilesForScan(Long reservationId) { + + final Set filesToDelete = new HashSet(); + + synchronized (Tablet.this) { + Set absFilePaths = scanFileReservations.remove(reservationId); + + if (absFilePaths == null) + throw new IllegalArgumentException("Unknown scan reservation id " + reservationId); + + boolean notify = false; + for (Path path : absFilePaths) { + long refCount = fileScanReferenceCounts.decrement(path, 1); + if (refCount == 0) { + if (filesToDeleteAfterScan.remove(path)) + filesToDelete.add(path); + notify = true; + } else if (refCount < 0) + throw new IllegalStateException("Scan ref count for " + path + " is " + refCount); + } + + if (notify) + Tablet.this.notifyAll(); + } + + if (filesToDelete.size() > 0) { + log.debug("Removing scan refs from metadata " + extent + " " + abs2rel(filesToDelete)); + MetadataTable.removeScanFiles(extent, abs2rel(filesToDelete), SecurityConstants.getSystemCredentials(), tabletServer.getLock()); + } + } + + private void removeFilesAfterScanRel(Set relPaths) { + Set scanFiles = new HashSet(); + + for (String rpath : relPaths) + scanFiles.add(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId() + rpath)); + + removeFilesAfterScan(scanFiles); + } + + private void removeFilesAfterScan(Set scanFiles) { + if (scanFiles.size() == 0) + return; + + Set filesToDelete = new HashSet(); + + synchronized (Tablet.this) { + for (Path path : scanFiles) { + if (fileScanReferenceCounts.get(path) == 0) + filesToDelete.add(path); + else + filesToDeleteAfterScan.add(path); + } + } + + if (filesToDelete.size() > 0) { + log.debug("Removing scan refs from metadata " + extent + " " + abs2rel(filesToDelete)); + MetadataTable.removeScanFiles(extent, abs2rel(filesToDelete), SecurityConstants.getSystemCredentials(), tabletServer.getLock()); + } + } + + private TreeSet waitForScansToFinish(Set pathsToWaitFor, boolean blockNewScans, long maxWaitTime) { + long startTime = System.currentTimeMillis(); + TreeSet inUse = new TreeSet(); + + Span waitForScans = Trace.start("waitForScans"); + synchronized (Tablet.this) { + if (blockNewScans) { + if (reservationsBlocked) + throw new IllegalStateException(); + + reservationsBlocked = true; + } + + for (Path path : pathsToWaitFor) { + while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) { + try { + Tablet.this.wait(100); + } catch (InterruptedException e) { + log.warn(e, e); + } + } + } + + for (Path path : pathsToWaitFor) { + if (fileScanReferenceCounts.get(path) > 0) + inUse.add(path); + } + + if (blockNewScans) { + reservationsBlocked = false; + Tablet.this.notifyAll(); + } + + } + waitForScans.stop(); + return inUse; + } + + public void importMapFiles(long tid, Map pathsString, boolean setTime) throws IOException { + + String bulkDir = null; + + Map paths = new HashMap(); + for (Entry entry : pathsString.entrySet()) + paths.put(new Path(entry.getKey()), entry.getValue()); + + for (Path tpath : paths.keySet()) { + + if (!tpath.getParent().getParent().equals(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId()))) { + throw new IOException("Map file " + tpath + " not in table dir " + ServerConstants.getTablesDir() + "/" + extent.getTableId()); + } + + if (bulkDir == null) + bulkDir = tpath.getParent().toString(); + else if (!bulkDir.equals(tpath.getParent().toString())) + throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath); + + } + + if (extent.isRootTablet()) { + throw new IllegalArgumentException("Can not import files to root tablet"); + } + + synchronized (bulkFileImportLock) { + TCredentials auths = SecurityConstants.getSystemCredentials(); + Connector conn; + try { + conn = HdfsZooInstance.getInstance().getConnector(auths.getPrincipal(), CredentialHelper.extractToken(auths)); + } catch (Exception ex) { + throw new IOException(ex); + } + // Remove any bulk files we've previously loaded and compacted away + List files = MetadataTable.getBulkFilesLoaded(conn, extent, tid); + for (String file : files) + if (paths.keySet().remove(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId() + file))) + log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file); + + if (paths.size() > 0) { + long bulkTime = Long.MIN_VALUE; + if (setTime) { + for (DataFileValue dfv : paths.values()) { + long nextTime = tabletTime.getAndUpdateTime(); + if (nextTime < bulkTime) + throw new IllegalStateException("Time went backwards unexpectedly " + nextTime + " " + bulkTime); + bulkTime = nextTime; + dfv.setTime(bulkTime); + } + } + + synchronized (timeLock) { + if (bulkTime > persistedTime) + persistedTime = bulkTime; + + MetadataTable.updateTabletDataFile(tid, extent, abs2rel(paths), tabletTime.getMetadataValue(persistedTime), auths, tabletServer.getLock()); + } + } + } + + synchronized (Tablet.this) { + for (Entry tpath : paths.entrySet()) { + if (datafileSizes.containsKey(tpath.getKey())) { + log.error("Adding file that is already in set " + tpath.getKey()); + } + datafileSizes.put(tpath.getKey(), tpath.getValue()); + + } + + tabletResources.importedMapFiles(); + + computeNumEntries(); + } + + for (Entry entry : paths.entrySet()) { + log.log(TLevel.TABLET_HIST, extent + " import " + abs2rel(entry.getKey()) + " " + entry.getValue()); + } + } + + String reserveMergingMinorCompactionFile() { + if (mergingMinorCompactionFile != null) + throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved : " + mergingMinorCompactionFile); + + if (extent.isRootTablet()) + return null; + + int maxFiles = acuTableConf.getMaxFilesPerTablet(); + + // when a major compaction is running and we are at max files, write out + // one extra file... want to avoid the case where major compaction is + // compacting everything except for the largest file, and therefore the + // largest file is returned for merging.. the following check mostly + // avoids this case, except for the case where major compactions fail or + // are canceled + if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles) + return null; + + if (datafileSizes.size() >= maxFiles) { + // find the smallest file + + long min = Long.MAX_VALUE; + Path minName = null; + + for (Entry entry : datafileSizes.entrySet()) { + if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) { + min = entry.getValue().getSize(); + minName = entry.getKey(); + } + } + + if (minName == null) + return null; + + mergingMinorCompactionFile = minName; + return minName.toString(); + } + + return null; + } + + void unreserveMergingMinorCompactionFile(Path file) { + if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null) + || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile))) + throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile); + + mergingMinorCompactionFile = null; + } + + void bringMinorCompactionOnline(String tmpDatafile, String newDatafile, String absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) { + bringMinorCompactionOnline(new Path(tmpDatafile), new Path(newDatafile), absMergeFile == null ? null : new Path(absMergeFile), dfv, commitSession, + flushId); + } + + void bringMinorCompactionOnline(Path tmpDatafile, Path newDatafile, Path absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) { + + IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); + if (extent.isRootTablet()) { + try { + if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) { + throw new IllegalStateException(); + } + } catch (Exception e) { + throw new IllegalStateException("Can not bring major compaction online, lock not held", e); + } + } + + // rename before putting in metadata table, so files in metadata table should + // always exist + do { + try { + if (dfv.getNumEntries() == 0) { + fs.delete(tmpDatafile, true); + } else { + if (fs.exists(newDatafile)) { + log.warn("Target map file already exist " + newDatafile); + fs.delete(newDatafile, true); + } + - if (!fs.rename(tmpDatafile, newDatafile)) { - throw new IOException("rename fails"); - } ++ rename(fs, tmpDatafile, newDatafile); + } + break; + } catch (IOException ioe) { + log.warn("Tablet " + extent + " failed to rename " + abs2rel(newDatafile) + " after MinC, will retry in 60 secs...", ioe); + UtilWaitThread.sleep(60 * 1000); + } + } while (true); + + long t1, t2; + + // the code below always assumes merged files are in use by scans... this must be done + // because the in memory list of files is not updated until after the metadata table + // therefore the file is available to scans until memory is updated, but want to ensure + // the file is not available for garbage collection... if memory were updated + // before this point (like major compactions do), then the following code could wait + // for scans to finish like major compactions do.... used to wait for scans to finish + // here, but that was incorrect because a scan could start after waiting but before + // memory was updated... assuming the file is always in use by scans leads to + // one uneeded metadata update when it was not actually in use + Set filesInUseByScans = Collections.emptySet(); + if (absMergeFile != null) + filesInUseByScans = Collections.singleton(absMergeFile); + + // very important to write delete entries outside of log lock, because + // this !METADATA write does not go up... it goes sideways or to itself + if (absMergeFile != null) + MetadataTable.addDeleteEntries(extent, Collections.singleton(abs2rel(absMergeFile)), SecurityConstants.getSystemCredentials()); + + Set unusedWalLogs = beginClearingUnusedLogs(); + try { + // the order of writing to !METADATA and walog is important in the face of machine/process failures + // need to write to !METADATA before writing to walog, when things are done in the reverse order + // data could be lost... the minor compaction start even should be written before the following metadata + // write is made + TCredentials creds = SecurityConstants.getSystemCredentials(); + + synchronized (timeLock) { + if (commitSession.getMaxCommittedTime() > persistedTime) + persistedTime = commitSession.getMaxCommittedTime(); + + String time = tabletTime.getMetadataValue(persistedTime); + MetadataTable.updateTabletDataFile(extent, abs2rel(newDatafile), abs2rel(absMergeFile), dfv, time, creds, abs2rel(filesInUseByScans), + tabletServer.getClientAddressString(), tabletServer.getLock(), unusedWalLogs, lastLocation, flushId); + } + + } finally { + finishClearingUnusedLogs(); + } + + do { + try { + // the purpose of making this update use the new commit session, instead of the old one passed in, + // is because the new one will reference the logs used by current memory... + + tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2); + break; + } catch (IOException e) { + log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry", e); + UtilWaitThread.sleep(1 * 1000); + } + } while (true); + + synchronized (Tablet.this) { + lastLocation = null; + + t1 = System.currentTimeMillis(); + if (datafileSizes.containsKey(newDatafile)) { + log.error("Adding file that is already in set " + newDatafile); + } + + if (dfv.getNumEntries() > 0) { + datafileSizes.put(newDatafile, dfv); + } + + if (absMergeFile != null) { + datafileSizes.remove(absMergeFile); + } + + unreserveMergingMinorCompactionFile(absMergeFile); + + dataSourceDeletions.incrementAndGet(); + tabletMemory.finishedMinC(); + + lastFlushID = flushId; + + computeNumEntries(); + t2 = System.currentTimeMillis(); + } + + // must do this after list of files in memory is updated above + removeFilesAfterScan(filesInUseByScans); + + if (absMergeFile != null) + log.log(TLevel.TABLET_HIST, extent + " MinC [" + abs2rel(absMergeFile) + ",memory] -> " + abs2rel(newDatafile)); + else + log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " + abs2rel(newDatafile)); + log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0, getExtent().toString())); + if (dfv.getSize() > acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD)) { + log.debug(String.format("Minor Compaction wrote out file larger than split threshold. split threshold = %,d file size = %,d", + acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD), dfv.getSize())); + } + + } + + private Map abs2rel(Map paths) { + TreeMap relMap = new TreeMap(); + + for (Entry entry : paths.entrySet()) + relMap.put(abs2rel(entry.getKey()), entry.getValue()); + + return relMap; + } + + private Set abs2rel(Set absPaths) { + Set relativePaths = new TreeSet(); + for (Path absPath : absPaths) + relativePaths.add(abs2rel(absPath)); + + return relativePaths; + } + + private Set string2path(Set strings) { + Set paths = new HashSet(); + for (String path : strings) + paths.add(new Path(path)); + + return paths; + } + + private String abs2rel(Path absPath) { + if (absPath == null) + return null; + + if (absPath.getParent().getParent().getName().equals(extent.getTableId().toString())) + return "/" + absPath.getParent().getName() + "/" + absPath.getName(); + else + return "../" + absPath.getParent().getParent().getName() + "/" + absPath.getParent().getName() + "/" + absPath.getName(); + } + + public void reserveMajorCompactingFiles(Set files) { + if (majorCompactingFiles.size() != 0) + throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles); + + Set mcf = string2path(files); + if (mergingMinorCompactionFile != null && mcf.contains(mergingMinorCompactionFile)) + throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile); + + majorCompactingFiles.addAll(mcf); + } + + public void clearMajorCompactingFile() { + majorCompactingFiles.clear(); + } + + void bringMajorCompactionOnline(Set oldDatafiles, String tmpDatafile, String newDatafile, Long compactionId, DataFileValue dfv) throws IOException { + bringMajorCompactionOnline(string2path(oldDatafiles), new Path(tmpDatafile), new Path(newDatafile), compactionId, dfv); + } + + void bringMajorCompactionOnline(Set oldDatafiles, Path tmpDatafile, Path newDatafile, Long compactionId, DataFileValue dfv) throws IOException { + long t1, t2; + + if (!extent.isRootTablet()) { + + if (fs.exists(newDatafile)) { + log.error("Target map file already exist " + newDatafile, new Exception()); + throw new IllegalStateException("Target map file already exist " + newDatafile); + } + + // rename before putting in metadata table, so files in metadata table should + // always exist - if (!fs.rename(tmpDatafile, newDatafile)) - log.warn("Rename of " + tmpDatafile + " to " + newDatafile + " returned false"); ++ rename(fs, tmpDatafile, newDatafile); + + if (dfv.getNumEntries() == 0) { + fs.delete(newDatafile, true); + } + } + + TServerInstance lastLocation = null; + synchronized (Tablet.this) { + + t1 = System.currentTimeMillis(); + + IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); + + dataSourceDeletions.incrementAndGet(); + + if (extent.isRootTablet()) { + + waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE); + + try { + if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) { + throw new IllegalStateException(); + } + } catch (Exception e) { + throw new IllegalStateException("Can not bring major compaction online, lock not held", e); + } + + // mark files as ready for deletion, but + // do not delete them until we successfully + // rename the compacted map file, in case + // the system goes down + + String compactName = newDatafile.getName(); + + for (Path path : oldDatafiles) { - fs.rename(path, new Path(location + "/delete+" + compactName + "+" + path.getName())); ++ rename(fs, path, new Path(location + "/delete+" + compactName + "+" + path.getName())); + } + + if (fs.exists(newDatafile)) { + log.error("Target map file already exist " + newDatafile, new Exception()); + throw new IllegalStateException("Target map file already exist " + newDatafile); + } + - if (!fs.rename(tmpDatafile, newDatafile)) - log.warn("Rename of " + tmpDatafile + " to " + newDatafile + " returned false"); ++ rename(fs, tmpDatafile, newDatafile); + + // start deleting files, if we do not finish they will be cleaned + // up later + Trash trash = new Trash(fs, fs.getConf()); + for (Path path : oldDatafiles) { + Path deleteFile = new Path(location + "/delete+" + compactName + "+" + path.getName()); + if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) || !trash.moveToTrash(deleteFile)) + fs.delete(deleteFile, true); + } + } + + // atomically remove old files and add new file + for (Path oldDatafile : oldDatafiles) { + if (!datafileSizes.containsKey(oldDatafile)) { + log.error("file does not exist in set " + oldDatafile); + } + datafileSizes.remove(oldDatafile); + majorCompactingFiles.remove(oldDatafile); + } + + if (datafileSizes.containsKey(newDatafile)) { + log.error("Adding file that is already in set " + newDatafile); + } + + if (dfv.getNumEntries() > 0) { + datafileSizes.put(newDatafile, dfv); + } + + // could be used by a follow on compaction in a multipass compaction + majorCompactingFiles.add(newDatafile); + + computeNumEntries(); + + lastLocation = Tablet.this.lastLocation; + Tablet.this.lastLocation = null; + + if (compactionId != null) + lastCompactID = compactionId; + + t2 = System.currentTimeMillis(); + } + + if (!extent.isRootTablet()) { + Set filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000); + if (filesInUseByScans.size() > 0) + log.debug("Adding scan refs to metadata " + extent + " " + abs2rel(filesInUseByScans)); + MetadataTable.replaceDatafiles(extent, abs2rel(oldDatafiles), abs2rel(filesInUseByScans), abs2rel(newDatafile), compactionId, dfv, + SecurityConstants.getSystemCredentials(), tabletServer.getClientAddressString(), lastLocation, tabletServer.getLock()); + removeFilesAfterScan(filesInUseByScans); + } + + log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0)); + log.log(TLevel.TABLET_HIST, extent + " MajC " + abs2rel(oldDatafiles) + " --> " + abs2rel(newDatafile)); + } + + public SortedMap getDatafileSizesRel() { + synchronized (Tablet.this) { + TreeMap files = new TreeMap(); + Set> es = datafileSizes.entrySet(); + + for (Entry entry : es) { + files.put(abs2rel(entry.getKey()), entry.getValue()); + } + + return Collections.unmodifiableSortedMap(files); + } + } + + public SortedMap getDatafileSizes() { + synchronized (Tablet.this) { + TreeMap files = new TreeMap(); + Set> es = datafileSizes.entrySet(); + + for (Entry entry : es) { + files.put(entry.getKey().toString(), entry.getValue()); + } + + return Collections.unmodifiableSortedMap(files); + } + } + + public Set getFiles() { + synchronized (Tablet.this) { + HashSet files = new HashSet(); + for (Path path : datafileSizes.keySet()) { + files.add(path.toString()); + } + return Collections.unmodifiableSet(files); + } + } + + } + + public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap tabletsKeyValues) + throws IOException { + this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), tabletsKeyValues); + splitCreationTime = 0; + } + + public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap datafiles, String time, + long initFlushID, long initCompactID) throws IOException { + this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), datafiles, time, initFlushID, initCompactID); + splitCreationTime = System.currentTimeMillis(); + } + + private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, + SortedMap tabletsKeyValues) throws IOException { + this(tabletServer, location, extent, trm, conf, TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration())), + tabletsKeyValues); + } + + static private final List EMPTY = Collections.emptyList(); + + private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, + SortedMap datafiles, String time, long initFlushID, long initCompactID) throws IOException { + this(tabletServer, location, extent, trm, conf, TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration())), EMPTY, + datafiles, time, null, new HashSet(), initFlushID, initCompactID); + } + + private static String lookupTime(AccumuloConfiguration conf, KeyExtent extent, SortedMap tabletsKeyValues) { + SortedMap entries; + + if (extent.isRootTablet()) { + return null; + } else { + entries = new TreeMap(); + Text rowName = extent.getMetadataEntry(); + for (Entry entry : tabletsKeyValues.entrySet()) { + if (entry.getKey().compareRow(rowName) == 0 && Constants.METADATA_TIME_COLUMN.hasColumns(entry.getKey())) { + entries.put(new Key(entry.getKey()), new Value(entry.getValue())); + } + } + } + + // log.debug("extent : "+extent+" entries : "+entries); + + if (entries.size() == 1) + return entries.values().iterator().next().toString(); + return null; + } + + private static SortedMap lookupDatafiles(AccumuloConfiguration conf, Text locText, FileSystem fs, KeyExtent extent, + SortedMap tabletsKeyValues) throws IOException { + Path location = new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId().toString() + locText.toString()); + + TreeMap datafiles = new TreeMap(); + + if (extent.isRootTablet()) { // the meta0 tablet + // cleanUpFiles() has special handling for delete. files + FileStatus[] files = fs.listStatus(location); + Path[] paths = new Path[files.length]; + for (int i = 0; i < files.length; i++) { + paths[i] = files[i].getPath(); + } + Collection goodPaths = cleanUpFiles(fs, files, location, true); + for (String path : goodPaths) { + String filename = new Path(path).getName(); + DataFileValue dfv = new DataFileValue(0, 0); + datafiles.put(locText.toString() + "/" + filename, dfv); + } + } else { + + SortedMap datafilesMetadata; + + Text rowName = extent.getMetadataEntry(); + + ScannerImpl mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), Constants.METADATA_TABLE_ID, + Constants.NO_AUTHS); + + // Commented out because when no data file is present, each tablet will scan through metadata table and return nothing + // reduced batch size to improve performance + // changed here after endKeys were implemented from 10 to 1000 + mdScanner.setBatchSize(1000); + + // leave these in, again, now using endKey for safety + mdScanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY); + + mdScanner.setRange(new Range(rowName)); + + datafilesMetadata = new TreeMap(); + + for (Entry entry : mdScanner) { + + if (entry.getKey().compareRow(rowName) != 0) { + break; + } + + datafilesMetadata.put(new Key(entry.getKey()), new Value(entry.getValue())); + } + + Iterator> dfmdIter = datafilesMetadata.entrySet().iterator(); + + while (dfmdIter.hasNext()) { + Entry entry = dfmdIter.next(); + + datafiles.put(entry.getKey().getColumnQualifier().toString(), new DataFileValue(entry.getValue().get())); + } + } + return datafiles; + } + + private static List lookupLogEntries(KeyExtent ke, SortedMap tabletsKeyValues) { + List logEntries = new ArrayList(); + + if (ke.isMeta()) { + try { + logEntries = MetadataTable.getLogEntries(SecurityConstants.getSystemCredentials(), ke); + } catch (Exception ex) { + throw new RuntimeException("Unable to read tablet log entries", ex); + } + } else { + log.debug("Looking at metadata " + tabletsKeyValues); + Text row = ke.getMetadataEntry(); + for (Entry entry : tabletsKeyValues.entrySet()) { + Key key = entry.getKey(); + if (key.getRow().equals(row)) { + if (key.getColumnFamily().equals(Constants.METADATA_LOG_COLUMN_FAMILY)) { + logEntries.add(MetadataTable.entryFromKeyValue(key, entry.getValue())); + } + } + } + } + + log.debug("got " + logEntries + " for logs for " + ke); + return logEntries; + } + + private static Set lookupScanFiles(KeyExtent extent, SortedMap tabletsKeyValues) { + HashSet scanFiles = new HashSet(); + + Text row = extent.getMetadataEntry(); + for (Entry entry : tabletsKeyValues.entrySet()) { + Key key = entry.getKey(); + if (key.getRow().equals(row) && key.getColumnFamily().equals(Constants.METADATA_SCANFILE_COLUMN_FAMILY)) { + scanFiles.add(key.getColumnQualifier().toString()); + } + } + + return scanFiles; + } + + private static long lookupFlushID(KeyExtent extent, SortedMap tabletsKeyValues) { + Text row = extent.getMetadataEntry(); + for (Entry entry : tabletsKeyValues.entrySet()) { + Key key = entry.getKey(); + if (key.getRow().equals(row) && Constants.METADATA_FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) + return Long.parseLong(entry.getValue().toString()); + } + + return -1; + } + + private static long lookupCompactID(KeyExtent extent, SortedMap tabletsKeyValues) { + Text row = extent.getMetadataEntry(); + for (Entry entry : tabletsKeyValues.entrySet()) { + Key key = entry.getKey(); + if (key.getRow().equals(row) && Constants.METADATA_COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) + return Long.parseLong(entry.getValue().toString()); + } + + return -1; + } + + private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, FileSystem fs, + SortedMap tabletsKeyValues) throws IOException { + this(tabletServer, location, extent, trm, conf, fs, lookupLogEntries(extent, tabletsKeyValues), lookupDatafiles(tabletServer.getSystemConfiguration(), + location, fs, extent, tabletsKeyValues), lookupTime(tabletServer.getSystemConfiguration(), extent, tabletsKeyValues), lookupLastServer(extent, + tabletsKeyValues), lookupScanFiles(extent, tabletsKeyValues), lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, tabletsKeyValues)); + } + + private static TServerInstance lookupLastServer(KeyExtent extent, SortedMap tabletsKeyValues) { + for (Entry entry : tabletsKeyValues.entrySet()) { + if (entry.getKey().getColumnFamily().compareTo(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY) == 0) { + return new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier()); + } + } + return null; + } + + /** + * yet another constructor - this one allows us to avoid costly lookups into the Metadata table if we already know the files we need - as at split time + */ + private Tablet(final TabletServer tabletServer, final Text location, final KeyExtent extent, final TabletResourceManager trm, final Configuration conf, + final FileSystem fs, final List logEntries, final SortedMap datafiles, String time, final TServerInstance lastLocation, + Set scanFiles, long initFlushID, long initCompactID) throws IOException { + this.location = new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId().toString() + location.toString()); + this.lastLocation = lastLocation; + this.tabletDirectory = location.toString(); + this.conf = conf; + this.acuTableConf = tabletServer.getTableConfiguration(extent); + + this.fs = fs; + this.extent = extent; + this.tabletResources = trm; + + this.lastFlushID = initFlushID; + this.lastCompactID = initCompactID; + + if (extent.isRootTablet()) { + + long rtime = Long.MIN_VALUE; + for (String path : datafiles.keySet()) { + String filename = new Path(path).getName(); + + FileSKVIterator reader = FileOperations.getInstance().openReader(this.location + "/" + filename, true, fs, fs.getConf(), + tabletServer.getTableConfiguration(extent)); + long maxTime = -1; + try { + + while (reader.hasTop()) { + maxTime = Math.max(maxTime, reader.getTopKey().getTimestamp()); + reader.next(); + } + + } finally { + reader.close(); + } + + if (maxTime > rtime) { + time = TabletTime.LOGICAL_TIME_ID + "" + maxTime; + rtime = maxTime; + } + } + } + + this.tabletServer = tabletServer; + this.logId = tabletServer.createLogId(extent); + + this.timer = new TabletStatsKeeper(); + + setupDefaultSecurityLabels(extent); + + tabletMemory = new TabletMemory(); + tabletTime = TabletTime.getInstance(time); + persistedTime = tabletTime.getTime(); + + acuTableConf.addObserver(configObserver = new ConfigurationObserver() { + + private void reloadConstraints() { + constraintChecker.set(new ConstraintChecker(getTableConfiguration())); + } + + public void propertiesChanged() { + reloadConstraints(); + + try { + setupDefaultSecurityLabels(extent); + } catch (Exception e) { + log.error("Failed to reload default security labels for extent: " + extent.toString()); + } + } + + public void propertyChanged(String prop) { + if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey())) + reloadConstraints(); + else if (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) { + try { + log.info("Default security labels changed for extent: " + extent.toString()); + setupDefaultSecurityLabels(extent); + } catch (Exception e) { + log.error("Failed to reload default security labels for extent: " + extent.toString()); + } + } + + } + + public void sessionExpired() { + log.debug("Session expired, no longer updating per table props..."); + } + + }); + // Force a load of any per-table properties + configObserver.propertiesChanged(); + + tabletResources.setTablet(this, acuTableConf); + + if (!logEntries.isEmpty()) { + log.info("Starting Write-Ahead Log recovery for " + this.extent); + final long[] count = new long[2]; + final CommitSession commitSession = tabletMemory.getCommitSession(); + count[1] = Long.MIN_VALUE; + try { + Set absPaths = new HashSet(); + for (String relPath : datafiles.keySet()) + absPaths.add(rel2abs(relPath, extent)); + + tabletServer.recover(this, logEntries, absPaths, new MutationReceiver() { + public void receive(Mutation m) { + // LogReader.printMutation(m); + Collection muts = m.getUpdates(); + for (ColumnUpdate columnUpdate : muts) { + if (!columnUpdate.hasTimestamp()) { + // if it is not a user set timestamp, it must have been set + // by the system + count[1] = Math.max(count[1], columnUpdate.getTimestamp()); + } + } + tabletMemory.mutate(commitSession, Collections.singletonList(m)); + count[0]++; + } + }); + + if (count[1] != Long.MIN_VALUE) { + tabletTime.useMaxTimeFromWALog(count[1]); + } + commitSession.updateMaxCommittedTime(tabletTime.getTime()); + + if (count[0] == 0) { + MetadataTable.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock()); + logEntries.clear(); + } + + } catch (Throwable t) { + if (acuTableConf.getBoolean(Property.TABLE_FAILURES_IGNORE)) { + log.warn("Error recovering from log files: ", t); + } else { + throw new RuntimeException(t); + } + } + // make some closed references that represent the recovered logs + currentLogs = new HashSet(); + for (LogEntry logEntry : logEntries) { + for (String log : logEntry.logSet) { + String[] parts = log.split("/", 2); + currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, parts[1])); + } + } + + log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries() + + " entries created)"); + } + + String contextName = acuTableConf.get(Property.TABLE_CLASSPATH); + if (contextName != null && !contextName.equals("")) { + // initialize context classloader, instead of possibly waiting for it to initialize for a scan + // TODO this could hang, causing other tablets to fail to load - ACCUMULO-1292 + AccumuloVFSClassLoader.getContextManager().getClassLoader(contextName); + } + + // do this last after tablet is completely setup because it + // could cause major compaction to start + datafileManager = new DatafileManager(datafiles); + + computeNumEntries(); + + datafileManager.removeFilesAfterScanRel(scanFiles); + + // look for hints of a failure on the previous tablet server + if (!logEntries.isEmpty() || needsMajorCompaction(MajorCompactionReason.NORMAL)) { + // look for any temp files hanging around + removeOldTemporaryFiles(); + } + + // look for hints of a failure on the previous tablet server + if (!logEntries.isEmpty() || needsMajorCompaction(MajorCompactionReason.NORMAL)) { + // look for any temp files hanging around + removeOldTemporaryFiles(); + } + + log.log(TLevel.TABLET_HIST, extent + " opened "); + } + + private void removeOldTemporaryFiles() { + // remove any temporary files created by a previous tablet server + try { + for (FileStatus tmp : fs.globStatus(new Path(location, "*_tmp"))){ + try { + fs.delete(tmp.getPath(), true); + } catch (IOException ex) { + log.error("Unable to remove old temp file " + tmp.getPath() + ": " + ex); + } + } + } catch (IOException ex) { + log.error("Error scanning for old temp files in " + location); + } + } + + private void setupDefaultSecurityLabels(KeyExtent extent) { + if (extent.isMeta()) { + defaultSecurityLabel = new byte[0]; + } else { + try { + ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY)); + this.defaultSecurityLabel = cv.getExpression(); + } catch (Exception e) { + log.error(e, e); + this.defaultSecurityLabel = new byte[0]; + } + } + } + + private static Collection cleanUpFiles(FileSystem fs, FileStatus[] files, Path location, boolean deleteTmp) throws IOException { + /* + * called in constructor and before major compactions + */ + Collection goodFiles = new ArrayList(files.length); + + for (FileStatus file : files) { + + String path = file.getPath().toString(); + String filename = file.getPath().getName(); + + // check for incomplete major compaction, this should only occur + // for root tablet + if (filename.startsWith("delete+")) { + String expectedCompactedFile = location.toString() + "/" + filename.split("\\+")[1]; + if (fs.exists(new Path(expectedCompactedFile))) { + // compaction finished, but did not finish deleting compacted files.. so delete it + if (!fs.delete(file.getPath(), true)) + log.warn("Delete of file: " + file.getPath().toString() + " return false"); + continue; + } + // compaction did not finish, so put files back + + // reset path and filename for rest of loop + filename = filename.split("\\+", 3)[2]; + path = location + "/" + filename; + - if (!fs.rename(file.getPath(), new Path(path))) - log.warn("Rename of " + file.getPath().toString() + " to " + path + " returned false"); ++ rename(fs, file.getPath(), new Path(path)); + } + + if (filename.endsWith("_tmp")) { + if (deleteTmp) { + log.warn("cleaning up old tmp file: " + path); + if (!fs.delete(file.getPath(), true)) + log.warn("Delete of tmp file: " + file.getPath().toString() + " return false"); + + } + continue; + } + + if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) { + log.error("unknown file in tablet" + path); + continue; + } + + goodFiles.add(path); + } + + return goodFiles; + } + + public static class KVEntry extends KeyValue { + public KVEntry(Key k, Value v) { + super(new Key(k), Arrays.copyOf(v.get(), v.get().length)); + } + + public String toString() { + return key.toString() + "=" + getValue(); + } + + int numBytes() { + return key.getSize() + getValue().get().length; + } + + int estimateMemoryUsed() { + return key.getSize() + getValue().get().length + (9 * 32); // overhead is 32 per object + } + } + + private LookupResult lookup(SortedKeyValueIterator mmfi, List ranges, HashSet columnSet, ArrayList results, + long maxResultsSize) throws IOException { + + LookupResult lookupResult = new LookupResult(); + + boolean exceededMemoryUsage = false; + boolean tabletClosed = false; + + Set cfset = null; + if (columnSet.size() > 0) + cfset = LocalityGroupUtil.families(columnSet); + + for (Range range : ranges) { + + if (exceededMemoryUsage || tabletClosed) { + lookupResult.unfinishedRanges.add(range); + continue; + } + + int entriesAdded = 0; + + try { + if (cfset != null) + mmfi.seek(range, cfset, true); + else + mmfi.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false); + + while (mmfi.hasTop()) { + Key key = mmfi.getTopKey(); + + KVEntry kve = new KVEntry(key, mmfi.getTopValue()); + results.add(kve); + entriesAdded++; + lookupResult.bytesAdded += kve.estimateMemoryUsed(); + lookupResult.dataSize += kve.numBytes(); + + exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize; + + if (exceededMemoryUsage) { + addUnfinishedRange(lookupResult, range, key, false); + break; + } + + mmfi.next(); + } + + } catch (TooManyFilesException tmfe) { + // treat this as a closed tablet, and let the client retry + log.warn("Tablet " + getExtent() + " has too many files, batch lookup can not run"); + handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded); + tabletClosed = true; + } catch (IOException ioe) { + if (shutdownInProgress()) { + // assume HDFS shutdown hook caused this exception + log.debug("IOException while shutdown in progress ", ioe); + handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded); + tabletClosed = true; + } else { + throw ioe; + } + } catch (IterationInterruptedException iie) { + if (isClosed()) { + handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded); + tabletClosed = true; + } else { + throw iie; + } + } catch (TabletClosedException tce) { + handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded); + tabletClosed = true; + } + + } + + return lookupResult; + } + + private void handleTabletClosedDuringScan(ArrayList results, LookupResult lookupResult, boolean exceededMemoryUsage, Range range, int entriesAdded) { + if (exceededMemoryUsage) + throw new IllegalStateException("tablet should not exceed memory usage or close, not both"); + + if (entriesAdded > 0) + addUnfinishedRange(lookupResult, range, results.get(results.size() - 1).key, false); + else + lookupResult.unfinishedRanges.add(range); + + lookupResult.closed = true; + } + + private void addUnfinishedRange(LookupResult lookupResult, Range range, Key key, boolean inclusiveStartKey) { + if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) { + Range nlur = new Range(new Key(key), inclusiveStartKey, range.getEndKey(), range.isEndKeyInclusive()); + lookupResult.unfinishedRanges.add(nlur); + } + } + + public static interface KVReceiver { + void receive(List matches) throws IOException; + } + + class LookupResult { + List unfinishedRanges = new ArrayList(); + long bytesAdded = 0; + long dataSize = 0; + boolean closed = false; + } + + public LookupResult lookup(List ranges, HashSet columns, Authorizations authorizations, ArrayList results, long maxResultSize, + List ssiList, Map> ssio, AtomicBoolean interruptFlag) throws IOException { + + if (ranges.size() == 0) { + return new LookupResult(); + } + + ranges = Range.mergeOverlapping(ranges); + Collections.sort(ranges); + + Range tabletRange = extent.toDataRange(); + for (Range range : ranges) { + // do a test to see if this range falls within the tablet, if it does not + // then clip will throw an exception + tabletRange.clip(range); + } + + ScanDataSource dataSource = new ScanDataSource(authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag); + + LookupResult result = null; + + try { + SortedKeyValueIterator iter = new SourceSwitchingIterator(dataSource); + result = lookup(iter, ranges, columns, results, maxResultSize); + return result; + } catch (IOException ioe) { + dataSource.close(true); + throw ioe; + } finally { + // code in finally block because always want + // to return mapfiles, even when exception is thrown + dataSource.close(false); + + synchronized (this) { + queryCount += results.size(); + if (result != null) + queryBytes += result.dataSize; + } + } + } + + private Batch nextBatch(SortedKeyValueIterator iter, Range range, int num, HashSet columns) throws IOException { + + // log.info("In nextBatch.."); + + List results = new ArrayList(); + Key key = null; + + Value value; + long resultSize = 0L; + long resultBytes = 0L; + + long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM); + + if (columns.size() == 0) { + iter.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false); + } else { + iter.seek(range, LocalityGroupUtil.families(columns), true); + } + + Key continueKey = null; + boolean skipContinueKey = false; + + boolean endOfTabletReached = false; + while (iter.hasTop()) { + + value = (Value) iter.getTopValue(); + key = iter.getTopKey(); + + KVEntry kvEntry = new KVEntry(key, value); // copies key and value + results.add(kvEntry); + resultSize += kvEntry.estimateMemoryUsed(); + resultBytes += kvEntry.numBytes(); + + if (resultSize >= maxResultsSize || results.size() >= num) { + continueKey = new Key(key); + skipContinueKey = true; + break; + } + + iter.next(); + } + + if (iter.hasTop() == false) { + endOfTabletReached = true; + } + + Batch retBatch = new Batch(); + retBatch.numBytes = resultBytes; + + if (!endOfTabletReached) { + retBatch.continueKey = continueKey; + retBatch.skipContinueKey = skipContinueKey; + } else { + retBatch.continueKey = null; + } + + if (endOfTabletReached && results.size() == 0) + retBatch.results = null; + else + retBatch.results = results; + + return retBatch; + } + + /** + * Determine if a JVM shutdown is in progress. + * + */ + private boolean shutdownInProgress() { + try { + Runtime.getRuntime().removeShutdownHook(new Thread(new Runnable() { + @Override + public void run() {} + })); + } catch (IllegalStateException ise) { + return true; + } + + return false; + } + + private class Batch { + public boolean skipContinueKey; + public List results; + public Key continueKey; + public long numBytes; + } + + Scanner createScanner(Range range, int num, HashSet columns, Authorizations authorizations, List ssiList, + Map> ssio, boolean isolated, AtomicBoolean interruptFlag) { + // do a test to see if this range falls within the tablet, if it does not + // then clip will throw an exception + extent.toDataRange().clip(range); + + ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated); + return new Scanner(range, opts); + } + + class ScanBatch { + boolean more; + List results; + + ScanBatch(List results, boolean more) { + this.results = results; + this.more = more; + } + } + + class Scanner { + + private ScanOptions options; + private Range range; + private SortedKeyValueIterator isolatedIter; + private ScanDataSource isolatedDataSource; + private boolean sawException = false; + private boolean scanClosed = false; + + Scanner(Range range, ScanOptions options) { + this.range = range; + this.options = options; + } + + synchronized ScanBatch read() throws IOException, TabletClosedException { + + if (sawException) + throw new IllegalStateException("Tried to use scanner after exception occurred."); + + if (scanClosed) + throw new IllegalStateException("Tried to use scanner after it was closed."); + + Batch results = null; + + ScanDataSource dataSource; + + if (options.isolated) { + if (isolatedDataSource == null) + isolatedDataSource = new ScanDataSource(options); + dataSource = isolatedDataSource; + } else { + dataSource = new ScanDataSource(options); + } + + try { + + SortedKeyValueIterator iter; + + if (options.isolated) { + if (isolatedIter == null) + isolatedIter = new SourceSwitchingIterator(dataSource, true); + else + isolatedDataSource.fileManager.reattach(); + iter = isolatedIter; + } else { + iter = new SourceSwitchingIterator(dataSource, false); + } + + results = nextBatch(iter, range, options.num, options.columnSet); + + if (results.results == null) { + range = null; + return new ScanBatch(new ArrayList(), false); + } else if (results.continueKey == null) { + return new ScanBatch(results.results, false); + } else { + range = new Range(results.continueKey, !results.skipContinueKey, range.getEndKey(), range.isEndKeyInclusive()); + return new ScanBatch(results.results, true); + } + + } catch (IterationInterruptedException iie) { + sawException = true; + if (isClosed()) + throw new TabletClosedException(iie); + else + throw iie; + } catch (IOException ioe) { + if (shutdownInProgress()) { + log.debug("IOException while shutdown in progress ", ioe); + throw new TabletClosedException(ioe); // assume IOException was caused by execution of HDFS shutdown hook + } + + sawException = true; + dataSource.close(true); + throw ioe; + } catch (RuntimeException re) { + sawException = true; + throw re; + } finally { + // code in finally block because always want + // to return mapfiles, even when exception is thrown + if (!options.isolated) + dataSource.close(false); + else if (dataSource.fileManager != null) + dataSource.fileManager.detach(); + + synchronized (Tablet.this) { + if (results != null && results.results != null) { + long more = results.results.size(); + queryCount += more; + queryBytes += results.numBytes; + } + } + } + } + + // close and read are synchronized because can not call close on the data source while it is in use + // this cloud lead to the case where file iterators that are in use by a thread are returned + // to the pool... this would be bad + void close() { + options.interruptFlag.set(true); + synchronized (this) { + scanClosed = true; + if (isolatedDataSource != null) + isolatedDataSource.close(false); + } + } + } + + static class ScanOptions { + + // scan options + Authorizations authorizations; + byte[] defaultLabels; + HashSet columnSet; + List ssiList; + Map> ssio; + AtomicBoolean interruptFlag; + int num; + boolean isolated; + + ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, HashSet columnSet, List ssiList, + Map> ssio, AtomicBoolean interruptFlag, boolean isolated) { + this.num = num; + this.authorizations = authorizations; + this.defaultLabels = defaultLabels; + this.columnSet = columnSet; + this.ssiList = ssiList; + this.ssio = ssio; + this.interruptFlag = interruptFlag; + this.isolated = isolated; + } + + } + + class ScanDataSource implements DataSource { + + // data source state + private ScanFileManager fileManager; + private SortedKeyValueIterator iter; + private long expectedDeletionCount; + private List memIters = null; + private long fileReservationId; + private AtomicBoolean interruptFlag; + private StatsIterator statsIterator; + + ScanOptions options; + + ScanDataSource(Authorizations authorizations, byte[] defaultLabels, HashSet columnSet, List ssiList, Map> ssio, + AtomicBoolean interruptFlag) { + expectedDeletionCount = dataSourceDeletions.get(); + this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false); + this.interruptFlag = interruptFlag; + } + + ScanDataSource(ScanOptions options) { + expectedDeletionCount = dataSourceDeletions.get(); + this.options = options; + this.interruptFlag = options.interruptFlag; + } + + @Override + public DataSource getNewDataSource() { + if (!isCurrent()) { + // log.debug("Switching data sources during a scan"); + if (memIters != null) { + tabletMemory.returnIterators(memIters); + memIters = null; + datafileManager.returnFilesForScan(fileReservationId); + fileReservationId = -1; + } + + if (fileManager != null) + fileManager.releaseOpenFiles(false); + + expectedDeletionCount = dataSourceDeletions.get(); + iter = null; + + return this; + } else + return this; + } + + @Override + public boolean isCurrent() { + return expectedDeletionCount == dataSourceDeletions.get(); + } + + @Override + public SortedKeyValueIterator iterator() throws IOException { + if (iter == null) + iter = createIterator(); + return iter; + } + + private SortedKeyValueIterator createIterator() throws IOException { + + Map files; + + synchronized (Tablet.this) { + + if (memIters != null) + throw new IllegalStateException("Tried to create new scan iterator w/o releasing memory"); + + if (Tablet.this.closed) + throw new TabletClosedException(); + + if (interruptFlag.get()) + throw new IterationInterruptedException(extent.toString() + " " + interruptFlag.hashCode()); + + // only acquire the file manager when we know the tablet is open + if (fileManager == null) { + fileManager = tabletResources.newScanFileManager(); + activeScans.add(this); + } + + if (fileManager.getNumOpenFiles() != 0) + throw new IllegalStateException("Tried to create new scan iterator w/o releasing files"); + + // set this before trying to get iterators in case + // getIterators() throws an exception + expectedDeletionCount = dataSourceDeletions.get(); + + memIters = tabletMemory.getIterators(); + Pair> reservation = datafileManager.reserveFilesForScan(); + fileReservationId = reservation.getFirst(); + files = reservation.getSecond(); + } + + Collection mapfiles = fileManager.openFiles(files, options.isolated); + + List> iters = new ArrayList>(mapfiles.size() + memIters.size()); + + iters.addAll(mapfiles); + iters.addAll(memIters); + + for (SortedKeyValueIterator skvi : iters) + ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag); + + MultiIterator multiIter = new MultiIterator(iters, extent); + + TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, acuTableConf, fileManager, files); + + statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, scannedCount); + + DeletingIterator delIter = new DeletingIterator(statsIterator, false); + + ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); + + ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.columnSet); + + VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.authorizations, options.defaultLabels); + + return iterEnv.getTopLevelIterator(IteratorUtil + .loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.ssiList, options.ssio, iterEnv)); + } + + private void close(boolean sawErrors) { + + if (memIters != null) { + tabletMemory.returnIterators(memIters); + memIters = null; + datafileManager.returnFilesForScan(fileReservationId); + fileReservationId = -1; + } + + synchronized (Tablet.this) { + activeScans.remove(this); + if (activeScans.size() == 0) + Tablet.this.notifyAll(); + } + + if (fileManager != null) { + fileManager.releaseOpenFiles(sawErrors); + fileManager = null; + } + + if (statsIterator != null) { + statsIterator.report(); + } + + } + + public void interrupt() { + interruptFlag.set(true); + } + + @Override + public DataSource getDeepCopyDataSource(IteratorEnvironment env) { + throw new UnsupportedOperationException(); + } + + } + + private DataFileValue minorCompact(Configuration conf, FileSystem fs, InMemoryMap memTable, String tmpDatafile, String newDatafile, String mergeFile, + boolean hasQueueTime, long queued, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { + boolean failed = false; + long start = System.currentTimeMillis(); + timer.incrementStatusMinor(); + + long count = 0; + + try { + Span span = Trace.start("write"); + count = memTable.getNumEntries(); + + DataFileValue dfv = null; + if (mergeFile != null) + dfv = datafileManager.getDatafileSizes().get(mergeFile); + + MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent, mincReason); + CompactionStats stats = compactor.call(); + + span.stop(); + span = Trace.start("bringOnline"); + datafileManager.bringMinorCompactionOnline(tmpDatafile, newDatafile, mergeFile, new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), + commitSession, flushId); + span.stop(); + return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()); + } catch (RuntimeException E) { + failed = true; + throw E; + } catch (Error E) { + // Weird errors like "OutOfMemoryError" when trying to create the thread for the compaction + failed = true; + throw new RuntimeException(E); + } finally { + try { + tabletMemory.finalizeMinC(); + } catch (Throwable t) { + log.error("Failed to free tablet memory", t); + } + + if (!failed) { + lastMinorCompactionFinishTime = System.currentTimeMillis(); + } + if (tabletServer.mincMetrics.isEnabled()) + tabletServer.mincMetrics.add(TabletServerMinCMetrics.minc, (lastMinorCompactionFinishTime - start)); + if (hasQueueTime) { + timer.updateTime(Operation.MINOR, queued, start, count, failed); + if (tabletServer.mincMetrics.isEnabled()) + tabletServer.mincMetrics.add(TabletServerMinCMetrics.queue, (start - queued)); + } else + timer.updateTime(Operation.MINOR, start, count, failed); + } + } + + private class MinorCompactionTask implements Runnable { + + private long queued; + private CommitSession commitSession; + private DataFileValue stats; + private String mergeFile; + private long flushId; + private MinorCompactionReason mincReason; + + MinorCompactionTask(String mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { + queued = System.currentTimeMillis(); + minorCompactionWaitingToStart = true; + this.commitSession = commitSession; + this.mergeFile = mergeFile; + this.flushId = flushId; + this.mincReason = mincReason; + } + + public void run() { + minorCompactionWaitingToStart = false; + minorCompactionInProgress = true; + Span minorCompaction = Trace.on("minorCompaction"); + try { + String newMapfileLocation = getNextMapFilename(mergeFile == null ? "F" : "M"); + Span span = Trace.start("waitForCommits"); + synchronized (Tablet.this) { + commitSession.waitForCommitsToFinish(); + } + span.stop(); + span = Trace.start("start"); + while (true) { + try { + // the purpose of the minor compaction start event is to keep track of the filename... in the case + // where the metadata table write for the minor compaction finishes and the process dies before + // writing the minor compaction finish event, then the start event+filename in metadata table will + // prevent recovery of duplicate data... the minor compaction start event could be written at any time + // before the metadata write for the minor compaction + tabletServer.minorCompactionStarted(commitSession, commitSession.getWALogSeq() + 1, newMapfileLocation); + break; + } catch (IOException e) { + log.warn("Failed to write to write ahead log " + e.getMessage(), e); + } + } + span.stop(); + span = Trace.start("compact"); + this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), newMapfileLocation + "_tmp", newMapfileLocation, mergeFile, true, queued, + commitSession, flushId, mincReason); + span.stop(); + + if (needsSplit()) { + tabletServer.executeSplit(Tablet.this); + } else { + initiateMajorCompaction(MajorCompactionReason.NORMAL); + } + } catch (Throwable t) { + log.error("Unknown error during minor compaction for extent: " + getExtent(), t); + throw new RuntimeException(t); + } finally { + minorCompactionInProgress = false; + minorCompaction.data("extent", extent.toString()); + minorCompaction.data("numEntries", Long.toSt