accumulo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Vines <vi...@apache.org>
Subject Re: [11/15] ACCUMULO-1940 do not expose a new tablet to the memory manager until after it is online
Date Sat, 30 Nov 2013 22:19:39 GMT
Perhaps after this release we should do another retro to do lessons
learned, to help streamline releases/multiple versions even further in the
future.


On Sat, Nov 30, 2013 at 5:16 PM, Josh Elser <josh.elser@gmail.com> wrote:

> On 11/30/2013 5:06 PM, Eric Newton wrote:
>
>> There was a merge "conflict" in 1.5 or 1.6. There was an extra line of
>> whitespace, or a line missing.
>>
>
> Strange. Maybe John hit the nail on the head that the email notifications
> for Git aren't always correct?
>
>  It is annoying to maintain 1.4, 1.5, and the largely unnecessary 1.6 (just
>> use master).
>>
>> However, I think this chore comes with software maturity and a larger user
>> base.
>>
>>
> I'd agree. I was mostly whining anyways. Having multiple versions in use
> is better than not having people use anything. Perhaps the 1.6.0 and master
> split is something we can revisit for the next release. If no one is
> working on features for the "next" release while we test the "current"
> release, there isn't a point in having two branches.
>
>
>>
>> On Sat, Nov 30, 2013 at 12:15 AM, Josh Elser <josh.elser@gmail.com>
>> wrote:
>>
>>  Actually, I was kind of confused when I saw your commit*s* on this
>>> ticket.
>>> What did you actually do? You have two commits that do the same changes:
>>>
>>> 82477f08aa64e2a8a1cf7f6af0db5ce954801ac8 (in 1.4, 1.5 and 1.6)
>>> 9b6b9cf104ff332cffdd4900d8057557e64e0ec8 (only in 1.6)\
>>>
>>> I would've only expected to see one email with a diff, followed by 2
>>> "merge" emails, e.g.
>>>
>>> ----------------------------------------------------------------------
>>>   .../tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java  |
>>> 2 --
>>>   1 file changed, 2 deletions(-)
>>> ----------------------------------------------------------------------
>>>
>>> Although, I will admit that dealing with 3 active branches is a big pain.
>>> However, I don't know of a better way to handle this in a way that
>>> doesn't
>>> make Git super confused and thus limit us in being able to answer
>>> questions
>>> like "where was a problem introduced" (git-bisect) and "where does this
>>> change exist" (and not having multiple commits that perform the same
>>> changes).
>>>
>>> On 11/29/13, 8:31 PM, Eric Newton wrote:
>>>
>>>  I changed one line of this file... git seems to be having a conniption.
>>>>  I
>>>> find the volume of git traffic to be so useless that I ignore it.
>>>>
>>>> Anyone else?
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Nov 29, 2013 at 1:24 PM, <ecn@apache.org> wrote:
>>>>
>>>>
>>>>  http://git-wip-us.apache.org/repos/asf/accumulo/blob/
>>>>> 9b6b9cf1/server/tserver/src/main/java/org/apache/accumulo/
>>>>> tserver/Tablet.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --cc
>>>>> server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
>>>>> index ee3b243,0000000..fd76415
>>>>> mode 100644,000000..100644
>>>>> --- a/server/tserver/src/main/java/org/apache/accumulo/
>>>>> tserver/Tablet.java
>>>>> +++ b/server/tserver/src/main/java/org/apache/accumulo/
>>>>> tserver/Tablet.java
>>>>> @@@ -1,3868 -1,0 +1,3866 @@@
>>>>>    +/*
>>>>>    + * Licensed to the Apache Software Foundation (ASF) under one or
>>>>> more
>>>>>    + * contributor license agreements.  See the NOTICE file distributed
>>>>> with
>>>>>    + * this work for additional information regarding copyright
>>>>> ownership.
>>>>>    + * The ASF licenses this file to You under the Apache License,
>>>>> Version
>>>>> 2.0
>>>>>    + * (the "License"); you may not use this file except in compliance
>>>>> with
>>>>>    + * the License.  You may obtain a copy of the License at
>>>>>    + *
>>>>>    + *     http://www.apache.org/licenses/LICENSE-2.0
>>>>>    + *
>>>>>    + * Unless required by applicable law or agreed to in writing,
>>>>> software
>>>>>    + * distributed under the License is distributed on an "AS IS"
>>>>> BASIS,
>>>>>    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>>>> implied.
>>>>>    + * See the License for the specific language governing permissions
>>>>> and
>>>>>    + * limitations under the License.
>>>>>    + */
>>>>>    +package org.apache.accumulo.tserver;
>>>>>    +
>>>>>    +import java.io.ByteArrayInputStream;
>>>>>    +import java.io.DataInputStream;
>>>>>    +import java.io.FileNotFoundException;
>>>>>    +import java.io.IOException;
>>>>>    +import java.util.ArrayList;
>>>>>    +import java.util.Arrays;
>>>>>    +import java.util.Collection;
>>>>>    +import java.util.Collections;
>>>>>    +import java.util.Comparator;
>>>>>    +import java.util.EnumSet;
>>>>>    +import java.util.HashMap;
>>>>>    +import java.util.HashSet;
>>>>>    +import java.util.Iterator;
>>>>>    +import java.util.List;
>>>>>    +import java.util.Map;
>>>>>    +import java.util.Map.Entry;
>>>>>    +import java.util.PriorityQueue;
>>>>>    +import java.util.Set;
>>>>>    +import java.util.SortedMap;
>>>>>    +import java.util.TreeMap;
>>>>>    +import java.util.TreeSet;
>>>>>    +import java.util.concurrent.atomic.AtomicBoolean;
>>>>>    +import java.util.concurrent.atomic.AtomicLong;
>>>>>    +import java.util.concurrent.atomic.AtomicReference;
>>>>>    +import java.util.concurrent.locks.ReentrantLock;
>>>>>    +
>>>>>    +import org.apache.accumulo.core.Constants;
>>>>>    +import org.apache.accumulo.core.client.Connector;
>>>>>    +import org.apache.accumulo.core.client.IteratorSetting;
>>>>>    +import org.apache.accumulo.core.client.impl.ScannerImpl;
>>>>>    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
>>>>>    +import org.apache.accumulo.core.conf.ConfigurationCopy;
>>>>>    +import org.apache.accumulo.core.conf.ConfigurationObserver;
>>>>>    +import org.apache.accumulo.core.conf.Property;
>>>>>    +import org.apache.accumulo.core.constraints.Violations;
>>>>>    +import org.apache.accumulo.core.data.ByteSequence;
>>>>>    +import org.apache.accumulo.core.data.Column;
>>>>>    +import org.apache.accumulo.core.data.ColumnUpdate;
>>>>>    +import org.apache.accumulo.core.data.Key;
>>>>>    +import org.apache.accumulo.core.data.KeyExtent;
>>>>>    +import org.apache.accumulo.core.data.KeyValue;
>>>>>    +import org.apache.accumulo.core.data.Mutation;
>>>>>    +import org.apache.accumulo.core.data.Range;
>>>>>    +import org.apache.accumulo.core.data.Value;
>>>>>    +import org.apache.accumulo.core.data.thrift.IterInfo;
>>>>>    +import org.apache.accumulo.core.data.thrift.MapFileInfo;
>>>>>    +import org.apache.accumulo.core.file.FileOperations;
>>>>>    +import org.apache.accumulo.core.file.FileSKVIterator;
>>>>>    +import org.apache.accumulo.core.iterators.
>>>>> IterationInterruptedException;
>>>>>    +import org.apache.accumulo.core.iterators.IteratorEnvironment;
>>>>>    +import org.apache.accumulo.core.iterators.IteratorUtil;
>>>>>    +import org.apache.accumulo.core.iterators.IteratorUtil.
>>>>> IteratorScope;
>>>>>    +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
>>>>>    +import
>>>>> org.apache.accumulo.core.iterators.system.
>>>>> ColumnFamilySkippingIterator;
>>>>>    +import org.apache.accumulo.core.iterators.system.
>>>>> ColumnQualifierFilter;
>>>>>    +import org.apache.accumulo.core.iterators.system.DeletingIterator;
>>>>>    +import org.apache.accumulo.core.iterators.system.
>>>>> InterruptibleIterator;
>>>>>    +import org.apache.accumulo.core.iterators.system.MultiIterator;
>>>>>    +import org.apache.accumulo.core.iterators.system.
>>>>> SourceSwitchingIterator;
>>>>>    +import
>>>>> org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.
>>>>> DataSource;
>>>>>    +import org.apache.accumulo.core.iterators.system.StatsIterator;
>>>>>    +import org.apache.accumulo.core.iterators.system.VisibilityFilter;
>>>>>    +import org.apache.accumulo.core.master.thrift.TabletLoadState;
>>>>>    +import org.apache.accumulo.core.metadata.MetadataTable;
>>>>>    +import org.apache.accumulo.core.metadata.RootTable;
>>>>>    +import org.apache.accumulo.core.metadata.schema.DataFileValue;
>>>>>    +import
>>>>> org.apache.accumulo.core.metadata.schema.
>>>>> MetadataSchema.TabletsSection;
>>>>>    +import
>>>>> org.apache.accumulo.core.metadata.schema.
>>>>> MetadataSchema.TabletsSection.
>>>>> DataFileColumnFamily;
>>>>>    +import
>>>>> org.apache.accumulo.core.metadata.schema.
>>>>> MetadataSchema.TabletsSection.
>>>>> LogColumnFamily;
>>>>>    +import
>>>>> org.apache.accumulo.core.metadata.schema.
>>>>> MetadataSchema.TabletsSection.
>>>>> ScanFileColumnFamily;
>>>>>    +import org.apache.accumulo.core.security.Authorizations;
>>>>>    +import org.apache.accumulo.core.security.ColumnVisibility;
>>>>>    +import org.apache.accumulo.core.security.Credentials;
>>>>>    +import org.apache.accumulo.core.tabletserver.log.LogEntry;
>>>>>    +import org.apache.accumulo.core.util.CachedConfiguration;
>>>>>    +import org.apache.accumulo.core.util.LocalityGroupUtil;
>>>>>    +import
>>>>> org.apache.accumulo.core.util.LocalityGroupUtil.
>>>>> LocalityGroupConfigurationError;
>>>>>    +import org.apache.accumulo.core.util.MapCounter;
>>>>>    +import org.apache.accumulo.core.util.Pair;
>>>>>    +import org.apache.accumulo.core.util.UtilWaitThread;
>>>>>    +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
>>>>>    +import org.apache.accumulo.server.ServerConstants;
>>>>>    +import org.apache.accumulo.server.client.HdfsZooInstance;
>>>>>    +import org.apache.accumulo.server.conf.TableConfiguration;
>>>>>    +import org.apache.accumulo.server.fs.FileRef;
>>>>>    +import org.apache.accumulo.server.fs.VolumeManager;
>>>>>    +import org.apache.accumulo.server.fs.VolumeManager.FileType;
>>>>>    +import org.apache.accumulo.server.fs.VolumeManagerImpl;
>>>>>    +import org.apache.accumulo.server.master.state.TServerInstance;
>>>>>    +import org.apache.accumulo.server.master.tableOps.
>>>>> CompactionIterators;
>>>>>    +import org.apache.accumulo.server.problems.ProblemReport;
>>>>>    +import org.apache.accumulo.server.problems.ProblemReports;
>>>>>    +import org.apache.accumulo.server.problems.ProblemType;
>>>>>    +import org.apache.accumulo.server.security.SystemCredentials;
>>>>>    +import org.apache.accumulo.server.tablets.TabletTime;
>>>>>    +import org.apache.accumulo.server.tablets.UniqueNameAllocator;
>>>>>    +import org.apache.accumulo.server.util.FileUtil;
>>>>>    +import org.apache.accumulo.server.util.MasterMetadataUtil;
>>>>>    +import org.apache.accumulo.server.util.MetadataTableUtil;
>>>>>    +import org.apache.accumulo.server.util.TabletOperations;
>>>>>    +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
>>>>>    +import org.apache.accumulo.start.classloader.vfs.
>>>>> AccumuloVFSClassLoader;
>>>>>    +import org.apache.accumulo.trace.instrument.Span;
>>>>>    +import org.apache.accumulo.trace.instrument.Trace;
>>>>>    +import org.apache.accumulo.tserver.Compactor.
>>>>> CompactionCanceledException;
>>>>>    +import org.apache.accumulo.tserver.Compactor.CompactionEnv;
>>>>>    +import org.apache.accumulo.tserver.FileManager.ScanFileManager;
>>>>>    +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
>>>>>    +import org.apache.accumulo.tserver.TabletServer.
>>>>> TservConstraintEnv;
>>>>>    +import
>>>>> org.apache.accumulo.tserver.TabletServerResourceManager.
>>>>> TabletResourceManager;
>>>>>    +import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
>>>>>    +import org.apache.accumulo.tserver.compaction.CompactionPlan;
>>>>>    +import org.apache.accumulo.tserver.compaction.CompactionStrategy;
>>>>>    +import org.apache.accumulo.tserver.compaction.
>>>>> DefaultCompactionStrategy;
>>>>>    +import org.apache.accumulo.tserver.compaction.
>>>>> MajorCompactionReason;
>>>>>    +import org.apache.accumulo.tserver.compaction.
>>>>> MajorCompactionRequest;
>>>>>    +import org.apache.accumulo.tserver.compaction.WriteParameters;
>>>>>    +import org.apache.accumulo.tserver.constraints.ConstraintChecker;
>>>>>    +import org.apache.accumulo.tserver.log.DfsLogger;
>>>>>    +import org.apache.accumulo.tserver.log.MutationReceiver;
>>>>>    +import org.apache.accumulo.tserver.mastermessage.
>>>>> TabletStatusMessage;
>>>>>    +import org.apache.accumulo.tserver.metrics.
>>>>> TabletServerMinCMetrics;
>>>>>    +import org.apache.commons.codec.DecoderException;
>>>>>    +import org.apache.commons.codec.binary.Hex;
>>>>>    +import org.apache.hadoop.conf.Configuration;
>>>>>    +import org.apache.hadoop.fs.FileStatus;
>>>>>    +import org.apache.hadoop.fs.FileSystem;
>>>>>    +import org.apache.hadoop.fs.Path;
>>>>>    +import org.apache.hadoop.io.Text;
>>>>>    +import org.apache.log4j.Logger;
>>>>>    +import org.apache.zookeeper.KeeperException;
>>>>>    +import org.apache.zookeeper.KeeperException.NoNodeException;
>>>>>    +
>>>>>    +/*
>>>>>    + * We need to be able to have the master tell a tabletServer to
>>>>>    + * close this file, and the tablet server to handle all pending
>>>>> client
>>>>> reads
>>>>>    + * before closing
>>>>>    + *
>>>>>    + */
>>>>>    +
>>>>>    +/**
>>>>>    + *
>>>>>    + * this class just provides an interface to read from a MapFile
>>>>> mostly
>>>>> takes care of reporting start and end keys
>>>>>    + *
>>>>>    + * need this because a single row extent can have multiple columns
>>>>> this
>>>>> manages all the columns (each handled by a store) for a single
>>>>> row-extent
>>>>>    + *
>>>>>    + *
>>>>>    + */
>>>>>    +
>>>>>    +public class Tablet {
>>>>>    +
>>>>>    +  enum MinorCompactionReason {
>>>>>    +    USER, SYSTEM, CLOSE
>>>>>    +  }
>>>>>    +
>>>>>    +  public class CommitSession {
>>>>>    +
>>>>>    +    private int seq;
>>>>>    +    private InMemoryMap memTable;
>>>>>    +    private int commitsInProgress;
>>>>>    +    private long maxCommittedTime = Long.MIN_VALUE;
>>>>>    +
>>>>>    +    private CommitSession(int seq, InMemoryMap imm) {
>>>>>    +      this.seq = seq;
>>>>>    +      this.memTable = imm;
>>>>>    +      commitsInProgress = 0;
>>>>>    +    }
>>>>>    +
>>>>>    +    public int getWALogSeq() {
>>>>>    +      return seq;
>>>>>    +    }
>>>>>    +
>>>>>    +    private void decrementCommitsInProgress() {
>>>>>    +      if (commitsInProgress < 1)
>>>>>    +        throw new IllegalStateException("commitsInProgress = " +
>>>>> commitsInProgress);
>>>>>    +
>>>>>    +      commitsInProgress--;
>>>>>    +      if (commitsInProgress == 0)
>>>>>    +        Tablet.this.notifyAll();
>>>>>    +    }
>>>>>    +
>>>>>    +    private void incrementCommitsInProgress() {
>>>>>    +      if (commitsInProgress < 0)
>>>>>    +        throw new IllegalStateException("commitsInProgress = " +
>>>>> commitsInProgress);
>>>>>    +
>>>>>    +      commitsInProgress++;
>>>>>    +    }
>>>>>    +
>>>>>    +    private void waitForCommitsToFinish() {
>>>>>    +      while (commitsInProgress > 0) {
>>>>>    +        try {
>>>>>    +          Tablet.this.wait(50);
>>>>>    +        } catch (InterruptedException e) {
>>>>>    +          log.warn(e, e);
>>>>>    +        }
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +    public void abortCommit(List<Mutation> value) {
>>>>>    +      Tablet.this.abortCommit(this, value);
>>>>>    +    }
>>>>>    +
>>>>>    +    public void commit(List<Mutation> mutations) {
>>>>>    +      Tablet.this.commit(this, mutations);
>>>>>    +    }
>>>>>    +
>>>>>    +    public Tablet getTablet() {
>>>>>    +      return Tablet.this;
>>>>>    +    }
>>>>>    +
>>>>>    +    public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger>
>>>>> copy,
>>>>> boolean mincFinish) {
>>>>>    +      return Tablet.this.beginUpdatingLogsUsed(memTable, copy,
>>>>> mincFinish);
>>>>>    +    }
>>>>>    +
>>>>>    +    public void finishUpdatingLogsUsed() {
>>>>>    +      Tablet.this.finishUpdatingLogsUsed();
>>>>>    +    }
>>>>>    +
>>>>>    +    public int getLogId() {
>>>>>    +      return logId;
>>>>>    +    }
>>>>>    +
>>>>>    +    public KeyExtent getExtent() {
>>>>>    +      return extent;
>>>>>    +    }
>>>>>    +
>>>>>    +    private void updateMaxCommittedTime(long time) {
>>>>>    +      maxCommittedTime = Math.max(time, maxCommittedTime);
>>>>>    +    }
>>>>>    +
>>>>>    +    private long getMaxCommittedTime() {
>>>>>    +      if (maxCommittedTime == Long.MIN_VALUE)
>>>>>    +        throw new IllegalStateException("Tried to read max
>>>>> committed
>>>>> time when it was never set");
>>>>>    +      return maxCommittedTime;
>>>>>    +    }
>>>>>    +
>>>>>    +  }
>>>>>    +
>>>>>    +  private class TabletMemory {
>>>>>    +    private InMemoryMap memTable;
>>>>>    +    private InMemoryMap otherMemTable;
>>>>>    +    private InMemoryMap deletingMemTable;
>>>>>    +    private int nextSeq = 1;
>>>>>    +    private CommitSession commitSession;
>>>>>    +
>>>>>    +    TabletMemory() {
>>>>>    +      try {
>>>>>    +        memTable = new InMemoryMap(acuTableConf);
>>>>>    +      } catch (LocalityGroupConfigurationError e) {
>>>>>    +        throw new RuntimeException(e);
>>>>>    +      }
>>>>>    +      commitSession = new CommitSession(nextSeq, memTable);
>>>>>    +      nextSeq += 2;
>>>>>    +    }
>>>>>    +
>>>>>    +    InMemoryMap getMemTable() {
>>>>>    +      return memTable;
>>>>>    +    }
>>>>>    +
>>>>>    +    InMemoryMap getMinCMemTable() {
>>>>>    +      return otherMemTable;
>>>>>    +    }
>>>>>    +
>>>>>    +    CommitSession prepareForMinC() {
>>>>>    +      if (otherMemTable != null) {
>>>>>    +        throw new IllegalStateException();
>>>>>    +      }
>>>>>    +
>>>>>    +      if (deletingMemTable != null) {
>>>>>    +        throw new IllegalStateException();
>>>>>    +      }
>>>>>    +
>>>>>    +      otherMemTable = memTable;
>>>>>    +      try {
>>>>>    +        memTable = new InMemoryMap(acuTableConf);
>>>>>    +      } catch (LocalityGroupConfigurationError e) {
>>>>>    +        throw new RuntimeException(e);
>>>>>    +      }
>>>>>    +
>>>>>    +      CommitSession oldCommitSession = commitSession;
>>>>>    +      commitSession = new CommitSession(nextSeq, memTable);
>>>>>    +      nextSeq += 2;
>>>>>    +
>>>>>    +
>>>>>    tabletResources.updateMemoryUsageStats(
>>>>> memTable.estimatedSizeInBytes(
>>>>> ),
>>>>> otherMemTable.estimatedSizeInBytes());
>>>>>    +
>>>>>    +      return oldCommitSession;
>>>>>    +    }
>>>>>    +
>>>>>    +    void finishedMinC() {
>>>>>    +
>>>>>    +      if (otherMemTable == null) {
>>>>>    +        throw new IllegalStateException();
>>>>>    +      }
>>>>>    +
>>>>>    +      if (deletingMemTable != null) {
>>>>>    +        throw new IllegalStateException();
>>>>>    +      }
>>>>>    +
>>>>>    +      deletingMemTable = otherMemTable;
>>>>>    +
>>>>>    +      otherMemTable = null;
>>>>>    +      Tablet.this.notifyAll();
>>>>>    +    }
>>>>>    +
>>>>>    +    void finalizeMinC() {
>>>>>    +      try {
>>>>>    +        deletingMemTable.delete(15000);
>>>>>    +      } finally {
>>>>>    +        synchronized (Tablet.this) {
>>>>>    +          if (otherMemTable != null) {
>>>>>    +            throw new IllegalStateException();
>>>>>    +          }
>>>>>    +
>>>>>    +          if (deletingMemTable == null) {
>>>>>    +            throw new IllegalStateException();
>>>>>    +          }
>>>>>    +
>>>>>    +          deletingMemTable = null;
>>>>>    +
>>>>>    +
>>>>>    tabletResources.updateMemoryUsageStats(
>>>>> memTable.estimatedSizeInBytes(),
>>>>> 0);
>>>>>    +        }
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +    boolean memoryReservedForMinC() {
>>>>>    +      return otherMemTable != null || deletingMemTable != null;
>>>>>    +    }
>>>>>    +
>>>>>    +    void waitForMinC() {
>>>>>    +      while (otherMemTable != null || deletingMemTable != null) {
>>>>>    +        try {
>>>>>    +          Tablet.this.wait(50);
>>>>>    +        } catch (InterruptedException e) {
>>>>>    +          log.warn(e, e);
>>>>>    +        }
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +    void mutate(CommitSession cm, List<Mutation> mutations) {
>>>>>    +      cm.memTable.mutate(mutations);
>>>>>    +    }
>>>>>    +
>>>>>    +    void updateMemoryUsageStats() {
>>>>>    +      long other = 0;
>>>>>    +      if (otherMemTable != null)
>>>>>    +        other = otherMemTable.estimatedSizeInBytes();
>>>>>    +      else if (deletingMemTable != null)
>>>>>    +        other = deletingMemTable.estimatedSizeInBytes();
>>>>>    +
>>>>>    +
>>>>>    tabletResources.updateMemoryUsageStats(
>>>>> memTable.estimatedSizeInBytes(
>>>>> ),
>>>>> other);
>>>>>    +    }
>>>>>    +
>>>>>    +    List<MemoryIterator> getIterators() {
>>>>>    +      List<MemoryIterator> toReturn = new
>>>>> ArrayList<MemoryIterator>(2);
>>>>>    +      toReturn.add(memTable.skvIterator());
>>>>>    +      if (otherMemTable != null)
>>>>>    +        toReturn.add(otherMemTable.skvIterator());
>>>>>    +      return toReturn;
>>>>>    +    }
>>>>>    +
>>>>>    +    void returnIterators(List<MemoryIterator> iters) {
>>>>>    +      for (MemoryIterator iter : iters) {
>>>>>    +        iter.close();
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +    public long getNumEntries() {
>>>>>    +      if (otherMemTable != null)
>>>>>    +        return memTable.getNumEntries() +
>>>>> otherMemTable.getNumEntries();
>>>>>    +      return memTable.getNumEntries();
>>>>>    +    }
>>>>>    +
>>>>>    +    CommitSession getCommitSession() {
>>>>>    +      return commitSession;
>>>>>    +    }
>>>>>    +  }
>>>>>    +
>>>>>    +  private TabletMemory tabletMemory;
>>>>>    +
>>>>>    +  private final TabletTime tabletTime;
>>>>>    +  private long persistedTime;
>>>>>    +  private final Object timeLock = new Object();
>>>>>    +
>>>>>    +  private final Path location; // absolute path of this tablets dir
>>>>>    +  private TServerInstance lastLocation;
>>>>>    +
>>>>>    +  private Configuration conf;
>>>>>    +  private VolumeManager fs;
>>>>>    +
>>>>>    +  private TableConfiguration acuTableConf;
>>>>>    +
>>>>>    +  private volatile boolean tableDirChecked = false;
>>>>>    +
>>>>>    +  private AtomicLong dataSourceDeletions = new AtomicLong(0);
>>>>>    +  private Set<ScanDataSource> activeScans = new
>>>>> HashSet<ScanDataSource>();
>>>>>    +
>>>>>    +  private volatile boolean closing = false;
>>>>>    +  private boolean closed = false;
>>>>>    +  private boolean closeComplete = false;
>>>>>    +
>>>>>    +  private long lastFlushID = -1;
>>>>>    +  private long lastCompactID = -1;
>>>>>    +
>>>>>    +  private KeyExtent extent;
>>>>>    +
>>>>>    +  private TabletResourceManager tabletResources;
>>>>>    +  final private DatafileManager datafileManager;
>>>>>    +  private volatile boolean majorCompactionInProgress = false;
>>>>>    +  private volatile boolean majorCompactionWaitingToStart = false;
>>>>>    +  private Set<MajorCompactionReason> majorCompactionQueued =
>>>>> Collections.synchronizedSet(EnumSet.noneOf(
>>>>> MajorCompactionReason.class));
>>>>>    +  private volatile boolean minorCompactionInProgress = false;
>>>>>    +  private volatile boolean minorCompactionWaitingToStart = false;
>>>>>    +
>>>>>    +  private boolean updatingFlushID = false;
>>>>>    +
>>>>>    +  private AtomicReference<ConstraintChecker> constraintChecker =
>>>>> new
>>>>> AtomicReference<ConstraintChecker>();
>>>>>    +
>>>>>    +  private final String tabletDirectory;
>>>>>    +
>>>>>    +  private int writesInProgress = 0;
>>>>>    +
>>>>>    +  private static final Logger log = Logger.getLogger(Tablet.class)
>>>>> ;
>>>>>    +  public TabletStatsKeeper timer;
>>>>>    +
>>>>>    +  private Rate queryRate = new Rate(0.2);
>>>>>    +  private long queryCount = 0;
>>>>>    +
>>>>>    +  private Rate queryByteRate = new Rate(0.2);
>>>>>    +  private long queryBytes = 0;
>>>>>    +
>>>>>    +  private Rate ingestRate = new Rate(0.2);
>>>>>    +  private long ingestCount = 0;
>>>>>    +
>>>>>    +  private Rate ingestByteRate = new Rate(0.2);
>>>>>    +  private long ingestBytes = 0;
>>>>>    +
>>>>>    +  private byte[] defaultSecurityLabel = new byte[0];
>>>>>    +
>>>>>    +  private long lastMinorCompactionFinishTime;
>>>>>    +  private long lastMapFileImportTime;
>>>>>    +
>>>>>    +  private volatile long numEntries;
>>>>>    +  private volatile long numEntriesInMemory;
>>>>>    +
>>>>>    +  // a count of the amount of data read by the iterators
>>>>>    +  private AtomicLong scannedCount = new AtomicLong(0);
>>>>>    +  private Rate scannedRate = new Rate(0.2);
>>>>>    +
>>>>>    +  private ConfigurationObserver configObserver;
>>>>>    +
>>>>>    +  private TabletServer tabletServer;
>>>>>    +
>>>>>    +  private final int logId;
>>>>>    +  // ensure we only have one reader/writer of our bulk file notes
>>>>> at
>>>>> at
>>>>> time
>>>>>    +  public final Object bulkFileImportLock = new Object();
>>>>>    +
>>>>>    +  public int getLogId() {
>>>>>    +    return logId;
>>>>>    +  }
>>>>>    +
>>>>>    +  public static class TabletClosedException extends
>>>>> RuntimeException {
>>>>>    +    public TabletClosedException(Exception e) {
>>>>>    +      super(e);
>>>>>    +    }
>>>>>    +
>>>>>    +    public TabletClosedException() {
>>>>>    +      super();
>>>>>    +    }
>>>>>    +
>>>>>    +    private static final long serialVersionUID = 1L;
>>>>>    +  }
>>>>>    +
>>>>>    +  FileRef getNextMapFilename(String prefix) throws IOException {
>>>>>    +    String extension =
>>>>> FileOperations.getNewFileExtension(tabletServer.
>>>>> getTableConfiguration(extent));
>>>>>    +    checkTabletDir();
>>>>>    +    return new FileRef(location.toString() + "/" + prefix +
>>>>> UniqueNameAllocator.getInstance().getNextName() + "." + extension);
>>>>>    +  }
>>>>>    +
>>>>>    +  private void checkTabletDir() throws IOException {
>>>>>    +    if (!tableDirChecked) {
>>>>>    +      checkTabletDir(this.location);
>>>>>    +      tableDirChecked = true;
>>>>>    +    }
>>>>>    +  }
>>>>>    +
>>>>>    +  private void checkTabletDir(Path tabletDir) throws IOException {
>>>>>    +
>>>>>    +    FileStatus[] files = null;
>>>>>    +    try {
>>>>>    +      files = fs.listStatus(tabletDir);
>>>>>    +    } catch (FileNotFoundException ex) {
>>>>>    +      // ignored
>>>>>    +    }
>>>>>    +
>>>>>    +    if (files == null) {
>>>>>    +      if (tabletDir.getName().startsWith("c-"))
>>>>>    +        log.debug("Tablet " + extent + " had no dir, creating " +
>>>>> tabletDir); // its a clone dir...
>>>>>    +      else
>>>>>    +        log.warn("Tablet " + extent + " had no dir, creating " +
>>>>> tabletDir);
>>>>>    +
>>>>>    +      fs.mkdirs(tabletDir);
>>>>>    +    }
>>>>>    +  }
>>>>>    +
>>>>>    +  class DatafileManager {
>>>>>    +    // access to datafilesizes needs to be synchronized: see
>>>>> CompactionRunner#getNumFiles
>>>>>    +    final private Map<FileRef,DataFileValue> datafileSizes =
>>>>> Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>());
>>>>>    +
>>>>>    +    DatafileManager(SortedMap<FileRef,DataFileValue>
>>>>> datafileSizes) {
>>>>>    +      for (Entry<FileRef,DataFileValue> datafiles :
>>>>> datafileSizes.entrySet())
>>>>>    +        this.datafileSizes.put(datafiles.getKey(),
>>>>> datafiles.getValue());
>>>>>    +    }
>>>>>    +
>>>>>    +    FileRef mergingMinorCompactionFile = null;
>>>>>    +    Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>();
>>>>>    +    Map<Long,Set<FileRef>> scanFileReservations = new
>>>>> HashMap<Long,Set<FileRef>>();
>>>>>    +    MapCounter<FileRef> fileScanReferenceCounts = new
>>>>> MapCounter<FileRef>();
>>>>>    +    long nextScanReservationId = 0;
>>>>>    +    boolean reservationsBlocked = false;
>>>>>    +
>>>>>    +    Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
>>>>>    +
>>>>>    +    Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() {
>>>>>    +      synchronized (Tablet.this) {
>>>>>    +
>>>>>    +        while (reservationsBlocked) {
>>>>>    +          try {
>>>>>    +            Tablet.this.wait(50);
>>>>>    +          } catch (InterruptedException e) {
>>>>>    +            log.warn(e, e);
>>>>>    +          }
>>>>>    +        }
>>>>>    +
>>>>>    +        Set<FileRef> absFilePaths = new
>>>>> HashSet<FileRef>(datafileSizes.keySet());
>>>>>    +
>>>>>    +        long rid = nextScanReservationId++;
>>>>>    +
>>>>>    +        scanFileReservations.put(rid, absFilePaths);
>>>>>    +
>>>>>    +        Map<FileRef,DataFileValue> ret = new
>>>>> HashMap<FileRef,DataFileValue>();
>>>>>    +
>>>>>    +        for (FileRef path : absFilePaths) {
>>>>>    +          fileScanReferenceCounts.increment(path, 1);
>>>>>    +          ret.put(path, datafileSizes.get(path));
>>>>>    +        }
>>>>>    +
>>>>>    +        return new Pair<Long,Map<FileRef,DataFileValue>>(rid,
>>>>> ret);
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +    void returnFilesForScan(Long reservationId) {
>>>>>    +
>>>>>    +      final Set<FileRef> filesToDelete = new HashSet<FileRef>();
>>>>>    +
>>>>>    +      synchronized (Tablet.this) {
>>>>>    +        Set<FileRef> absFilePaths =
>>>>> scanFileReservations.remove(reservationId);
>>>>>    +
>>>>>    +        if (absFilePaths == null)
>>>>>    +          throw new IllegalArgumentException("Unknown scan
>>>>> reservation
>>>>> id " + reservationId);
>>>>>    +
>>>>>    +        boolean notify = false;
>>>>>    +        for (FileRef path : absFilePaths) {
>>>>>    +          long refCount = fileScanReferenceCounts.decrement(path,
>>>>> 1);
>>>>>    +          if (refCount == 0) {
>>>>>    +            if (filesToDeleteAfterScan.remove(path))
>>>>>    +              filesToDelete.add(path);
>>>>>    +            notify = true;
>>>>>    +          } else if (refCount < 0)
>>>>>    +            throw new IllegalStateException("Scan ref count for " +
>>>>> path
>>>>> + " is " + refCount);
>>>>>    +        }
>>>>>    +
>>>>>    +        if (notify)
>>>>>    +          Tablet.this.notifyAll();
>>>>>    +      }
>>>>>    +
>>>>>    +      if (filesToDelete.size() > 0) {
>>>>>    +        log.debug("Removing scan refs from metadata " + extent + "
>>>>> " +
>>>>> filesToDelete);
>>>>>    +        MetadataTableUtil.removeScanFiles(extent, filesToDelete,
>>>>> SystemCredentials.get(), tabletServer.getLock());
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +    private void removeFilesAfterScan(Set<FileRef> scanFiles) {
>>>>>    +      if (scanFiles.size() == 0)
>>>>>    +        return;
>>>>>    +
>>>>>    +      Set<FileRef> filesToDelete = new HashSet<FileRef>();
>>>>>    +
>>>>>    +      synchronized (Tablet.this) {
>>>>>    +        for (FileRef path : scanFiles) {
>>>>>    +          if (fileScanReferenceCounts.get(path) == 0)
>>>>>    +            filesToDelete.add(path);
>>>>>    +          else
>>>>>    +            filesToDeleteAfterScan.add(path);
>>>>>    +        }
>>>>>    +      }
>>>>>    +
>>>>>    +      if (filesToDelete.size() > 0) {
>>>>>    +        log.debug("Removing scan refs from metadata " + extent + "
>>>>> " +
>>>>> filesToDelete);
>>>>>    +        MetadataTableUtil.removeScanFiles(extent, filesToDelete,
>>>>> SystemCredentials.get(), tabletServer.getLock());
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +    private TreeSet<FileRef> waitForScansToFinish(Set<FileRef>
>>>>> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
>>>>>    +      long startTime = System.currentTimeMillis();
>>>>>    +      TreeSet<FileRef> inUse = new TreeSet<FileRef>();
>>>>>    +
>>>>>    +      Span waitForScans = Trace.start("waitForScans");
>>>>>    +      try {
>>>>>    +        synchronized (Tablet.this) {
>>>>>    +          if (blockNewScans) {
>>>>>    +            if (reservationsBlocked)
>>>>>    +              throw new IllegalStateException();
>>>>>    +
>>>>>    +            reservationsBlocked = true;
>>>>>    +          }
>>>>>    +
>>>>>    +          for (FileRef path : pathsToWaitFor) {
>>>>>    +            while (fileScanReferenceCounts.get(path) > 0 &&
>>>>> System.currentTimeMillis() - startTime < maxWaitTime) {
>>>>>    +              try {
>>>>>    +                Tablet.this.wait(100);
>>>>>    +              } catch (InterruptedException e) {
>>>>>    +                log.warn(e, e);
>>>>>    +              }
>>>>>    +            }
>>>>>    +          }
>>>>>    +
>>>>>    +          for (FileRef path : pathsToWaitFor) {
>>>>>    +            if (fileScanReferenceCounts.get(path) > 0)
>>>>>    +              inUse.add(path);
>>>>>    +          }
>>>>>    +
>>>>>    +          if (blockNewScans) {
>>>>>    +            reservationsBlocked = false;
>>>>>    +            Tablet.this.notifyAll();
>>>>>    +          }
>>>>>    +
>>>>>    +        }
>>>>>    +      } finally {
>>>>>    +        waitForScans.stop();
>>>>>    +      }
>>>>>    +      return inUse;
>>>>>    +    }
>>>>>    +
>>>>>    +    public void importMapFiles(long tid, Map<FileRef,DataFileValue>
>>>>> pathsString, boolean setTime) throws IOException {
>>>>>    +
>>>>>    +      String bulkDir = null;
>>>>>    +
>>>>>    +      Map<FileRef,DataFileValue> paths = new
>>>>> HashMap<FileRef,DataFileValue>();
>>>>>    +      for (Entry<FileRef,DataFileValue> entry :
>>>>> pathsString.entrySet())
>>>>>    +        paths.put(entry.getKey(), entry.getValue());
>>>>>    +
>>>>>    +      for (FileRef tpath : paths.keySet()) {
>>>>>    +
>>>>>    +        boolean inTheRightDirectory = false;
>>>>>    +        Path parent = tpath.path().getParent().getParent();
>>>>>    +        for (String tablesDir : ServerConstants.getTablesDirs()) {
>>>>>    +          if (parent.equals(new Path(tablesDir,
>>>>> extent.getTableId().toString()))) {
>>>>>    +            inTheRightDirectory = true;
>>>>>    +            break;
>>>>>    +          }
>>>>>    +        }
>>>>>    +        if (!inTheRightDirectory) {
>>>>>    +          throw new IOException("Data file " + tpath + " not in
>>>>> table
>>>>> dirs");
>>>>>    +        }
>>>>>    +
>>>>>    +        if (bulkDir == null)
>>>>>    +          bulkDir = tpath.path().getParent().toString();
>>>>>    +        else if (!bulkDir.equals(tpath.path().
>>>>> getParent().toString()))
>>>>>    +          throw new IllegalArgumentException("bulk files in
>>>>> different
>>>>> dirs " + bulkDir + " " + tpath);
>>>>>    +
>>>>>    +      }
>>>>>    +
>>>>>    +      if (extent.isRootTablet()) {
>>>>>    +        throw new IllegalArgumentException("Can not import files to
>>>>> root
>>>>> tablet");
>>>>>    +      }
>>>>>    +
>>>>>    +      synchronized (bulkFileImportLock) {
>>>>>    +        Credentials creds = SystemCredentials.get();
>>>>>    +        Connector conn;
>>>>>    +        try {
>>>>>    +          conn =
>>>>> HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(),
>>>>> creds.getToken());
>>>>>    +        } catch (Exception ex) {
>>>>>    +          throw new IOException(ex);
>>>>>    +        }
>>>>>    +        // Remove any bulk files we've previously loaded and
>>>>> compacted
>>>>> away
>>>>>    +        List<FileRef> files = MetadataTableUtil.
>>>>> getBulkFilesLoaded(conn,
>>>>> extent, tid);
>>>>>    +
>>>>>    +        for (FileRef file : files)
>>>>>    +          if (paths.keySet().remove(file.path()))
>>>>>    +            log.debug("Ignoring request to re-import a file already
>>>>> imported: " + extent + ": " + file);
>>>>>    +
>>>>>    +        if (paths.size() > 0) {
>>>>>    +          long bulkTime = Long.MIN_VALUE;
>>>>>    +          if (setTime) {
>>>>>    +            for (DataFileValue dfv : paths.values()) {
>>>>>    +              long nextTime = tabletTime.getAndUpdateTime();
>>>>>    +              if (nextTime < bulkTime)
>>>>>    +                throw new IllegalStateException("Time went
>>>>> backwards
>>>>> unexpectedly " + nextTime + " " + bulkTime);
>>>>>    +              bulkTime = nextTime;
>>>>>    +              dfv.setTime(bulkTime);
>>>>>    +            }
>>>>>    +          }
>>>>>    +
>>>>>    +          synchronized (timeLock) {
>>>>>    +            if (bulkTime > persistedTime)
>>>>>    +              persistedTime = bulkTime;
>>>>>    +
>>>>>    +            MetadataTableUtil.updateTabletDataFile(tid, extent,
>>>>> paths,
>>>>> tabletTime.getMetadataValue(persistedTime), creds,
>>>>> tabletServer.getLock());
>>>>>    +          }
>>>>>    +        }
>>>>>    +      }
>>>>>    +
>>>>>    +      synchronized (Tablet.this) {
>>>>>    +        for (Entry<FileRef,DataFileValue> tpath :
>>>>> paths.entrySet()) {
>>>>>    +          if (datafileSizes.containsKey(tpath.getKey())) {
>>>>>    +            log.error("Adding file that is already in set " +
>>>>> tpath.getKey());
>>>>>    +          }
>>>>>    +          datafileSizes.put(tpath.getKey(), tpath.getValue());
>>>>>    +
>>>>>    +        }
>>>>>    +
>>>>>    +        tabletResources.importedMapFiles();
>>>>>    +
>>>>>    +        computeNumEntries();
>>>>>    +      }
>>>>>    +
>>>>>    +      for (FileRef tpath : paths.keySet()) {
>>>>>    +        log.log(TLevel.TABLET_HIST, extent + " import " + tpath +
>>>>> " "
>>>>> +
>>>>> paths.get(tpath));
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +    FileRef reserveMergingMinorCompactionFile() {
>>>>>    +      if (mergingMinorCompactionFile != null)
>>>>>    +        throw new IllegalStateException("Tried to reserve merging
>>>>> minor
>>>>> compaction file when already reserved  : " +
>>>>> mergingMinorCompactionFile);
>>>>>    +
>>>>>    +      if (extent.isRootTablet())
>>>>>    +        return null;
>>>>>    +
>>>>>    +      int maxFiles = acuTableConf.getMaxFilesPerTablet();
>>>>>    +
>>>>>    +      // when a major compaction is running and we are at max
>>>>> files,
>>>>> write out
>>>>>    +      // one extra file... want to avoid the case where major
>>>>> compaction
>>>>> is
>>>>>    +      // compacting everything except for the largest file, and
>>>>> therefore the
>>>>>    +      // largest file is returned for merging.. the following check
>>>>> mostly
>>>>>    +      // avoids this case, except for the case where major
>>>>> compactions
>>>>> fail or
>>>>>    +      // are canceled
>>>>>    +      if (majorCompactingFiles.size() > 0 && datafileSizes.size()
>>>>> ==
>>>>> maxFiles)
>>>>>    +        return null;
>>>>>    +
>>>>>    +      if (datafileSizes.size() >= maxFiles) {
>>>>>    +        // find the smallest file
>>>>>    +
>>>>>    +        long min = Long.MAX_VALUE;
>>>>>    +        FileRef minName = null;
>>>>>    +
>>>>>    +        for (Entry<FileRef,DataFileValue> entry :
>>>>> datafileSizes.entrySet()) {
>>>>>    +          if (entry.getValue().getSize() < min &&
>>>>> !majorCompactingFiles.contains(entry.getKey())) {
>>>>>    +            min = entry.getValue().getSize();
>>>>>    +            minName = entry.getKey();
>>>>>    +          }
>>>>>    +        }
>>>>>    +
>>>>>    +        if (minName == null)
>>>>>    +          return null;
>>>>>    +
>>>>>    +        mergingMinorCompactionFile = minName;
>>>>>    +        return minName;
>>>>>    +      }
>>>>>    +
>>>>>    +      return null;
>>>>>    +    }
>>>>>    +
>>>>>    +    void unreserveMergingMinorCompactionFile(FileRef file) {
>>>>>    +      if ((file == null && mergingMinorCompactionFile != null) ||
>>>>> (file
>>>>> != null && mergingMinorCompactionFile == null)
>>>>>    +          || (file != null && mergingMinorCompactionFile != null &&
>>>>> !file.equals(mergingMinorCompactionFile)))
>>>>>    +        throw new IllegalStateException("Disagreement " + file +
>>>>> " "
>>>>> +
>>>>> mergingMinorCompactionFile);
>>>>>    +
>>>>>    +      mergingMinorCompactionFile = null;
>>>>>    +    }
>>>>>    +
>>>>>    +    void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef
>>>>> newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession
>>>>> commitSession, long flushId)
>>>>>    +        throws IOException {
>>>>>    +
>>>>>    +      IZooReaderWriter zoo = ZooReaderWriter.
>>>>> getRetryingInstance();
>>>>>    +      if (extent.isRootTablet()) {
>>>>>    +        try {
>>>>>    +          if (!zoo.isLockHeld(tabletServer.getLock().getLockID()))
>>>>> {
>>>>>    +            throw new IllegalStateException();
>>>>>    +          }
>>>>>    +        } catch (Exception e) {
>>>>>    +          throw new IllegalStateException("Can not bring major
>>>>> compaction online, lock not held", e);
>>>>>    +        }
>>>>>    +      }
>>>>>    +
>>>>>    +      // rename before putting in metadata table, so files in
>>>>> metadata
>>>>> table should
>>>>>    +      // always exist
>>>>>    +      do {
>>>>>    +        try {
>>>>>    +          if (dfv.getNumEntries() == 0) {
>>>>>    +            fs.deleteRecursively(tmpDatafile.path());
>>>>>    +          } else {
>>>>>    +            if (fs.exists(newDatafile.path())) {
>>>>>    +              log.warn("Target map file already exist " +
>>>>> newDatafile);
>>>>>    +              fs.deleteRecursively(newDatafile.path());
>>>>>    +            }
>>>>>    +
>>>>>    +            if (!fs.rename(tmpDatafile.path(),
>>>>> newDatafile.path())) {
>>>>>    +              throw new IOException("rename fails");
>>>>>    +            }
>>>>>    +          }
>>>>>    +          break;
>>>>>    +        } catch (IOException ioe) {
>>>>>    +          log.warn("Tablet " + extent + " failed to rename " +
>>>>> newDatafile + " after MinC, will retry in 60 secs...", ioe);
>>>>>    +          UtilWaitThread.sleep(60 * 1000);
>>>>>    +        }
>>>>>    +      } while (true);
>>>>>    +
>>>>>    +      long t1, t2;
>>>>>    +
>>>>>    +      // the code below always assumes merged files are in use by
>>>>> scans... this must be done
>>>>>    +      // because the in memory list of files is not updated until
>>>>> after
>>>>> the metadata table
>>>>>    +      // therefore the file is available to scans until memory is
>>>>> updated, but want to ensure
>>>>>    +      // the file is not available for garbage collection... if
>>>>> memory
>>>>> were updated
>>>>>    +      // before this point (like major compactions do), then the
>>>>> following code could wait
>>>>>    +      // for scans to finish like major compactions do.... used to
>>>>> wait
>>>>> for scans to finish
>>>>>    +      // here, but that was incorrect because a scan could start
>>>>> after
>>>>> waiting but before
>>>>>    +      // memory was updated... assuming the file is always in use
>>>>> by
>>>>> scans leads to
>>>>>    +      // one uneeded metadata update when it was not actually in
>>>>> use
>>>>>    +      Set<FileRef> filesInUseByScans = Collections.emptySet();
>>>>>    +      if (absMergeFile != null)
>>>>>    +        filesInUseByScans = Collections.singleton(absMergeFile);
>>>>>    +
>>>>>    +      // very important to write delete entries outside of log
>>>>> lock,
>>>>> because
>>>>>    +      // this !METADATA write does not go up... it goes sideways
>>>>> or to
>>>>> itself
>>>>>    +      if (absMergeFile != null)
>>>>>    +        MetadataTableUtil.addDeleteEntries(extent,
>>>>> Collections.singleton(absMergeFile), SystemCredentials.get());
>>>>>    +
>>>>>    +      Set<String> unusedWalLogs = beginClearingUnusedLogs();
>>>>>    +      try {
>>>>>    +        // the order of writing to !METADATA and walog is
>>>>> important in
>>>>> the face of machine/process failures
>>>>>    +        // need to write to !METADATA before writing to walog, when
>>>>> things are done in the reverse order
>>>>>    +        // data could be lost... the minor compaction start even
>>>>> should
>>>>> be written before the following metadata
>>>>>    +        // write is made
>>>>>    +
>>>>>    +        synchronized (timeLock) {
>>>>>    +          if (commitSession.getMaxCommittedTime() > persistedTime)
>>>>>    +            persistedTime = commitSession.getMaxCommittedTime();
>>>>>    +
>>>>>    +          String time = tabletTime.getMetadataValue(
>>>>> persistedTime);
>>>>>    +          MasterMetadataUtil.updateTabletDataFile(extent,
>>>>> newDatafile,
>>>>> absMergeFile, dfv, time, SystemCredentials.get(), filesInUseByScans,
>>>>>    +              tabletServer.getClientAddressString(),
>>>>> tabletServer.getLock(), unusedWalLogs, lastLocation, flushId);
>>>>>    +        }
>>>>>    +
>>>>>    +      } finally {
>>>>>    +        finishClearingUnusedLogs();
>>>>>    +      }
>>>>>    +
>>>>>    +      do {
>>>>>    +        try {
>>>>>    +          // the purpose of making this update use the new commit
>>>>> session, instead of the old one passed in,
>>>>>    +          // is because the new one will reference the logs used by
>>>>> current memory...
>>>>>    +
>>>>>    +
>>>>>    tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(
>>>>> ),
>>>>> newDatafile.toString(), commitSession.getWALogSeq() + 2);
>>>>>    +          break;
>>>>>    +        } catch (IOException e) {
>>>>>    +          log.error("Failed to write to write-ahead log " +
>>>>> e.getMessage() + " will retry", e);
>>>>>    +          UtilWaitThread.sleep(1 * 1000);
>>>>>    +        }
>>>>>    +      } while (true);
>>>>>    +
>>>>>    +      synchronized (Tablet.this) {
>>>>>    +        lastLocation = null;
>>>>>    +
>>>>>    +        t1 = System.currentTimeMillis();
>>>>>    +        if (datafileSizes.containsKey(newDatafile)) {
>>>>>    +          log.error("Adding file that is already in set " +
>>>>> newDatafile);
>>>>>    +        }
>>>>>    +
>>>>>    +        if (dfv.getNumEntries() > 0) {
>>>>>    +          datafileSizes.put(newDatafile, dfv);
>>>>>    +        }
>>>>>    +
>>>>>    +        if (absMergeFile != null) {
>>>>>    +          datafileSizes.remove(absMergeFile);
>>>>>    +        }
>>>>>    +
>>>>>    +        unreserveMergingMinorCompactionFile(absMergeFile);
>>>>>    +
>>>>>    +        dataSourceDeletions.incrementAndGet();
>>>>>    +        tabletMemory.finishedMinC();
>>>>>    +
>>>>>    +        lastFlushID = flushId;
>>>>>    +
>>>>>    +        computeNumEntries();
>>>>>    +        t2 = System.currentTimeMillis();
>>>>>    +      }
>>>>>    +
>>>>>    +      // must do this after list of files in memory is updated
>>>>> above
>>>>>    +      removeFilesAfterScan(filesInUseByScans);
>>>>>    +
>>>>>    +      if (absMergeFile != null)
>>>>>    +        log.log(TLevel.TABLET_HIST, extent + " MinC [" +
>>>>> absMergeFile
>>>>> +
>>>>> ",memory] -> " + newDatafile);
>>>>>    +      else
>>>>>    +        log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " +
>>>>> newDatafile);
>>>>>    +      log.debug(String.format("MinC finish lock %.2f secs %s", (t2
>>>>> -
>>>>> t1)
>>>>> / 1000.0, getExtent().toString()));
>>>>>    +      if (dfv.getSize() >
>>>>> acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD)) {
>>>>>    +        log.debug(String.format("Minor Compaction wrote out file
>>>>> larger
>>>>> than split threshold.  split threshold = %,d  file size = %,d",
>>>>>    +
>>>>>    acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD),
>>>>> dfv.getSize()));
>>>>>    +      }
>>>>>    +
>>>>>    +    }
>>>>>    +
>>>>>    +    public void reserveMajorCompactingFiles(Collection<FileRef>
>>>>> files) {
>>>>>    +      if (majorCompactingFiles.size() != 0)
>>>>>    +        throw new IllegalStateException("Major compacting files not
>>>>> empty " + majorCompactingFiles);
>>>>>    +
>>>>>    +      if (mergingMinorCompactionFile != null &&
>>>>> files.contains(mergingMinorCompactionFile))
>>>>>    +        throw new IllegalStateException("Major compaction tried to
>>>>> resrve file in use by minor compaction " + mergingMinorCompactionFile);
>>>>>    +
>>>>>    +      majorCompactingFiles.addAll(files);
>>>>>    +    }
>>>>>    +
>>>>>    +    public void clearMajorCompactingFile() {
>>>>>    +      majorCompactingFiles.clear();
>>>>>    +    }
>>>>>    +
>>>>>    +    void bringMajorCompactionOnline(Set<FileRef> oldDatafiles,
>>>>> FileRef
>>>>> tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv)
>>>>>    +        throws IOException {
>>>>>    +      long t1, t2;
>>>>>    +
>>>>>    +      if (!extent.isRootTablet()) {
>>>>>    +
>>>>>    +        if (fs.exists(newDatafile.path())) {
>>>>>    +          log.error("Target map file already exist " + newDatafile,
>>>>> new
>>>>> Exception());
>>>>>    +          throw new IllegalStateException("Target map file already
>>>>> exist
>>>>> " + newDatafile);
>>>>>    +        }
>>>>>    +
>>>>>    +        // rename before putting in metadata table, so files in
>>>>> metadata
>>>>> table should
>>>>>    +        // always exist
>>>>>    +        if (!fs.rename(tmpDatafile.path(), newDatafile.path()))
>>>>>    +          log.warn("Rename of " + tmpDatafile + " to " +
>>>>> newDatafile
>>>>> + "
>>>>> returned false");
>>>>>    +
>>>>>    +        if (dfv.getNumEntries() == 0) {
>>>>>    +          fs.deleteRecursively(newDatafile.path());
>>>>>    +        }
>>>>>    +      }
>>>>>    +
>>>>>    +      TServerInstance lastLocation = null;
>>>>>    +      synchronized (Tablet.this) {
>>>>>    +
>>>>>    +        t1 = System.currentTimeMillis();
>>>>>    +
>>>>>    +        IZooReaderWriter zoo = ZooReaderWriter.
>>>>> getRetryingInstance();
>>>>>    +
>>>>>    +        dataSourceDeletions.incrementAndGet();
>>>>>    +
>>>>>    +        if (extent.isRootTablet()) {
>>>>>    +
>>>>>    +          waitForScansToFinish(oldDatafiles, true,
>>>>> Long.MAX_VALUE);
>>>>>    +
>>>>>    +          try {
>>>>>    +            if (!zoo.isLockHeld(tabletServer.
>>>>> getLock().getLockID()))
>>>>> {
>>>>>    +              throw new IllegalStateException();
>>>>>    +            }
>>>>>    +          } catch (Exception e) {
>>>>>    +            throw new IllegalStateException("Can not bring major
>>>>> compaction online, lock not held", e);
>>>>>    +          }
>>>>>    +
>>>>>    +          // mark files as ready for deletion, but
>>>>>    +          // do not delete them until we successfully
>>>>>    +          // rename the compacted map file, in case
>>>>>    +          // the system goes down
>>>>>    +
>>>>>    +          String compactName = newDatafile.path().getName();
>>>>>    +
>>>>>    +          for (FileRef ref : oldDatafiles) {
>>>>>    +            Path path = ref.path();
>>>>>    +            fs.rename(path, new Path(location + "/delete+" +
>>>>> compactName
>>>>> + "+" + path.getName()));
>>>>>    +          }
>>>>>    +
>>>>>    +          if (fs.exists(newDatafile.path())) {
>>>>>    +            log.error("Target map file already exist " +
>>>>> newDatafile,
>>>>> new Exception());
>>>>>    +            throw new IllegalStateException("Target map file
>>>>> already
>>>>> exist " + newDatafile);
>>>>>    +          }
>>>>>    +
>>>>>    +          if (!fs.rename(tmpDatafile.path(), newDatafile.path()))
>>>>>    +            log.warn("Rename of " + tmpDatafile + " to " +
>>>>> newDatafile +
>>>>> " returned false");
>>>>>    +
>>>>>    +          // start deleting files, if we do not finish they will be
>>>>> cleaned
>>>>>    +          // up later
>>>>>    +          for (FileRef ref : oldDatafiles) {
>>>>>    +            Path path = ref.path();
>>>>>    +            Path deleteFile = new Path(location + "/delete+" +
>>>>> compactName + "+" + path.getName());
>>>>>    +            if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE)
>>>>> ||
>>>>> !fs.moveToTrash(deleteFile))
>>>>>    +              fs.deleteRecursively(deleteFile);
>>>>>    +          }
>>>>>    +        }
>>>>>    +
>>>>>    +        // atomically remove old files and add new file
>>>>>    +        for (FileRef oldDatafile : oldDatafiles) {
>>>>>    +          if (!datafileSizes.containsKey(oldDatafile)) {
>>>>>    +            log.error("file does not exist in set " + oldDatafile);
>>>>>    +          }
>>>>>    +          datafileSizes.remove(oldDatafile);
>>>>>    +          majorCompactingFiles.remove(oldDatafile);
>>>>>    +        }
>>>>>    +
>>>>>    +        if (datafileSizes.containsKey(newDatafile)) {
>>>>>    +          log.error("Adding file that is already in set " +
>>>>> newDatafile);
>>>>>    +        }
>>>>>    +
>>>>>    +        if (dfv.getNumEntries() > 0) {
>>>>>    +          datafileSizes.put(newDatafile, dfv);
>>>>>    +        }
>>>>>    +
>>>>>    +        // could be used by a follow on compaction in a multipass
>>>>> compaction
>>>>>    +        majorCompactingFiles.add(newDatafile);
>>>>>    +
>>>>>    +        computeNumEntries();
>>>>>    +
>>>>>    +        lastLocation = Tablet.this.lastLocation;
>>>>>    +        Tablet.this.lastLocation = null;
>>>>>    +
>>>>>    +        if (compactionId != null)
>>>>>    +          lastCompactID = compactionId;
>>>>>    +
>>>>>    +        t2 = System.currentTimeMillis();
>>>>>    +      }
>>>>>    +
>>>>>    +      if (!extent.isRootTablet()) {
>>>>>    +        Set<FileRef> filesInUseByScans =
>>>>> waitForScansToFinish(oldDatafiles, false, 10000);
>>>>>    +        if (filesInUseByScans.size() > 0)
>>>>>    +          log.debug("Adding scan refs to metadata " + extent + " "
>>>>> +
>>>>> filesInUseByScans);
>>>>>    +        MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles,
>>>>> filesInUseByScans, newDatafile, compactionId, dfv,
>>>>> SystemCredentials.get(),
>>>>>    +            tabletServer.getClientAddressString(), lastLocation,
>>>>> tabletServer.getLock());
>>>>>    +        removeFilesAfterScan(filesInUseByScans);
>>>>>    +      }
>>>>>    +
>>>>>    +      log.debug(String.format("MajC finish lock %.2f secs", (t2 -
>>>>> t1)
>>>>> /
>>>>> 1000.0));
>>>>>    +      log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles
>>>>> + "
>>>>> --> " + newDatafile);
>>>>>    +    }
>>>>>    +
>>>>>    +    public SortedMap<FileRef,DataFileValue> getDatafileSizes() {
>>>>>    +      synchronized (Tablet.this) {
>>>>>    +        TreeMap<FileRef,DataFileValue> copy = new
>>>>> TreeMap<FileRef,DataFileValue>(datafileSizes);
>>>>>    +        return Collections.unmodifiableSortedMap(copy);
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +    public Set<FileRef> getFiles() {
>>>>>    +      synchronized (Tablet.this) {
>>>>>    +        HashSet<FileRef> files = new
>>>>> HashSet<FileRef>(datafileSizes.keySet());
>>>>>    +        return Collections.unmodifiableSet(files);
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +  }
>>>>>    +
>>>>>    +  public Tablet(TabletServer tabletServer, Text location, KeyExtent
>>>>> extent, TabletResourceManager trm, SortedMap<Key,Value>
>>>>> tabletsKeyValues)
>>>>>    +      throws IOException {
>>>>>    +    this(tabletServer, location, extent, trm,
>>>>> CachedConfiguration.getInstance(), tabletsKeyValues);
>>>>>    +    splitCreationTime = 0;
>>>>>    +  }
>>>>>    +
>>>>>    +  public Tablet(TabletServer tabletServer, Text location, KeyExtent
>>>>> extent, TabletResourceManager trm, SortedMap<FileRef,DataFileValue>
>>>>> datafiles, String time,
>>>>>    +      long initFlushID, long initCompactID) throws IOException {
>>>>>    +    this(tabletServer, location, extent, trm,
>>>>> CachedConfiguration.getInstance(), datafiles, time, initFlushID,
>>>>> initCompactID);
>>>>>    +    splitCreationTime = System.currentTimeMillis();
>>>>>    +  }
>>>>>    +
>>>>>    +  private Tablet(TabletServer tabletServer, Text location,
>>>>> KeyExtent
>>>>> extent, TabletResourceManager trm, Configuration conf,
>>>>>    +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>>>>>    +    this(tabletServer, location, extent, trm, conf,
>>>>> VolumeManagerImpl.get(), tabletsKeyValues);
>>>>>    +  }
>>>>>    +
>>>>>    +  static private final List<LogEntry> EMPTY =
>>>>> Collections.emptyList();
>>>>>    +
>>>>>    +  private Tablet(TabletServer tabletServer, Text location,
>>>>> KeyExtent
>>>>> extent, TabletResourceManager trm, Configuration conf,
>>>>>    +      SortedMap<FileRef,DataFileValue> datafiles, String time,
>>>>> long
>>>>> initFlushID, long initCompactID) throws IOException {
>>>>>    +    this(tabletServer, location, extent, trm, conf,
>>>>> VolumeManagerImpl.get(), EMPTY, datafiles, time, null, new
>>>>> HashSet<FileRef>(), initFlushID, initCompactID);
>>>>>    +  }
>>>>>    +
>>>>>    +  private static String lookupTime(AccumuloConfiguration conf,
>>>>> KeyExtent
>>>>> extent, SortedMap<Key,Value> tabletsKeyValues) {
>>>>>    +    SortedMap<Key,Value> entries;
>>>>>    +
>>>>>    +    if (extent.isRootTablet()) {
>>>>>    +      return null;
>>>>>    +    } else {
>>>>>    +      entries = new TreeMap<Key,Value>();
>>>>>    +      Text rowName = extent.getMetadataEntry();
>>>>>    +      for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>>>    +        if (entry.getKey().compareRow(rowName) == 0 &&
>>>>> TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.
>>>>> getKey()))
>>>>> {
>>>>>    +          entries.put(new Key(entry.getKey()), new
>>>>> Value(entry.getValue()));
>>>>>    +        }
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +    // log.debug("extent : "+extent+"   entries : "+entries);
>>>>>    +
>>>>>    +    if (entries.size() == 1)
>>>>>    +      return entries.values().iterator().next().toString();
>>>>>    +    return null;
>>>>>    +  }
>>>>>    +
>>>>>    +  private static SortedMap<FileRef,DataFileValue>
>>>>> lookupDatafiles(AccumuloConfiguration conf, VolumeManager fs,
>>>>> KeyExtent
>>>>> extent,
>>>>>    +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>>>>>    +
>>>>>    +    TreeMap<FileRef,DataFileValue> datafiles = new
>>>>> TreeMap<FileRef,DataFileValue>();
>>>>>    +
>>>>>    +    if (extent.isRootTablet()) { // the meta0 tablet
>>>>>    +      Path location = new Path(MetadataTableUtil.
>>>>> getRootTabletDir());
>>>>>    +      // cleanUpFiles() has special handling for delete. files
>>>>>    +      FileStatus[] files = fs.listStatus(location);
>>>>>    +      Collection<String> goodPaths = cleanUpFiles(fs, files, true);
>>>>>    +      for (String good : goodPaths) {
>>>>>    +        Path path = new Path(good);
>>>>>    +        String filename = path.getName();
>>>>>    +        FileRef ref = new FileRef(location.toString() + "/" +
>>>>> filename,
>>>>> path);
>>>>>    +        DataFileValue dfv = new DataFileValue(0, 0);
>>>>>    +        datafiles.put(ref, dfv);
>>>>>    +      }
>>>>>    +    } else {
>>>>>    +
>>>>>    +      Text rowName = extent.getMetadataEntry();
>>>>>    +
>>>>>    +      String tableId = extent.isMeta() ? RootTable.ID :
>>>>> MetadataTable.ID;
>>>>>    +      ScannerImpl mdScanner = new
>>>>> ScannerImpl(HdfsZooInstance.getInstance(), SystemCredentials.get(),
>>>>> tableId, Authorizations.EMPTY);
>>>>>    +
>>>>>    +      // Commented out because when no data file is present, each
>>>>> tablet
>>>>> will scan through metadata table and return nothing
>>>>>    +      // reduced batch size to improve performance
>>>>>    +      // changed here after endKeys were implemented from 10 to
>>>>> 1000
>>>>>    +      mdScanner.setBatchSize(1000);
>>>>>    +
>>>>>    +      // leave these in, again, now using endKey for safety
>>>>>    +      mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
>>>>>    +
>>>>>    +      mdScanner.setRange(new Range(rowName));
>>>>>    +
>>>>>    +      for (Entry<Key,Value> entry : mdScanner) {
>>>>>    +
>>>>>    +        if (entry.getKey().compareRow(rowName) != 0) {
>>>>>    +          break;
>>>>>    +        }
>>>>>    +
>>>>>    +        FileRef ref = new
>>>>> FileRef(entry.getKey().getColumnQualifier().toString(),
>>>>> fs.getFullPath(entry.getKey()));
>>>>>    +        datafiles.put(ref, new DataFileValue(entry.getValue()
>>>>> .get()));
>>>>>    +      }
>>>>>    +    }
>>>>>    +    return datafiles;
>>>>>    +  }
>>>>>    +
>>>>>    +  private static List<LogEntry> lookupLogEntries(KeyExtent ke,
>>>>> SortedMap<Key,Value> tabletsKeyValues) {
>>>>>    +    List<LogEntry> logEntries = new ArrayList<LogEntry>();
>>>>>    +
>>>>>    +    if (ke.isMeta()) {
>>>>>    +      try {
>>>>>    +        logEntries =
>>>>> MetadataTableUtil.getLogEntries(SystemCredentials.get(), ke);
>>>>>    +      } catch (Exception ex) {
>>>>>    +        throw new RuntimeException("Unable to read tablet log
>>>>> entries",
>>>>> ex);
>>>>>    +      }
>>>>>    +    } else {
>>>>>    +      log.debug("Looking at metadata " + tabletsKeyValues);
>>>>>    +      Text row = ke.getMetadataEntry();
>>>>>    +      for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>>>    +        Key key = entry.getKey();
>>>>>    +        if (key.getRow().equals(row)) {
>>>>>    +          if (key.getColumnFamily().equals(LogColumnFamily.NAME))
>>>>> {
>>>>>    +            logEntries.add(LogEntry.fromKeyValue(key,
>>>>> entry.getValue()));
>>>>>    +          }
>>>>>    +        }
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +    log.debug("got " + logEntries + " for logs for " + ke);
>>>>>    +    return logEntries;
>>>>>    +  }
>>>>>    +
>>>>>    +  private static Set<FileRef> lookupScanFiles(KeyExtent extent,
>>>>> SortedMap<Key,Value> tabletsKeyValues, VolumeManager fs) throws
>>>>> IOException
>>>>> {
>>>>>    +    HashSet<FileRef> scanFiles = new HashSet<FileRef>();
>>>>>    +
>>>>>    +    Text row = extent.getMetadataEntry();
>>>>>    +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>>>    +      Key key = entry.getKey();
>>>>>    +      if (key.getRow().equals(row) &&
>>>>> key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) {
>>>>>    +        String meta = key.getColumnQualifier().toString();
>>>>>    +        Path path = fs.getFullPath(extent.getTableId().toString(),
>>>>> meta);
>>>>>    +        scanFiles.add(new FileRef(meta, path));
>>>>>    +      }
>>>>>    +    }
>>>>>    +
>>>>>    +    return scanFiles;
>>>>>    +  }
>>>>>    +
>>>>>    +  private static long lookupFlushID(KeyExtent extent,
>>>>> SortedMap<Key,Value> tabletsKeyValues) {
>>>>>    +    Text row = extent.getMetadataEntry();
>>>>>    +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>>>    +      Key key = entry.getKey();
>>>>>    +      if (key.getRow().equals(row) &&
>>>>> TabletsSection.ServerColumnFamily.FLUSH_COLUMN.equals(key.
>>>>> getColumnFamily(),
>>>>> key.getColumnQualifier()))
>>>>>    +        return Long.parseLong(entry.getValue().toString());
>>>>>    +    }
>>>>>    +
>>>>>    +    return -1;
>>>>>    +  }
>>>>>    +
>>>>>    +  private static long lookupCompactID(KeyExtent extent,
>>>>> SortedMap<Key,Value> tabletsKeyValues) {
>>>>>    +    Text row = extent.getMetadataEntry();
>>>>>    +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>>>    +      Key key = entry.getKey();
>>>>>    +      if (key.getRow().equals(row) &&
>>>>> TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.
>>>>> getColumnFamily(),
>>>>> key.getColumnQualifier()))
>>>>>    +        return Long.parseLong(entry.getValue().toString());
>>>>>    +    }
>>>>>    +
>>>>>    +    return -1;
>>>>>    +  }
>>>>>    +
>>>>>    +  private Tablet(TabletServer tabletServer, Text location,
>>>>> KeyExtent
>>>>> extent, TabletResourceManager trm, Configuration conf, VolumeManager
>>>>> fs,
>>>>>    +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>>>>>    +    this(tabletServer, location, extent, trm, conf, fs,
>>>>> lookupLogEntries(extent, tabletsKeyValues),
>>>>> lookupDatafiles(tabletServer.getSystemConfiguration(), fs,
>>>>>    +        extent, tabletsKeyValues),
>>>>> lookupTime(tabletServer.getSystemConfiguration(), extent,
>>>>> tabletsKeyValues), lookupLastServer(extent, tabletsKeyValues),
>>>>>    +        lookupScanFiles(extent, tabletsKeyValues, fs),
>>>>> lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent,
>>>>> tabletsKeyValues));
>>>>>    +  }
>>>>>    +
>>>>>    +  private static TServerInstance lookupLastServer(KeyExtent extent,
>>>>> SortedMap<Key,Value> tabletsKeyValues) {
>>>>>    +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>>>    +      if (entry.getKey().getColumnFamily().compareTo(
>>>>> TabletsSection.LastLocationColumnFamily.NAME) == 0) {
>>>>>    +        return new TServerInstance(entry.getValue(),
>>>>> entry.getKey().getColumnQualifier());
>>>>>    +      }
>>>>>    +    }
>>>>>    +    return null;
>>>>>    +  }
>>>>>    +
>>>>>    +  /**
>>>>>    +   * yet another constructor - this one allows us to avoid costly
>>>>> lookups into the Metadata table if we already know the files we need -
>>>>> as
>>>>> at split time
>>>>>    +   */
>>>>>    +  private Tablet(final TabletServer tabletServer, final Text
>>>>> location,
>>>>> final KeyExtent extent, final TabletResourceManager trm, final
>>>>> Configuration conf,
>>>>>    +      final VolumeManager fs, final List<LogEntry> logEntries,
>>>>> final
>>>>> SortedMap<FileRef,DataFileValue> datafiles, String time,
>>>>>    +      final TServerInstance lastLocation, Set<FileRef> scanFiles,
>>>>> long
>>>>> initFlushID, long initCompactID) throws IOException {
>>>>>    +    Path locationPath;
>>>>>    +    if (location.find(":") >= 0) {
>>>>>    +      locationPath = new Path(location.toString());
>>>>>    +    } else {
>>>>>    +      locationPath = fs.getFullPath(FileType.TABLE,
>>>>> extent.getTableId().toString() + location.toString());
>>>>>    +    }
>>>>>    +    FileSystem fsForPath = fs.getFileSystemByPath(locationPath);
>>>>>    +    this.location = locationPath.makeQualified(fsForPath.getUri(),
>>>>> fsForPath.getWorkingDirectory());
>>>>>    +    this.lastLocation = lastLocation;
>>>>>    +    this.tabletDirectory = location.toString();
>>>>>    +    this.conf = conf;
>>>>>    +    this.acuTableConf = tabletServer.
>>>>> getTableConfiguration(extent);
>>>>>    +
>>>>>    +    this.fs = fs;
>>>>>    +    this.extent = extent;
>>>>>    +    this.tabletResources = trm;
>>>>>    +
>>>>>    +    this.lastFlushID = initFlushID;
>>>>>    +    this.lastCompactID = initCompactID;
>>>>>    +
>>>>>    +    if (extent.isRootTablet()) {
>>>>>    +      long rtime = Long.MIN_VALUE;
>>>>>    +      for (FileRef ref : datafiles.keySet()) {
>>>>>    +        Path path = ref.path();
>>>>>    +        FileSystem ns = fs.getFileSystemByPath(path);
>>>>>    +        FileSKVIterator reader =
>>>>> FileOperations.getInstance().openReader(path.toString(), true, ns,
>>>>> ns.getConf(), tabletServer.getTableConfiguration(extent));
>>>>>    +        long maxTime = -1;
>>>>>    +        try {
>>>>>    +
>>>>>    +          while (reader.hasTop()) {
>>>>>    +            maxTime = Math.max(maxTime,
>>>>> reader.getTopKey().getTimestamp());
>>>>>    +            reader.next();
>>>>>    +          }
>>>>>    +
>>>>>    +        } finally {
>>>>>    +          reader.close();
>>>>>    +        }
>>>>>    +
>>>>>    +        if (maxTime > rtime) {
>>>>>    +          time = TabletTime.LOGICAL_TIME_ID + "" + maxTime;
>>>>>    +          rtime = maxTime;
>>>>>    +        }
>>>>>    +      }
>>>>>    +    }
>>>>>    +    if (time == null && datafiles.isEmpty() &&
>>>>> extent.equals(RootTable.OLD_EXTENT)) {
>>>>>    +      // recovery... old root tablet has no data, so time doesn't
>>>>> matter:
>>>>>    +      time = TabletTime.LOGICAL_TIME_ID + "" + Long.MIN_VALUE;
>>>>>    +    }
>>>>>    +
>>>>>    +    this.tabletServer = tabletServer;
>>>>>    +    this.logId = tabletServer.createLogId(extent);
>>>>>    +
>>>>>    +    this.timer = new TabletStatsKeeper();
>>>>>    +
>>>>>    +    setupDefaultSecurityLabels(extent);
>>>>>    +
>>>>>    +    tabletMemory = new TabletMemory();
>>>>>    +    tabletTime = TabletTime.getInstance(time);
>>>>>    +    persistedTime = tabletTime.getTime();
>>>>>    +
>>>>>    +    acuTableConf.addObserver(configObserver = new
>>>>> ConfigurationObserver() {
>>>>>    +
>>>>>    +      private void reloadConstraints() {
>>>>>    +        constraintChecker.set(new
>>>>> ConstraintChecker(getTableConfiguration()));
>>>>>    +      }
>>>>>    +
>>>>>    +      @Override
>>>>>    +      public void propertiesChanged() {
>>>>>    +        reloadConstraints();
>>>>>    +
>>>>>    +        try {
>>>>>    +          setupDefaultSecurityLabels(extent);
>>>>>    +        } catch (Exception e) {
>>>>>    +          log.error("Failed to reload default security labels for
>>>>> extent: " + extent.toString());
>>>>>    +        }
>>>>>    +      }
>>>>>    +
>>>>>    +      @Override
>>>>>    +      public void propertyChanged(String prop) {
>>>>>    +        if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.
>>>>> getKey()))
>>>>>    +          reloadConstraints();
>>>>>    +        else if
>>>>> (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) {
>>>>>    +          try {
>>>>>    +            log.info("Default security labels changed for extent:
>>>>> " +
>>>>> extent.toString());
>>>>>    +            setupDefaultSecurityLabels(extent);
>>>>>    +          } catch (Exception e) {
>>>>>    +            log.error("Failed to reload default security labels for
>>>>> extent: " + extent.toString());
>>>>>    +          }
>>>>>    +        }
>>>>>    +
>>>>>    +      }
>>>>>    +
>>>>>    +      @Override
>>>>>    +      public void sessionExpired() {
>>>>>    +        log.debug("Session expired, no longer updating per table
>>>>> props...");
>>>>>    +      }
>>>>>    +
>>>>>    +    });
>>>>>    +    // Force a load of any per-table properties
>>>>>    +    configObserver.propertiesChanged();
>>>>>    +
>>>>>    +    tabletResources.setTablet(this, acuTableConf);
>>>>>    +    if (!logEntries.isEmpty()) {
>>>>>    +      log.info("Starting Write-Ahead Log recovery for " +
>>>>> this.extent);
>>>>>    +      final long[] count = new long[2];
>>>>>    +      final CommitSession commitSession =
>>>>> tabletMemory.getCommitSession();
>>>>>    +      count[1] = Long.MIN_VALUE;
>>>>>    +      try {
>>>>>    +        Set<String> absPaths = new HashSet<String>();
>>>>>    +        for (FileRef ref : datafiles.keySet())
>>>>>    +          absPaths.add(ref.path().toString());
>>>>>    +
>>>>>    +        tabletServer.recover(this.tabletServer.getFileSystem(),
>>>>> this,
>>>>> logEntries, absPaths, new MutationReceiver() {
>>>>>    +          @Override
>>>>>    +          public void receive(Mutation m) {
>>>>>    +            // LogReader.printMutation(m);
>>>>>    +            Collection<ColumnUpdate> muts = m.getUpdates();
>>>>>    +            for (ColumnUpdate columnUpdate : muts) {
>>>>>    +              if (!columnUpdate.hasTimestamp()) {
>>>>>    +                // if it is not a user set timestamp, it must have
>>>>> been
>>>>> set
>>>>>    +                // by the system
>>>>>    +                count[1] = Math.max(count[1],
>>>>> columnUpdate.getTimestamp());
>>>>>    +              }
>>>>
>>>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message