Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AAD0A179E7 for ; Sat, 10 Jan 2015 00:59:29 +0000 (UTC) Received: (qmail 33983 invoked by uid 500); 10 Jan 2015 00:59:31 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 33937 invoked by uid 500); 10 Jan 2015 00:59:31 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 33759 invoked by uid 99); 10 Jan 2015 00:59:30 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Jan 2015 00:59:30 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 874FF817FA1; Sat, 10 Jan 2015 00:59:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Sat, 10 Jan 2015 00:59:35 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [6/9] accumulo git commit: Merge branch '1.5' into 1.6 http://git-wip-us.apache.org/repos/asf/accumulo/blob/14f9f00e/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java index 163d761,0000000..7420ec4 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java @@@ -1,3857 -1,0 +1,3859 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + +import static com.google.common.base.Charsets.UTF_8; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.impl.ScannerImpl; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.ConfigurationObserver; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.constraints.Violations; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.KeyValue; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.data.thrift.MapFileInfo; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.iterators.IterationInterruptedException; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; +import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter; +import org.apache.accumulo.core.iterators.system.DeletingIterator; +import org.apache.accumulo.core.iterators.system.InterruptibleIterator; +import org.apache.accumulo.core.iterators.system.MultiIterator; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource; +import org.apache.accumulo.core.iterators.system.StatsIterator; +import org.apache.accumulo.core.iterators.system.VisibilityFilter; +import org.apache.accumulo.core.master.thrift.TabletLoadState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; +import org.apache.accumulo.core.util.MapCounter; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManager.FileType; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.fs.VolumeUtil; +import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.tableOps.CompactionIterators; +import org.apache.accumulo.server.problems.ProblemReport; +import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.problems.ProblemType; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.tablets.TabletTime; +import org.apache.accumulo.server.tablets.UniqueNameAllocator; +import org.apache.accumulo.server.util.FileUtil; +import org.apache.accumulo.server.util.MasterMetadataUtil; +import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.accumulo.server.util.TabletOperations; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.accumulo.tserver.Compactor.CompactionCanceledException; +import org.apache.accumulo.tserver.Compactor.CompactionEnv; +import org.apache.accumulo.tserver.FileManager.ScanFileManager; +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; +import org.apache.accumulo.tserver.TabletServer.TservConstraintEnv; +import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager; +import org.apache.accumulo.tserver.TabletStatsKeeper.Operation; +import org.apache.accumulo.tserver.compaction.CompactionPlan; +import org.apache.accumulo.tserver.compaction.CompactionStrategy; +import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy; +import org.apache.accumulo.tserver.compaction.MajorCompactionReason; +import org.apache.accumulo.tserver.compaction.MajorCompactionRequest; +import org.apache.accumulo.tserver.compaction.WriteParameters; +import org.apache.accumulo.tserver.constraints.ConstraintChecker; +import org.apache.accumulo.tserver.log.DfsLogger; +import org.apache.accumulo.tserver.log.MutationReceiver; +import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage; +import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; + +import com.google.common.annotations.VisibleForTesting; + +/* + * We need to be able to have the master tell a tabletServer to + * close this file, and the tablet server to handle all pending client reads + * before closing + * + */ + +/** + * + * this class just provides an interface to read from a MapFile mostly takes care of reporting start and end keys + * + * need this because a single row extent can have multiple columns this manages all the columns (each handled by a store) for a single row-extent + * + * + */ + +public class Tablet { + + enum MinorCompactionReason { + USER, SYSTEM, CLOSE, RECOVERY + } + + public class CommitSession { + + private int seq; + private InMemoryMap memTable; + private int commitsInProgress; + private long maxCommittedTime = Long.MIN_VALUE; + + private CommitSession(int seq, InMemoryMap imm) { + this.seq = seq; + this.memTable = imm; + commitsInProgress = 0; + } + + public int getWALogSeq() { + return seq; + } + + private void decrementCommitsInProgress() { + if (commitsInProgress < 1) + throw new IllegalStateException("commitsInProgress = " + commitsInProgress); + + commitsInProgress--; + if (commitsInProgress == 0) + Tablet.this.notifyAll(); + } + + private void incrementCommitsInProgress() { + if (commitsInProgress < 0) + throw new IllegalStateException("commitsInProgress = " + commitsInProgress); + + commitsInProgress++; + } + + private void waitForCommitsToFinish() { + while (commitsInProgress > 0) { + try { + Tablet.this.wait(50); + } catch (InterruptedException e) { + log.warn(e, e); + } + } + } + + public void abortCommit(List value) { + Tablet.this.abortCommit(this, value); + } + + public void commit(List mutations) { + Tablet.this.commit(this, mutations); + } + + public Tablet getTablet() { + return Tablet.this; + } + + public boolean beginUpdatingLogsUsed(ArrayList copy, boolean mincFinish) { + return Tablet.this.beginUpdatingLogsUsed(memTable, copy, mincFinish); + } + + public void finishUpdatingLogsUsed() { + Tablet.this.finishUpdatingLogsUsed(); + } + + public int getLogId() { + return logId; + } + + public KeyExtent getExtent() { + return extent; + } + + private void updateMaxCommittedTime(long time) { + maxCommittedTime = Math.max(time, maxCommittedTime); + } + + private long getMaxCommittedTime() { + if (maxCommittedTime == Long.MIN_VALUE) + throw new IllegalStateException("Tried to read max committed time when it was never set"); + return maxCommittedTime; + } + + } + + private class TabletMemory { + private InMemoryMap memTable; + private InMemoryMap otherMemTable; + private InMemoryMap deletingMemTable; + private int nextSeq = 1; + private CommitSession commitSession; + + TabletMemory() { + try { + memTable = new InMemoryMap(acuTableConf); + } catch (LocalityGroupConfigurationError e) { + throw new RuntimeException(e); + } + commitSession = new CommitSession(nextSeq, memTable); + nextSeq += 2; + } + + InMemoryMap getMemTable() { + return memTable; + } + + InMemoryMap getMinCMemTable() { + return otherMemTable; + } + + CommitSession prepareForMinC() { + if (otherMemTable != null) { + throw new IllegalStateException(); + } + + if (deletingMemTable != null) { + throw new IllegalStateException(); + } + + otherMemTable = memTable; + try { + memTable = new InMemoryMap(acuTableConf); + } catch (LocalityGroupConfigurationError e) { + throw new RuntimeException(e); + } + + CommitSession oldCommitSession = commitSession; + commitSession = new CommitSession(nextSeq, memTable); + nextSeq += 2; + + tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), otherMemTable.estimatedSizeInBytes()); + + return oldCommitSession; + } + + void finishedMinC() { + + if (otherMemTable == null) { + throw new IllegalStateException(); + } + + if (deletingMemTable != null) { + throw new IllegalStateException(); + } + + deletingMemTable = otherMemTable; + + otherMemTable = null; + Tablet.this.notifyAll(); + } + + void finalizeMinC() { + try { + deletingMemTable.delete(15000); + } finally { + synchronized (Tablet.this) { + if (otherMemTable != null) { + throw new IllegalStateException(); + } + + if (deletingMemTable == null) { + throw new IllegalStateException(); + } + + deletingMemTable = null; + + tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0); + } + } + } + + boolean memoryReservedForMinC() { + return otherMemTable != null || deletingMemTable != null; + } + + void waitForMinC() { + while (otherMemTable != null || deletingMemTable != null) { + try { + Tablet.this.wait(50); + } catch (InterruptedException e) { + log.warn(e, e); + } + } + } + + void mutate(CommitSession cm, List mutations) { + cm.memTable.mutate(mutations); + } + + void updateMemoryUsageStats() { + long other = 0; + if (otherMemTable != null) + other = otherMemTable.estimatedSizeInBytes(); + else if (deletingMemTable != null) + other = deletingMemTable.estimatedSizeInBytes(); + + tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other); + } + + List getIterators() { + List toReturn = new ArrayList(2); + toReturn.add(memTable.skvIterator()); + if (otherMemTable != null) + toReturn.add(otherMemTable.skvIterator()); + return toReturn; + } + + void returnIterators(List iters) { + for (MemoryIterator iter : iters) { + iter.close(); + } + } + + public long getNumEntries() { + if (otherMemTable != null) + return memTable.getNumEntries() + otherMemTable.getNumEntries(); + return memTable.getNumEntries(); + } + + CommitSession getCommitSession() { + return commitSession; + } + } + + private TabletMemory tabletMemory; + + private final TabletTime tabletTime; + private long persistedTime; + private final Object timeLock = new Object(); + + private final Path location; // absolute path of this tablets dir + private TServerInstance lastLocation; + + private Configuration conf; + private VolumeManager fs; + + private TableConfiguration acuTableConf; + + private volatile boolean tableDirChecked = false; + + private AtomicLong dataSourceDeletions = new AtomicLong(0); + private Set activeScans = new HashSet(); + + private volatile boolean closing = false; + private boolean closed = false; + private boolean closeComplete = false; + + private long lastFlushID = -1; + private long lastCompactID = -1; + + private static class CompactionWaitInfo { + long flushID = -1; + long compactionID = -1; + } + + // stores info about user initiated major compaction that is waiting on a minor compaction to finish + private CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo(); + + private KeyExtent extent; + + private TabletResourceManager tabletResources; + final private DatafileManager datafileManager; + private volatile boolean majorCompactionInProgress = false; + private volatile boolean majorCompactionWaitingToStart = false; + private Set majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class)); + private volatile boolean minorCompactionInProgress = false; + private volatile boolean minorCompactionWaitingToStart = false; + + private boolean updatingFlushID = false; + + private AtomicReference constraintChecker = new AtomicReference(); + + private final String tabletDirectory; + + private int writesInProgress = 0; + + private static final Logger log = Logger.getLogger(Tablet.class); + public TabletStatsKeeper timer; + + private Rate queryRate = new Rate(0.95); + private long queryCount = 0; + + private Rate queryByteRate = new Rate(0.95); + private long queryBytes = 0; + + private Rate ingestRate = new Rate(0.95); + private long ingestCount = 0; + + private Rate ingestByteRate = new Rate(0.95); + private long ingestBytes = 0; + + private byte[] defaultSecurityLabel = new byte[0]; + + private long lastMinorCompactionFinishTime; + private long lastMapFileImportTime; + + private volatile long numEntries; + private volatile long numEntriesInMemory; + + // a count of the amount of data read by the iterators + private AtomicLong scannedCount = new AtomicLong(0); + private Rate scannedRate = new Rate(0.95); + + private ConfigurationObserver configObserver; + + private TabletServer tabletServer; + + private final int logId; + // ensure we only have one reader/writer of our bulk file notes at at time + public final Object bulkFileImportLock = new Object(); + + public int getLogId() { + return logId; + } + + public static class TabletClosedException extends RuntimeException { + public TabletClosedException(Exception e) { + super(e); + } + + public TabletClosedException() { + super(); + } + + private static final long serialVersionUID = 1L; + } + + FileRef getNextMapFilename(String prefix) throws IOException { + String extension = FileOperations.getNewFileExtension(tabletServer.getTableConfiguration(extent)); + checkTabletDir(); + return new FileRef(location.toString() + "/" + prefix + UniqueNameAllocator.getInstance().getNextName() + "." + extension); + } + + private void checkTabletDir() throws IOException { + if (!tableDirChecked) { + checkTabletDir(this.location); + tableDirChecked = true; + } + } + + private void checkTabletDir(Path tabletDir) throws IOException { + + FileStatus[] files = null; + try { + files = fs.listStatus(tabletDir); + } catch (FileNotFoundException ex) { + // ignored + } + + if (files == null) { + if (tabletDir.getName().startsWith(Constants.CLONE_PREFIX)) + log.debug("Tablet " + extent + " had no dir, creating " + tabletDir); // its a clone dir... + else + log.warn("Tablet " + extent + " had no dir, creating " + tabletDir); + + fs.mkdirs(tabletDir); + } + } + + class DatafileManager { + // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles + final private Map datafileSizes = Collections.synchronizedMap(new TreeMap()); + + DatafileManager(SortedMap datafileSizes) { + for (Entry datafiles : datafileSizes.entrySet()) + this.datafileSizes.put(datafiles.getKey(), datafiles.getValue()); + } + + FileRef mergingMinorCompactionFile = null; + Set filesToDeleteAfterScan = new HashSet(); + Map> scanFileReservations = new HashMap>(); + MapCounter fileScanReferenceCounts = new MapCounter(); + long nextScanReservationId = 0; + boolean reservationsBlocked = false; + + Set majorCompactingFiles = new HashSet(); + + Pair> reserveFilesForScan() { + synchronized (Tablet.this) { + + while (reservationsBlocked) { + try { + Tablet.this.wait(50); + } catch (InterruptedException e) { + log.warn(e, e); + } + } + + Set absFilePaths = new HashSet(datafileSizes.keySet()); + + long rid = nextScanReservationId++; + + scanFileReservations.put(rid, absFilePaths); + + Map ret = new HashMap(); + + for (FileRef path : absFilePaths) { + fileScanReferenceCounts.increment(path, 1); + ret.put(path, datafileSizes.get(path)); + } + + return new Pair>(rid, ret); + } + } + + void returnFilesForScan(Long reservationId) { + + final Set filesToDelete = new HashSet(); + + synchronized (Tablet.this) { + Set absFilePaths = scanFileReservations.remove(reservationId); + + if (absFilePaths == null) + throw new IllegalArgumentException("Unknown scan reservation id " + reservationId); + + boolean notify = false; + for (FileRef path : absFilePaths) { + long refCount = fileScanReferenceCounts.decrement(path, 1); + if (refCount == 0) { + if (filesToDeleteAfterScan.remove(path)) + filesToDelete.add(path); + notify = true; + } else if (refCount < 0) + throw new IllegalStateException("Scan ref count for " + path + " is " + refCount); + } + + if (notify) + Tablet.this.notifyAll(); + } + + if (filesToDelete.size() > 0) { + log.debug("Removing scan refs from metadata " + extent + " " + filesToDelete); + MetadataTableUtil.removeScanFiles(extent, filesToDelete, SystemCredentials.get(), tabletServer.getLock()); + } + } + + private void removeFilesAfterScan(Set scanFiles) { + if (scanFiles.size() == 0) + return; + + Set filesToDelete = new HashSet(); + + synchronized (Tablet.this) { + for (FileRef path : scanFiles) { + if (fileScanReferenceCounts.get(path) == 0) + filesToDelete.add(path); + else + filesToDeleteAfterScan.add(path); + } + } + + if (filesToDelete.size() > 0) { + log.debug("Removing scan refs from metadata " + extent + " " + filesToDelete); + MetadataTableUtil.removeScanFiles(extent, filesToDelete, SystemCredentials.get(), tabletServer.getLock()); + } + } + + private TreeSet waitForScansToFinish(Set pathsToWaitFor, boolean blockNewScans, long maxWaitTime) { + long startTime = System.currentTimeMillis(); + TreeSet inUse = new TreeSet(); + + Span waitForScans = Trace.start("waitForScans"); + try { + synchronized (Tablet.this) { + if (blockNewScans) { + if (reservationsBlocked) + throw new IllegalStateException(); + + reservationsBlocked = true; + } + + for (FileRef path : pathsToWaitFor) { + while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) { + try { + Tablet.this.wait(100); + } catch (InterruptedException e) { + log.warn(e, e); + } + } + } + + for (FileRef path : pathsToWaitFor) { + if (fileScanReferenceCounts.get(path) > 0) + inUse.add(path); + } + + if (blockNewScans) { + reservationsBlocked = false; + Tablet.this.notifyAll(); + } + + } + } finally { + waitForScans.stop(); + } + return inUse; + } + + public void importMapFiles(long tid, Map pathsString, boolean setTime) throws IOException { + + String bulkDir = null; + + Map paths = new HashMap(); + for (Entry entry : pathsString.entrySet()) + paths.put(entry.getKey(), entry.getValue()); + + for (FileRef tpath : paths.keySet()) { + + boolean inTheRightDirectory = false; + Path parent = tpath.path().getParent().getParent(); + for (String tablesDir : ServerConstants.getTablesDirs()) { + if (parent.equals(new Path(tablesDir, extent.getTableId().toString()))) { + inTheRightDirectory = true; + break; + } + } + if (!inTheRightDirectory) { + throw new IOException("Data file " + tpath + " not in table dirs"); + } + + if (bulkDir == null) + bulkDir = tpath.path().getParent().toString(); + else if (!bulkDir.equals(tpath.path().getParent().toString())) + throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath); + + } + + if (extent.isRootTablet()) { + throw new IllegalArgumentException("Can not import files to root tablet"); + } + + synchronized (bulkFileImportLock) { + Credentials creds = SystemCredentials.get(); + Connector conn; + try { + conn = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), creds.getToken()); + } catch (Exception ex) { + throw new IOException(ex); + } + // Remove any bulk files we've previously loaded and compacted away + List files = MetadataTableUtil.getBulkFilesLoaded(conn, extent, tid); + + for (FileRef file : files) + if (paths.keySet().remove(file)) + log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file); + + if (paths.size() > 0) { + long bulkTime = Long.MIN_VALUE; + if (setTime) { + for (DataFileValue dfv : paths.values()) { + long nextTime = tabletTime.getAndUpdateTime(); + if (nextTime < bulkTime) + throw new IllegalStateException("Time went backwards unexpectedly " + nextTime + " " + bulkTime); + bulkTime = nextTime; + dfv.setTime(bulkTime); + } + } + + synchronized (timeLock) { + if (bulkTime > persistedTime) + persistedTime = bulkTime; + + MetadataTableUtil.updateTabletDataFile(tid, extent, paths, tabletTime.getMetadataValue(persistedTime), creds, tabletServer.getLock()); + } + } + } + + synchronized (Tablet.this) { + for (Entry tpath : paths.entrySet()) { + if (datafileSizes.containsKey(tpath.getKey())) { + log.error("Adding file that is already in set " + tpath.getKey()); + } + datafileSizes.put(tpath.getKey(), tpath.getValue()); + + } + + tabletResources.importedMapFiles(); + + computeNumEntries(); + } + + for (Entry entry : paths.entrySet()) { + log.log(TLevel.TABLET_HIST, extent + " import " + entry.getKey() + " " + entry.getValue()); + } + } + + FileRef reserveMergingMinorCompactionFile() { + if (mergingMinorCompactionFile != null) + throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved : " + mergingMinorCompactionFile); + + if (extent.isRootTablet()) + return null; + + int maxFiles = acuTableConf.getMaxFilesPerTablet(); + + // when a major compaction is running and we are at max files, write out + // one extra file... want to avoid the case where major compaction is + // compacting everything except for the largest file, and therefore the + // largest file is returned for merging.. the following check mostly + // avoids this case, except for the case where major compactions fail or + // are canceled + if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles) + return null; + + if (datafileSizes.size() >= maxFiles) { + // find the smallest file + + long min = Long.MAX_VALUE; + FileRef minName = null; + + for (Entry entry : datafileSizes.entrySet()) { + if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) { + min = entry.getValue().getSize(); + minName = entry.getKey(); + } + } + + if (minName == null) + return null; + + mergingMinorCompactionFile = minName; + return minName; + } + + return null; + } + + void unreserveMergingMinorCompactionFile(FileRef file) { + if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null) + || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile))) + throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile); + + mergingMinorCompactionFile = null; + } + + void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) + throws IOException { + + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + if (extent.isRootTablet()) { + try { + if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) { + throw new IllegalStateException(); + } + } catch (Exception e) { + throw new IllegalStateException("Can not bring major compaction online, lock not held", e); + } + } + + // rename before putting in metadata table, so files in metadata table should + // always exist + do { + try { + if (dfv.getNumEntries() == 0) { + fs.deleteRecursively(tmpDatafile.path()); + } else { + if (fs.exists(newDatafile.path())) { + log.warn("Target map file already exist " + newDatafile); + fs.deleteRecursively(newDatafile.path()); + } + + rename(fs, tmpDatafile.path(), newDatafile.path()); + } + break; + } catch (IOException ioe) { + log.warn("Tablet " + extent + " failed to rename " + newDatafile + " after MinC, will retry in 60 secs...", ioe); + UtilWaitThread.sleep(60 * 1000); + } + } while (true); + + long t1, t2; + + // the code below always assumes merged files are in use by scans... this must be done + // because the in memory list of files is not updated until after the metadata table + // therefore the file is available to scans until memory is updated, but want to ensure + // the file is not available for garbage collection... if memory were updated + // before this point (like major compactions do), then the following code could wait + // for scans to finish like major compactions do.... used to wait for scans to finish + // here, but that was incorrect because a scan could start after waiting but before + // memory was updated... assuming the file is always in use by scans leads to + // one uneeded metadata update when it was not actually in use + Set filesInUseByScans = Collections.emptySet(); + if (absMergeFile != null) + filesInUseByScans = Collections.singleton(absMergeFile); + + // very important to write delete entries outside of log lock, because + // this metadata write does not go up... it goes sideways or to itself + if (absMergeFile != null) + MetadataTableUtil.addDeleteEntries(extent, Collections.singleton(absMergeFile), SystemCredentials.get()); + + Set unusedWalLogs = beginClearingUnusedLogs(); + try { + // the order of writing to metadata and walog is important in the face of machine/process failures + // need to write to metadata before writing to walog, when things are done in the reverse order + // data could be lost... the minor compaction start even should be written before the following metadata + // write is made + + synchronized (timeLock) { + if (commitSession.getMaxCommittedTime() > persistedTime) + persistedTime = commitSession.getMaxCommittedTime(); + + String time = tabletTime.getMetadataValue(persistedTime); + MasterMetadataUtil.updateTabletDataFile(extent, newDatafile, absMergeFile, dfv, time, SystemCredentials.get(), filesInUseByScans, + tabletServer.getClientAddressString(), tabletServer.getLock(), unusedWalLogs, lastLocation, flushId); + } + + } finally { + finishClearingUnusedLogs(); + } + + do { + try { + // the purpose of making this update use the new commit session, instead of the old one passed in, + // is because the new one will reference the logs used by current memory... + + tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2); + break; + } catch (IOException e) { + log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry", e); + UtilWaitThread.sleep(1 * 1000); + } + } while (true); + + synchronized (Tablet.this) { + lastLocation = null; + + t1 = System.currentTimeMillis(); + if (datafileSizes.containsKey(newDatafile)) { + log.error("Adding file that is already in set " + newDatafile); + } + + if (dfv.getNumEntries() > 0) { + datafileSizes.put(newDatafile, dfv); + } + + if (absMergeFile != null) { + datafileSizes.remove(absMergeFile); + } + + unreserveMergingMinorCompactionFile(absMergeFile); + + dataSourceDeletions.incrementAndGet(); + tabletMemory.finishedMinC(); + + lastFlushID = flushId; + + computeNumEntries(); + t2 = System.currentTimeMillis(); + } + + // must do this after list of files in memory is updated above + removeFilesAfterScan(filesInUseByScans); + + if (absMergeFile != null) + log.log(TLevel.TABLET_HIST, extent + " MinC [" + absMergeFile + ",memory] -> " + newDatafile); + else + log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " + newDatafile); + log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0, getExtent().toString())); + if (dfv.getSize() > acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD)) { + log.debug(String.format("Minor Compaction wrote out file larger than split threshold. split threshold = %,d file size = %,d", + acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD), dfv.getSize())); + } + + } + + public void reserveMajorCompactingFiles(Collection files) { + if (majorCompactingFiles.size() != 0) + throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles); + + if (mergingMinorCompactionFile != null && files.contains(mergingMinorCompactionFile)) + throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile); + + majorCompactingFiles.addAll(files); + } + + public void clearMajorCompactingFile() { + majorCompactingFiles.clear(); + } + + void bringMajorCompactionOnline(Set oldDatafiles, FileRef tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv) + throws IOException { + long t1, t2; + + if (!extent.isRootTablet()) { + + if (fs.exists(newDatafile.path())) { + log.error("Target map file already exist " + newDatafile, new Exception()); + throw new IllegalStateException("Target map file already exist " + newDatafile); + } + + // rename before putting in metadata table, so files in metadata table should + // always exist + rename(fs, tmpDatafile.path(), newDatafile.path()); + + if (dfv.getNumEntries() == 0) { + fs.deleteRecursively(newDatafile.path()); + } + } + + TServerInstance lastLocation = null; + synchronized (Tablet.this) { + + t1 = System.currentTimeMillis(); + + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + + dataSourceDeletions.incrementAndGet(); + + if (extent.isRootTablet()) { + + waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE); + + try { + if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) { + throw new IllegalStateException(); + } + } catch (Exception e) { + throw new IllegalStateException("Can not bring major compaction online, lock not held", e); + } + + // mark files as ready for deletion, but + // do not delete them until we successfully + // rename the compacted map file, in case + // the system goes down + + RootFiles.replaceFiles(acuTableConf, fs, location, oldDatafiles, tmpDatafile, newDatafile); + } + + // atomically remove old files and add new file + for (FileRef oldDatafile : oldDatafiles) { + if (!datafileSizes.containsKey(oldDatafile)) { + log.error("file does not exist in set " + oldDatafile); + } + datafileSizes.remove(oldDatafile); + majorCompactingFiles.remove(oldDatafile); + } + + if (datafileSizes.containsKey(newDatafile)) { + log.error("Adding file that is already in set " + newDatafile); + } + + if (dfv.getNumEntries() > 0) { + datafileSizes.put(newDatafile, dfv); + } + + // could be used by a follow on compaction in a multipass compaction + majorCompactingFiles.add(newDatafile); + + computeNumEntries(); + + lastLocation = Tablet.this.lastLocation; + Tablet.this.lastLocation = null; + + if (compactionId != null) + lastCompactID = compactionId; + + t2 = System.currentTimeMillis(); + } + + if (!extent.isRootTablet()) { + Set filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000); + if (filesInUseByScans.size() > 0) + log.debug("Adding scan refs to metadata " + extent + " " + filesInUseByScans); + MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles, filesInUseByScans, newDatafile, compactionId, dfv, SystemCredentials.get(), + tabletServer.getClientAddressString(), lastLocation, tabletServer.getLock()); + removeFilesAfterScan(filesInUseByScans); + } + + log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0)); + log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + " --> " + newDatafile); + } + + public SortedMap getDatafileSizes() { + synchronized (Tablet.this) { + TreeMap copy = new TreeMap(datafileSizes); + return Collections.unmodifiableSortedMap(copy); + } + } + + public Set getFiles() { + synchronized (Tablet.this) { + HashSet files = new HashSet(datafileSizes.keySet()); + return Collections.unmodifiableSet(files); + } + } + + } + + public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap tabletsKeyValues) + throws IOException { + this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), tabletsKeyValues); + splitCreationTime = 0; + } + + public Tablet(KeyExtent extent, TabletServer tabletServer, TabletResourceManager trm, SplitInfo info) throws IOException { + this(tabletServer, new Text(info.dir), extent, trm, CachedConfiguration.getInstance(), info.datafiles, info.time, info.initFlushID, info.initCompactID, + info.lastLocation); + splitCreationTime = System.currentTimeMillis(); + } + + /** + * Only visibile for testing + */ + @VisibleForTesting + protected Tablet(TabletTime tabletTime, String tabletDirectory, int logId, Path location, DatafileManager datafileManager) { + this.tabletTime = tabletTime; + this.tabletDirectory = tabletDirectory; + this.logId = logId; + this.location = location; + this.datafileManager = datafileManager; + } + + private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, + SortedMap tabletsKeyValues) throws IOException { + this(tabletServer, location, extent, trm, conf, VolumeManagerImpl.get(), tabletsKeyValues); + } + + static private final List EMPTY = Collections.emptyList(); + + private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, + SortedMap datafiles, String time, long initFlushID, long initCompactID, TServerInstance last) throws IOException { + this(tabletServer, location, extent, trm, conf, VolumeManagerImpl.get(), EMPTY, datafiles, time, last, new HashSet(), initFlushID, initCompactID); + } + + private static String lookupTime(AccumuloConfiguration conf, KeyExtent extent, SortedMap tabletsKeyValues) { + SortedMap entries; + + if (extent.isRootTablet()) { + return null; + } else { + entries = new TreeMap(); + Text rowName = extent.getMetadataEntry(); + for (Entry entry : tabletsKeyValues.entrySet()) { + if (entry.getKey().compareRow(rowName) == 0 && TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) { + entries.put(new Key(entry.getKey()), new Value(entry.getValue())); + } + } + } + + // log.debug("extent : "+extent+" entries : "+entries); + + if (entries.size() == 1) + return entries.values().iterator().next().toString(); + return null; + } + + private static SortedMap lookupDatafiles(AccumuloConfiguration conf, VolumeManager fs, KeyExtent extent, + SortedMap tabletsKeyValues) throws IOException { + + TreeMap datafiles = new TreeMap(); + + if (extent.isRootTablet()) { // the meta0 tablet + Path location = new Path(MetadataTableUtil.getRootTabletDir()); + + // cleanUpFiles() has special handling for delete. files + FileStatus[] files = fs.listStatus(location); + Collection goodPaths = RootFiles.cleanupReplacement(fs, files, true); + for (String good : goodPaths) { + Path path = new Path(good); + String filename = path.getName(); + FileRef ref = new FileRef(location.toString() + "/" + filename, path); + DataFileValue dfv = new DataFileValue(0, 0); + datafiles.put(ref, dfv); + } + } else { + + Text rowName = extent.getMetadataEntry(); + + String tableId = extent.isMeta() ? RootTable.ID : MetadataTable.ID; + ScannerImpl mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), SystemCredentials.get(), tableId, Authorizations.EMPTY); + + // Commented out because when no data file is present, each tablet will scan through metadata table and return nothing + // reduced batch size to improve performance + // changed here after endKeys were implemented from 10 to 1000 + mdScanner.setBatchSize(1000); + + // leave these in, again, now using endKey for safety + mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME); + + mdScanner.setRange(new Range(rowName)); + + for (Entry entry : mdScanner) { + + if (entry.getKey().compareRow(rowName) != 0) { + break; + } + + FileRef ref = new FileRef(fs, entry.getKey()); + datafiles.put(ref, new DataFileValue(entry.getValue().get())); + } + } + return datafiles; + } + + private static List lookupLogEntries(KeyExtent ke, SortedMap tabletsKeyValues) { + List logEntries = new ArrayList(); + + if (ke.isMeta()) { + try { + logEntries = MetadataTableUtil.getLogEntries(SystemCredentials.get(), ke); + } catch (Exception ex) { + throw new RuntimeException("Unable to read tablet log entries", ex); + } + } else { + log.debug("Looking at metadata " + tabletsKeyValues); + Text row = ke.getMetadataEntry(); + for (Entry entry : tabletsKeyValues.entrySet()) { + Key key = entry.getKey(); + if (key.getRow().equals(row)) { + if (key.getColumnFamily().equals(LogColumnFamily.NAME)) { + logEntries.add(LogEntry.fromKeyValue(key, entry.getValue())); + } + } + } + } + + log.debug("got " + logEntries + " for logs for " + ke); + return logEntries; + } + + private static Set lookupScanFiles(KeyExtent extent, SortedMap tabletsKeyValues, VolumeManager fs) throws IOException { + HashSet scanFiles = new HashSet(); + + Text row = extent.getMetadataEntry(); + for (Entry entry : tabletsKeyValues.entrySet()) { + Key key = entry.getKey(); + if (key.getRow().equals(row) && key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) { + scanFiles.add(new FileRef(fs, key)); + } + } + + return scanFiles; + } + + private static long lookupFlushID(KeyExtent extent, SortedMap tabletsKeyValues) { + Text row = extent.getMetadataEntry(); + for (Entry entry : tabletsKeyValues.entrySet()) { + Key key = entry.getKey(); + if (key.getRow().equals(row) && TabletsSection.ServerColumnFamily.FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) + return Long.parseLong(entry.getValue().toString()); + } + + return -1; + } + + private static long lookupCompactID(KeyExtent extent, SortedMap tabletsKeyValues) { + Text row = extent.getMetadataEntry(); + for (Entry entry : tabletsKeyValues.entrySet()) { + Key key = entry.getKey(); + if (key.getRow().equals(row) && TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) + return Long.parseLong(entry.getValue().toString()); + } + + return -1; + } + + private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, VolumeManager fs, + SortedMap tabletsKeyValues) throws IOException { + this(tabletServer, location, extent, trm, conf, fs, lookupLogEntries(extent, tabletsKeyValues), lookupDatafiles(tabletServer.getSystemConfiguration(), fs, + extent, tabletsKeyValues), lookupTime(tabletServer.getSystemConfiguration(), extent, tabletsKeyValues), lookupLastServer(extent, tabletsKeyValues), + lookupScanFiles(extent, tabletsKeyValues, fs), lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, tabletsKeyValues)); + } + + private static TServerInstance lookupLastServer(KeyExtent extent, SortedMap tabletsKeyValues) { + for (Entry entry : tabletsKeyValues.entrySet()) { + if (entry.getKey().getColumnFamily().compareTo(TabletsSection.LastLocationColumnFamily.NAME) == 0) { + return new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier()); + } + } + return null; + } + + /** + * yet another constructor - this one allows us to avoid costly lookups into the Metadata table if we already know the files we need - as at split time + */ + private Tablet(final TabletServer tabletServer, final Text location, final KeyExtent extent, final TabletResourceManager trm, final Configuration conf, + final VolumeManager fs, final List rawLogEntries, final SortedMap rawDatafiles, String time, + final TServerInstance lastLocation, Set scanFiles, long initFlushID, long initCompactID) throws IOException { + + TabletFiles tabletPaths = VolumeUtil.updateTabletVolumes(tabletServer.getLock(), fs, extent, new TabletFiles(location.toString(), rawLogEntries, + rawDatafiles)); + + Path locationPath; + + if (tabletPaths.dir.contains(":")) { + locationPath = new Path(tabletPaths.dir.toString()); + } else { + locationPath = fs.getFullPath(FileType.TABLE, extent.getTableId().toString() + tabletPaths.dir.toString()); + } + + final List logEntries = tabletPaths.logEntries; + final SortedMap datafiles = tabletPaths.datafiles; + + this.location = locationPath; + this.lastLocation = lastLocation; + this.tabletDirectory = tabletPaths.dir; + this.conf = conf; + this.acuTableConf = tabletServer.getTableConfiguration(extent); + + this.fs = fs; + this.extent = extent; + this.tabletResources = trm; + + this.lastFlushID = initFlushID; + this.lastCompactID = initCompactID; + + if (extent.isRootTablet()) { + long rtime = Long.MIN_VALUE; + for (FileRef ref : datafiles.keySet()) { + Path path = ref.path(); + FileSystem ns = fs.getVolumeByPath(path).getFileSystem(); + FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), tabletServer.getTableConfiguration(extent)); + long maxTime = -1; + try { + + while (reader.hasTop()) { + maxTime = Math.max(maxTime, reader.getTopKey().getTimestamp()); + reader.next(); + } + + } finally { + reader.close(); + } + + if (maxTime > rtime) { + time = TabletTime.LOGICAL_TIME_ID + "" + maxTime; + rtime = maxTime; + } + } + } + if (time == null && datafiles.isEmpty() && extent.equals(RootTable.OLD_EXTENT)) { + // recovery... old root tablet has no data, so time doesn't matter: + time = TabletTime.LOGICAL_TIME_ID + "" + Long.MIN_VALUE; + } + + this.tabletServer = tabletServer; + this.logId = tabletServer.createLogId(extent); + + this.timer = new TabletStatsKeeper(); + + setupDefaultSecurityLabels(extent); + + tabletMemory = new TabletMemory(); + tabletTime = TabletTime.getInstance(time); + persistedTime = tabletTime.getTime(); + + acuTableConf.addObserver(configObserver = new ConfigurationObserver() { + + private void reloadConstraints() { + constraintChecker.set(new ConstraintChecker(acuTableConf)); + } + + @Override + public void propertiesChanged() { + reloadConstraints(); + + try { + setupDefaultSecurityLabels(extent); + } catch (Exception e) { + log.error("Failed to reload default security labels for extent: " + extent.toString()); + } + } + + @Override + public void propertyChanged(String prop) { + if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey())) + reloadConstraints(); + else if (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) { + try { + log.info("Default security labels changed for extent: " + extent.toString()); + setupDefaultSecurityLabels(extent); + } catch (Exception e) { + log.error("Failed to reload default security labels for extent: " + extent.toString()); + } + } + + } + + @Override + public void sessionExpired() { + log.debug("Session expired, no longer updating per table props..."); + } + + }); + + acuTableConf.getNamespaceConfiguration().addObserver(configObserver); + + // Force a load of any per-table properties + configObserver.propertiesChanged(); + + tabletResources.setTablet(this, acuTableConf); + if (!logEntries.isEmpty()) { + log.info("Starting Write-Ahead Log recovery for " + this.extent); + // count[0] = entries used on tablet + // count[1] = track max time from walog entries wihtout timestamps + final long[] count = new long[2]; + final CommitSession commitSession = tabletMemory.getCommitSession(); + count[1] = Long.MIN_VALUE; + try { + Set absPaths = new HashSet(); + for (FileRef ref : datafiles.keySet()) + absPaths.add(ref.path().toString()); + + tabletServer.recover(this.tabletServer.getFileSystem(), this, logEntries, absPaths, new MutationReceiver() { + @Override + public void receive(Mutation m) { + // LogReader.printMutation(m); + Collection muts = m.getUpdates(); + for (ColumnUpdate columnUpdate : muts) { + if (!columnUpdate.hasTimestamp()) { + // if it is not a user set timestamp, it must have been set + // by the system + count[1] = Math.max(count[1], columnUpdate.getTimestamp()); + } + } + tabletMemory.mutate(commitSession, Collections.singletonList(m)); + count[0]++; + } + }); + + if (count[1] != Long.MIN_VALUE) { + tabletTime.useMaxTimeFromWALog(count[1]); + } + commitSession.updateMaxCommittedTime(tabletTime.getTime()); + + if (count[0] == 0) { + log.debug("No replayed mutations applied, removing unused entries for " + extent); + MetadataTableUtil.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock()); + logEntries.clear(); + } + + } catch (Throwable t) { + if (acuTableConf.getBoolean(Property.TABLE_FAILURES_IGNORE)) { + log.warn("Error recovering from log files: ", t); + } else { + throw new RuntimeException(t); + } + } + // make some closed references that represent the recovered logs + currentLogs = new HashSet(); + for (LogEntry logEntry : logEntries) { + for (String log : logEntry.logSet) { + currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log, logEntry.getColumnQualifier().toString())); + } + } + + log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries() + + " entries created)"); + } + + String contextName = acuTableConf.get(Property.TABLE_CLASSPATH); + if (contextName != null && !contextName.equals("")) { + // initialize context classloader, instead of possibly waiting for it to initialize for a scan + // TODO this could hang, causing other tablets to fail to load - ACCUMULO-1292 + AccumuloVFSClassLoader.getContextManager().getClassLoader(contextName); + } + + // do this last after tablet is completely setup because it + // could cause major compaction to start + datafileManager = new DatafileManager(datafiles); + + computeNumEntries(); + + datafileManager.removeFilesAfterScan(scanFiles); + + // look for hints of a failure on the previous tablet server + if (!logEntries.isEmpty() || needsMajorCompaction(MajorCompactionReason.NORMAL)) { + // look for any temp files hanging around + removeOldTemporaryFiles(); + } + + log.log(TLevel.TABLET_HIST, extent + " opened"); + } + + private void removeOldTemporaryFiles() { + // remove any temporary files created by a previous tablet server + try { + for (FileStatus tmp : fs.globStatus(new Path(location, "*_tmp"))) { + try { + log.debug("Removing old temp file " + tmp.getPath()); + fs.delete(tmp.getPath()); + } catch (IOException ex) { + log.error("Unable to remove old temp file " + tmp.getPath() + ": " + ex); + } + } + } catch (IOException ex) { + log.error("Error scanning for old temp files in " + location); + } + } + + private void setupDefaultSecurityLabels(KeyExtent extent) { + if (extent.isMeta()) { + defaultSecurityLabel = new byte[0]; + } else { + try { + ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY)); + this.defaultSecurityLabel = cv.getExpression(); + } catch (Exception e) { + log.error(e, e); + this.defaultSecurityLabel = new byte[0]; + } + } + } + + public static class KVEntry extends KeyValue { + private static final long serialVersionUID = 1L; + + public KVEntry(Key k, Value v) { + super(new Key(k), Arrays.copyOf(v.get(), v.get().length)); + } + + int numBytes() { + return getKey().getSize() + getValue().get().length; + } + + int estimateMemoryUsed() { + return getKey().getSize() + getValue().get().length + (9 * 32); // overhead is 32 per object + } + } + + private LookupResult lookup(SortedKeyValueIterator mmfi, List ranges, HashSet columnSet, ArrayList results, + long maxResultsSize) throws IOException { + + LookupResult lookupResult = new LookupResult(); + + boolean exceededMemoryUsage = false; + boolean tabletClosed = false; + + Set cfset = null; + if (columnSet.size() > 0) + cfset = LocalityGroupUtil.families(columnSet); + + for (Range range : ranges) { + + if (exceededMemoryUsage || tabletClosed) { + lookupResult.unfinishedRanges.add(range); + continue; + } + + int entriesAdded = 0; + + try { + if (cfset != null) + mmfi.seek(range, cfset, true); + else + mmfi.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false); + + while (mmfi.hasTop()) { + Key key = mmfi.getTopKey(); + + KVEntry kve = new KVEntry(key, mmfi.getTopValue()); + results.add(kve); + entriesAdded++; + lookupResult.bytesAdded += kve.estimateMemoryUsed(); + lookupResult.dataSize += kve.numBytes(); + + exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize; + + if (exceededMemoryUsage) { + addUnfinishedRange(lookupResult, range, key, false); + break; + } + + mmfi.next(); + } + + } catch (TooManyFilesException tmfe) { + // treat this as a closed tablet, and let the client retry + log.warn("Tablet " + getExtent() + " has too many files, batch lookup can not run"); + handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded); + tabletClosed = true; + } catch (IOException ioe) { + if (shutdownInProgress()) { + // assume HDFS shutdown hook caused this exception + log.debug("IOException while shutdown in progress ", ioe); + handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded); + tabletClosed = true; + } else { + throw ioe; + } + } catch (IterationInterruptedException iie) { + if (isClosed()) { + handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded); + tabletClosed = true; + } else { + throw iie; + } + } catch (TabletClosedException tce) { + handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded); + tabletClosed = true; + } + + } + + return lookupResult; + } + + private void handleTabletClosedDuringScan(ArrayList results, LookupResult lookupResult, boolean exceededMemoryUsage, Range range, int entriesAdded) { + if (exceededMemoryUsage) + throw new IllegalStateException("tablet should not exceed memory usage or close, not both"); + + if (entriesAdded > 0) + addUnfinishedRange(lookupResult, range, results.get(results.size() - 1).getKey(), false); + else + lookupResult.unfinishedRanges.add(range); + + lookupResult.closed = true; + } + + private void addUnfinishedRange(LookupResult lookupResult, Range range, Key key, boolean inclusiveStartKey) { + if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) { + Range nlur = new Range(new Key(key), inclusiveStartKey, range.getEndKey(), range.isEndKeyInclusive()); + lookupResult.unfinishedRanges.add(nlur); + } + } + + public static interface KVReceiver { + void receive(List matches) throws IOException; + } + + class LookupResult { + List unfinishedRanges = new ArrayList(); + long bytesAdded = 0; + long dataSize = 0; + boolean closed = false; + } + + public LookupResult lookup(List ranges, HashSet columns, Authorizations authorizations, ArrayList results, long maxResultSize, + List ssiList, Map> ssio, AtomicBoolean interruptFlag) throws IOException { + + if (ranges.size() == 0) { + return new LookupResult(); + } + + ranges = Range.mergeOverlapping(ranges); - Collections.sort(ranges); ++ if (ranges.size() > 1) { ++ Collections.sort(ranges); ++ } + + Range tabletRange = extent.toDataRange(); + for (Range range : ranges) { + // do a test to see if this range falls within the tablet, if it does not + // then clip will throw an exception + tabletRange.clip(range); + } + + ScanDataSource dataSource = new ScanDataSource(authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag); + + LookupResult result = null; + + try { + SortedKeyValueIterator iter = new SourceSwitchingIterator(dataSource); + result = lookup(iter, ranges, columns, results, maxResultSize); + return result; + } catch (IOException ioe) { + dataSource.close(true); + throw ioe; + } finally { + // code in finally block because always want + // to return mapfiles, even when exception is thrown + dataSource.close(false); + + synchronized (this) { + queryCount += results.size(); + if (result != null) + queryBytes += result.dataSize; + } + } + } + + private Batch nextBatch(SortedKeyValueIterator iter, Range range, int num, Set columns) throws IOException { + + // log.info("In nextBatch.."); + + List results = new ArrayList(); + Key key = null; + + Value value; + long resultSize = 0L; + long resultBytes = 0L; + + long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM); + + if (columns.size() == 0) { + iter.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false); + } else { + iter.seek(range, LocalityGroupUtil.families(columns), true); + } + + Key continueKey = null; + boolean skipContinueKey = false; + + boolean endOfTabletReached = false; + while (iter.hasTop()) { + + value = iter.getTopValue(); + key = iter.getTopKey(); + + KVEntry kvEntry = new KVEntry(key, value); // copies key and value + results.add(kvEntry); + resultSize += kvEntry.estimateMemoryUsed(); + resultBytes += kvEntry.numBytes(); + + if (resultSize >= maxResultsSize || results.size() >= num) { + continueKey = new Key(key); + skipContinueKey = true; + break; + } + + iter.next(); + } + + if (iter.hasTop() == false) { + endOfTabletReached = true; + } + + Batch retBatch = new Batch(); + retBatch.numBytes = resultBytes; + + if (!endOfTabletReached) { + retBatch.continueKey = continueKey; + retBatch.skipContinueKey = skipContinueKey; + } else { + retBatch.continueKey = null; + } + + if (endOfTabletReached && results.size() == 0) + retBatch.results = null; + else + retBatch.results = results; + + return retBatch; + } + + /** + * Determine if a JVM shutdown is in progress. + * + */ + private boolean shutdownInProgress() { + try { + Runtime.getRuntime().removeShutdownHook(new Thread(new Runnable() { + @Override + public void run() {} + })); + } catch (IllegalStateException ise) { + return true; + } + + return false; + } + + private class Batch { + public boolean skipContinueKey; + public List results; + public Key continueKey; + public long numBytes; + } + + Scanner createScanner(Range range, int num, Set columns, Authorizations authorizations, List ssiList, Map> ssio, + boolean isolated, AtomicBoolean interruptFlag) { + // do a test to see if this range falls within the tablet, if it does not + // then clip will throw an exception + extent.toDataRange().clip(range); + + ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated); + return new Scanner(range, opts); + } + + class ScanBatch { + boolean more; + List results; + + ScanBatch(List results, boolean more) { + this.results = results; + this.more = more; + } + } + + class Scanner { + + private ScanOptions options; + private Range range; + private SortedKeyValueIterator isolatedIter; + private ScanDataSource isolatedDataSource; + private boolean sawException = false; + private boolean scanClosed = false; + + Scanner(Range range, ScanOptions options) { + this.range = range; + this.options = options; + } + + synchronized ScanBatch read() throws IOException, TabletClosedException { + + if (sawException) + throw new IllegalStateException("Tried to use scanner after exception occurred."); + + if (scanClosed) + throw new IllegalStateException("Tried to use scanner after it was closed."); + + Batch results = null; + + ScanDataSource dataSource; + + if (options.isolated) { + if (isolatedDataSource == null) + isolatedDataSource = new ScanDataSource(options); + dataSource = isolatedDataSource; + } else { + dataSource = new ScanDataSource(options); + } + + try { + + SortedKeyValueIterator iter; + + if (options.isolated) { + if (isolatedIter == null) + isolatedIter = new SourceSwitchingIterator(dataSource, true); + else + isolatedDataSource.fileManager.reattach(); + iter = isolatedIter; + } else { + iter = new SourceSwitchingIterator(dataSource, false); + } + + results = nextBatch(iter, range, options.num, options.columnSet); + + if (results.results == null) { + range = null; + return new ScanBatch(new ArrayList(), false); + } else if (results.continueKey == null) { + return new ScanBatch(results.results, false); + } else { + range = new Range(results.continueKey, !results.skipContinueKey, range.getEndKey(), range.isEndKeyInclusive()); + return new ScanBatch(results.results, true); + } + + } catch (IterationInterruptedException iie) { + sawException = true; + if (isClosed()) + throw new TabletClosedException(iie); + else + throw iie; + } catch (IOException ioe) { + if (shutdownInProgress()) { + log.debug("IOException while shutdown in progress ", ioe); + throw new TabletClosedException(ioe); // assume IOException was caused by execution of HDFS shutdown hook + } + + sawException = true; + dataSource.close(true); + throw ioe; + } catch (RuntimeException re) { + sawException = true; + throw re; + } finally { + // code in finally block because always want + // to return mapfiles, even when exception is thrown + if (!options.isolated) + dataSource.close(false); + else if (dataSource.fileManager != null) + dataSource.fileManager.detach(); + + synchronized (Tablet.this) { + if (results != null && results.results != null) { + long more = results.results.size(); + queryCount += more; + queryBytes += results.numBytes; + } + } + } + } + + // close and read are synchronized because can not call close on the data source while it is in use + // this cloud lead to the case where file iterators that are in use by a thread are returned + // to the pool... this would be bad + void close() { + options.interruptFlag.set(true); + synchronized (this) { + scanClosed = true; + if (isolatedDataSource != null) + isolatedDataSource.close(false); + } + } + } + + static class ScanOptions { + + // scan options + Authorizations authorizations; + byte[] defaultLabels; + Set columnSet; + List ssiList; + Map> ssio; + AtomicBoolean interruptFlag; + int num; + boolean isolated; + + ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set columnSet, List ssiList, + Map> ssio, AtomicBoolean interruptFlag, boolean isolated) { + this.num = num; + this.authorizations = authorizations; + this.defaultLabels = defaultLabels; + this.columnSet = columnSet; + this.ssiList = ssiList; + this.ssio = ssio; + this.interruptFlag = interruptFlag; + this.isolated = isolated; + } + + } + + class ScanDataSource implements DataSource { + + // data source state + private ScanFileManager fileManager; + private SortedKeyValueIterator iter; + private long expectedDeletionCount; + private List memIters = null; + private long fileReservationId; + private AtomicBoolean interruptFlag; + private StatsIterator statsIterator; + + ScanOptions options; + + ScanDataSource(Authorizations authorizations, byte[] defaultLabels, HashSet columnSet, List ssiList, Map> ssio, + AtomicBoolean interruptFlag) { + expectedDeletionCount = dataSourceDeletions.get(); + this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false); + this.interruptFlag = interruptFlag; + } + + ScanDataSource(ScanOptions options) { + expectedDeletionCount = dataSourceDeletions.get(); + this.options = options; + this.interruptFlag = options.interruptFlag; + } + + @Override + public DataSource getNewDataSource() { + if (!isCurrent()) { + // log.debug("Switching data sources during a scan"); + if (memIters != null) { + tabletMemory.returnIterators(memIters); + memIters = null; + datafileManager.returnFilesForScan(fileReservationId); + fileReservationId = -1; + } + + if (fileManager != null) + fileManager.releaseOpenFiles(false); + + expectedDeletionCount = dataSourceDeletions.get(); + iter = null; + + return this; + } else + return this; + } + + @Override + public boolean isCurrent() { + return expectedDeletionCount == dataSourceDeletions.get(); + } + + @Override + public SortedKeyValueIterator iterator() throws IOException { + if (iter == null) + iter = createIterator(); + return iter; + } + + private SortedKeyValueIterator createIterator() throws IOException { + + Map files; + + synchronized (Tablet.this) { + + if (memIters != null) + throw new IllegalStateException("Tried to create new scan iterator w/o releasing memory"); + + if (Tablet.this.closed) + throw new TabletClosedException(); + + if (interruptFlag.get()) + throw new IterationInterruptedException(extent.toString() + " " + interruptFlag.hashCode()); + + // only acquire the file manager when we know the tablet is open + if (fileManager == null) { + fileManager = tabletResources.newScanFileManager(); + activeScans.add(this); + } + + if (fileManager.getNumOpenFiles() != 0) + throw new IllegalStateException("Tried to create new scan iterator w/o releasing files"); + + // set this before trying to get iterators in case + // getIterators() throws an exception + expectedDeletionCount = dataSourceDeletions.get(); + + memIters = tabletMemory.getIterators(); + Pair> reservation = datafileManager.reserveFilesForScan(); + fileReservationId = reservation.getFirst(); + files = reservation.getSecond(); + } + + Collection mapfiles = fileManager.openFiles(files, options.isolated); + + List> iters = new ArrayList>(mapfiles.size() + memIters.size()); + + iters.addAll(mapfiles); + iters.addAll(memIters); + + for (SortedKeyValueIterator skvi : iters) + ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag); + + MultiIterator multiIter = new MultiIterator(iters, extent); + + TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, acuTableConf, fileManager, files); + + statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, scannedCount); + + DeletingIterator delIter = new DeletingIterator(statsIterator, false); + + ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); + + ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.columnSet); + + VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.authorizations, options.defaultLabels); + + return iterEnv.getTopLevelIterator(IteratorUtil + .loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.ssiList, options.ssio, iterEnv)); + } + + private void close(boolean sawErrors) { + + if (memIters != null) { + tabletMemory.returnIterators(memIters); + memIters = null; + datafileManager.returnFilesForScan(fileReservationId); + fileReservationId = -1; + } + + synchronized (Tablet.this) { + activeScans.remove(this); + if (activeScans.size() == 0) + Tablet.this.notifyAll(); + } + + if (fileManager != null) { + fileManager.releaseOpenFiles(sawErrors); + fileManager = null; + } + + if (statsIterator != null) { + statsIterator.report(); + } + + } + + public void interrupt() { + interruptFlag.set(true); + } + + @Override + public DataSource getDeepCopyDataSource(IteratorEnvironment env) { + throw new UnsupportedOperationException(); + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + throw new UnsupportedOperationException(); + } + + } + + private DataFileValue minorCompact(Configuration conf, VolumeManager fs, InMemoryMap memTable, FileRef tmpDatafile, FileRef newDatafile, FileRef mergeFile, + boolean hasQueueTime, long queued, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { + boolean failed = false; + long start = System.currentTimeMillis(); + timer.incrementStatusMinor(); + + long count = 0; + + try { + Span span = Trace.start("write"); + CompactionStats stats; + try { + count = memTable.getNumEntries(); + + DataFileValue dfv = null; + if (mergeFile != null) + dfv = datafileManager.getDatafileSizes().get(mergeFile); + + MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent, mincReason); + stats = compactor.call(); + } finally { + span.stop(); + } + span = Trace.start("bringOnline"); + try { + datafileManager.bringMinorCompactionOnline(tmpDatafile, newDatafile, mergeFile, new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), + commitSession, flushId); + } finally { + span.stop(); + } + return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()); + } catch (Exception E) { + failed = true; + throw new RuntimeException(E); + } catch (Error E) { + // Weird errors like "OutOfMemoryError" when trying to create the thread for the compaction + failed = true; + throw new RuntimeException(E); + } finally { + try { + tabletMemory.finalizeMinC(); + } catch (Throwable t) { + log.error("Failed to free tablet memory", t); + } + + if (!failed) { + lastMinorCompactionFinishTime = System.currentTimeMillis(); + } + if (tabletServer.mincMetrics.isEnabled()) + tabletServer.mincMetrics.add(TabletServerMinCMetrics.minc, (lastMinorCompactionFinishTime - start)); + if (hasQueueTime) { + timer.updateTime(Operation.MINOR, queued, start, count, failed); + if (tabletServer.mincMetrics.isEnabled()) + tabletServer.mincMetrics.add(TabletServerMinCMetrics.queue, (start - queued)); + } else + timer.updateTime(Operation.MINOR, start, count, failed); + } + } + + private class MinorCompactionTask implements Runnable { + + private long queued; + private CommitSession commitSession; + private DataFileValue stats; + private FileRef mergeFile; + private long flushId; + private MinorCompactionReason mincReason; + + MinorCompactionTask(FileRef mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { + queued = System.currentTimeMillis(); + minorCompactionWaitingToStart = true; + this.commitSession = commitSession; + this.mergeFile = mergeFile; + this.flushId = flushId; + this.mincReason = mincReason; + } + + @Override + public void run() { + minorCompactionWaitingToStart = false; + minorCompactionInProgress = true; + Span minorCompaction = Trace.on("minorCompaction"); + try { + FileRef newMapfileLocation = getNextMapFilename(mergeFile == null ? "F" : "M"); + FileRef tmpFileRef = new FileRef(newMapfileLocation.path() + "_tmp"); + Span span = Trace.start("waitForCommits"); + synchronized (Tablet.this) { + commitSession.waitForCommitsToFinish(); + } + span.stop(); + span = Trace.start("start"); + while (true) { + try { + // the purpose of the minor compaction start event is to keep track of the filename... in the case + // where the metadata table write for the minor compaction finishes and the process dies before + // writing the minor compaction finish event, then the start event+filename in metadata table will + // prevent recovery of duplicate data... the minor compaction start event could be written at any time + // before the metadata write for the minor compaction + tabletServer.minorCompactionStarted(commitSession, commitSession.getWALogSeq() + 1, newMapfileLocation.path().toString()); + break; + } catch (IOException e) { + log.warn("Failed to write to write ahead log " + e.getMessage(), e); + } + } + span.stop(); + span = Trace.start("compact"); + this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), tmpFileRef, newMapfileLocation, mergeFile, true, queued, commitSession, flushId, + mincReason); + span.stop(); + + if (needsSplit()) { + tabletServer.executeSplit(Tablet.this); + } else { + initiateMajorCompaction(MajorCompactionReason.NORMAL); + } + } catch (Throwable t) { + log.error("Unknown error during minor compaction for extent: " + getExtent(), t); + throw new RuntimeException(t); + } finally { + minorCompactionInProgress = false; + minorCompaction.data("extent", extent.toString()); + minorCompaction.data("numEntries", Long.toString(this.stats.getNumEntries())); + minorCompaction.data("size", Long.toString(this.stats.getSize())); + minorCompaction.stop(); + } + } + } + + private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) { + CommitSession oldCommitSession = tabletMemory.prepareForMinC(); + otherLogs = currentLogs; + currentLogs = new HashSet(); + + FileRef mergeFile = null; + if (mincReason != MinorCompactionReason.RECOVERY) { + mergeFile = datafileManager.reserveMergingMinorCompactionFile(); + } + + return new MinorCompactionTask(mergeFile, oldCommitSession, flushId, mincReason); + + } + + void flush(long tableFlushID) { + boolean updateMetadata = false; + boolean initiateMinor = false; + + try { + + synchronized (this) { + + // only want one thing at a time to update flush ID to ensure that metadata table and tablet in memory state are consistent + if (updatingFlushID) + return; + + if (lastFlushID >= tableFlushID) + return; + + if (closing || closed || tabletMemory.memoryReservedForMinC()) + return; + + if (tabletMemory.getMemTable().getNumEntries() == 0) { + lastFlushID = tableFlushID; + updatingFlushID = true; + updateMetadata = true; + } else + initiateMinor = true; + } + + if (updateMetadata) { + Credentials creds = SystemCredentials.get(); + // if multiple threads were allowed to update this outside of a sync block, then it would be + // a race condition + MetadataTableUtil.updateTabletFlushID(extent, tableFlushID, creds, tabletServer.getLock()); + } else if (initiateMinor) + initiateMinorCompaction(tableFlushID, MinorCompactionReason.USER); + + } finally { + if (updateMetadata) { + synchronized (this) { + updatingFlushID = false; + this.notifyAll(); + } + } + } + + } + + boolean initiateMinorCompaction(MinorCompactionReason mincReason) { + if (isClosed()) { + // don't bother trying to get flush id if closed... could be closed after this check but that is ok... just trying to cut down on uneeded log messages.... + return false; + } + + // get the flush id before the new memmap is made available for write + long flushId; + try { + flushId = getFlushID(); + } catch (NoNodeException e) { + log.info("Asked to initiate MinC when there was no flush id " + getExtent() + " " + e.getMessage()); + return false; + } + return initiateMinorCompaction(flushId, mincReason); + } + + boolean minorCompactNow(MinorCompactionReason mincReason) { + long flushId; + try { + flushId = getFlushID(); + } catch (NoNodeException e) { + log.info("Asked to initiate MinC when there was no flush id " + getExtent() + " " + e.getMessage()); + return false; + } + MinorCompactionTask mct = createMinorCompactionTask(flushId, mincReason); + if (mct == null) + return false; + mct.run(); + return true; + } + + boolean initiateMinorCompaction(long flushId, MinorCompactionReason mincReason) { + MinorCompactionTask mct = createMinorCompactionTask(flushId, mincReason); + if (mct == null) + return false; + tabletResources.executeMinorCompaction(mct); + return true; + } + + private MinorCompactionTask createMinorCompactionTask(long flushId, MinorCompactionReason mincReason) { + MinorCompactionTask mct; + long t1, t2; + + StringBuilder logMessage = null; + + try { + synchronized (this) { + t1 = System.currentTimeMillis(); + + if (closing || closed || majorCompactionWaitingToStart || tabletMemory.memoryReservedForMinC() || tabletMemory.getMemTable().getNumEntries() == 0 + || updatingFlushID) { + + logMessage = new StringBuilder(); + + logMessage.append(extent.toString()); + logMessage.append(" closing " + closing); + logMessage.append(" closed " + closed); + logMessage.append(" majorCompactionWaitingToStart " + majorCompactionWaitingToStart); + if (tabletMemory != null) + logMessage.append(" tabletMemory.memoryReservedForMinC() " + tabletMemory.memoryReservedForMinC()); + if (tabletMemory != null && tabletMemory.getMemTable() != null) + logMessage.append(" tabletMemory.getMemTable().getNumEntries() " + tabletMemory.getMemTable().getNumEntries()); + logMessage.append(" updatingFlushID " + updatingFlushID); + + return null; + } + + mct = prepareForMinC(flushId, mincReason); + t2 = System.currentTimeMillis(); + } + } finally { + // log outside of sync block + if (logMessage != null && log.isDebugEnabled()) + log.debug(logMessage); + } + + log.debug(String.format("MinC initiate lock %.2f secs", (t2 - t1) / 1000.0)); + return mct; + } + + long getFlushID() throws NoNodeException { + try { + String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId() + + Constants.ZTABLE_FLUSH_ID; + return Long.parseLong(new String(ZooReaderWriter.getInstance().getData(zTablePath, null), UTF_8)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (NumberFormatException nfe) { + throw new RuntimeException(nfe); + } catch (KeeperException ke) { + if (ke instanceof NoNodeException) { + throw (NoNodeException) ke; + } else { + throw new RuntimeException(ke); + } + } + } + + long getCompactionCancelID() { + String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId() + + Constants.ZTABLE_COMPACT_CANCEL_ID; + + try { + return Long.parseLong(new String(ZooReaderWriter.getInstance().getData(zTablePath, null), UTF_8)); + } catch (KeeperException e) { + throw new RuntimeException(e); + } catch (InterruptedException http://git-wip-us.apache.org/repos/asf/accumulo/blob/14f9f00e/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/BatchVerify.java ----------------------------------------------------------------------