Return-Path: X-Original-To: apmail-accumulo-dev-archive@www.apache.org Delivered-To: apmail-accumulo-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DC7ED10B37 for ; Sat, 30 Nov 2013 22:16:33 +0000 (UTC) Received: (qmail 39956 invoked by uid 500); 30 Nov 2013 22:16:33 -0000 Delivered-To: apmail-accumulo-dev-archive@accumulo.apache.org Received: (qmail 39922 invoked by uid 500); 30 Nov 2013 22:16:33 -0000 Mailing-List: contact dev-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list dev@accumulo.apache.org Received: (qmail 39914 invoked by uid 99); 30 Nov 2013 22:16:33 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Nov 2013 22:16:33 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=5.0 tests=RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of josh.elser@gmail.com designates 209.85.213.50 as permitted sender) Received: from [209.85.213.50] (HELO mail-yh0-f50.google.com) (209.85.213.50) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Nov 2013 22:16:24 +0000 Received: by mail-yh0-f50.google.com with SMTP id b6so7580080yha.23 for ; Sat, 30 Nov 2013 14:16:03 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=message-id:date:from:user-agent:mime-version:to:subject:references :in-reply-to:content-type:content-transfer-encoding; bh=9/eqcFgYN8FduRfFPTtW/zV/R4bPy9TIIyslUpqx0r0=; b=d+uP7dYJ2dCd9RKt1/qPq6B0nzJDj1YNllljrN+ZqbxWaPw70n/Vtsx6mpeIHwWcEk p6gUrUMU9kCIpIpYoU2sM2QmpCGb6Iy91EEv4yxeJWWohisVWnLJsj4lSYrsL0In5Syo y7xqvsJR0CFn/GLOzlyXH13OFibqlpUK0tQdvn36aHPVmg97K3y3XsheaiuVfh82Ilhi 9bwkhc/SaBk99cBAj80J67aMpEBEZVVDyIgSedL1DbFFQa0Kni58CMnBCldO1n2nK5Ax bD3fmt6bBHpMpG1VkcRfNV4wf9R1OKneCbBLbJZwv2g0wDdZBU7m4yQg0KDViGy4u8nT k0dg== X-Received: by 10.236.125.102 with SMTP id y66mr7830255yhh.58.1385849763123; Sat, 30 Nov 2013 14:16:03 -0800 (PST) Received: from [192.168.2.38] (pool-173-69-177-34.bltmmd.fios.verizon.net. [173.69.177.34]) by mx.google.com with ESMTPSA id g25sm30104626yhg.6.2013.11.30.14.16.01 for (version=TLSv1 cipher=ECDHE-RSA-RC4-SHA bits=128/128); Sat, 30 Nov 2013 14:16:02 -0800 (PST) Message-ID: <529A63B0.2060106@gmail.com> Date: Sat, 30 Nov 2013 17:16:16 -0500 From: Josh Elser User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:24.0) Gecko/20100101 Thunderbird/24.1.1 MIME-Version: 1.0 To: dev@accumulo.apache.org Subject: Re: [11/15] ACCUMULO-1940 do not expose a new tablet to the memory manager until after it is online References: <8748a79cd42542a38f179318a574bdab@git.apache.org> <52997466.9030503@gmail.com> In-Reply-To: Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit X-Virus-Checked: Checked by ClamAV on apache.org On 11/30/2013 5:06 PM, Eric Newton wrote: > There was a merge "conflict" in 1.5 or 1.6. There was an extra line of > whitespace, or a line missing. Strange. Maybe John hit the nail on the head that the email notifications for Git aren't always correct? > It is annoying to maintain 1.4, 1.5, and the largely unnecessary 1.6 (just > use master). > > However, I think this chore comes with software maturity and a larger user > base. > I'd agree. I was mostly whining anyways. Having multiple versions in use is better than not having people use anything. Perhaps the 1.6.0 and master split is something we can revisit for the next release. If no one is working on features for the "next" release while we test the "current" release, there isn't a point in having two branches. > > > On Sat, Nov 30, 2013 at 12:15 AM, Josh Elser wrote: > >> Actually, I was kind of confused when I saw your commit*s* on this ticket. >> What did you actually do? You have two commits that do the same changes: >> >> 82477f08aa64e2a8a1cf7f6af0db5ce954801ac8 (in 1.4, 1.5 and 1.6) >> 9b6b9cf104ff332cffdd4900d8057557e64e0ec8 (only in 1.6)\ >> >> I would've only expected to see one email with a diff, followed by 2 >> "merge" emails, e.g. >> >> ---------------------------------------------------------------------- >> .../tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java | 2 -- >> 1 file changed, 2 deletions(-) >> ---------------------------------------------------------------------- >> >> Although, I will admit that dealing with 3 active branches is a big pain. >> However, I don't know of a better way to handle this in a way that doesn't >> make Git super confused and thus limit us in being able to answer questions >> like "where was a problem introduced" (git-bisect) and "where does this >> change exist" (and not having multiple commits that perform the same >> changes). >> >> On 11/29/13, 8:31 PM, Eric Newton wrote: >> >>> I changed one line of this file... git seems to be having a conniption. I >>> find the volume of git traffic to be so useless that I ignore it. >>> >>> Anyone else? >>> >>> >>> >>> >>> On Fri, Nov 29, 2013 at 1:24 PM, wrote: >>> >>> >>>> http://git-wip-us.apache.org/repos/asf/accumulo/blob/ >>>> 9b6b9cf1/server/tserver/src/main/java/org/apache/accumulo/ >>>> tserver/Tablet.java >>>> ---------------------------------------------------------------------- >>>> diff --cc >>>> server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java >>>> index ee3b243,0000000..fd76415 >>>> mode 100644,000000..100644 >>>> --- a/server/tserver/src/main/java/org/apache/accumulo/ >>>> tserver/Tablet.java >>>> +++ b/server/tserver/src/main/java/org/apache/accumulo/ >>>> tserver/Tablet.java >>>> @@@ -1,3868 -1,0 +1,3866 @@@ >>>> +/* >>>> + * Licensed to the Apache Software Foundation (ASF) under one or more >>>> + * contributor license agreements. See the NOTICE file distributed >>>> with >>>> + * this work for additional information regarding copyright ownership. >>>> + * The ASF licenses this file to You under the Apache License, Version >>>> 2.0 >>>> + * (the "License"); you may not use this file except in compliance >>>> with >>>> + * the License. You may obtain a copy of the License at >>>> + * >>>> + * http://www.apache.org/licenses/LICENSE-2.0 >>>> + * >>>> + * Unless required by applicable law or agreed to in writing, software >>>> + * distributed under the License is distributed on an "AS IS" BASIS, >>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >>>> implied. >>>> + * See the License for the specific language governing permissions and >>>> + * limitations under the License. >>>> + */ >>>> +package org.apache.accumulo.tserver; >>>> + >>>> +import java.io.ByteArrayInputStream; >>>> +import java.io.DataInputStream; >>>> +import java.io.FileNotFoundException; >>>> +import java.io.IOException; >>>> +import java.util.ArrayList; >>>> +import java.util.Arrays; >>>> +import java.util.Collection; >>>> +import java.util.Collections; >>>> +import java.util.Comparator; >>>> +import java.util.EnumSet; >>>> +import java.util.HashMap; >>>> +import java.util.HashSet; >>>> +import java.util.Iterator; >>>> +import java.util.List; >>>> +import java.util.Map; >>>> +import java.util.Map.Entry; >>>> +import java.util.PriorityQueue; >>>> +import java.util.Set; >>>> +import java.util.SortedMap; >>>> +import java.util.TreeMap; >>>> +import java.util.TreeSet; >>>> +import java.util.concurrent.atomic.AtomicBoolean; >>>> +import java.util.concurrent.atomic.AtomicLong; >>>> +import java.util.concurrent.atomic.AtomicReference; >>>> +import java.util.concurrent.locks.ReentrantLock; >>>> + >>>> +import org.apache.accumulo.core.Constants; >>>> +import org.apache.accumulo.core.client.Connector; >>>> +import org.apache.accumulo.core.client.IteratorSetting; >>>> +import org.apache.accumulo.core.client.impl.ScannerImpl; >>>> +import org.apache.accumulo.core.conf.AccumuloConfiguration; >>>> +import org.apache.accumulo.core.conf.ConfigurationCopy; >>>> +import org.apache.accumulo.core.conf.ConfigurationObserver; >>>> +import org.apache.accumulo.core.conf.Property; >>>> +import org.apache.accumulo.core.constraints.Violations; >>>> +import org.apache.accumulo.core.data.ByteSequence; >>>> +import org.apache.accumulo.core.data.Column; >>>> +import org.apache.accumulo.core.data.ColumnUpdate; >>>> +import org.apache.accumulo.core.data.Key; >>>> +import org.apache.accumulo.core.data.KeyExtent; >>>> +import org.apache.accumulo.core.data.KeyValue; >>>> +import org.apache.accumulo.core.data.Mutation; >>>> +import org.apache.accumulo.core.data.Range; >>>> +import org.apache.accumulo.core.data.Value; >>>> +import org.apache.accumulo.core.data.thrift.IterInfo; >>>> +import org.apache.accumulo.core.data.thrift.MapFileInfo; >>>> +import org.apache.accumulo.core.file.FileOperations; >>>> +import org.apache.accumulo.core.file.FileSKVIterator; >>>> +import org.apache.accumulo.core.iterators. >>>> IterationInterruptedException; >>>> +import org.apache.accumulo.core.iterators.IteratorEnvironment; >>>> +import org.apache.accumulo.core.iterators.IteratorUtil; >>>> +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; >>>> +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; >>>> +import >>>> org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; >>>> +import org.apache.accumulo.core.iterators.system. >>>> ColumnQualifierFilter; >>>> +import org.apache.accumulo.core.iterators.system.DeletingIterator; >>>> +import org.apache.accumulo.core.iterators.system. >>>> InterruptibleIterator; >>>> +import org.apache.accumulo.core.iterators.system.MultiIterator; >>>> +import org.apache.accumulo.core.iterators.system. >>>> SourceSwitchingIterator; >>>> +import >>>> org.apache.accumulo.core.iterators.system.SourceSwitchingIterator. >>>> DataSource; >>>> +import org.apache.accumulo.core.iterators.system.StatsIterator; >>>> +import org.apache.accumulo.core.iterators.system.VisibilityFilter; >>>> +import org.apache.accumulo.core.master.thrift.TabletLoadState; >>>> +import org.apache.accumulo.core.metadata.MetadataTable; >>>> +import org.apache.accumulo.core.metadata.RootTable; >>>> +import org.apache.accumulo.core.metadata.schema.DataFileValue; >>>> +import >>>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; >>>> +import >>>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection. >>>> DataFileColumnFamily; >>>> +import >>>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection. >>>> LogColumnFamily; >>>> +import >>>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection. >>>> ScanFileColumnFamily; >>>> +import org.apache.accumulo.core.security.Authorizations; >>>> +import org.apache.accumulo.core.security.ColumnVisibility; >>>> +import org.apache.accumulo.core.security.Credentials; >>>> +import org.apache.accumulo.core.tabletserver.log.LogEntry; >>>> +import org.apache.accumulo.core.util.CachedConfiguration; >>>> +import org.apache.accumulo.core.util.LocalityGroupUtil; >>>> +import >>>> org.apache.accumulo.core.util.LocalityGroupUtil. >>>> LocalityGroupConfigurationError; >>>> +import org.apache.accumulo.core.util.MapCounter; >>>> +import org.apache.accumulo.core.util.Pair; >>>> +import org.apache.accumulo.core.util.UtilWaitThread; >>>> +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; >>>> +import org.apache.accumulo.server.ServerConstants; >>>> +import org.apache.accumulo.server.client.HdfsZooInstance; >>>> +import org.apache.accumulo.server.conf.TableConfiguration; >>>> +import org.apache.accumulo.server.fs.FileRef; >>>> +import org.apache.accumulo.server.fs.VolumeManager; >>>> +import org.apache.accumulo.server.fs.VolumeManager.FileType; >>>> +import org.apache.accumulo.server.fs.VolumeManagerImpl; >>>> +import org.apache.accumulo.server.master.state.TServerInstance; >>>> +import org.apache.accumulo.server.master.tableOps. >>>> CompactionIterators; >>>> +import org.apache.accumulo.server.problems.ProblemReport; >>>> +import org.apache.accumulo.server.problems.ProblemReports; >>>> +import org.apache.accumulo.server.problems.ProblemType; >>>> +import org.apache.accumulo.server.security.SystemCredentials; >>>> +import org.apache.accumulo.server.tablets.TabletTime; >>>> +import org.apache.accumulo.server.tablets.UniqueNameAllocator; >>>> +import org.apache.accumulo.server.util.FileUtil; >>>> +import org.apache.accumulo.server.util.MasterMetadataUtil; >>>> +import org.apache.accumulo.server.util.MetadataTableUtil; >>>> +import org.apache.accumulo.server.util.TabletOperations; >>>> +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; >>>> +import org.apache.accumulo.start.classloader.vfs. >>>> AccumuloVFSClassLoader; >>>> +import org.apache.accumulo.trace.instrument.Span; >>>> +import org.apache.accumulo.trace.instrument.Trace; >>>> +import org.apache.accumulo.tserver.Compactor. >>>> CompactionCanceledException; >>>> +import org.apache.accumulo.tserver.Compactor.CompactionEnv; >>>> +import org.apache.accumulo.tserver.FileManager.ScanFileManager; >>>> +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; >>>> +import org.apache.accumulo.tserver.TabletServer.TservConstraintEnv; >>>> +import >>>> org.apache.accumulo.tserver.TabletServerResourceManager. >>>> TabletResourceManager; >>>> +import org.apache.accumulo.tserver.TabletStatsKeeper.Operation; >>>> +import org.apache.accumulo.tserver.compaction.CompactionPlan; >>>> +import org.apache.accumulo.tserver.compaction.CompactionStrategy; >>>> +import org.apache.accumulo.tserver.compaction. >>>> DefaultCompactionStrategy; >>>> +import org.apache.accumulo.tserver.compaction.MajorCompactionReason; >>>> +import org.apache.accumulo.tserver.compaction.MajorCompactionRequest; >>>> +import org.apache.accumulo.tserver.compaction.WriteParameters; >>>> +import org.apache.accumulo.tserver.constraints.ConstraintChecker; >>>> +import org.apache.accumulo.tserver.log.DfsLogger; >>>> +import org.apache.accumulo.tserver.log.MutationReceiver; >>>> +import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage; >>>> +import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics; >>>> +import org.apache.commons.codec.DecoderException; >>>> +import org.apache.commons.codec.binary.Hex; >>>> +import org.apache.hadoop.conf.Configuration; >>>> +import org.apache.hadoop.fs.FileStatus; >>>> +import org.apache.hadoop.fs.FileSystem; >>>> +import org.apache.hadoop.fs.Path; >>>> +import org.apache.hadoop.io.Text; >>>> +import org.apache.log4j.Logger; >>>> +import org.apache.zookeeper.KeeperException; >>>> +import org.apache.zookeeper.KeeperException.NoNodeException; >>>> + >>>> +/* >>>> + * We need to be able to have the master tell a tabletServer to >>>> + * close this file, and the tablet server to handle all pending client >>>> reads >>>> + * before closing >>>> + * >>>> + */ >>>> + >>>> +/** >>>> + * >>>> + * this class just provides an interface to read from a MapFile mostly >>>> takes care of reporting start and end keys >>>> + * >>>> + * need this because a single row extent can have multiple columns >>>> this >>>> manages all the columns (each handled by a store) for a single row-extent >>>> + * >>>> + * >>>> + */ >>>> + >>>> +public class Tablet { >>>> + >>>> + enum MinorCompactionReason { >>>> + USER, SYSTEM, CLOSE >>>> + } >>>> + >>>> + public class CommitSession { >>>> + >>>> + private int seq; >>>> + private InMemoryMap memTable; >>>> + private int commitsInProgress; >>>> + private long maxCommittedTime = Long.MIN_VALUE; >>>> + >>>> + private CommitSession(int seq, InMemoryMap imm) { >>>> + this.seq = seq; >>>> + this.memTable = imm; >>>> + commitsInProgress = 0; >>>> + } >>>> + >>>> + public int getWALogSeq() { >>>> + return seq; >>>> + } >>>> + >>>> + private void decrementCommitsInProgress() { >>>> + if (commitsInProgress < 1) >>>> + throw new IllegalStateException("commitsInProgress = " + >>>> commitsInProgress); >>>> + >>>> + commitsInProgress--; >>>> + if (commitsInProgress == 0) >>>> + Tablet.this.notifyAll(); >>>> + } >>>> + >>>> + private void incrementCommitsInProgress() { >>>> + if (commitsInProgress < 0) >>>> + throw new IllegalStateException("commitsInProgress = " + >>>> commitsInProgress); >>>> + >>>> + commitsInProgress++; >>>> + } >>>> + >>>> + private void waitForCommitsToFinish() { >>>> + while (commitsInProgress > 0) { >>>> + try { >>>> + Tablet.this.wait(50); >>>> + } catch (InterruptedException e) { >>>> + log.warn(e, e); >>>> + } >>>> + } >>>> + } >>>> + >>>> + public void abortCommit(List value) { >>>> + Tablet.this.abortCommit(this, value); >>>> + } >>>> + >>>> + public void commit(List mutations) { >>>> + Tablet.this.commit(this, mutations); >>>> + } >>>> + >>>> + public Tablet getTablet() { >>>> + return Tablet.this; >>>> + } >>>> + >>>> + public boolean beginUpdatingLogsUsed(ArrayList copy, >>>> boolean mincFinish) { >>>> + return Tablet.this.beginUpdatingLogsUsed(memTable, copy, >>>> mincFinish); >>>> + } >>>> + >>>> + public void finishUpdatingLogsUsed() { >>>> + Tablet.this.finishUpdatingLogsUsed(); >>>> + } >>>> + >>>> + public int getLogId() { >>>> + return logId; >>>> + } >>>> + >>>> + public KeyExtent getExtent() { >>>> + return extent; >>>> + } >>>> + >>>> + private void updateMaxCommittedTime(long time) { >>>> + maxCommittedTime = Math.max(time, maxCommittedTime); >>>> + } >>>> + >>>> + private long getMaxCommittedTime() { >>>> + if (maxCommittedTime == Long.MIN_VALUE) >>>> + throw new IllegalStateException("Tried to read max committed >>>> time when it was never set"); >>>> + return maxCommittedTime; >>>> + } >>>> + >>>> + } >>>> + >>>> + private class TabletMemory { >>>> + private InMemoryMap memTable; >>>> + private InMemoryMap otherMemTable; >>>> + private InMemoryMap deletingMemTable; >>>> + private int nextSeq = 1; >>>> + private CommitSession commitSession; >>>> + >>>> + TabletMemory() { >>>> + try { >>>> + memTable = new InMemoryMap(acuTableConf); >>>> + } catch (LocalityGroupConfigurationError e) { >>>> + throw new RuntimeException(e); >>>> + } >>>> + commitSession = new CommitSession(nextSeq, memTable); >>>> + nextSeq += 2; >>>> + } >>>> + >>>> + InMemoryMap getMemTable() { >>>> + return memTable; >>>> + } >>>> + >>>> + InMemoryMap getMinCMemTable() { >>>> + return otherMemTable; >>>> + } >>>> + >>>> + CommitSession prepareForMinC() { >>>> + if (otherMemTable != null) { >>>> + throw new IllegalStateException(); >>>> + } >>>> + >>>> + if (deletingMemTable != null) { >>>> + throw new IllegalStateException(); >>>> + } >>>> + >>>> + otherMemTable = memTable; >>>> + try { >>>> + memTable = new InMemoryMap(acuTableConf); >>>> + } catch (LocalityGroupConfigurationError e) { >>>> + throw new RuntimeException(e); >>>> + } >>>> + >>>> + CommitSession oldCommitSession = commitSession; >>>> + commitSession = new CommitSession(nextSeq, memTable); >>>> + nextSeq += 2; >>>> + >>>> + >>>> tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes( >>>> ), >>>> otherMemTable.estimatedSizeInBytes()); >>>> + >>>> + return oldCommitSession; >>>> + } >>>> + >>>> + void finishedMinC() { >>>> + >>>> + if (otherMemTable == null) { >>>> + throw new IllegalStateException(); >>>> + } >>>> + >>>> + if (deletingMemTable != null) { >>>> + throw new IllegalStateException(); >>>> + } >>>> + >>>> + deletingMemTable = otherMemTable; >>>> + >>>> + otherMemTable = null; >>>> + Tablet.this.notifyAll(); >>>> + } >>>> + >>>> + void finalizeMinC() { >>>> + try { >>>> + deletingMemTable.delete(15000); >>>> + } finally { >>>> + synchronized (Tablet.this) { >>>> + if (otherMemTable != null) { >>>> + throw new IllegalStateException(); >>>> + } >>>> + >>>> + if (deletingMemTable == null) { >>>> + throw new IllegalStateException(); >>>> + } >>>> + >>>> + deletingMemTable = null; >>>> + >>>> + >>>> tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), >>>> 0); >>>> + } >>>> + } >>>> + } >>>> + >>>> + boolean memoryReservedForMinC() { >>>> + return otherMemTable != null || deletingMemTable != null; >>>> + } >>>> + >>>> + void waitForMinC() { >>>> + while (otherMemTable != null || deletingMemTable != null) { >>>> + try { >>>> + Tablet.this.wait(50); >>>> + } catch (InterruptedException e) { >>>> + log.warn(e, e); >>>> + } >>>> + } >>>> + } >>>> + >>>> + void mutate(CommitSession cm, List mutations) { >>>> + cm.memTable.mutate(mutations); >>>> + } >>>> + >>>> + void updateMemoryUsageStats() { >>>> + long other = 0; >>>> + if (otherMemTable != null) >>>> + other = otherMemTable.estimatedSizeInBytes(); >>>> + else if (deletingMemTable != null) >>>> + other = deletingMemTable.estimatedSizeInBytes(); >>>> + >>>> + >>>> tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes( >>>> ), >>>> other); >>>> + } >>>> + >>>> + List getIterators() { >>>> + List toReturn = new >>>> ArrayList(2); >>>> + toReturn.add(memTable.skvIterator()); >>>> + if (otherMemTable != null) >>>> + toReturn.add(otherMemTable.skvIterator()); >>>> + return toReturn; >>>> + } >>>> + >>>> + void returnIterators(List iters) { >>>> + for (MemoryIterator iter : iters) { >>>> + iter.close(); >>>> + } >>>> + } >>>> + >>>> + public long getNumEntries() { >>>> + if (otherMemTable != null) >>>> + return memTable.getNumEntries() + >>>> otherMemTable.getNumEntries(); >>>> + return memTable.getNumEntries(); >>>> + } >>>> + >>>> + CommitSession getCommitSession() { >>>> + return commitSession; >>>> + } >>>> + } >>>> + >>>> + private TabletMemory tabletMemory; >>>> + >>>> + private final TabletTime tabletTime; >>>> + private long persistedTime; >>>> + private final Object timeLock = new Object(); >>>> + >>>> + private final Path location; // absolute path of this tablets dir >>>> + private TServerInstance lastLocation; >>>> + >>>> + private Configuration conf; >>>> + private VolumeManager fs; >>>> + >>>> + private TableConfiguration acuTableConf; >>>> + >>>> + private volatile boolean tableDirChecked = false; >>>> + >>>> + private AtomicLong dataSourceDeletions = new AtomicLong(0); >>>> + private Set activeScans = new >>>> HashSet(); >>>> + >>>> + private volatile boolean closing = false; >>>> + private boolean closed = false; >>>> + private boolean closeComplete = false; >>>> + >>>> + private long lastFlushID = -1; >>>> + private long lastCompactID = -1; >>>> + >>>> + private KeyExtent extent; >>>> + >>>> + private TabletResourceManager tabletResources; >>>> + final private DatafileManager datafileManager; >>>> + private volatile boolean majorCompactionInProgress = false; >>>> + private volatile boolean majorCompactionWaitingToStart = false; >>>> + private Set majorCompactionQueued = >>>> Collections.synchronizedSet(EnumSet.noneOf( >>>> MajorCompactionReason.class)); >>>> + private volatile boolean minorCompactionInProgress = false; >>>> + private volatile boolean minorCompactionWaitingToStart = false; >>>> + >>>> + private boolean updatingFlushID = false; >>>> + >>>> + private AtomicReference constraintChecker = new >>>> AtomicReference(); >>>> + >>>> + private final String tabletDirectory; >>>> + >>>> + private int writesInProgress = 0; >>>> + >>>> + private static final Logger log = Logger.getLogger(Tablet.class); >>>> + public TabletStatsKeeper timer; >>>> + >>>> + private Rate queryRate = new Rate(0.2); >>>> + private long queryCount = 0; >>>> + >>>> + private Rate queryByteRate = new Rate(0.2); >>>> + private long queryBytes = 0; >>>> + >>>> + private Rate ingestRate = new Rate(0.2); >>>> + private long ingestCount = 0; >>>> + >>>> + private Rate ingestByteRate = new Rate(0.2); >>>> + private long ingestBytes = 0; >>>> + >>>> + private byte[] defaultSecurityLabel = new byte[0]; >>>> + >>>> + private long lastMinorCompactionFinishTime; >>>> + private long lastMapFileImportTime; >>>> + >>>> + private volatile long numEntries; >>>> + private volatile long numEntriesInMemory; >>>> + >>>> + // a count of the amount of data read by the iterators >>>> + private AtomicLong scannedCount = new AtomicLong(0); >>>> + private Rate scannedRate = new Rate(0.2); >>>> + >>>> + private ConfigurationObserver configObserver; >>>> + >>>> + private TabletServer tabletServer; >>>> + >>>> + private final int logId; >>>> + // ensure we only have one reader/writer of our bulk file notes at >>>> at >>>> time >>>> + public final Object bulkFileImportLock = new Object(); >>>> + >>>> + public int getLogId() { >>>> + return logId; >>>> + } >>>> + >>>> + public static class TabletClosedException extends RuntimeException { >>>> + public TabletClosedException(Exception e) { >>>> + super(e); >>>> + } >>>> + >>>> + public TabletClosedException() { >>>> + super(); >>>> + } >>>> + >>>> + private static final long serialVersionUID = 1L; >>>> + } >>>> + >>>> + FileRef getNextMapFilename(String prefix) throws IOException { >>>> + String extension = >>>> FileOperations.getNewFileExtension(tabletServer. >>>> getTableConfiguration(extent)); >>>> + checkTabletDir(); >>>> + return new FileRef(location.toString() + "/" + prefix + >>>> UniqueNameAllocator.getInstance().getNextName() + "." + extension); >>>> + } >>>> + >>>> + private void checkTabletDir() throws IOException { >>>> + if (!tableDirChecked) { >>>> + checkTabletDir(this.location); >>>> + tableDirChecked = true; >>>> + } >>>> + } >>>> + >>>> + private void checkTabletDir(Path tabletDir) throws IOException { >>>> + >>>> + FileStatus[] files = null; >>>> + try { >>>> + files = fs.listStatus(tabletDir); >>>> + } catch (FileNotFoundException ex) { >>>> + // ignored >>>> + } >>>> + >>>> + if (files == null) { >>>> + if (tabletDir.getName().startsWith("c-")) >>>> + log.debug("Tablet " + extent + " had no dir, creating " + >>>> tabletDir); // its a clone dir... >>>> + else >>>> + log.warn("Tablet " + extent + " had no dir, creating " + >>>> tabletDir); >>>> + >>>> + fs.mkdirs(tabletDir); >>>> + } >>>> + } >>>> + >>>> + class DatafileManager { >>>> + // access to datafilesizes needs to be synchronized: see >>>> CompactionRunner#getNumFiles >>>> + final private Map datafileSizes = >>>> Collections.synchronizedMap(new TreeMap()); >>>> + >>>> + DatafileManager(SortedMap datafileSizes) { >>>> + for (Entry datafiles : >>>> datafileSizes.entrySet()) >>>> + this.datafileSizes.put(datafiles.getKey(), >>>> datafiles.getValue()); >>>> + } >>>> + >>>> + FileRef mergingMinorCompactionFile = null; >>>> + Set filesToDeleteAfterScan = new HashSet(); >>>> + Map> scanFileReservations = new >>>> HashMap>(); >>>> + MapCounter fileScanReferenceCounts = new >>>> MapCounter(); >>>> + long nextScanReservationId = 0; >>>> + boolean reservationsBlocked = false; >>>> + >>>> + Set majorCompactingFiles = new HashSet(); >>>> + >>>> + Pair> reserveFilesForScan() { >>>> + synchronized (Tablet.this) { >>>> + >>>> + while (reservationsBlocked) { >>>> + try { >>>> + Tablet.this.wait(50); >>>> + } catch (InterruptedException e) { >>>> + log.warn(e, e); >>>> + } >>>> + } >>>> + >>>> + Set absFilePaths = new >>>> HashSet(datafileSizes.keySet()); >>>> + >>>> + long rid = nextScanReservationId++; >>>> + >>>> + scanFileReservations.put(rid, absFilePaths); >>>> + >>>> + Map ret = new >>>> HashMap(); >>>> + >>>> + for (FileRef path : absFilePaths) { >>>> + fileScanReferenceCounts.increment(path, 1); >>>> + ret.put(path, datafileSizes.get(path)); >>>> + } >>>> + >>>> + return new Pair>(rid, ret); >>>> + } >>>> + } >>>> + >>>> + void returnFilesForScan(Long reservationId) { >>>> + >>>> + final Set filesToDelete = new HashSet(); >>>> + >>>> + synchronized (Tablet.this) { >>>> + Set absFilePaths = >>>> scanFileReservations.remove(reservationId); >>>> + >>>> + if (absFilePaths == null) >>>> + throw new IllegalArgumentException("Unknown scan >>>> reservation >>>> id " + reservationId); >>>> + >>>> + boolean notify = false; >>>> + for (FileRef path : absFilePaths) { >>>> + long refCount = fileScanReferenceCounts.decrement(path, 1); >>>> + if (refCount == 0) { >>>> + if (filesToDeleteAfterScan.remove(path)) >>>> + filesToDelete.add(path); >>>> + notify = true; >>>> + } else if (refCount < 0) >>>> + throw new IllegalStateException("Scan ref count for " + >>>> path >>>> + " is " + refCount); >>>> + } >>>> + >>>> + if (notify) >>>> + Tablet.this.notifyAll(); >>>> + } >>>> + >>>> + if (filesToDelete.size() > 0) { >>>> + log.debug("Removing scan refs from metadata " + extent + " " + >>>> filesToDelete); >>>> + MetadataTableUtil.removeScanFiles(extent, filesToDelete, >>>> SystemCredentials.get(), tabletServer.getLock()); >>>> + } >>>> + } >>>> + >>>> + private void removeFilesAfterScan(Set scanFiles) { >>>> + if (scanFiles.size() == 0) >>>> + return; >>>> + >>>> + Set filesToDelete = new HashSet(); >>>> + >>>> + synchronized (Tablet.this) { >>>> + for (FileRef path : scanFiles) { >>>> + if (fileScanReferenceCounts.get(path) == 0) >>>> + filesToDelete.add(path); >>>> + else >>>> + filesToDeleteAfterScan.add(path); >>>> + } >>>> + } >>>> + >>>> + if (filesToDelete.size() > 0) { >>>> + log.debug("Removing scan refs from metadata " + extent + " " + >>>> filesToDelete); >>>> + MetadataTableUtil.removeScanFiles(extent, filesToDelete, >>>> SystemCredentials.get(), tabletServer.getLock()); >>>> + } >>>> + } >>>> + >>>> + private TreeSet waitForScansToFinish(Set >>>> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) { >>>> + long startTime = System.currentTimeMillis(); >>>> + TreeSet inUse = new TreeSet(); >>>> + >>>> + Span waitForScans = Trace.start("waitForScans"); >>>> + try { >>>> + synchronized (Tablet.this) { >>>> + if (blockNewScans) { >>>> + if (reservationsBlocked) >>>> + throw new IllegalStateException(); >>>> + >>>> + reservationsBlocked = true; >>>> + } >>>> + >>>> + for (FileRef path : pathsToWaitFor) { >>>> + while (fileScanReferenceCounts.get(path) > 0 && >>>> System.currentTimeMillis() - startTime < maxWaitTime) { >>>> + try { >>>> + Tablet.this.wait(100); >>>> + } catch (InterruptedException e) { >>>> + log.warn(e, e); >>>> + } >>>> + } >>>> + } >>>> + >>>> + for (FileRef path : pathsToWaitFor) { >>>> + if (fileScanReferenceCounts.get(path) > 0) >>>> + inUse.add(path); >>>> + } >>>> + >>>> + if (blockNewScans) { >>>> + reservationsBlocked = false; >>>> + Tablet.this.notifyAll(); >>>> + } >>>> + >>>> + } >>>> + } finally { >>>> + waitForScans.stop(); >>>> + } >>>> + return inUse; >>>> + } >>>> + >>>> + public void importMapFiles(long tid, Map >>>> pathsString, boolean setTime) throws IOException { >>>> + >>>> + String bulkDir = null; >>>> + >>>> + Map paths = new >>>> HashMap(); >>>> + for (Entry entry : >>>> pathsString.entrySet()) >>>> + paths.put(entry.getKey(), entry.getValue()); >>>> + >>>> + for (FileRef tpath : paths.keySet()) { >>>> + >>>> + boolean inTheRightDirectory = false; >>>> + Path parent = tpath.path().getParent().getParent(); >>>> + for (String tablesDir : ServerConstants.getTablesDirs()) { >>>> + if (parent.equals(new Path(tablesDir, >>>> extent.getTableId().toString()))) { >>>> + inTheRightDirectory = true; >>>> + break; >>>> + } >>>> + } >>>> + if (!inTheRightDirectory) { >>>> + throw new IOException("Data file " + tpath + " not in table >>>> dirs"); >>>> + } >>>> + >>>> + if (bulkDir == null) >>>> + bulkDir = tpath.path().getParent().toString(); >>>> + else if (!bulkDir.equals(tpath.path(). >>>> getParent().toString())) >>>> + throw new IllegalArgumentException("bulk files in different >>>> dirs " + bulkDir + " " + tpath); >>>> + >>>> + } >>>> + >>>> + if (extent.isRootTablet()) { >>>> + throw new IllegalArgumentException("Can not import files to >>>> root >>>> tablet"); >>>> + } >>>> + >>>> + synchronized (bulkFileImportLock) { >>>> + Credentials creds = SystemCredentials.get(); >>>> + Connector conn; >>>> + try { >>>> + conn = >>>> HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), >>>> creds.getToken()); >>>> + } catch (Exception ex) { >>>> + throw new IOException(ex); >>>> + } >>>> + // Remove any bulk files we've previously loaded and compacted >>>> away >>>> + List files = MetadataTableUtil. >>>> getBulkFilesLoaded(conn, >>>> extent, tid); >>>> + >>>> + for (FileRef file : files) >>>> + if (paths.keySet().remove(file.path())) >>>> + log.debug("Ignoring request to re-import a file already >>>> imported: " + extent + ": " + file); >>>> + >>>> + if (paths.size() > 0) { >>>> + long bulkTime = Long.MIN_VALUE; >>>> + if (setTime) { >>>> + for (DataFileValue dfv : paths.values()) { >>>> + long nextTime = tabletTime.getAndUpdateTime(); >>>> + if (nextTime < bulkTime) >>>> + throw new IllegalStateException("Time went backwards >>>> unexpectedly " + nextTime + " " + bulkTime); >>>> + bulkTime = nextTime; >>>> + dfv.setTime(bulkTime); >>>> + } >>>> + } >>>> + >>>> + synchronized (timeLock) { >>>> + if (bulkTime > persistedTime) >>>> + persistedTime = bulkTime; >>>> + >>>> + MetadataTableUtil.updateTabletDataFile(tid, extent, >>>> paths, >>>> tabletTime.getMetadataValue(persistedTime), creds, >>>> tabletServer.getLock()); >>>> + } >>>> + } >>>> + } >>>> + >>>> + synchronized (Tablet.this) { >>>> + for (Entry tpath : paths.entrySet()) { >>>> + if (datafileSizes.containsKey(tpath.getKey())) { >>>> + log.error("Adding file that is already in set " + >>>> tpath.getKey()); >>>> + } >>>> + datafileSizes.put(tpath.getKey(), tpath.getValue()); >>>> + >>>> + } >>>> + >>>> + tabletResources.importedMapFiles(); >>>> + >>>> + computeNumEntries(); >>>> + } >>>> + >>>> + for (FileRef tpath : paths.keySet()) { >>>> + log.log(TLevel.TABLET_HIST, extent + " import " + tpath + " " >>>> + >>>> paths.get(tpath)); >>>> + } >>>> + } >>>> + >>>> + FileRef reserveMergingMinorCompactionFile() { >>>> + if (mergingMinorCompactionFile != null) >>>> + throw new IllegalStateException("Tried to reserve merging >>>> minor >>>> compaction file when already reserved : " + mergingMinorCompactionFile); >>>> + >>>> + if (extent.isRootTablet()) >>>> + return null; >>>> + >>>> + int maxFiles = acuTableConf.getMaxFilesPerTablet(); >>>> + >>>> + // when a major compaction is running and we are at max files, >>>> write out >>>> + // one extra file... want to avoid the case where major >>>> compaction >>>> is >>>> + // compacting everything except for the largest file, and >>>> therefore the >>>> + // largest file is returned for merging.. the following check >>>> mostly >>>> + // avoids this case, except for the case where major compactions >>>> fail or >>>> + // are canceled >>>> + if (majorCompactingFiles.size() > 0 && datafileSizes.size() == >>>> maxFiles) >>>> + return null; >>>> + >>>> + if (datafileSizes.size() >= maxFiles) { >>>> + // find the smallest file >>>> + >>>> + long min = Long.MAX_VALUE; >>>> + FileRef minName = null; >>>> + >>>> + for (Entry entry : >>>> datafileSizes.entrySet()) { >>>> + if (entry.getValue().getSize() < min && >>>> !majorCompactingFiles.contains(entry.getKey())) { >>>> + min = entry.getValue().getSize(); >>>> + minName = entry.getKey(); >>>> + } >>>> + } >>>> + >>>> + if (minName == null) >>>> + return null; >>>> + >>>> + mergingMinorCompactionFile = minName; >>>> + return minName; >>>> + } >>>> + >>>> + return null; >>>> + } >>>> + >>>> + void unreserveMergingMinorCompactionFile(FileRef file) { >>>> + if ((file == null && mergingMinorCompactionFile != null) || >>>> (file >>>> != null && mergingMinorCompactionFile == null) >>>> + || (file != null && mergingMinorCompactionFile != null && >>>> !file.equals(mergingMinorCompactionFile))) >>>> + throw new IllegalStateException("Disagreement " + file + " " >>>> + >>>> mergingMinorCompactionFile); >>>> + >>>> + mergingMinorCompactionFile = null; >>>> + } >>>> + >>>> + void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef >>>> newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession >>>> commitSession, long flushId) >>>> + throws IOException { >>>> + >>>> + IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); >>>> + if (extent.isRootTablet()) { >>>> + try { >>>> + if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) { >>>> + throw new IllegalStateException(); >>>> + } >>>> + } catch (Exception e) { >>>> + throw new IllegalStateException("Can not bring major >>>> compaction online, lock not held", e); >>>> + } >>>> + } >>>> + >>>> + // rename before putting in metadata table, so files in metadata >>>> table should >>>> + // always exist >>>> + do { >>>> + try { >>>> + if (dfv.getNumEntries() == 0) { >>>> + fs.deleteRecursively(tmpDatafile.path()); >>>> + } else { >>>> + if (fs.exists(newDatafile.path())) { >>>> + log.warn("Target map file already exist " + >>>> newDatafile); >>>> + fs.deleteRecursively(newDatafile.path()); >>>> + } >>>> + >>>> + if (!fs.rename(tmpDatafile.path(), newDatafile.path())) { >>>> + throw new IOException("rename fails"); >>>> + } >>>> + } >>>> + break; >>>> + } catch (IOException ioe) { >>>> + log.warn("Tablet " + extent + " failed to rename " + >>>> newDatafile + " after MinC, will retry in 60 secs...", ioe); >>>> + UtilWaitThread.sleep(60 * 1000); >>>> + } >>>> + } while (true); >>>> + >>>> + long t1, t2; >>>> + >>>> + // the code below always assumes merged files are in use by >>>> scans... this must be done >>>> + // because the in memory list of files is not updated until >>>> after >>>> the metadata table >>>> + // therefore the file is available to scans until memory is >>>> updated, but want to ensure >>>> + // the file is not available for garbage collection... if memory >>>> were updated >>>> + // before this point (like major compactions do), then the >>>> following code could wait >>>> + // for scans to finish like major compactions do.... used to >>>> wait >>>> for scans to finish >>>> + // here, but that was incorrect because a scan could start after >>>> waiting but before >>>> + // memory was updated... assuming the file is always in use by >>>> scans leads to >>>> + // one uneeded metadata update when it was not actually in use >>>> + Set filesInUseByScans = Collections.emptySet(); >>>> + if (absMergeFile != null) >>>> + filesInUseByScans = Collections.singleton(absMergeFile); >>>> + >>>> + // very important to write delete entries outside of log lock, >>>> because >>>> + // this !METADATA write does not go up... it goes sideways or to >>>> itself >>>> + if (absMergeFile != null) >>>> + MetadataTableUtil.addDeleteEntries(extent, >>>> Collections.singleton(absMergeFile), SystemCredentials.get()); >>>> + >>>> + Set unusedWalLogs = beginClearingUnusedLogs(); >>>> + try { >>>> + // the order of writing to !METADATA and walog is important in >>>> the face of machine/process failures >>>> + // need to write to !METADATA before writing to walog, when >>>> things are done in the reverse order >>>> + // data could be lost... the minor compaction start even >>>> should >>>> be written before the following metadata >>>> + // write is made >>>> + >>>> + synchronized (timeLock) { >>>> + if (commitSession.getMaxCommittedTime() > persistedTime) >>>> + persistedTime = commitSession.getMaxCommittedTime(); >>>> + >>>> + String time = tabletTime.getMetadataValue(persistedTime); >>>> + MasterMetadataUtil.updateTabletDataFile(extent, >>>> newDatafile, >>>> absMergeFile, dfv, time, SystemCredentials.get(), filesInUseByScans, >>>> + tabletServer.getClientAddressString(), >>>> tabletServer.getLock(), unusedWalLogs, lastLocation, flushId); >>>> + } >>>> + >>>> + } finally { >>>> + finishClearingUnusedLogs(); >>>> + } >>>> + >>>> + do { >>>> + try { >>>> + // the purpose of making this update use the new commit >>>> session, instead of the old one passed in, >>>> + // is because the new one will reference the logs used by >>>> current memory... >>>> + >>>> + >>>> tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(), >>>> newDatafile.toString(), commitSession.getWALogSeq() + 2); >>>> + break; >>>> + } catch (IOException e) { >>>> + log.error("Failed to write to write-ahead log " + >>>> e.getMessage() + " will retry", e); >>>> + UtilWaitThread.sleep(1 * 1000); >>>> + } >>>> + } while (true); >>>> + >>>> + synchronized (Tablet.this) { >>>> + lastLocation = null; >>>> + >>>> + t1 = System.currentTimeMillis(); >>>> + if (datafileSizes.containsKey(newDatafile)) { >>>> + log.error("Adding file that is already in set " + >>>> newDatafile); >>>> + } >>>> + >>>> + if (dfv.getNumEntries() > 0) { >>>> + datafileSizes.put(newDatafile, dfv); >>>> + } >>>> + >>>> + if (absMergeFile != null) { >>>> + datafileSizes.remove(absMergeFile); >>>> + } >>>> + >>>> + unreserveMergingMinorCompactionFile(absMergeFile); >>>> + >>>> + dataSourceDeletions.incrementAndGet(); >>>> + tabletMemory.finishedMinC(); >>>> + >>>> + lastFlushID = flushId; >>>> + >>>> + computeNumEntries(); >>>> + t2 = System.currentTimeMillis(); >>>> + } >>>> + >>>> + // must do this after list of files in memory is updated above >>>> + removeFilesAfterScan(filesInUseByScans); >>>> + >>>> + if (absMergeFile != null) >>>> + log.log(TLevel.TABLET_HIST, extent + " MinC [" + absMergeFile >>>> + >>>> ",memory] -> " + newDatafile); >>>> + else >>>> + log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " + >>>> newDatafile); >>>> + log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - >>>> t1) >>>> / 1000.0, getExtent().toString())); >>>> + if (dfv.getSize() > >>>> acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD)) { >>>> + log.debug(String.format("Minor Compaction wrote out file >>>> larger >>>> than split threshold. split threshold = %,d file size = %,d", >>>> + >>>> acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD), >>>> dfv.getSize())); >>>> + } >>>> + >>>> + } >>>> + >>>> + public void reserveMajorCompactingFiles(Collection >>>> files) { >>>> + if (majorCompactingFiles.size() != 0) >>>> + throw new IllegalStateException("Major compacting files not >>>> empty " + majorCompactingFiles); >>>> + >>>> + if (mergingMinorCompactionFile != null && >>>> files.contains(mergingMinorCompactionFile)) >>>> + throw new IllegalStateException("Major compaction tried to >>>> resrve file in use by minor compaction " + mergingMinorCompactionFile); >>>> + >>>> + majorCompactingFiles.addAll(files); >>>> + } >>>> + >>>> + public void clearMajorCompactingFile() { >>>> + majorCompactingFiles.clear(); >>>> + } >>>> + >>>> + void bringMajorCompactionOnline(Set oldDatafiles, >>>> FileRef >>>> tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv) >>>> + throws IOException { >>>> + long t1, t2; >>>> + >>>> + if (!extent.isRootTablet()) { >>>> + >>>> + if (fs.exists(newDatafile.path())) { >>>> + log.error("Target map file already exist " + newDatafile, >>>> new >>>> Exception()); >>>> + throw new IllegalStateException("Target map file already >>>> exist >>>> " + newDatafile); >>>> + } >>>> + >>>> + // rename before putting in metadata table, so files in >>>> metadata >>>> table should >>>> + // always exist >>>> + if (!fs.rename(tmpDatafile.path(), newDatafile.path())) >>>> + log.warn("Rename of " + tmpDatafile + " to " + newDatafile >>>> + " >>>> returned false"); >>>> + >>>> + if (dfv.getNumEntries() == 0) { >>>> + fs.deleteRecursively(newDatafile.path()); >>>> + } >>>> + } >>>> + >>>> + TServerInstance lastLocation = null; >>>> + synchronized (Tablet.this) { >>>> + >>>> + t1 = System.currentTimeMillis(); >>>> + >>>> + IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); >>>> + >>>> + dataSourceDeletions.incrementAndGet(); >>>> + >>>> + if (extent.isRootTablet()) { >>>> + >>>> + waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE); >>>> + >>>> + try { >>>> + if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) >>>> { >>>> + throw new IllegalStateException(); >>>> + } >>>> + } catch (Exception e) { >>>> + throw new IllegalStateException("Can not bring major >>>> compaction online, lock not held", e); >>>> + } >>>> + >>>> + // mark files as ready for deletion, but >>>> + // do not delete them until we successfully >>>> + // rename the compacted map file, in case >>>> + // the system goes down >>>> + >>>> + String compactName = newDatafile.path().getName(); >>>> + >>>> + for (FileRef ref : oldDatafiles) { >>>> + Path path = ref.path(); >>>> + fs.rename(path, new Path(location + "/delete+" + >>>> compactName >>>> + "+" + path.getName())); >>>> + } >>>> + >>>> + if (fs.exists(newDatafile.path())) { >>>> + log.error("Target map file already exist " + newDatafile, >>>> new Exception()); >>>> + throw new IllegalStateException("Target map file already >>>> exist " + newDatafile); >>>> + } >>>> + >>>> + if (!fs.rename(tmpDatafile.path(), newDatafile.path())) >>>> + log.warn("Rename of " + tmpDatafile + " to " + >>>> newDatafile + >>>> " returned false"); >>>> + >>>> + // start deleting files, if we do not finish they will be >>>> cleaned >>>> + // up later >>>> + for (FileRef ref : oldDatafiles) { >>>> + Path path = ref.path(); >>>> + Path deleteFile = new Path(location + "/delete+" + >>>> compactName + "+" + path.getName()); >>>> + if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) || >>>> !fs.moveToTrash(deleteFile)) >>>> + fs.deleteRecursively(deleteFile); >>>> + } >>>> + } >>>> + >>>> + // atomically remove old files and add new file >>>> + for (FileRef oldDatafile : oldDatafiles) { >>>> + if (!datafileSizes.containsKey(oldDatafile)) { >>>> + log.error("file does not exist in set " + oldDatafile); >>>> + } >>>> + datafileSizes.remove(oldDatafile); >>>> + majorCompactingFiles.remove(oldDatafile); >>>> + } >>>> + >>>> + if (datafileSizes.containsKey(newDatafile)) { >>>> + log.error("Adding file that is already in set " + >>>> newDatafile); >>>> + } >>>> + >>>> + if (dfv.getNumEntries() > 0) { >>>> + datafileSizes.put(newDatafile, dfv); >>>> + } >>>> + >>>> + // could be used by a follow on compaction in a multipass >>>> compaction >>>> + majorCompactingFiles.add(newDatafile); >>>> + >>>> + computeNumEntries(); >>>> + >>>> + lastLocation = Tablet.this.lastLocation; >>>> + Tablet.this.lastLocation = null; >>>> + >>>> + if (compactionId != null) >>>> + lastCompactID = compactionId; >>>> + >>>> + t2 = System.currentTimeMillis(); >>>> + } >>>> + >>>> + if (!extent.isRootTablet()) { >>>> + Set filesInUseByScans = >>>> waitForScansToFinish(oldDatafiles, false, 10000); >>>> + if (filesInUseByScans.size() > 0) >>>> + log.debug("Adding scan refs to metadata " + extent + " " + >>>> filesInUseByScans); >>>> + MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles, >>>> filesInUseByScans, newDatafile, compactionId, dfv, >>>> SystemCredentials.get(), >>>> + tabletServer.getClientAddressString(), lastLocation, >>>> tabletServer.getLock()); >>>> + removeFilesAfterScan(filesInUseByScans); >>>> + } >>>> + >>>> + log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) >>>> / >>>> 1000.0)); >>>> + log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + " >>>> --> " + newDatafile); >>>> + } >>>> + >>>> + public SortedMap getDatafileSizes() { >>>> + synchronized (Tablet.this) { >>>> + TreeMap copy = new >>>> TreeMap(datafileSizes); >>>> + return Collections.unmodifiableSortedMap(copy); >>>> + } >>>> + } >>>> + >>>> + public Set getFiles() { >>>> + synchronized (Tablet.this) { >>>> + HashSet files = new >>>> HashSet(datafileSizes.keySet()); >>>> + return Collections.unmodifiableSet(files); >>>> + } >>>> + } >>>> + >>>> + } >>>> + >>>> + public Tablet(TabletServer tabletServer, Text location, KeyExtent >>>> extent, TabletResourceManager trm, SortedMap tabletsKeyValues) >>>> + throws IOException { >>>> + this(tabletServer, location, extent, trm, >>>> CachedConfiguration.getInstance(), tabletsKeyValues); >>>> + splitCreationTime = 0; >>>> + } >>>> + >>>> + public Tablet(TabletServer tabletServer, Text location, KeyExtent >>>> extent, TabletResourceManager trm, SortedMap >>>> datafiles, String time, >>>> + long initFlushID, long initCompactID) throws IOException { >>>> + this(tabletServer, location, extent, trm, >>>> CachedConfiguration.getInstance(), datafiles, time, initFlushID, >>>> initCompactID); >>>> + splitCreationTime = System.currentTimeMillis(); >>>> + } >>>> + >>>> + private Tablet(TabletServer tabletServer, Text location, KeyExtent >>>> extent, TabletResourceManager trm, Configuration conf, >>>> + SortedMap tabletsKeyValues) throws IOException { >>>> + this(tabletServer, location, extent, trm, conf, >>>> VolumeManagerImpl.get(), tabletsKeyValues); >>>> + } >>>> + >>>> + static private final List EMPTY = Collections.emptyList(); >>>> + >>>> + private Tablet(TabletServer tabletServer, Text location, KeyExtent >>>> extent, TabletResourceManager trm, Configuration conf, >>>> + SortedMap datafiles, String time, long >>>> initFlushID, long initCompactID) throws IOException { >>>> + this(tabletServer, location, extent, trm, conf, >>>> VolumeManagerImpl.get(), EMPTY, datafiles, time, null, new >>>> HashSet(), initFlushID, initCompactID); >>>> + } >>>> + >>>> + private static String lookupTime(AccumuloConfiguration conf, >>>> KeyExtent >>>> extent, SortedMap tabletsKeyValues) { >>>> + SortedMap entries; >>>> + >>>> + if (extent.isRootTablet()) { >>>> + return null; >>>> + } else { >>>> + entries = new TreeMap(); >>>> + Text rowName = extent.getMetadataEntry(); >>>> + for (Entry entry : tabletsKeyValues.entrySet()) { >>>> + if (entry.getKey().compareRow(rowName) == 0 && >>>> TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) >>>> { >>>> + entries.put(new Key(entry.getKey()), new >>>> Value(entry.getValue())); >>>> + } >>>> + } >>>> + } >>>> + >>>> + // log.debug("extent : "+extent+" entries : "+entries); >>>> + >>>> + if (entries.size() == 1) >>>> + return entries.values().iterator().next().toString(); >>>> + return null; >>>> + } >>>> + >>>> + private static SortedMap >>>> lookupDatafiles(AccumuloConfiguration conf, VolumeManager fs, KeyExtent >>>> extent, >>>> + SortedMap tabletsKeyValues) throws IOException { >>>> + >>>> + TreeMap datafiles = new >>>> TreeMap(); >>>> + >>>> + if (extent.isRootTablet()) { // the meta0 tablet >>>> + Path location = new Path(MetadataTableUtil.getRootTabletDir()); >>>> + // cleanUpFiles() has special handling for delete. files >>>> + FileStatus[] files = fs.listStatus(location); >>>> + Collection goodPaths = cleanUpFiles(fs, files, true); >>>> + for (String good : goodPaths) { >>>> + Path path = new Path(good); >>>> + String filename = path.getName(); >>>> + FileRef ref = new FileRef(location.toString() + "/" + >>>> filename, >>>> path); >>>> + DataFileValue dfv = new DataFileValue(0, 0); >>>> + datafiles.put(ref, dfv); >>>> + } >>>> + } else { >>>> + >>>> + Text rowName = extent.getMetadataEntry(); >>>> + >>>> + String tableId = extent.isMeta() ? RootTable.ID : >>>> MetadataTable.ID; >>>> + ScannerImpl mdScanner = new >>>> ScannerImpl(HdfsZooInstance.getInstance(), SystemCredentials.get(), >>>> tableId, Authorizations.EMPTY); >>>> + >>>> + // Commented out because when no data file is present, each >>>> tablet >>>> will scan through metadata table and return nothing >>>> + // reduced batch size to improve performance >>>> + // changed here after endKeys were implemented from 10 to 1000 >>>> + mdScanner.setBatchSize(1000); >>>> + >>>> + // leave these in, again, now using endKey for safety >>>> + mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME); >>>> + >>>> + mdScanner.setRange(new Range(rowName)); >>>> + >>>> + for (Entry entry : mdScanner) { >>>> + >>>> + if (entry.getKey().compareRow(rowName) != 0) { >>>> + break; >>>> + } >>>> + >>>> + FileRef ref = new >>>> FileRef(entry.getKey().getColumnQualifier().toString(), >>>> fs.getFullPath(entry.getKey())); >>>> + datafiles.put(ref, new DataFileValue(entry.getValue() >>>> .get())); >>>> + } >>>> + } >>>> + return datafiles; >>>> + } >>>> + >>>> + private static List lookupLogEntries(KeyExtent ke, >>>> SortedMap tabletsKeyValues) { >>>> + List logEntries = new ArrayList(); >>>> + >>>> + if (ke.isMeta()) { >>>> + try { >>>> + logEntries = >>>> MetadataTableUtil.getLogEntries(SystemCredentials.get(), ke); >>>> + } catch (Exception ex) { >>>> + throw new RuntimeException("Unable to read tablet log >>>> entries", >>>> ex); >>>> + } >>>> + } else { >>>> + log.debug("Looking at metadata " + tabletsKeyValues); >>>> + Text row = ke.getMetadataEntry(); >>>> + for (Entry entry : tabletsKeyValues.entrySet()) { >>>> + Key key = entry.getKey(); >>>> + if (key.getRow().equals(row)) { >>>> + if (key.getColumnFamily().equals(LogColumnFamily.NAME)) { >>>> + logEntries.add(LogEntry.fromKeyValue(key, >>>> entry.getValue())); >>>> + } >>>> + } >>>> + } >>>> + } >>>> + >>>> + log.debug("got " + logEntries + " for logs for " + ke); >>>> + return logEntries; >>>> + } >>>> + >>>> + private static Set lookupScanFiles(KeyExtent extent, >>>> SortedMap tabletsKeyValues, VolumeManager fs) throws >>>> IOException >>>> { >>>> + HashSet scanFiles = new HashSet(); >>>> + >>>> + Text row = extent.getMetadataEntry(); >>>> + for (Entry entry : tabletsKeyValues.entrySet()) { >>>> + Key key = entry.getKey(); >>>> + if (key.getRow().equals(row) && >>>> key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) { >>>> + String meta = key.getColumnQualifier().toString(); >>>> + Path path = fs.getFullPath(extent.getTableId().toString(), >>>> meta); >>>> + scanFiles.add(new FileRef(meta, path)); >>>> + } >>>> + } >>>> + >>>> + return scanFiles; >>>> + } >>>> + >>>> + private static long lookupFlushID(KeyExtent extent, >>>> SortedMap tabletsKeyValues) { >>>> + Text row = extent.getMetadataEntry(); >>>> + for (Entry entry : tabletsKeyValues.entrySet()) { >>>> + Key key = entry.getKey(); >>>> + if (key.getRow().equals(row) && >>>> TabletsSection.ServerColumnFamily.FLUSH_COLUMN.equals(key. >>>> getColumnFamily(), >>>> key.getColumnQualifier())) >>>> + return Long.parseLong(entry.getValue().toString()); >>>> + } >>>> + >>>> + return -1; >>>> + } >>>> + >>>> + private static long lookupCompactID(KeyExtent extent, >>>> SortedMap tabletsKeyValues) { >>>> + Text row = extent.getMetadataEntry(); >>>> + for (Entry entry : tabletsKeyValues.entrySet()) { >>>> + Key key = entry.getKey(); >>>> + if (key.getRow().equals(row) && >>>> TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key. >>>> getColumnFamily(), >>>> key.getColumnQualifier())) >>>> + return Long.parseLong(entry.getValue().toString()); >>>> + } >>>> + >>>> + return -1; >>>> + } >>>> + >>>> + private Tablet(TabletServer tabletServer, Text location, KeyExtent >>>> extent, TabletResourceManager trm, Configuration conf, VolumeManager fs, >>>> + SortedMap tabletsKeyValues) throws IOException { >>>> + this(tabletServer, location, extent, trm, conf, fs, >>>> lookupLogEntries(extent, tabletsKeyValues), >>>> lookupDatafiles(tabletServer.getSystemConfiguration(), fs, >>>> + extent, tabletsKeyValues), >>>> lookupTime(tabletServer.getSystemConfiguration(), extent, >>>> tabletsKeyValues), lookupLastServer(extent, tabletsKeyValues), >>>> + lookupScanFiles(extent, tabletsKeyValues, fs), >>>> lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, >>>> tabletsKeyValues)); >>>> + } >>>> + >>>> + private static TServerInstance lookupLastServer(KeyExtent extent, >>>> SortedMap tabletsKeyValues) { >>>> + for (Entry entry : tabletsKeyValues.entrySet()) { >>>> + if (entry.getKey().getColumnFamily().compareTo( >>>> TabletsSection.LastLocationColumnFamily.NAME) == 0) { >>>> + return new TServerInstance(entry.getValue(), >>>> entry.getKey().getColumnQualifier()); >>>> + } >>>> + } >>>> + return null; >>>> + } >>>> + >>>> + /** >>>> + * yet another constructor - this one allows us to avoid costly >>>> lookups into the Metadata table if we already know the files we need - as >>>> at split time >>>> + */ >>>> + private Tablet(final TabletServer tabletServer, final Text location, >>>> final KeyExtent extent, final TabletResourceManager trm, final >>>> Configuration conf, >>>> + final VolumeManager fs, final List logEntries, final >>>> SortedMap datafiles, String time, >>>> + final TServerInstance lastLocation, Set scanFiles, long >>>> initFlushID, long initCompactID) throws IOException { >>>> + Path locationPath; >>>> + if (location.find(":") >= 0) { >>>> + locationPath = new Path(location.toString()); >>>> + } else { >>>> + locationPath = fs.getFullPath(FileType.TABLE, >>>> extent.getTableId().toString() + location.toString()); >>>> + } >>>> + FileSystem fsForPath = fs.getFileSystemByPath(locationPath); >>>> + this.location = locationPath.makeQualified(fsForPath.getUri(), >>>> fsForPath.getWorkingDirectory()); >>>> + this.lastLocation = lastLocation; >>>> + this.tabletDirectory = location.toString(); >>>> + this.conf = conf; >>>> + this.acuTableConf = tabletServer.getTableConfiguration(extent); >>>> + >>>> + this.fs = fs; >>>> + this.extent = extent; >>>> + this.tabletResources = trm; >>>> + >>>> + this.lastFlushID = initFlushID; >>>> + this.lastCompactID = initCompactID; >>>> + >>>> + if (extent.isRootTablet()) { >>>> + long rtime = Long.MIN_VALUE; >>>> + for (FileRef ref : datafiles.keySet()) { >>>> + Path path = ref.path(); >>>> + FileSystem ns = fs.getFileSystemByPath(path); >>>> + FileSKVIterator reader = >>>> FileOperations.getInstance().openReader(path.toString(), true, ns, >>>> ns.getConf(), tabletServer.getTableConfiguration(extent)); >>>> + long maxTime = -1; >>>> + try { >>>> + >>>> + while (reader.hasTop()) { >>>> + maxTime = Math.max(maxTime, >>>> reader.getTopKey().getTimestamp()); >>>> + reader.next(); >>>> + } >>>> + >>>> + } finally { >>>> + reader.close(); >>>> + } >>>> + >>>> + if (maxTime > rtime) { >>>> + time = TabletTime.LOGICAL_TIME_ID + "" + maxTime; >>>> + rtime = maxTime; >>>> + } >>>> + } >>>> + } >>>> + if (time == null && datafiles.isEmpty() && >>>> extent.equals(RootTable.OLD_EXTENT)) { >>>> + // recovery... old root tablet has no data, so time doesn't >>>> matter: >>>> + time = TabletTime.LOGICAL_TIME_ID + "" + Long.MIN_VALUE; >>>> + } >>>> + >>>> + this.tabletServer = tabletServer; >>>> + this.logId = tabletServer.createLogId(extent); >>>> + >>>> + this.timer = new TabletStatsKeeper(); >>>> + >>>> + setupDefaultSecurityLabels(extent); >>>> + >>>> + tabletMemory = new TabletMemory(); >>>> + tabletTime = TabletTime.getInstance(time); >>>> + persistedTime = tabletTime.getTime(); >>>> + >>>> + acuTableConf.addObserver(configObserver = new >>>> ConfigurationObserver() { >>>> + >>>> + private void reloadConstraints() { >>>> + constraintChecker.set(new >>>> ConstraintChecker(getTableConfiguration())); >>>> + } >>>> + >>>> + @Override >>>> + public void propertiesChanged() { >>>> + reloadConstraints(); >>>> + >>>> + try { >>>> + setupDefaultSecurityLabels(extent); >>>> + } catch (Exception e) { >>>> + log.error("Failed to reload default security labels for >>>> extent: " + extent.toString()); >>>> + } >>>> + } >>>> + >>>> + @Override >>>> + public void propertyChanged(String prop) { >>>> + if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX. >>>> getKey())) >>>> + reloadConstraints(); >>>> + else if >>>> (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) { >>>> + try { >>>> + log.info("Default security labels changed for extent: " + >>>> extent.toString()); >>>> + setupDefaultSecurityLabels(extent); >>>> + } catch (Exception e) { >>>> + log.error("Failed to reload default security labels for >>>> extent: " + extent.toString()); >>>> + } >>>> + } >>>> + >>>> + } >>>> + >>>> + @Override >>>> + public void sessionExpired() { >>>> + log.debug("Session expired, no longer updating per table >>>> props..."); >>>> + } >>>> + >>>> + }); >>>> + // Force a load of any per-table properties >>>> + configObserver.propertiesChanged(); >>>> + >>>> + tabletResources.setTablet(this, acuTableConf); >>>> + if (!logEntries.isEmpty()) { >>>> + log.info("Starting Write-Ahead Log recovery for " + >>>> this.extent); >>>> + final long[] count = new long[2]; >>>> + final CommitSession commitSession = >>>> tabletMemory.getCommitSession(); >>>> + count[1] = Long.MIN_VALUE; >>>> + try { >>>> + Set absPaths = new HashSet(); >>>> + for (FileRef ref : datafiles.keySet()) >>>> + absPaths.add(ref.path().toString()); >>>> + >>>> + tabletServer.recover(this.tabletServer.getFileSystem(), this, >>>> logEntries, absPaths, new MutationReceiver() { >>>> + @Override >>>> + public void receive(Mutation m) { >>>> + // LogReader.printMutation(m); >>>> + Collection muts = m.getUpdates(); >>>> + for (ColumnUpdate columnUpdate : muts) { >>>> + if (!columnUpdate.hasTimestamp()) { >>>> + // if it is not a user set timestamp, it must have >>>> been >>>> set >>>> + // by the system >>>> + count[1] = Math.max(count[1], >>>> columnUpdate.getTimestamp()); >>>> + } >>>> + } >>>> + tabletMemory.mutate(commitSession, >>>> Collections.singletonList(m)); >>>> + count[0]++; >>>> + } >>>> + }); >>>> + >>>> + if (count[1] != Long.MIN_VALUE) { >>>> + tabletTime.useMaxTimeFromWALog(count[1]); >>>> + } >>>> + commitSession.updateMaxCommittedTime(tabletTime.getTime()); >>>> + >>>> - tabletMemory.updateMemoryUsageStats(); >>>> - >>>> + if (count[0] == 0) { >>>> + MetadataTableUtil.removeUnusedWALEntries(extent, >>>> logEntries, >>>> tabletServer.getLock()); >>>> + logEntries.clear(); >>>> + } >>>> + >>>> + } catch (Throwable t) { >>>> + if (acuTableConf.getBoolean(Property.TABLE_FAILURES_IGNORE)) >>>> { >>>> + log.warn("Error recovering from log files: ", t); >>>> + } else { >>>> + throw new RuntimeException(t); >>>> + } >>>> + } >>>> + // make some closed references that represent the recovered logs >>>> + currentLogs = new HashSet(); >>>> + for (LogEntry logEntry : logEntries) { >>>> + for (String log : logEntry.logSet) { >>>> + currentLogs.add(new DfsLogger(tabletServer. >>>> getServerConfig(), >>>> log)); >>>> + } >>>> + } >>>> + >>>> + log.info("Write-Ahead Log recovery complete for " + >>>> this.extent + >>>> " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries() >>>> + + " entries created)"); >>>> + } >>>> + >>>> + String contextName = acuTableConf.get(Property.TABLE_CLASSPATH); >>>> + if (contextName != null && !contextName.equals("")) { >>>> + // initialize context classloader, instead of possibly waiting >>>> for >>>> it to initialize for a scan >>>> + // TODO this could hang, causing other tablets to fail to load - >>>> ACCUMULO-1292 >>>> + >>>> AccumuloVFSClassLoader.getContextManager(). >>>> getClassLoader(contextName); >>>> + } >>>> + >>>> + // do this last after tablet is completely setup because it >>>> + // could cause major compaction to start >>>> + datafileManager = new DatafileManager(datafiles); >>>> + >>>> + computeNumEntries(); >>>> + >>>> + datafileManager.removeFilesAfterScan(scanFiles); >>>> + >>>> + // look for hints of a failure on the previous tablet server >>>> + if (!logEntries.isEmpty() || >>>> needsMajorCompaction(MajorCompactionReason.NORMAL)) { >>>> + // look for any temp files hanging around >>>> + removeOldTemporaryFiles(); >>>> + } >>>> + >>>> + log.log(TLevel.TABLET_HIST, extent + " opened"); >>>> + } >>>> + >>>> + private void removeOldTemporaryFiles() { >>>> + // remove any temporary files created by a previous tablet server >>>> + try { >>>> + for (FileStatus tmp : fs.globStatus(new Path(location, >>>> "*_tmp"))) { >>>> + try { >>>> + log.debug("Removing old temp file " + tmp.getPath()); >>>> + fs.delete(tmp.getPath()); >>>> + } catch (IOException ex) { >>>> + log.error("Unable to remove old temp file " + tmp.getPath() >>>> + >>>> ": " + ex); >>>> + } >>>> + } >>>> + } catch (IOException ex) { >>>> + log.error("Error scanning for old temp files in " + location); >>>> + } >>>> + } >>>> + >>>> + private void setupDefaultSecurityLabels(KeyExtent extent) { >>>> + if (extent.isMeta()) { >>>> + defaultSecurityLabel = new byte[0]; >>>> + } else { >>>> + try { >>>> + ColumnVisibility cv = new >>>> ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_ >>>> SCANTIME_VISIBILITY)); >>>> + this.defaultSecurityLabel = cv.getExpression(); >>>> + } catch (Exception e) { >>>> + log.error(e, e); >>>> + this.defaultSecurityLabel = new byte[0]; >>>> + } >>>> + } >>>> + } >>>> + >>>> + private static Collection cleanUpFiles(VolumeManager fs, >>>> FileStatus[] files, boolean deleteTmp) throws IOException { >>>> + /* >>>> + * called in constructor and before major compactions >>>> + */ >>>> + Collection goodFiles = new ArrayList(files. >>>> length); >>>> + >>>> + for (FileStatus file : files) { >>>> + >>>> + String path = file.getPath().toString(); >>>> + String filename = file.getPath().getName(); >>>> + >>>> + // check for incomplete major compaction, this should only occur >>>> + // for root tablet >>>> + if (filename.startsWith("delete+")) { >>>> + String expectedCompactedFile = path.substring(0, >>>> path.lastIndexOf("/delete+")) + "/" + filename.split("\\+")[1]; >>>> + if (fs.exists(new Path(expectedCompactedFile))) { >>>> + // compaction finished, but did not finish deleting >>>> compacted >>>> files.. so delete it >>>> + if (!fs.deleteRecursively(file.getPath())) >>>> + log.warn("Delete of file: " + file.getPath().toString() + >>>> " >>>> return false"); >>>> + continue; >>>> + } >>>> + // compaction did not finish, so put files back >>>> + >>>> + // reset path and filename for rest of loop >>>> + filename = filename.split("\\+", 3)[2]; >>>> + path = path.substring(0, path.lastIndexOf("/delete+")) + "/" + >>>> filename; >>>> + >>>> + if (!fs.rename(file.getPath(), new Path(path))) >>>> + log.warn("Rename of " + file.getPath().toString() + " to " + >>>> path + " returned false"); >>>> + } >>>> + >>>> + if (filename.endsWith("_tmp")) { >>>> + if (deleteTmp) { >>>> + log.warn("cleaning up old tmp file: " + path); >>>> + if (!fs.deleteRecursively(file.getPath())) >>>> + log.warn("Delete of tmp file: " + >>>> file.getPath().toString() >>>> + " return false"); >>>> + >>>> + } >>>> + continue; >>>> + } >>>> + >>>> + if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && >>>> !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) >>>> { >>>> + log.error("unknown file in tablet" + path); >>>> + continue; >>>> + } >>>> + >>>> + goodFiles.add(path); >>>> + } >>>> + >>>> + return goodFiles; >>>> + } >>>> + >>>> + public static class KVEntry extends KeyValue { >>>> + public KVEntry(Key k, Value v) { >>>> + super(new Key(k), Arrays.copyOf(v.get(), v.get().length)); >>>> + } >>>> + >>>> + @Override >>> >>> >