Return-Path: X-Original-To: apmail-accumulo-dev-archive@www.apache.org Delivered-To: apmail-accumulo-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9621010B3C for ; Sat, 30 Nov 2013 22:20:22 +0000 (UTC) Received: (qmail 41714 invoked by uid 500); 30 Nov 2013 22:20:22 -0000 Delivered-To: apmail-accumulo-dev-archive@accumulo.apache.org Received: (qmail 41683 invoked by uid 500); 30 Nov 2013 22:20:22 -0000 Mailing-List: contact dev-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list dev@accumulo.apache.org Received: (qmail 41675 invoked by uid 99); 30 Nov 2013 22:20:22 -0000 Received: from minotaur.apache.org (HELO minotaur.apache.org) (140.211.11.9) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Nov 2013 22:20:22 +0000 Received: from localhost (HELO mail-la0-f53.google.com) (127.0.0.1) (smtp-auth username vines, mechanism plain) by minotaur.apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Nov 2013 22:20:21 +0000 Received: by mail-la0-f53.google.com with SMTP id ea20so7466316lab.40 for ; Sat, 30 Nov 2013 14:20:19 -0800 (PST) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=mime-version:reply-to:in-reply-to:references:from:date:message-id :subject:to:content-type; bh=vTWT2u0QKl8Asj4gmFwn35jpouHCCDCSKq7yCj+GHKk=; b=dEKxhmhcnN41IRuNae/Ivl4HB/OBr4aN29GFgxhwfSlw59bL3KOt7Ps92HLFU+IsTM X+fnhDadq84KY8fgivDY3KOoh/QJ0ZBCs8izyxqSQITAGTUo9NQB9Is2xZSB43rUJdwx vY2eMK3TYfL4KGyQ/1mUijUmjx/B1SrYdZRE72PNDM+f/KEmYjq+t8E771Qvh1CpsAgq TSXR1PbbYC2ty7wwOLHXQOA9CKZDfttwv2lpAy8yaYZY3F4At1J6f0HO+bLOv32UgBtX hpbiy+ioiAilwMdBwtM9Z/8Fwzqnm5tVjetg5RPl53yFWSkFhixtIAg7NDxLoLkHMfiD XGGg== X-Received: by 10.152.219.34 with SMTP id pl2mr1922497lac.27.1385850019281; Sat, 30 Nov 2013 14:20:19 -0800 (PST) MIME-Version: 1.0 Reply-To: vines@apache.org Received: by 10.114.99.99 with HTTP; Sat, 30 Nov 2013 14:19:39 -0800 (PST) In-Reply-To: <529A63B0.2060106@gmail.com> References: <8748a79cd42542a38f179318a574bdab@git.apache.org> <52997466.9030503@gmail.com> <529A63B0.2060106@gmail.com> From: John Vines Date: Sat, 30 Nov 2013 17:19:39 -0500 Message-ID: Subject: Re: [11/15] ACCUMULO-1940 do not expose a new tablet to the memory manager until after it is online To: Accumulo Dev List Content-Type: multipart/alternative; boundary=001a113372601c808204ec6c5aa9 --001a113372601c808204ec6c5aa9 Content-Type: text/plain; charset=ISO-8859-1 Perhaps after this release we should do another retro to do lessons learned, to help streamline releases/multiple versions even further in the future. On Sat, Nov 30, 2013 at 5:16 PM, Josh Elser wrote: > On 11/30/2013 5:06 PM, Eric Newton wrote: > >> There was a merge "conflict" in 1.5 or 1.6. There was an extra line of >> whitespace, or a line missing. >> > > Strange. Maybe John hit the nail on the head that the email notifications > for Git aren't always correct? > > It is annoying to maintain 1.4, 1.5, and the largely unnecessary 1.6 (just >> use master). >> >> However, I think this chore comes with software maturity and a larger user >> base. >> >> > I'd agree. I was mostly whining anyways. Having multiple versions in use > is better than not having people use anything. Perhaps the 1.6.0 and master > split is something we can revisit for the next release. If no one is > working on features for the "next" release while we test the "current" > release, there isn't a point in having two branches. > > >> >> On Sat, Nov 30, 2013 at 12:15 AM, Josh Elser >> wrote: >> >> Actually, I was kind of confused when I saw your commit*s* on this >>> ticket. >>> What did you actually do? You have two commits that do the same changes: >>> >>> 82477f08aa64e2a8a1cf7f6af0db5ce954801ac8 (in 1.4, 1.5 and 1.6) >>> 9b6b9cf104ff332cffdd4900d8057557e64e0ec8 (only in 1.6)\ >>> >>> I would've only expected to see one email with a diff, followed by 2 >>> "merge" emails, e.g. >>> >>> ---------------------------------------------------------------------- >>> .../tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java | >>> 2 -- >>> 1 file changed, 2 deletions(-) >>> ---------------------------------------------------------------------- >>> >>> Although, I will admit that dealing with 3 active branches is a big pain. >>> However, I don't know of a better way to handle this in a way that >>> doesn't >>> make Git super confused and thus limit us in being able to answer >>> questions >>> like "where was a problem introduced" (git-bisect) and "where does this >>> change exist" (and not having multiple commits that perform the same >>> changes). >>> >>> On 11/29/13, 8:31 PM, Eric Newton wrote: >>> >>> I changed one line of this file... git seems to be having a conniption. >>>> I >>>> find the volume of git traffic to be so useless that I ignore it. >>>> >>>> Anyone else? >>>> >>>> >>>> >>>> >>>> On Fri, Nov 29, 2013 at 1:24 PM, wrote: >>>> >>>> >>>> http://git-wip-us.apache.org/repos/asf/accumulo/blob/ >>>>> 9b6b9cf1/server/tserver/src/main/java/org/apache/accumulo/ >>>>> tserver/Tablet.java >>>>> ---------------------------------------------------------------------- >>>>> diff --cc >>>>> server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java >>>>> index ee3b243,0000000..fd76415 >>>>> mode 100644,000000..100644 >>>>> --- a/server/tserver/src/main/java/org/apache/accumulo/ >>>>> tserver/Tablet.java >>>>> +++ b/server/tserver/src/main/java/org/apache/accumulo/ >>>>> tserver/Tablet.java >>>>> @@@ -1,3868 -1,0 +1,3866 @@@ >>>>> +/* >>>>> + * Licensed to the Apache Software Foundation (ASF) under one or >>>>> more >>>>> + * contributor license agreements. See the NOTICE file distributed >>>>> with >>>>> + * this work for additional information regarding copyright >>>>> ownership. >>>>> + * The ASF licenses this file to You under the Apache License, >>>>> Version >>>>> 2.0 >>>>> + * (the "License"); you may not use this file except in compliance >>>>> with >>>>> + * the License. You may obtain a copy of the License at >>>>> + * >>>>> + * http://www.apache.org/licenses/LICENSE-2.0 >>>>> + * >>>>> + * Unless required by applicable law or agreed to in writing, >>>>> software >>>>> + * distributed under the License is distributed on an "AS IS" >>>>> BASIS, >>>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >>>>> implied. >>>>> + * See the License for the specific language governing permissions >>>>> and >>>>> + * limitations under the License. >>>>> + */ >>>>> +package org.apache.accumulo.tserver; >>>>> + >>>>> +import java.io.ByteArrayInputStream; >>>>> +import java.io.DataInputStream; >>>>> +import java.io.FileNotFoundException; >>>>> +import java.io.IOException; >>>>> +import java.util.ArrayList; >>>>> +import java.util.Arrays; >>>>> +import java.util.Collection; >>>>> +import java.util.Collections; >>>>> +import java.util.Comparator; >>>>> +import java.util.EnumSet; >>>>> +import java.util.HashMap; >>>>> +import java.util.HashSet; >>>>> +import java.util.Iterator; >>>>> +import java.util.List; >>>>> +import java.util.Map; >>>>> +import java.util.Map.Entry; >>>>> +import java.util.PriorityQueue; >>>>> +import java.util.Set; >>>>> +import java.util.SortedMap; >>>>> +import java.util.TreeMap; >>>>> +import java.util.TreeSet; >>>>> +import java.util.concurrent.atomic.AtomicBoolean; >>>>> +import java.util.concurrent.atomic.AtomicLong; >>>>> +import java.util.concurrent.atomic.AtomicReference; >>>>> +import java.util.concurrent.locks.ReentrantLock; >>>>> + >>>>> +import org.apache.accumulo.core.Constants; >>>>> +import org.apache.accumulo.core.client.Connector; >>>>> +import org.apache.accumulo.core.client.IteratorSetting; >>>>> +import org.apache.accumulo.core.client.impl.ScannerImpl; >>>>> +import org.apache.accumulo.core.conf.AccumuloConfiguration; >>>>> +import org.apache.accumulo.core.conf.ConfigurationCopy; >>>>> +import org.apache.accumulo.core.conf.ConfigurationObserver; >>>>> +import org.apache.accumulo.core.conf.Property; >>>>> +import org.apache.accumulo.core.constraints.Violations; >>>>> +import org.apache.accumulo.core.data.ByteSequence; >>>>> +import org.apache.accumulo.core.data.Column; >>>>> +import org.apache.accumulo.core.data.ColumnUpdate; >>>>> +import org.apache.accumulo.core.data.Key; >>>>> +import org.apache.accumulo.core.data.KeyExtent; >>>>> +import org.apache.accumulo.core.data.KeyValue; >>>>> +import org.apache.accumulo.core.data.Mutation; >>>>> +import org.apache.accumulo.core.data.Range; >>>>> +import org.apache.accumulo.core.data.Value; >>>>> +import org.apache.accumulo.core.data.thrift.IterInfo; >>>>> +import org.apache.accumulo.core.data.thrift.MapFileInfo; >>>>> +import org.apache.accumulo.core.file.FileOperations; >>>>> +import org.apache.accumulo.core.file.FileSKVIterator; >>>>> +import org.apache.accumulo.core.iterators. >>>>> IterationInterruptedException; >>>>> +import org.apache.accumulo.core.iterators.IteratorEnvironment; >>>>> +import org.apache.accumulo.core.iterators.IteratorUtil; >>>>> +import org.apache.accumulo.core.iterators.IteratorUtil. >>>>> IteratorScope; >>>>> +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; >>>>> +import >>>>> org.apache.accumulo.core.iterators.system. >>>>> ColumnFamilySkippingIterator; >>>>> +import org.apache.accumulo.core.iterators.system. >>>>> ColumnQualifierFilter; >>>>> +import org.apache.accumulo.core.iterators.system.DeletingIterator; >>>>> +import org.apache.accumulo.core.iterators.system. >>>>> InterruptibleIterator; >>>>> +import org.apache.accumulo.core.iterators.system.MultiIterator; >>>>> +import org.apache.accumulo.core.iterators.system. >>>>> SourceSwitchingIterator; >>>>> +import >>>>> org.apache.accumulo.core.iterators.system.SourceSwitchingIterator. >>>>> DataSource; >>>>> +import org.apache.accumulo.core.iterators.system.StatsIterator; >>>>> +import org.apache.accumulo.core.iterators.system.VisibilityFilter; >>>>> +import org.apache.accumulo.core.master.thrift.TabletLoadState; >>>>> +import org.apache.accumulo.core.metadata.MetadataTable; >>>>> +import org.apache.accumulo.core.metadata.RootTable; >>>>> +import org.apache.accumulo.core.metadata.schema.DataFileValue; >>>>> +import >>>>> org.apache.accumulo.core.metadata.schema. >>>>> MetadataSchema.TabletsSection; >>>>> +import >>>>> org.apache.accumulo.core.metadata.schema. >>>>> MetadataSchema.TabletsSection. >>>>> DataFileColumnFamily; >>>>> +import >>>>> org.apache.accumulo.core.metadata.schema. >>>>> MetadataSchema.TabletsSection. >>>>> LogColumnFamily; >>>>> +import >>>>> org.apache.accumulo.core.metadata.schema. >>>>> MetadataSchema.TabletsSection. >>>>> ScanFileColumnFamily; >>>>> +import org.apache.accumulo.core.security.Authorizations; >>>>> +import org.apache.accumulo.core.security.ColumnVisibility; >>>>> +import org.apache.accumulo.core.security.Credentials; >>>>> +import org.apache.accumulo.core.tabletserver.log.LogEntry; >>>>> +import org.apache.accumulo.core.util.CachedConfiguration; >>>>> +import org.apache.accumulo.core.util.LocalityGroupUtil; >>>>> +import >>>>> org.apache.accumulo.core.util.LocalityGroupUtil. >>>>> LocalityGroupConfigurationError; >>>>> +import org.apache.accumulo.core.util.MapCounter; >>>>> +import org.apache.accumulo.core.util.Pair; >>>>> +import org.apache.accumulo.core.util.UtilWaitThread; >>>>> +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; >>>>> +import org.apache.accumulo.server.ServerConstants; >>>>> +import org.apache.accumulo.server.client.HdfsZooInstance; >>>>> +import org.apache.accumulo.server.conf.TableConfiguration; >>>>> +import org.apache.accumulo.server.fs.FileRef; >>>>> +import org.apache.accumulo.server.fs.VolumeManager; >>>>> +import org.apache.accumulo.server.fs.VolumeManager.FileType; >>>>> +import org.apache.accumulo.server.fs.VolumeManagerImpl; >>>>> +import org.apache.accumulo.server.master.state.TServerInstance; >>>>> +import org.apache.accumulo.server.master.tableOps. >>>>> CompactionIterators; >>>>> +import org.apache.accumulo.server.problems.ProblemReport; >>>>> +import org.apache.accumulo.server.problems.ProblemReports; >>>>> +import org.apache.accumulo.server.problems.ProblemType; >>>>> +import org.apache.accumulo.server.security.SystemCredentials; >>>>> +import org.apache.accumulo.server.tablets.TabletTime; >>>>> +import org.apache.accumulo.server.tablets.UniqueNameAllocator; >>>>> +import org.apache.accumulo.server.util.FileUtil; >>>>> +import org.apache.accumulo.server.util.MasterMetadataUtil; >>>>> +import org.apache.accumulo.server.util.MetadataTableUtil; >>>>> +import org.apache.accumulo.server.util.TabletOperations; >>>>> +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; >>>>> +import org.apache.accumulo.start.classloader.vfs. >>>>> AccumuloVFSClassLoader; >>>>> +import org.apache.accumulo.trace.instrument.Span; >>>>> +import org.apache.accumulo.trace.instrument.Trace; >>>>> +import org.apache.accumulo.tserver.Compactor. >>>>> CompactionCanceledException; >>>>> +import org.apache.accumulo.tserver.Compactor.CompactionEnv; >>>>> +import org.apache.accumulo.tserver.FileManager.ScanFileManager; >>>>> +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; >>>>> +import org.apache.accumulo.tserver.TabletServer. >>>>> TservConstraintEnv; >>>>> +import >>>>> org.apache.accumulo.tserver.TabletServerResourceManager. >>>>> TabletResourceManager; >>>>> +import org.apache.accumulo.tserver.TabletStatsKeeper.Operation; >>>>> +import org.apache.accumulo.tserver.compaction.CompactionPlan; >>>>> +import org.apache.accumulo.tserver.compaction.CompactionStrategy; >>>>> +import org.apache.accumulo.tserver.compaction. >>>>> DefaultCompactionStrategy; >>>>> +import org.apache.accumulo.tserver.compaction. >>>>> MajorCompactionReason; >>>>> +import org.apache.accumulo.tserver.compaction. >>>>> MajorCompactionRequest; >>>>> +import org.apache.accumulo.tserver.compaction.WriteParameters; >>>>> +import org.apache.accumulo.tserver.constraints.ConstraintChecker; >>>>> +import org.apache.accumulo.tserver.log.DfsLogger; >>>>> +import org.apache.accumulo.tserver.log.MutationReceiver; >>>>> +import org.apache.accumulo.tserver.mastermessage. >>>>> TabletStatusMessage; >>>>> +import org.apache.accumulo.tserver.metrics. >>>>> TabletServerMinCMetrics; >>>>> +import org.apache.commons.codec.DecoderException; >>>>> +import org.apache.commons.codec.binary.Hex; >>>>> +import org.apache.hadoop.conf.Configuration; >>>>> +import org.apache.hadoop.fs.FileStatus; >>>>> +import org.apache.hadoop.fs.FileSystem; >>>>> +import org.apache.hadoop.fs.Path; >>>>> +import org.apache.hadoop.io.Text; >>>>> +import org.apache.log4j.Logger; >>>>> +import org.apache.zookeeper.KeeperException; >>>>> +import org.apache.zookeeper.KeeperException.NoNodeException; >>>>> + >>>>> +/* >>>>> + * We need to be able to have the master tell a tabletServer to >>>>> + * close this file, and the tablet server to handle all pending >>>>> client >>>>> reads >>>>> + * before closing >>>>> + * >>>>> + */ >>>>> + >>>>> +/** >>>>> + * >>>>> + * this class just provides an interface to read from a MapFile >>>>> mostly >>>>> takes care of reporting start and end keys >>>>> + * >>>>> + * need this because a single row extent can have multiple columns >>>>> this >>>>> manages all the columns (each handled by a store) for a single >>>>> row-extent >>>>> + * >>>>> + * >>>>> + */ >>>>> + >>>>> +public class Tablet { >>>>> + >>>>> + enum MinorCompactionReason { >>>>> + USER, SYSTEM, CLOSE >>>>> + } >>>>> + >>>>> + public class CommitSession { >>>>> + >>>>> + private int seq; >>>>> + private InMemoryMap memTable; >>>>> + private int commitsInProgress; >>>>> + private long maxCommittedTime = Long.MIN_VALUE; >>>>> + >>>>> + private CommitSession(int seq, InMemoryMap imm) { >>>>> + this.seq = seq; >>>>> + this.memTable = imm; >>>>> + commitsInProgress = 0; >>>>> + } >>>>> + >>>>> + public int getWALogSeq() { >>>>> + return seq; >>>>> + } >>>>> + >>>>> + private void decrementCommitsInProgress() { >>>>> + if (commitsInProgress < 1) >>>>> + throw new IllegalStateException("commitsInProgress = " + >>>>> commitsInProgress); >>>>> + >>>>> + commitsInProgress--; >>>>> + if (commitsInProgress == 0) >>>>> + Tablet.this.notifyAll(); >>>>> + } >>>>> + >>>>> + private void incrementCommitsInProgress() { >>>>> + if (commitsInProgress < 0) >>>>> + throw new IllegalStateException("commitsInProgress = " + >>>>> commitsInProgress); >>>>> + >>>>> + commitsInProgress++; >>>>> + } >>>>> + >>>>> + private void waitForCommitsToFinish() { >>>>> + while (commitsInProgress > 0) { >>>>> + try { >>>>> + Tablet.this.wait(50); >>>>> + } catch (InterruptedException e) { >>>>> + log.warn(e, e); >>>>> + } >>>>> + } >>>>> + } >>>>> + >>>>> + public void abortCommit(List value) { >>>>> + Tablet.this.abortCommit(this, value); >>>>> + } >>>>> + >>>>> + public void commit(List mutations) { >>>>> + Tablet.this.commit(this, mutations); >>>>> + } >>>>> + >>>>> + public Tablet getTablet() { >>>>> + return Tablet.this; >>>>> + } >>>>> + >>>>> + public boolean beginUpdatingLogsUsed(ArrayList >>>>> copy, >>>>> boolean mincFinish) { >>>>> + return Tablet.this.beginUpdatingLogsUsed(memTable, copy, >>>>> mincFinish); >>>>> + } >>>>> + >>>>> + public void finishUpdatingLogsUsed() { >>>>> + Tablet.this.finishUpdatingLogsUsed(); >>>>> + } >>>>> + >>>>> + public int getLogId() { >>>>> + return logId; >>>>> + } >>>>> + >>>>> + public KeyExtent getExtent() { >>>>> + return extent; >>>>> + } >>>>> + >>>>> + private void updateMaxCommittedTime(long time) { >>>>> + maxCommittedTime = Math.max(time, maxCommittedTime); >>>>> + } >>>>> + >>>>> + private long getMaxCommittedTime() { >>>>> + if (maxCommittedTime == Long.MIN_VALUE) >>>>> + throw new IllegalStateException("Tried to read max >>>>> committed >>>>> time when it was never set"); >>>>> + return maxCommittedTime; >>>>> + } >>>>> + >>>>> + } >>>>> + >>>>> + private class TabletMemory { >>>>> + private InMemoryMap memTable; >>>>> + private InMemoryMap otherMemTable; >>>>> + private InMemoryMap deletingMemTable; >>>>> + private int nextSeq = 1; >>>>> + private CommitSession commitSession; >>>>> + >>>>> + TabletMemory() { >>>>> + try { >>>>> + memTable = new InMemoryMap(acuTableConf); >>>>> + } catch (LocalityGroupConfigurationError e) { >>>>> + throw new RuntimeException(e); >>>>> + } >>>>> + commitSession = new CommitSession(nextSeq, memTable); >>>>> + nextSeq += 2; >>>>> + } >>>>> + >>>>> + InMemoryMap getMemTable() { >>>>> + return memTable; >>>>> + } >>>>> + >>>>> + InMemoryMap getMinCMemTable() { >>>>> + return otherMemTable; >>>>> + } >>>>> + >>>>> + CommitSession prepareForMinC() { >>>>> + if (otherMemTable != null) { >>>>> + throw new IllegalStateException(); >>>>> + } >>>>> + >>>>> + if (deletingMemTable != null) { >>>>> + throw new IllegalStateException(); >>>>> + } >>>>> + >>>>> + otherMemTable = memTable; >>>>> + try { >>>>> + memTable = new InMemoryMap(acuTableConf); >>>>> + } catch (LocalityGroupConfigurationError e) { >>>>> + throw new RuntimeException(e); >>>>> + } >>>>> + >>>>> + CommitSession oldCommitSession = commitSession; >>>>> + commitSession = new CommitSession(nextSeq, memTable); >>>>> + nextSeq += 2; >>>>> + >>>>> + >>>>> tabletResources.updateMemoryUsageStats( >>>>> memTable.estimatedSizeInBytes( >>>>> ), >>>>> otherMemTable.estimatedSizeInBytes()); >>>>> + >>>>> + return oldCommitSession; >>>>> + } >>>>> + >>>>> + void finishedMinC() { >>>>> + >>>>> + if (otherMemTable == null) { >>>>> + throw new IllegalStateException(); >>>>> + } >>>>> + >>>>> + if (deletingMemTable != null) { >>>>> + throw new IllegalStateException(); >>>>> + } >>>>> + >>>>> + deletingMemTable = otherMemTable; >>>>> + >>>>> + otherMemTable = null; >>>>> + Tablet.this.notifyAll(); >>>>> + } >>>>> + >>>>> + void finalizeMinC() { >>>>> + try { >>>>> + deletingMemTable.delete(15000); >>>>> + } finally { >>>>> + synchronized (Tablet.this) { >>>>> + if (otherMemTable != null) { >>>>> + throw new IllegalStateException(); >>>>> + } >>>>> + >>>>> + if (deletingMemTable == null) { >>>>> + throw new IllegalStateException(); >>>>> + } >>>>> + >>>>> + deletingMemTable = null; >>>>> + >>>>> + >>>>> tabletResources.updateMemoryUsageStats( >>>>> memTable.estimatedSizeInBytes(), >>>>> 0); >>>>> + } >>>>> + } >>>>> + } >>>>> + >>>>> + boolean memoryReservedForMinC() { >>>>> + return otherMemTable != null || deletingMemTable != null; >>>>> + } >>>>> + >>>>> + void waitForMinC() { >>>>> + while (otherMemTable != null || deletingMemTable != null) { >>>>> + try { >>>>> + Tablet.this.wait(50); >>>>> + } catch (InterruptedException e) { >>>>> + log.warn(e, e); >>>>> + } >>>>> + } >>>>> + } >>>>> + >>>>> + void mutate(CommitSession cm, List mutations) { >>>>> + cm.memTable.mutate(mutations); >>>>> + } >>>>> + >>>>> + void updateMemoryUsageStats() { >>>>> + long other = 0; >>>>> + if (otherMemTable != null) >>>>> + other = otherMemTable.estimatedSizeInBytes(); >>>>> + else if (deletingMemTable != null) >>>>> + other = deletingMemTable.estimatedSizeInBytes(); >>>>> + >>>>> + >>>>> tabletResources.updateMemoryUsageStats( >>>>> memTable.estimatedSizeInBytes( >>>>> ), >>>>> other); >>>>> + } >>>>> + >>>>> + List getIterators() { >>>>> + List toReturn = new >>>>> ArrayList(2); >>>>> + toReturn.add(memTable.skvIterator()); >>>>> + if (otherMemTable != null) >>>>> + toReturn.add(otherMemTable.skvIterator()); >>>>> + return toReturn; >>>>> + } >>>>> + >>>>> + void returnIterators(List iters) { >>>>> + for (MemoryIterator iter : iters) { >>>>> + iter.close(); >>>>> + } >>>>> + } >>>>> + >>>>> + public long getNumEntries() { >>>>> + if (otherMemTable != null) >>>>> + return memTable.getNumEntries() + >>>>> otherMemTable.getNumEntries(); >>>>> + return memTable.getNumEntries(); >>>>> + } >>>>> + >>>>> + CommitSession getCommitSession() { >>>>> + return commitSession; >>>>> + } >>>>> + } >>>>> + >>>>> + private TabletMemory tabletMemory; >>>>> + >>>>> + private final TabletTime tabletTime; >>>>> + private long persistedTime; >>>>> + private final Object timeLock = new Object(); >>>>> + >>>>> + private final Path location; // absolute path of this tablets dir >>>>> + private TServerInstance lastLocation; >>>>> + >>>>> + private Configuration conf; >>>>> + private VolumeManager fs; >>>>> + >>>>> + private TableConfiguration acuTableConf; >>>>> + >>>>> + private volatile boolean tableDirChecked = false; >>>>> + >>>>> + private AtomicLong dataSourceDeletions = new AtomicLong(0); >>>>> + private Set activeScans = new >>>>> HashSet(); >>>>> + >>>>> + private volatile boolean closing = false; >>>>> + private boolean closed = false; >>>>> + private boolean closeComplete = false; >>>>> + >>>>> + private long lastFlushID = -1; >>>>> + private long lastCompactID = -1; >>>>> + >>>>> + private KeyExtent extent; >>>>> + >>>>> + private TabletResourceManager tabletResources; >>>>> + final private DatafileManager datafileManager; >>>>> + private volatile boolean majorCompactionInProgress = false; >>>>> + private volatile boolean majorCompactionWaitingToStart = false; >>>>> + private Set majorCompactionQueued = >>>>> Collections.synchronizedSet(EnumSet.noneOf( >>>>> MajorCompactionReason.class)); >>>>> + private volatile boolean minorCompactionInProgress = false; >>>>> + private volatile boolean minorCompactionWaitingToStart = false; >>>>> + >>>>> + private boolean updatingFlushID = false; >>>>> + >>>>> + private AtomicReference constraintChecker = >>>>> new >>>>> AtomicReference(); >>>>> + >>>>> + private final String tabletDirectory; >>>>> + >>>>> + private int writesInProgress = 0; >>>>> + >>>>> + private static final Logger log = Logger.getLogger(Tablet.class) >>>>> ; >>>>> + public TabletStatsKeeper timer; >>>>> + >>>>> + private Rate queryRate = new Rate(0.2); >>>>> + private long queryCount = 0; >>>>> + >>>>> + private Rate queryByteRate = new Rate(0.2); >>>>> + private long queryBytes = 0; >>>>> + >>>>> + private Rate ingestRate = new Rate(0.2); >>>>> + private long ingestCount = 0; >>>>> + >>>>> + private Rate ingestByteRate = new Rate(0.2); >>>>> + private long ingestBytes = 0; >>>>> + >>>>> + private byte[] defaultSecurityLabel = new byte[0]; >>>>> + >>>>> + private long lastMinorCompactionFinishTime; >>>>> + private long lastMapFileImportTime; >>>>> + >>>>> + private volatile long numEntries; >>>>> + private volatile long numEntriesInMemory; >>>>> + >>>>> + // a count of the amount of data read by the iterators >>>>> + private AtomicLong scannedCount = new AtomicLong(0); >>>>> + private Rate scannedRate = new Rate(0.2); >>>>> + >>>>> + private ConfigurationObserver configObserver; >>>>> + >>>>> + private TabletServer tabletServer; >>>>> + >>>>> + private final int logId; >>>>> + // ensure we only have one reader/writer of our bulk file notes >>>>> at >>>>> at >>>>> time >>>>> + public final Object bulkFileImportLock = new Object(); >>>>> + >>>>> + public int getLogId() { >>>>> + return logId; >>>>> + } >>>>> + >>>>> + public static class TabletClosedException extends >>>>> RuntimeException { >>>>> + public TabletClosedException(Exception e) { >>>>> + super(e); >>>>> + } >>>>> + >>>>> + public TabletClosedException() { >>>>> + super(); >>>>> + } >>>>> + >>>>> + private static final long serialVersionUID = 1L; >>>>> + } >>>>> + >>>>> + FileRef getNextMapFilename(String prefix) throws IOException { >>>>> + String extension = >>>>> FileOperations.getNewFileExtension(tabletServer. >>>>> getTableConfiguration(extent)); >>>>> + checkTabletDir(); >>>>> + return new FileRef(location.toString() + "/" + prefix + >>>>> UniqueNameAllocator.getInstance().getNextName() + "." + extension); >>>>> + } >>>>> + >>>>> + private void checkTabletDir() throws IOException { >>>>> + if (!tableDirChecked) { >>>>> + checkTabletDir(this.location); >>>>> + tableDirChecked = true; >>>>> + } >>>>> + } >>>>> + >>>>> + private void checkTabletDir(Path tabletDir) throws IOException { >>>>> + >>>>> + FileStatus[] files = null; >>>>> + try { >>>>> + files = fs.listStatus(tabletDir); >>>>> + } catch (FileNotFoundException ex) { >>>>> + // ignored >>>>> + } >>>>> + >>>>> + if (files == null) { >>>>> + if (tabletDir.getName().startsWith("c-")) >>>>> + log.debug("Tablet " + extent + " had no dir, creating " + >>>>> tabletDir); // its a clone dir... >>>>> + else >>>>> + log.warn("Tablet " + extent + " had no dir, creating " + >>>>> tabletDir); >>>>> + >>>>> + fs.mkdirs(tabletDir); >>>>> + } >>>>> + } >>>>> + >>>>> + class DatafileManager { >>>>> + // access to datafilesizes needs to be synchronized: see >>>>> CompactionRunner#getNumFiles >>>>> + final private Map datafileSizes = >>>>> Collections.synchronizedMap(new TreeMap()); >>>>> + >>>>> + DatafileManager(SortedMap >>>>> datafileSizes) { >>>>> + for (Entry datafiles : >>>>> datafileSizes.entrySet()) >>>>> + this.datafileSizes.put(datafiles.getKey(), >>>>> datafiles.getValue()); >>>>> + } >>>>> + >>>>> + FileRef mergingMinorCompactionFile = null; >>>>> + Set filesToDeleteAfterScan = new HashSet(); >>>>> + Map> scanFileReservations = new >>>>> HashMap>(); >>>>> + MapCounter fileScanReferenceCounts = new >>>>> MapCounter(); >>>>> + long nextScanReservationId = 0; >>>>> + boolean reservationsBlocked = false; >>>>> + >>>>> + Set majorCompactingFiles = new HashSet(); >>>>> + >>>>> + Pair> reserveFilesForScan() { >>>>> + synchronized (Tablet.this) { >>>>> + >>>>> + while (reservationsBlocked) { >>>>> + try { >>>>> + Tablet.this.wait(50); >>>>> + } catch (InterruptedException e) { >>>>> + log.warn(e, e); >>>>> + } >>>>> + } >>>>> + >>>>> + Set absFilePaths = new >>>>> HashSet(datafileSizes.keySet()); >>>>> + >>>>> + long rid = nextScanReservationId++; >>>>> + >>>>> + scanFileReservations.put(rid, absFilePaths); >>>>> + >>>>> + Map ret = new >>>>> HashMap(); >>>>> + >>>>> + for (FileRef path : absFilePaths) { >>>>> + fileScanReferenceCounts.increment(path, 1); >>>>> + ret.put(path, datafileSizes.get(path)); >>>>> + } >>>>> + >>>>> + return new Pair>(rid, >>>>> ret); >>>>> + } >>>>> + } >>>>> + >>>>> + void returnFilesForScan(Long reservationId) { >>>>> + >>>>> + final Set filesToDelete = new HashSet(); >>>>> + >>>>> + synchronized (Tablet.this) { >>>>> + Set absFilePaths = >>>>> scanFileReservations.remove(reservationId); >>>>> + >>>>> + if (absFilePaths == null) >>>>> + throw new IllegalArgumentException("Unknown scan >>>>> reservation >>>>> id " + reservationId); >>>>> + >>>>> + boolean notify = false; >>>>> + for (FileRef path : absFilePaths) { >>>>> + long refCount = fileScanReferenceCounts.decrement(path, >>>>> 1); >>>>> + if (refCount == 0) { >>>>> + if (filesToDeleteAfterScan.remove(path)) >>>>> + filesToDelete.add(path); >>>>> + notify = true; >>>>> + } else if (refCount < 0) >>>>> + throw new IllegalStateException("Scan ref count for " + >>>>> path >>>>> + " is " + refCount); >>>>> + } >>>>> + >>>>> + if (notify) >>>>> + Tablet.this.notifyAll(); >>>>> + } >>>>> + >>>>> + if (filesToDelete.size() > 0) { >>>>> + log.debug("Removing scan refs from metadata " + extent + " >>>>> " + >>>>> filesToDelete); >>>>> + MetadataTableUtil.removeScanFiles(extent, filesToDelete, >>>>> SystemCredentials.get(), tabletServer.getLock()); >>>>> + } >>>>> + } >>>>> + >>>>> + private void removeFilesAfterScan(Set scanFiles) { >>>>> + if (scanFiles.size() == 0) >>>>> + return; >>>>> + >>>>> + Set filesToDelete = new HashSet(); >>>>> + >>>>> + synchronized (Tablet.this) { >>>>> + for (FileRef path : scanFiles) { >>>>> + if (fileScanReferenceCounts.get(path) == 0) >>>>> + filesToDelete.add(path); >>>>> + else >>>>> + filesToDeleteAfterScan.add(path); >>>>> + } >>>>> + } >>>>> + >>>>> + if (filesToDelete.size() > 0) { >>>>> + log.debug("Removing scan refs from metadata " + extent + " >>>>> " + >>>>> filesToDelete); >>>>> + MetadataTableUtil.removeScanFiles(extent, filesToDelete, >>>>> SystemCredentials.get(), tabletServer.getLock()); >>>>> + } >>>>> + } >>>>> + >>>>> + private TreeSet waitForScansToFinish(Set >>>>> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) { >>>>> + long startTime = System.currentTimeMillis(); >>>>> + TreeSet inUse = new TreeSet(); >>>>> + >>>>> + Span waitForScans = Trace.start("waitForScans"); >>>>> + try { >>>>> + synchronized (Tablet.this) { >>>>> + if (blockNewScans) { >>>>> + if (reservationsBlocked) >>>>> + throw new IllegalStateException(); >>>>> + >>>>> + reservationsBlocked = true; >>>>> + } >>>>> + >>>>> + for (FileRef path : pathsToWaitFor) { >>>>> + while (fileScanReferenceCounts.get(path) > 0 && >>>>> System.currentTimeMillis() - startTime < maxWaitTime) { >>>>> + try { >>>>> + Tablet.this.wait(100); >>>>> + } catch (InterruptedException e) { >>>>> + log.warn(e, e); >>>>> + } >>>>> + } >>>>> + } >>>>> + >>>>> + for (FileRef path : pathsToWaitFor) { >>>>> + if (fileScanReferenceCounts.get(path) > 0) >>>>> + inUse.add(path); >>>>> + } >>>>> + >>>>> + if (blockNewScans) { >>>>> + reservationsBlocked = false; >>>>> + Tablet.this.notifyAll(); >>>>> + } >>>>> + >>>>> + } >>>>> + } finally { >>>>> + waitForScans.stop(); >>>>> + } >>>>> + return inUse; >>>>> + } >>>>> + >>>>> + public void importMapFiles(long tid, Map >>>>> pathsString, boolean setTime) throws IOException { >>>>> + >>>>> + String bulkDir = null; >>>>> + >>>>> + Map paths = new >>>>> HashMap(); >>>>> + for (Entry entry : >>>>> pathsString.entrySet()) >>>>> + paths.put(entry.getKey(), entry.getValue()); >>>>> + >>>>> + for (FileRef tpath : paths.keySet()) { >>>>> + >>>>> + boolean inTheRightDirectory = false; >>>>> + Path parent = tpath.path().getParent().getParent(); >>>>> + for (String tablesDir : ServerConstants.getTablesDirs()) { >>>>> + if (parent.equals(new Path(tablesDir, >>>>> extent.getTableId().toString()))) { >>>>> + inTheRightDirectory = true; >>>>> + break; >>>>> + } >>>>> + } >>>>> + if (!inTheRightDirectory) { >>>>> + throw new IOException("Data file " + tpath + " not in >>>>> table >>>>> dirs"); >>>>> + } >>>>> + >>>>> + if (bulkDir == null) >>>>> + bulkDir = tpath.path().getParent().toString(); >>>>> + else if (!bulkDir.equals(tpath.path(). >>>>> getParent().toString())) >>>>> + throw new IllegalArgumentException("bulk files in >>>>> different >>>>> dirs " + bulkDir + " " + tpath); >>>>> + >>>>> + } >>>>> + >>>>> + if (extent.isRootTablet()) { >>>>> + throw new IllegalArgumentException("Can not import files to >>>>> root >>>>> tablet"); >>>>> + } >>>>> + >>>>> + synchronized (bulkFileImportLock) { >>>>> + Credentials creds = SystemCredentials.get(); >>>>> + Connector conn; >>>>> + try { >>>>> + conn = >>>>> HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), >>>>> creds.getToken()); >>>>> + } catch (Exception ex) { >>>>> + throw new IOException(ex); >>>>> + } >>>>> + // Remove any bulk files we've previously loaded and >>>>> compacted >>>>> away >>>>> + List files = MetadataTableUtil. >>>>> getBulkFilesLoaded(conn, >>>>> extent, tid); >>>>> + >>>>> + for (FileRef file : files) >>>>> + if (paths.keySet().remove(file.path())) >>>>> + log.debug("Ignoring request to re-import a file already >>>>> imported: " + extent + ": " + file); >>>>> + >>>>> + if (paths.size() > 0) { >>>>> + long bulkTime = Long.MIN_VALUE; >>>>> + if (setTime) { >>>>> + for (DataFileValue dfv : paths.values()) { >>>>> + long nextTime = tabletTime.getAndUpdateTime(); >>>>> + if (nextTime < bulkTime) >>>>> + throw new IllegalStateException("Time went >>>>> backwards >>>>> unexpectedly " + nextTime + " " + bulkTime); >>>>> + bulkTime = nextTime; >>>>> + dfv.setTime(bulkTime); >>>>> + } >>>>> + } >>>>> + >>>>> + synchronized (timeLock) { >>>>> + if (bulkTime > persistedTime) >>>>> + persistedTime = bulkTime; >>>>> + >>>>> + MetadataTableUtil.updateTabletDataFile(tid, extent, >>>>> paths, >>>>> tabletTime.getMetadataValue(persistedTime), creds, >>>>> tabletServer.getLock()); >>>>> + } >>>>> + } >>>>> + } >>>>> + >>>>> + synchronized (Tablet.this) { >>>>> + for (Entry tpath : >>>>> paths.entrySet()) { >>>>> + if (datafileSizes.containsKey(tpath.getKey())) { >>>>> + log.error("Adding file that is already in set " + >>>>> tpath.getKey()); >>>>> + } >>>>> + datafileSizes.put(tpath.getKey(), tpath.getValue()); >>>>> + >>>>> + } >>>>> + >>>>> + tabletResources.importedMapFiles(); >>>>> + >>>>> + computeNumEntries(); >>>>> + } >>>>> + >>>>> + for (FileRef tpath : paths.keySet()) { >>>>> + log.log(TLevel.TABLET_HIST, extent + " import " + tpath + >>>>> " " >>>>> + >>>>> paths.get(tpath)); >>>>> + } >>>>> + } >>>>> + >>>>> + FileRef reserveMergingMinorCompactionFile() { >>>>> + if (mergingMinorCompactionFile != null) >>>>> + throw new IllegalStateException("Tried to reserve merging >>>>> minor >>>>> compaction file when already reserved : " + >>>>> mergingMinorCompactionFile); >>>>> + >>>>> + if (extent.isRootTablet()) >>>>> + return null; >>>>> + >>>>> + int maxFiles = acuTableConf.getMaxFilesPerTablet(); >>>>> + >>>>> + // when a major compaction is running and we are at max >>>>> files, >>>>> write out >>>>> + // one extra file... want to avoid the case where major >>>>> compaction >>>>> is >>>>> + // compacting everything except for the largest file, and >>>>> therefore the >>>>> + // largest file is returned for merging.. the following check >>>>> mostly >>>>> + // avoids this case, except for the case where major >>>>> compactions >>>>> fail or >>>>> + // are canceled >>>>> + if (majorCompactingFiles.size() > 0 && datafileSizes.size() >>>>> == >>>>> maxFiles) >>>>> + return null; >>>>> + >>>>> + if (datafileSizes.size() >= maxFiles) { >>>>> + // find the smallest file >>>>> + >>>>> + long min = Long.MAX_VALUE; >>>>> + FileRef minName = null; >>>>> + >>>>> + for (Entry entry : >>>>> datafileSizes.entrySet()) { >>>>> + if (entry.getValue().getSize() < min && >>>>> !majorCompactingFiles.contains(entry.getKey())) { >>>>> + min = entry.getValue().getSize(); >>>>> + minName = entry.getKey(); >>>>> + } >>>>> + } >>>>> + >>>>> + if (minName == null) >>>>> + return null; >>>>> + >>>>> + mergingMinorCompactionFile = minName; >>>>> + return minName; >>>>> + } >>>>> + >>>>> + return null; >>>>> + } >>>>> + >>>>> + void unreserveMergingMinorCompactionFile(FileRef file) { >>>>> + if ((file == null && mergingMinorCompactionFile != null) || >>>>> (file >>>>> != null && mergingMinorCompactionFile == null) >>>>> + || (file != null && mergingMinorCompactionFile != null && >>>>> !file.equals(mergingMinorCompactionFile))) >>>>> + throw new IllegalStateException("Disagreement " + file + >>>>> " " >>>>> + >>>>> mergingMinorCompactionFile); >>>>> + >>>>> + mergingMinorCompactionFile = null; >>>>> + } >>>>> + >>>>> + void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef >>>>> newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession >>>>> commitSession, long flushId) >>>>> + throws IOException { >>>>> + >>>>> + IZooReaderWriter zoo = ZooReaderWriter. >>>>> getRetryingInstance(); >>>>> + if (extent.isRootTablet()) { >>>>> + try { >>>>> + if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) >>>>> { >>>>> + throw new IllegalStateException(); >>>>> + } >>>>> + } catch (Exception e) { >>>>> + throw new IllegalStateException("Can not bring major >>>>> compaction online, lock not held", e); >>>>> + } >>>>> + } >>>>> + >>>>> + // rename before putting in metadata table, so files in >>>>> metadata >>>>> table should >>>>> + // always exist >>>>> + do { >>>>> + try { >>>>> + if (dfv.getNumEntries() == 0) { >>>>> + fs.deleteRecursively(tmpDatafile.path()); >>>>> + } else { >>>>> + if (fs.exists(newDatafile.path())) { >>>>> + log.warn("Target map file already exist " + >>>>> newDatafile); >>>>> + fs.deleteRecursively(newDatafile.path()); >>>>> + } >>>>> + >>>>> + if (!fs.rename(tmpDatafile.path(), >>>>> newDatafile.path())) { >>>>> + throw new IOException("rename fails"); >>>>> + } >>>>> + } >>>>> + break; >>>>> + } catch (IOException ioe) { >>>>> + log.warn("Tablet " + extent + " failed to rename " + >>>>> newDatafile + " after MinC, will retry in 60 secs...", ioe); >>>>> + UtilWaitThread.sleep(60 * 1000); >>>>> + } >>>>> + } while (true); >>>>> + >>>>> + long t1, t2; >>>>> + >>>>> + // the code below always assumes merged files are in use by >>>>> scans... this must be done >>>>> + // because the in memory list of files is not updated until >>>>> after >>>>> the metadata table >>>>> + // therefore the file is available to scans until memory is >>>>> updated, but want to ensure >>>>> + // the file is not available for garbage collection... if >>>>> memory >>>>> were updated >>>>> + // before this point (like major compactions do), then the >>>>> following code could wait >>>>> + // for scans to finish like major compactions do.... used to >>>>> wait >>>>> for scans to finish >>>>> + // here, but that was incorrect because a scan could start >>>>> after >>>>> waiting but before >>>>> + // memory was updated... assuming the file is always in use >>>>> by >>>>> scans leads to >>>>> + // one uneeded metadata update when it was not actually in >>>>> use >>>>> + Set filesInUseByScans = Collections.emptySet(); >>>>> + if (absMergeFile != null) >>>>> + filesInUseByScans = Collections.singleton(absMergeFile); >>>>> + >>>>> + // very important to write delete entries outside of log >>>>> lock, >>>>> because >>>>> + // this !METADATA write does not go up... it goes sideways >>>>> or to >>>>> itself >>>>> + if (absMergeFile != null) >>>>> + MetadataTableUtil.addDeleteEntries(extent, >>>>> Collections.singleton(absMergeFile), SystemCredentials.get()); >>>>> + >>>>> + Set unusedWalLogs = beginClearingUnusedLogs(); >>>>> + try { >>>>> + // the order of writing to !METADATA and walog is >>>>> important in >>>>> the face of machine/process failures >>>>> + // need to write to !METADATA before writing to walog, when >>>>> things are done in the reverse order >>>>> + // data could be lost... the minor compaction start even >>>>> should >>>>> be written before the following metadata >>>>> + // write is made >>>>> + >>>>> + synchronized (timeLock) { >>>>> + if (commitSession.getMaxCommittedTime() > persistedTime) >>>>> + persistedTime = commitSession.getMaxCommittedTime(); >>>>> + >>>>> + String time = tabletTime.getMetadataValue( >>>>> persistedTime); >>>>> + MasterMetadataUtil.updateTabletDataFile(extent, >>>>> newDatafile, >>>>> absMergeFile, dfv, time, SystemCredentials.get(), filesInUseByScans, >>>>> + tabletServer.getClientAddressString(), >>>>> tabletServer.getLock(), unusedWalLogs, lastLocation, flushId); >>>>> + } >>>>> + >>>>> + } finally { >>>>> + finishClearingUnusedLogs(); >>>>> + } >>>>> + >>>>> + do { >>>>> + try { >>>>> + // the purpose of making this update use the new commit >>>>> session, instead of the old one passed in, >>>>> + // is because the new one will reference the logs used by >>>>> current memory... >>>>> + >>>>> + >>>>> tabletServer.minorCompactionFinished(tabletMemory.getCommitSession( >>>>> ), >>>>> newDatafile.toString(), commitSession.getWALogSeq() + 2); >>>>> + break; >>>>> + } catch (IOException e) { >>>>> + log.error("Failed to write to write-ahead log " + >>>>> e.getMessage() + " will retry", e); >>>>> + UtilWaitThread.sleep(1 * 1000); >>>>> + } >>>>> + } while (true); >>>>> + >>>>> + synchronized (Tablet.this) { >>>>> + lastLocation = null; >>>>> + >>>>> + t1 = System.currentTimeMillis(); >>>>> + if (datafileSizes.containsKey(newDatafile)) { >>>>> + log.error("Adding file that is already in set " + >>>>> newDatafile); >>>>> + } >>>>> + >>>>> + if (dfv.getNumEntries() > 0) { >>>>> + datafileSizes.put(newDatafile, dfv); >>>>> + } >>>>> + >>>>> + if (absMergeFile != null) { >>>>> + datafileSizes.remove(absMergeFile); >>>>> + } >>>>> + >>>>> + unreserveMergingMinorCompactionFile(absMergeFile); >>>>> + >>>>> + dataSourceDeletions.incrementAndGet(); >>>>> + tabletMemory.finishedMinC(); >>>>> + >>>>> + lastFlushID = flushId; >>>>> + >>>>> + computeNumEntries(); >>>>> + t2 = System.currentTimeMillis(); >>>>> + } >>>>> + >>>>> + // must do this after list of files in memory is updated >>>>> above >>>>> + removeFilesAfterScan(filesInUseByScans); >>>>> + >>>>> + if (absMergeFile != null) >>>>> + log.log(TLevel.TABLET_HIST, extent + " MinC [" + >>>>> absMergeFile >>>>> + >>>>> ",memory] -> " + newDatafile); >>>>> + else >>>>> + log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " + >>>>> newDatafile); >>>>> + log.debug(String.format("MinC finish lock %.2f secs %s", (t2 >>>>> - >>>>> t1) >>>>> / 1000.0, getExtent().toString())); >>>>> + if (dfv.getSize() > >>>>> acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD)) { >>>>> + log.debug(String.format("Minor Compaction wrote out file >>>>> larger >>>>> than split threshold. split threshold = %,d file size = %,d", >>>>> + >>>>> acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD), >>>>> dfv.getSize())); >>>>> + } >>>>> + >>>>> + } >>>>> + >>>>> + public void reserveMajorCompactingFiles(Collection >>>>> files) { >>>>> + if (majorCompactingFiles.size() != 0) >>>>> + throw new IllegalStateException("Major compacting files not >>>>> empty " + majorCompactingFiles); >>>>> + >>>>> + if (mergingMinorCompactionFile != null && >>>>> files.contains(mergingMinorCompactionFile)) >>>>> + throw new IllegalStateException("Major compaction tried to >>>>> resrve file in use by minor compaction " + mergingMinorCompactionFile); >>>>> + >>>>> + majorCompactingFiles.addAll(files); >>>>> + } >>>>> + >>>>> + public void clearMajorCompactingFile() { >>>>> + majorCompactingFiles.clear(); >>>>> + } >>>>> + >>>>> + void bringMajorCompactionOnline(Set oldDatafiles, >>>>> FileRef >>>>> tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv) >>>>> + throws IOException { >>>>> + long t1, t2; >>>>> + >>>>> + if (!extent.isRootTablet()) { >>>>> + >>>>> + if (fs.exists(newDatafile.path())) { >>>>> + log.error("Target map file already exist " + newDatafile, >>>>> new >>>>> Exception()); >>>>> + throw new IllegalStateException("Target map file already >>>>> exist >>>>> " + newDatafile); >>>>> + } >>>>> + >>>>> + // rename before putting in metadata table, so files in >>>>> metadata >>>>> table should >>>>> + // always exist >>>>> + if (!fs.rename(tmpDatafile.path(), newDatafile.path())) >>>>> + log.warn("Rename of " + tmpDatafile + " to " + >>>>> newDatafile >>>>> + " >>>>> returned false"); >>>>> + >>>>> + if (dfv.getNumEntries() == 0) { >>>>> + fs.deleteRecursively(newDatafile.path()); >>>>> + } >>>>> + } >>>>> + >>>>> + TServerInstance lastLocation = null; >>>>> + synchronized (Tablet.this) { >>>>> + >>>>> + t1 = System.currentTimeMillis(); >>>>> + >>>>> + IZooReaderWriter zoo = ZooReaderWriter. >>>>> getRetryingInstance(); >>>>> + >>>>> + dataSourceDeletions.incrementAndGet(); >>>>> + >>>>> + if (extent.isRootTablet()) { >>>>> + >>>>> + waitForScansToFinish(oldDatafiles, true, >>>>> Long.MAX_VALUE); >>>>> + >>>>> + try { >>>>> + if (!zoo.isLockHeld(tabletServer. >>>>> getLock().getLockID())) >>>>> { >>>>> + throw new IllegalStateException(); >>>>> + } >>>>> + } catch (Exception e) { >>>>> + throw new IllegalStateException("Can not bring major >>>>> compaction online, lock not held", e); >>>>> + } >>>>> + >>>>> + // mark files as ready for deletion, but >>>>> + // do not delete them until we successfully >>>>> + // rename the compacted map file, in case >>>>> + // the system goes down >>>>> + >>>>> + String compactName = newDatafile.path().getName(); >>>>> + >>>>> + for (FileRef ref : oldDatafiles) { >>>>> + Path path = ref.path(); >>>>> + fs.rename(path, new Path(location + "/delete+" + >>>>> compactName >>>>> + "+" + path.getName())); >>>>> + } >>>>> + >>>>> + if (fs.exists(newDatafile.path())) { >>>>> + log.error("Target map file already exist " + >>>>> newDatafile, >>>>> new Exception()); >>>>> + throw new IllegalStateException("Target map file >>>>> already >>>>> exist " + newDatafile); >>>>> + } >>>>> + >>>>> + if (!fs.rename(tmpDatafile.path(), newDatafile.path())) >>>>> + log.warn("Rename of " + tmpDatafile + " to " + >>>>> newDatafile + >>>>> " returned false"); >>>>> + >>>>> + // start deleting files, if we do not finish they will be >>>>> cleaned >>>>> + // up later >>>>> + for (FileRef ref : oldDatafiles) { >>>>> + Path path = ref.path(); >>>>> + Path deleteFile = new Path(location + "/delete+" + >>>>> compactName + "+" + path.getName()); >>>>> + if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) >>>>> || >>>>> !fs.moveToTrash(deleteFile)) >>>>> + fs.deleteRecursively(deleteFile); >>>>> + } >>>>> + } >>>>> + >>>>> + // atomically remove old files and add new file >>>>> + for (FileRef oldDatafile : oldDatafiles) { >>>>> + if (!datafileSizes.containsKey(oldDatafile)) { >>>>> + log.error("file does not exist in set " + oldDatafile); >>>>> + } >>>>> + datafileSizes.remove(oldDatafile); >>>>> + majorCompactingFiles.remove(oldDatafile); >>>>> + } >>>>> + >>>>> + if (datafileSizes.containsKey(newDatafile)) { >>>>> + log.error("Adding file that is already in set " + >>>>> newDatafile); >>>>> + } >>>>> + >>>>> + if (dfv.getNumEntries() > 0) { >>>>> + datafileSizes.put(newDatafile, dfv); >>>>> + } >>>>> + >>>>> + // could be used by a follow on compaction in a multipass >>>>> compaction >>>>> + majorCompactingFiles.add(newDatafile); >>>>> + >>>>> + computeNumEntries(); >>>>> + >>>>> + lastLocation = Tablet.this.lastLocation; >>>>> + Tablet.this.lastLocation = null; >>>>> + >>>>> + if (compactionId != null) >>>>> + lastCompactID = compactionId; >>>>> + >>>>> + t2 = System.currentTimeMillis(); >>>>> + } >>>>> + >>>>> + if (!extent.isRootTablet()) { >>>>> + Set filesInUseByScans = >>>>> waitForScansToFinish(oldDatafiles, false, 10000); >>>>> + if (filesInUseByScans.size() > 0) >>>>> + log.debug("Adding scan refs to metadata " + extent + " " >>>>> + >>>>> filesInUseByScans); >>>>> + MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles, >>>>> filesInUseByScans, newDatafile, compactionId, dfv, >>>>> SystemCredentials.get(), >>>>> + tabletServer.getClientAddressString(), lastLocation, >>>>> tabletServer.getLock()); >>>>> + removeFilesAfterScan(filesInUseByScans); >>>>> + } >>>>> + >>>>> + log.debug(String.format("MajC finish lock %.2f secs", (t2 - >>>>> t1) >>>>> / >>>>> 1000.0)); >>>>> + log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles >>>>> + " >>>>> --> " + newDatafile); >>>>> + } >>>>> + >>>>> + public SortedMap getDatafileSizes() { >>>>> + synchronized (Tablet.this) { >>>>> + TreeMap copy = new >>>>> TreeMap(datafileSizes); >>>>> + return Collections.unmodifiableSortedMap(copy); >>>>> + } >>>>> + } >>>>> + >>>>> + public Set getFiles() { >>>>> + synchronized (Tablet.this) { >>>>> + HashSet files = new >>>>> HashSet(datafileSizes.keySet()); >>>>> + return Collections.unmodifiableSet(files); >>>>> + } >>>>> + } >>>>> + >>>>> + } >>>>> + >>>>> + public Tablet(TabletServer tabletServer, Text location, KeyExtent >>>>> extent, TabletResourceManager trm, SortedMap >>>>> tabletsKeyValues) >>>>> + throws IOException { >>>>> + this(tabletServer, location, extent, trm, >>>>> CachedConfiguration.getInstance(), tabletsKeyValues); >>>>> + splitCreationTime = 0; >>>>> + } >>>>> + >>>>> + public Tablet(TabletServer tabletServer, Text location, KeyExtent >>>>> extent, TabletResourceManager trm, SortedMap >>>>> datafiles, String time, >>>>> + long initFlushID, long initCompactID) throws IOException { >>>>> + this(tabletServer, location, extent, trm, >>>>> CachedConfiguration.getInstance(), datafiles, time, initFlushID, >>>>> initCompactID); >>>>> + splitCreationTime = System.currentTimeMillis(); >>>>> + } >>>>> + >>>>> + private Tablet(TabletServer tabletServer, Text location, >>>>> KeyExtent >>>>> extent, TabletResourceManager trm, Configuration conf, >>>>> + SortedMap tabletsKeyValues) throws IOException { >>>>> + this(tabletServer, location, extent, trm, conf, >>>>> VolumeManagerImpl.get(), tabletsKeyValues); >>>>> + } >>>>> + >>>>> + static private final List EMPTY = >>>>> Collections.emptyList(); >>>>> + >>>>> + private Tablet(TabletServer tabletServer, Text location, >>>>> KeyExtent >>>>> extent, TabletResourceManager trm, Configuration conf, >>>>> + SortedMap datafiles, String time, >>>>> long >>>>> initFlushID, long initCompactID) throws IOException { >>>>> + this(tabletServer, location, extent, trm, conf, >>>>> VolumeManagerImpl.get(), EMPTY, datafiles, time, null, new >>>>> HashSet(), initFlushID, initCompactID); >>>>> + } >>>>> + >>>>> + private static String lookupTime(AccumuloConfiguration conf, >>>>> KeyExtent >>>>> extent, SortedMap tabletsKeyValues) { >>>>> + SortedMap entries; >>>>> + >>>>> + if (extent.isRootTablet()) { >>>>> + return null; >>>>> + } else { >>>>> + entries = new TreeMap(); >>>>> + Text rowName = extent.getMetadataEntry(); >>>>> + for (Entry entry : tabletsKeyValues.entrySet()) { >>>>> + if (entry.getKey().compareRow(rowName) == 0 && >>>>> TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry. >>>>> getKey())) >>>>> { >>>>> + entries.put(new Key(entry.getKey()), new >>>>> Value(entry.getValue())); >>>>> + } >>>>> + } >>>>> + } >>>>> + >>>>> + // log.debug("extent : "+extent+" entries : "+entries); >>>>> + >>>>> + if (entries.size() == 1) >>>>> + return entries.values().iterator().next().toString(); >>>>> + return null; >>>>> + } >>>>> + >>>>> + private static SortedMap >>>>> lookupDatafiles(AccumuloConfiguration conf, VolumeManager fs, >>>>> KeyExtent >>>>> extent, >>>>> + SortedMap tabletsKeyValues) throws IOException { >>>>> + >>>>> + TreeMap datafiles = new >>>>> TreeMap(); >>>>> + >>>>> + if (extent.isRootTablet()) { // the meta0 tablet >>>>> + Path location = new Path(MetadataTableUtil. >>>>> getRootTabletDir()); >>>>> + // cleanUpFiles() has special handling for delete. files >>>>> + FileStatus[] files = fs.listStatus(location); >>>>> + Collection goodPaths = cleanUpFiles(fs, files, true); >>>>> + for (String good : goodPaths) { >>>>> + Path path = new Path(good); >>>>> + String filename = path.getName(); >>>>> + FileRef ref = new FileRef(location.toString() + "/" + >>>>> filename, >>>>> path); >>>>> + DataFileValue dfv = new DataFileValue(0, 0); >>>>> + datafiles.put(ref, dfv); >>>>> + } >>>>> + } else { >>>>> + >>>>> + Text rowName = extent.getMetadataEntry(); >>>>> + >>>>> + String tableId = extent.isMeta() ? RootTable.ID : >>>>> MetadataTable.ID; >>>>> + ScannerImpl mdScanner = new >>>>> ScannerImpl(HdfsZooInstance.getInstance(), SystemCredentials.get(), >>>>> tableId, Authorizations.EMPTY); >>>>> + >>>>> + // Commented out because when no data file is present, each >>>>> tablet >>>>> will scan through metadata table and return nothing >>>>> + // reduced batch size to improve performance >>>>> + // changed here after endKeys were implemented from 10 to >>>>> 1000 >>>>> + mdScanner.setBatchSize(1000); >>>>> + >>>>> + // leave these in, again, now using endKey for safety >>>>> + mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME); >>>>> + >>>>> + mdScanner.setRange(new Range(rowName)); >>>>> + >>>>> + for (Entry entry : mdScanner) { >>>>> + >>>>> + if (entry.getKey().compareRow(rowName) != 0) { >>>>> + break; >>>>> + } >>>>> + >>>>> + FileRef ref = new >>>>> FileRef(entry.getKey().getColumnQualifier().toString(), >>>>> fs.getFullPath(entry.getKey())); >>>>> + datafiles.put(ref, new DataFileValue(entry.getValue() >>>>> .get())); >>>>> + } >>>>> + } >>>>> + return datafiles; >>>>> + } >>>>> + >>>>> + private static List lookupLogEntries(KeyExtent ke, >>>>> SortedMap tabletsKeyValues) { >>>>> + List logEntries = new ArrayList(); >>>>> + >>>>> + if (ke.isMeta()) { >>>>> + try { >>>>> + logEntries = >>>>> MetadataTableUtil.getLogEntries(SystemCredentials.get(), ke); >>>>> + } catch (Exception ex) { >>>>> + throw new RuntimeException("Unable to read tablet log >>>>> entries", >>>>> ex); >>>>> + } >>>>> + } else { >>>>> + log.debug("Looking at metadata " + tabletsKeyValues); >>>>> + Text row = ke.getMetadataEntry(); >>>>> + for (Entry entry : tabletsKeyValues.entrySet()) { >>>>> + Key key = entry.getKey(); >>>>> + if (key.getRow().equals(row)) { >>>>> + if (key.getColumnFamily().equals(LogColumnFamily.NAME)) >>>>> { >>>>> + logEntries.add(LogEntry.fromKeyValue(key, >>>>> entry.getValue())); >>>>> + } >>>>> + } >>>>> + } >>>>> + } >>>>> + >>>>> + log.debug("got " + logEntries + " for logs for " + ke); >>>>> + return logEntries; >>>>> + } >>>>> + >>>>> + private static Set lookupScanFiles(KeyExtent extent, >>>>> SortedMap tabletsKeyValues, VolumeManager fs) throws >>>>> IOException >>>>> { >>>>> + HashSet scanFiles = new HashSet(); >>>>> + >>>>> + Text row = extent.getMetadataEntry(); >>>>> + for (Entry entry : tabletsKeyValues.entrySet()) { >>>>> + Key key = entry.getKey(); >>>>> + if (key.getRow().equals(row) && >>>>> key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) { >>>>> + String meta = key.getColumnQualifier().toString(); >>>>> + Path path = fs.getFullPath(extent.getTableId().toString(), >>>>> meta); >>>>> + scanFiles.add(new FileRef(meta, path)); >>>>> + } >>>>> + } >>>>> + >>>>> + return scanFiles; >>>>> + } >>>>> + >>>>> + private static long lookupFlushID(KeyExtent extent, >>>>> SortedMap tabletsKeyValues) { >>>>> + Text row = extent.getMetadataEntry(); >>>>> + for (Entry entry : tabletsKeyValues.entrySet()) { >>>>> + Key key = entry.getKey(); >>>>> + if (key.getRow().equals(row) && >>>>> TabletsSection.ServerColumnFamily.FLUSH_COLUMN.equals(key. >>>>> getColumnFamily(), >>>>> key.getColumnQualifier())) >>>>> + return Long.parseLong(entry.getValue().toString()); >>>>> + } >>>>> + >>>>> + return -1; >>>>> + } >>>>> + >>>>> + private static long lookupCompactID(KeyExtent extent, >>>>> SortedMap tabletsKeyValues) { >>>>> + Text row = extent.getMetadataEntry(); >>>>> + for (Entry entry : tabletsKeyValues.entrySet()) { >>>>> + Key key = entry.getKey(); >>>>> + if (key.getRow().equals(row) && >>>>> TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key. >>>>> getColumnFamily(), >>>>> key.getColumnQualifier())) >>>>> + return Long.parseLong(entry.getValue().toString()); >>>>> + } >>>>> + >>>>> + return -1; >>>>> + } >>>>> + >>>>> + private Tablet(TabletServer tabletServer, Text location, >>>>> KeyExtent >>>>> extent, TabletResourceManager trm, Configuration conf, VolumeManager >>>>> fs, >>>>> + SortedMap tabletsKeyValues) throws IOException { >>>>> + this(tabletServer, location, extent, trm, conf, fs, >>>>> lookupLogEntries(extent, tabletsKeyValues), >>>>> lookupDatafiles(tabletServer.getSystemConfiguration(), fs, >>>>> + extent, tabletsKeyValues), >>>>> lookupTime(tabletServer.getSystemConfiguration(), extent, >>>>> tabletsKeyValues), lookupLastServer(extent, tabletsKeyValues), >>>>> + lookupScanFiles(extent, tabletsKeyValues, fs), >>>>> lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, >>>>> tabletsKeyValues)); >>>>> + } >>>>> + >>>>> + private static TServerInstance lookupLastServer(KeyExtent extent, >>>>> SortedMap tabletsKeyValues) { >>>>> + for (Entry entry : tabletsKeyValues.entrySet()) { >>>>> + if (entry.getKey().getColumnFamily().compareTo( >>>>> TabletsSection.LastLocationColumnFamily.NAME) == 0) { >>>>> + return new TServerInstance(entry.getValue(), >>>>> entry.getKey().getColumnQualifier()); >>>>> + } >>>>> + } >>>>> + return null; >>>>> + } >>>>> + >>>>> + /** >>>>> + * yet another constructor - this one allows us to avoid costly >>>>> lookups into the Metadata table if we already know the files we need - >>>>> as >>>>> at split time >>>>> + */ >>>>> + private Tablet(final TabletServer tabletServer, final Text >>>>> location, >>>>> final KeyExtent extent, final TabletResourceManager trm, final >>>>> Configuration conf, >>>>> + final VolumeManager fs, final List logEntries, >>>>> final >>>>> SortedMap datafiles, String time, >>>>> + final TServerInstance lastLocation, Set scanFiles, >>>>> long >>>>> initFlushID, long initCompactID) throws IOException { >>>>> + Path locationPath; >>>>> + if (location.find(":") >= 0) { >>>>> + locationPath = new Path(location.toString()); >>>>> + } else { >>>>> + locationPath = fs.getFullPath(FileType.TABLE, >>>>> extent.getTableId().toString() + location.toString()); >>>>> + } >>>>> + FileSystem fsForPath = fs.getFileSystemByPath(locationPath); >>>>> + this.location = locationPath.makeQualified(fsForPath.getUri(), >>>>> fsForPath.getWorkingDirectory()); >>>>> + this.lastLocation = lastLocation; >>>>> + this.tabletDirectory = location.toString(); >>>>> + this.conf = conf; >>>>> + this.acuTableConf = tabletServer. >>>>> getTableConfiguration(extent); >>>>> + >>>>> + this.fs = fs; >>>>> + this.extent = extent; >>>>> + this.tabletResources = trm; >>>>> + >>>>> + this.lastFlushID = initFlushID; >>>>> + this.lastCompactID = initCompactID; >>>>> + >>>>> + if (extent.isRootTablet()) { >>>>> + long rtime = Long.MIN_VALUE; >>>>> + for (FileRef ref : datafiles.keySet()) { >>>>> + Path path = ref.path(); >>>>> + FileSystem ns = fs.getFileSystemByPath(path); >>>>> + FileSKVIterator reader = >>>>> FileOperations.getInstance().openReader(path.toString(), true, ns, >>>>> ns.getConf(), tabletServer.getTableConfiguration(extent)); >>>>> + long maxTime = -1; >>>>> + try { >>>>> + >>>>> + while (reader.hasTop()) { >>>>> + maxTime = Math.max(maxTime, >>>>> reader.getTopKey().getTimestamp()); >>>>> + reader.next(); >>>>> + } >>>>> + >>>>> + } finally { >>>>> + reader.close(); >>>>> + } >>>>> + >>>>> + if (maxTime > rtime) { >>>>> + time = TabletTime.LOGICAL_TIME_ID + "" + maxTime; >>>>> + rtime = maxTime; >>>>> + } >>>>> + } >>>>> + } >>>>> + if (time == null && datafiles.isEmpty() && >>>>> extent.equals(RootTable.OLD_EXTENT)) { >>>>> + // recovery... old root tablet has no data, so time doesn't >>>>> matter: >>>>> + time = TabletTime.LOGICAL_TIME_ID + "" + Long.MIN_VALUE; >>>>> + } >>>>> + >>>>> + this.tabletServer = tabletServer; >>>>> + this.logId = tabletServer.createLogId(extent); >>>>> + >>>>> + this.timer = new TabletStatsKeeper(); >>>>> + >>>>> + setupDefaultSecurityLabels(extent); >>>>> + >>>>> + tabletMemory = new TabletMemory(); >>>>> + tabletTime = TabletTime.getInstance(time); >>>>> + persistedTime = tabletTime.getTime(); >>>>> + >>>>> + acuTableConf.addObserver(configObserver = new >>>>> ConfigurationObserver() { >>>>> + >>>>> + private void reloadConstraints() { >>>>> + constraintChecker.set(new >>>>> ConstraintChecker(getTableConfiguration())); >>>>> + } >>>>> + >>>>> + @Override >>>>> + public void propertiesChanged() { >>>>> + reloadConstraints(); >>>>> + >>>>> + try { >>>>> + setupDefaultSecurityLabels(extent); >>>>> + } catch (Exception e) { >>>>> + log.error("Failed to reload default security labels for >>>>> extent: " + extent.toString()); >>>>> + } >>>>> + } >>>>> + >>>>> + @Override >>>>> + public void propertyChanged(String prop) { >>>>> + if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX. >>>>> getKey())) >>>>> + reloadConstraints(); >>>>> + else if >>>>> (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) { >>>>> + try { >>>>> + log.info("Default security labels changed for extent: >>>>> " + >>>>> extent.toString()); >>>>> + setupDefaultSecurityLabels(extent); >>>>> + } catch (Exception e) { >>>>> + log.error("Failed to reload default security labels for >>>>> extent: " + extent.toString()); >>>>> + } >>>>> + } >>>>> + >>>>> + } >>>>> + >>>>> + @Override >>>>> + public void sessionExpired() { >>>>> + log.debug("Session expired, no longer updating per table >>>>> props..."); >>>>> + } >>>>> + >>>>> + }); >>>>> + // Force a load of any per-table properties >>>>> + configObserver.propertiesChanged(); >>>>> + >>>>> + tabletResources.setTablet(this, acuTableConf); >>>>> + if (!logEntries.isEmpty()) { >>>>> + log.info("Starting Write-Ahead Log recovery for " + >>>>> this.extent); >>>>> + final long[] count = new long[2]; >>>>> + final CommitSession commitSession = >>>>> tabletMemory.getCommitSession(); >>>>> + count[1] = Long.MIN_VALUE; >>>>> + try { >>>>> + Set absPaths = new HashSet(); >>>>> + for (FileRef ref : datafiles.keySet()) >>>>> + absPaths.add(ref.path().toString()); >>>>> + >>>>> + tabletServer.recover(this.tabletServer.getFileSystem(), >>>>> this, >>>>> logEntries, absPaths, new MutationReceiver() { >>>>> + @Override >>>>> + public void receive(Mutation m) { >>>>> + // LogReader.printMutation(m); >>>>> + Collection muts = m.getUpdates(); >>>>> + for (ColumnUpdate columnUpdate : muts) { >>>>> + if (!columnUpdate.hasTimestamp()) { >>>>> + // if it is not a user set timestamp, it must have >>>>> been >>>>> set >>>>> + // by the system >>>>> + count[1] = Math.max(count[1], >>>>> columnUpdate.getTimestamp()); >>>>> + } >>>> >>>> --001a113372601c808204ec6c5aa9--