accumulo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Elser <josh.el...@gmail.com>
Subject Re: [11/15] ACCUMULO-1940 do not expose a new tablet to the memory manager until after it is online
Date Sat, 30 Nov 2013 05:15:18 GMT
Actually, I was kind of confused when I saw your commit*s* on this 
ticket. What did you actually do? You have two commits that do the same 
changes:

82477f08aa64e2a8a1cf7f6af0db5ce954801ac8 (in 1.4, 1.5 and 1.6)
9b6b9cf104ff332cffdd4900d8057557e64e0ec8 (only in 1.6)\

I would've only expected to see one email with a diff, followed by 2 
"merge" emails, e.g.

----------------------------------------------------------------------
  .../tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java  | 2 --
  1 file changed, 2 deletions(-)
----------------------------------------------------------------------

Although, I will admit that dealing with 3 active branches is a big 
pain. However, I don't know of a better way to handle this in a way that 
doesn't make Git super confused and thus limit us in being able to 
answer questions like "where was a problem introduced" (git-bisect) and 
"where does this change exist" (and not having multiple commits that 
perform the same changes).

On 11/29/13, 8:31 PM, Eric Newton wrote:
> I changed one line of this file... git seems to be having a conniption.  I
> find the volume of git traffic to be so useless that I ignore it.
>
> Anyone else?
>
>
>
>
> On Fri, Nov 29, 2013 at 1:24 PM, <ecn@apache.org> wrote:
>
>>
>> http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b6b9cf1/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
>> ----------------------------------------------------------------------
>> diff --cc
>> server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
>> index ee3b243,0000000..fd76415
>> mode 100644,000000..100644
>> --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
>> +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
>> @@@ -1,3868 -1,0 +1,3866 @@@
>>   +/*
>>   + * Licensed to the Apache Software Foundation (ASF) under one or more
>>   + * contributor license agreements.  See the NOTICE file distributed with
>>   + * this work for additional information regarding copyright ownership.
>>   + * The ASF licenses this file to You under the Apache License, Version
>> 2.0
>>   + * (the "License"); you may not use this file except in compliance with
>>   + * the License.  You may obtain a copy of the License at
>>   + *
>>   + *     http://www.apache.org/licenses/LICENSE-2.0
>>   + *
>>   + * Unless required by applicable law or agreed to in writing, software
>>   + * distributed under the License is distributed on an "AS IS" BASIS,
>>   + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>>   + * See the License for the specific language governing permissions and
>>   + * limitations under the License.
>>   + */
>>   +package org.apache.accumulo.tserver;
>>   +
>>   +import java.io.ByteArrayInputStream;
>>   +import java.io.DataInputStream;
>>   +import java.io.FileNotFoundException;
>>   +import java.io.IOException;
>>   +import java.util.ArrayList;
>>   +import java.util.Arrays;
>>   +import java.util.Collection;
>>   +import java.util.Collections;
>>   +import java.util.Comparator;
>>   +import java.util.EnumSet;
>>   +import java.util.HashMap;
>>   +import java.util.HashSet;
>>   +import java.util.Iterator;
>>   +import java.util.List;
>>   +import java.util.Map;
>>   +import java.util.Map.Entry;
>>   +import java.util.PriorityQueue;
>>   +import java.util.Set;
>>   +import java.util.SortedMap;
>>   +import java.util.TreeMap;
>>   +import java.util.TreeSet;
>>   +import java.util.concurrent.atomic.AtomicBoolean;
>>   +import java.util.concurrent.atomic.AtomicLong;
>>   +import java.util.concurrent.atomic.AtomicReference;
>>   +import java.util.concurrent.locks.ReentrantLock;
>>   +
>>   +import org.apache.accumulo.core.Constants;
>>   +import org.apache.accumulo.core.client.Connector;
>>   +import org.apache.accumulo.core.client.IteratorSetting;
>>   +import org.apache.accumulo.core.client.impl.ScannerImpl;
>>   +import org.apache.accumulo.core.conf.AccumuloConfiguration;
>>   +import org.apache.accumulo.core.conf.ConfigurationCopy;
>>   +import org.apache.accumulo.core.conf.ConfigurationObserver;
>>   +import org.apache.accumulo.core.conf.Property;
>>   +import org.apache.accumulo.core.constraints.Violations;
>>   +import org.apache.accumulo.core.data.ByteSequence;
>>   +import org.apache.accumulo.core.data.Column;
>>   +import org.apache.accumulo.core.data.ColumnUpdate;
>>   +import org.apache.accumulo.core.data.Key;
>>   +import org.apache.accumulo.core.data.KeyExtent;
>>   +import org.apache.accumulo.core.data.KeyValue;
>>   +import org.apache.accumulo.core.data.Mutation;
>>   +import org.apache.accumulo.core.data.Range;
>>   +import org.apache.accumulo.core.data.Value;
>>   +import org.apache.accumulo.core.data.thrift.IterInfo;
>>   +import org.apache.accumulo.core.data.thrift.MapFileInfo;
>>   +import org.apache.accumulo.core.file.FileOperations;
>>   +import org.apache.accumulo.core.file.FileSKVIterator;
>>   +import org.apache.accumulo.core.iterators.IterationInterruptedException;
>>   +import org.apache.accumulo.core.iterators.IteratorEnvironment;
>>   +import org.apache.accumulo.core.iterators.IteratorUtil;
>>   +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
>>   +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
>>   +import
>> org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
>>   +import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
>>   +import org.apache.accumulo.core.iterators.system.DeletingIterator;
>>   +import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
>>   +import org.apache.accumulo.core.iterators.system.MultiIterator;
>>   +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
>>   +import
>> org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
>>   +import org.apache.accumulo.core.iterators.system.StatsIterator;
>>   +import org.apache.accumulo.core.iterators.system.VisibilityFilter;
>>   +import org.apache.accumulo.core.master.thrift.TabletLoadState;
>>   +import org.apache.accumulo.core.metadata.MetadataTable;
>>   +import org.apache.accumulo.core.metadata.RootTable;
>>   +import org.apache.accumulo.core.metadata.schema.DataFileValue;
>>   +import
>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
>>   +import
>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
>>   +import
>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
>>   +import
>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
>>   +import org.apache.accumulo.core.security.Authorizations;
>>   +import org.apache.accumulo.core.security.ColumnVisibility;
>>   +import org.apache.accumulo.core.security.Credentials;
>>   +import org.apache.accumulo.core.tabletserver.log.LogEntry;
>>   +import org.apache.accumulo.core.util.CachedConfiguration;
>>   +import org.apache.accumulo.core.util.LocalityGroupUtil;
>>   +import
>> org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
>>   +import org.apache.accumulo.core.util.MapCounter;
>>   +import org.apache.accumulo.core.util.Pair;
>>   +import org.apache.accumulo.core.util.UtilWaitThread;
>>   +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
>>   +import org.apache.accumulo.server.ServerConstants;
>>   +import org.apache.accumulo.server.client.HdfsZooInstance;
>>   +import org.apache.accumulo.server.conf.TableConfiguration;
>>   +import org.apache.accumulo.server.fs.FileRef;
>>   +import org.apache.accumulo.server.fs.VolumeManager;
>>   +import org.apache.accumulo.server.fs.VolumeManager.FileType;
>>   +import org.apache.accumulo.server.fs.VolumeManagerImpl;
>>   +import org.apache.accumulo.server.master.state.TServerInstance;
>>   +import org.apache.accumulo.server.master.tableOps.CompactionIterators;
>>   +import org.apache.accumulo.server.problems.ProblemReport;
>>   +import org.apache.accumulo.server.problems.ProblemReports;
>>   +import org.apache.accumulo.server.problems.ProblemType;
>>   +import org.apache.accumulo.server.security.SystemCredentials;
>>   +import org.apache.accumulo.server.tablets.TabletTime;
>>   +import org.apache.accumulo.server.tablets.UniqueNameAllocator;
>>   +import org.apache.accumulo.server.util.FileUtil;
>>   +import org.apache.accumulo.server.util.MasterMetadataUtil;
>>   +import org.apache.accumulo.server.util.MetadataTableUtil;
>>   +import org.apache.accumulo.server.util.TabletOperations;
>>   +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
>>   +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
>>   +import org.apache.accumulo.trace.instrument.Span;
>>   +import org.apache.accumulo.trace.instrument.Trace;
>>   +import org.apache.accumulo.tserver.Compactor.CompactionCanceledException;
>>   +import org.apache.accumulo.tserver.Compactor.CompactionEnv;
>>   +import org.apache.accumulo.tserver.FileManager.ScanFileManager;
>>   +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
>>   +import org.apache.accumulo.tserver.TabletServer.TservConstraintEnv;
>>   +import
>> org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
>>   +import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
>>   +import org.apache.accumulo.tserver.compaction.CompactionPlan;
>>   +import org.apache.accumulo.tserver.compaction.CompactionStrategy;
>>   +import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
>>   +import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
>>   +import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
>>   +import org.apache.accumulo.tserver.compaction.WriteParameters;
>>   +import org.apache.accumulo.tserver.constraints.ConstraintChecker;
>>   +import org.apache.accumulo.tserver.log.DfsLogger;
>>   +import org.apache.accumulo.tserver.log.MutationReceiver;
>>   +import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
>>   +import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
>>   +import org.apache.commons.codec.DecoderException;
>>   +import org.apache.commons.codec.binary.Hex;
>>   +import org.apache.hadoop.conf.Configuration;
>>   +import org.apache.hadoop.fs.FileStatus;
>>   +import org.apache.hadoop.fs.FileSystem;
>>   +import org.apache.hadoop.fs.Path;
>>   +import org.apache.hadoop.io.Text;
>>   +import org.apache.log4j.Logger;
>>   +import org.apache.zookeeper.KeeperException;
>>   +import org.apache.zookeeper.KeeperException.NoNodeException;
>>   +
>>   +/*
>>   + * We need to be able to have the master tell a tabletServer to
>>   + * close this file, and the tablet server to handle all pending client
>> reads
>>   + * before closing
>>   + *
>>   + */
>>   +
>>   +/**
>>   + *
>>   + * this class just provides an interface to read from a MapFile mostly
>> takes care of reporting start and end keys
>>   + *
>>   + * need this because a single row extent can have multiple columns this
>> manages all the columns (each handled by a store) for a single row-extent
>>   + *
>>   + *
>>   + */
>>   +
>>   +public class Tablet {
>>   +
>>   +  enum MinorCompactionReason {
>>   +    USER, SYSTEM, CLOSE
>>   +  }
>>   +
>>   +  public class CommitSession {
>>   +
>>   +    private int seq;
>>   +    private InMemoryMap memTable;
>>   +    private int commitsInProgress;
>>   +    private long maxCommittedTime = Long.MIN_VALUE;
>>   +
>>   +    private CommitSession(int seq, InMemoryMap imm) {
>>   +      this.seq = seq;
>>   +      this.memTable = imm;
>>   +      commitsInProgress = 0;
>>   +    }
>>   +
>>   +    public int getWALogSeq() {
>>   +      return seq;
>>   +    }
>>   +
>>   +    private void decrementCommitsInProgress() {
>>   +      if (commitsInProgress < 1)
>>   +        throw new IllegalStateException("commitsInProgress = " +
>> commitsInProgress);
>>   +
>>   +      commitsInProgress--;
>>   +      if (commitsInProgress == 0)
>>   +        Tablet.this.notifyAll();
>>   +    }
>>   +
>>   +    private void incrementCommitsInProgress() {
>>   +      if (commitsInProgress < 0)
>>   +        throw new IllegalStateException("commitsInProgress = " +
>> commitsInProgress);
>>   +
>>   +      commitsInProgress++;
>>   +    }
>>   +
>>   +    private void waitForCommitsToFinish() {
>>   +      while (commitsInProgress > 0) {
>>   +        try {
>>   +          Tablet.this.wait(50);
>>   +        } catch (InterruptedException e) {
>>   +          log.warn(e, e);
>>   +        }
>>   +      }
>>   +    }
>>   +
>>   +    public void abortCommit(List<Mutation> value) {
>>   +      Tablet.this.abortCommit(this, value);
>>   +    }
>>   +
>>   +    public void commit(List<Mutation> mutations) {
>>   +      Tablet.this.commit(this, mutations);
>>   +    }
>>   +
>>   +    public Tablet getTablet() {
>>   +      return Tablet.this;
>>   +    }
>>   +
>>   +    public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy,
>> boolean mincFinish) {
>>   +      return Tablet.this.beginUpdatingLogsUsed(memTable, copy,
>> mincFinish);
>>   +    }
>>   +
>>   +    public void finishUpdatingLogsUsed() {
>>   +      Tablet.this.finishUpdatingLogsUsed();
>>   +    }
>>   +
>>   +    public int getLogId() {
>>   +      return logId;
>>   +    }
>>   +
>>   +    public KeyExtent getExtent() {
>>   +      return extent;
>>   +    }
>>   +
>>   +    private void updateMaxCommittedTime(long time) {
>>   +      maxCommittedTime = Math.max(time, maxCommittedTime);
>>   +    }
>>   +
>>   +    private long getMaxCommittedTime() {
>>   +      if (maxCommittedTime == Long.MIN_VALUE)
>>   +        throw new IllegalStateException("Tried to read max committed
>> time when it was never set");
>>   +      return maxCommittedTime;
>>   +    }
>>   +
>>   +  }
>>   +
>>   +  private class TabletMemory {
>>   +    private InMemoryMap memTable;
>>   +    private InMemoryMap otherMemTable;
>>   +    private InMemoryMap deletingMemTable;
>>   +    private int nextSeq = 1;
>>   +    private CommitSession commitSession;
>>   +
>>   +    TabletMemory() {
>>   +      try {
>>   +        memTable = new InMemoryMap(acuTableConf);
>>   +      } catch (LocalityGroupConfigurationError e) {
>>   +        throw new RuntimeException(e);
>>   +      }
>>   +      commitSession = new CommitSession(nextSeq, memTable);
>>   +      nextSeq += 2;
>>   +    }
>>   +
>>   +    InMemoryMap getMemTable() {
>>   +      return memTable;
>>   +    }
>>   +
>>   +    InMemoryMap getMinCMemTable() {
>>   +      return otherMemTable;
>>   +    }
>>   +
>>   +    CommitSession prepareForMinC() {
>>   +      if (otherMemTable != null) {
>>   +        throw new IllegalStateException();
>>   +      }
>>   +
>>   +      if (deletingMemTable != null) {
>>   +        throw new IllegalStateException();
>>   +      }
>>   +
>>   +      otherMemTable = memTable;
>>   +      try {
>>   +        memTable = new InMemoryMap(acuTableConf);
>>   +      } catch (LocalityGroupConfigurationError e) {
>>   +        throw new RuntimeException(e);
>>   +      }
>>   +
>>   +      CommitSession oldCommitSession = commitSession;
>>   +      commitSession = new CommitSession(nextSeq, memTable);
>>   +      nextSeq += 2;
>>   +
>>   +
>>   tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(),
>> otherMemTable.estimatedSizeInBytes());
>>   +
>>   +      return oldCommitSession;
>>   +    }
>>   +
>>   +    void finishedMinC() {
>>   +
>>   +      if (otherMemTable == null) {
>>   +        throw new IllegalStateException();
>>   +      }
>>   +
>>   +      if (deletingMemTable != null) {
>>   +        throw new IllegalStateException();
>>   +      }
>>   +
>>   +      deletingMemTable = otherMemTable;
>>   +
>>   +      otherMemTable = null;
>>   +      Tablet.this.notifyAll();
>>   +    }
>>   +
>>   +    void finalizeMinC() {
>>   +      try {
>>   +        deletingMemTable.delete(15000);
>>   +      } finally {
>>   +        synchronized (Tablet.this) {
>>   +          if (otherMemTable != null) {
>>   +            throw new IllegalStateException();
>>   +          }
>>   +
>>   +          if (deletingMemTable == null) {
>>   +            throw new IllegalStateException();
>>   +          }
>>   +
>>   +          deletingMemTable = null;
>>   +
>>   +
>>   tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0);
>>   +        }
>>   +      }
>>   +    }
>>   +
>>   +    boolean memoryReservedForMinC() {
>>   +      return otherMemTable != null || deletingMemTable != null;
>>   +    }
>>   +
>>   +    void waitForMinC() {
>>   +      while (otherMemTable != null || deletingMemTable != null) {
>>   +        try {
>>   +          Tablet.this.wait(50);
>>   +        } catch (InterruptedException e) {
>>   +          log.warn(e, e);
>>   +        }
>>   +      }
>>   +    }
>>   +
>>   +    void mutate(CommitSession cm, List<Mutation> mutations) {
>>   +      cm.memTable.mutate(mutations);
>>   +    }
>>   +
>>   +    void updateMemoryUsageStats() {
>>   +      long other = 0;
>>   +      if (otherMemTable != null)
>>   +        other = otherMemTable.estimatedSizeInBytes();
>>   +      else if (deletingMemTable != null)
>>   +        other = deletingMemTable.estimatedSizeInBytes();
>>   +
>>   +
>>   tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(),
>> other);
>>   +    }
>>   +
>>   +    List<MemoryIterator> getIterators() {
>>   +      List<MemoryIterator> toReturn = new ArrayList<MemoryIterator>(2);
>>   +      toReturn.add(memTable.skvIterator());
>>   +      if (otherMemTable != null)
>>   +        toReturn.add(otherMemTable.skvIterator());
>>   +      return toReturn;
>>   +    }
>>   +
>>   +    void returnIterators(List<MemoryIterator> iters) {
>>   +      for (MemoryIterator iter : iters) {
>>   +        iter.close();
>>   +      }
>>   +    }
>>   +
>>   +    public long getNumEntries() {
>>   +      if (otherMemTable != null)
>>   +        return memTable.getNumEntries() + otherMemTable.getNumEntries();
>>   +      return memTable.getNumEntries();
>>   +    }
>>   +
>>   +    CommitSession getCommitSession() {
>>   +      return commitSession;
>>   +    }
>>   +  }
>>   +
>>   +  private TabletMemory tabletMemory;
>>   +
>>   +  private final TabletTime tabletTime;
>>   +  private long persistedTime;
>>   +  private final Object timeLock = new Object();
>>   +
>>   +  private final Path location; // absolute path of this tablets dir
>>   +  private TServerInstance lastLocation;
>>   +
>>   +  private Configuration conf;
>>   +  private VolumeManager fs;
>>   +
>>   +  private TableConfiguration acuTableConf;
>>   +
>>   +  private volatile boolean tableDirChecked = false;
>>   +
>>   +  private AtomicLong dataSourceDeletions = new AtomicLong(0);
>>   +  private Set<ScanDataSource> activeScans = new
>> HashSet<ScanDataSource>();
>>   +
>>   +  private volatile boolean closing = false;
>>   +  private boolean closed = false;
>>   +  private boolean closeComplete = false;
>>   +
>>   +  private long lastFlushID = -1;
>>   +  private long lastCompactID = -1;
>>   +
>>   +  private KeyExtent extent;
>>   +
>>   +  private TabletResourceManager tabletResources;
>>   +  final private DatafileManager datafileManager;
>>   +  private volatile boolean majorCompactionInProgress = false;
>>   +  private volatile boolean majorCompactionWaitingToStart = false;
>>   +  private Set<MajorCompactionReason> majorCompactionQueued =
>> Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class));
>>   +  private volatile boolean minorCompactionInProgress = false;
>>   +  private volatile boolean minorCompactionWaitingToStart = false;
>>   +
>>   +  private boolean updatingFlushID = false;
>>   +
>>   +  private AtomicReference<ConstraintChecker> constraintChecker = new
>> AtomicReference<ConstraintChecker>();
>>   +
>>   +  private final String tabletDirectory;
>>   +
>>   +  private int writesInProgress = 0;
>>   +
>>   +  private static final Logger log = Logger.getLogger(Tablet.class);
>>   +  public TabletStatsKeeper timer;
>>   +
>>   +  private Rate queryRate = new Rate(0.2);
>>   +  private long queryCount = 0;
>>   +
>>   +  private Rate queryByteRate = new Rate(0.2);
>>   +  private long queryBytes = 0;
>>   +
>>   +  private Rate ingestRate = new Rate(0.2);
>>   +  private long ingestCount = 0;
>>   +
>>   +  private Rate ingestByteRate = new Rate(0.2);
>>   +  private long ingestBytes = 0;
>>   +
>>   +  private byte[] defaultSecurityLabel = new byte[0];
>>   +
>>   +  private long lastMinorCompactionFinishTime;
>>   +  private long lastMapFileImportTime;
>>   +
>>   +  private volatile long numEntries;
>>   +  private volatile long numEntriesInMemory;
>>   +
>>   +  // a count of the amount of data read by the iterators
>>   +  private AtomicLong scannedCount = new AtomicLong(0);
>>   +  private Rate scannedRate = new Rate(0.2);
>>   +
>>   +  private ConfigurationObserver configObserver;
>>   +
>>   +  private TabletServer tabletServer;
>>   +
>>   +  private final int logId;
>>   +  // ensure we only have one reader/writer of our bulk file notes at at
>> time
>>   +  public final Object bulkFileImportLock = new Object();
>>   +
>>   +  public int getLogId() {
>>   +    return logId;
>>   +  }
>>   +
>>   +  public static class TabletClosedException extends RuntimeException {
>>   +    public TabletClosedException(Exception e) {
>>   +      super(e);
>>   +    }
>>   +
>>   +    public TabletClosedException() {
>>   +      super();
>>   +    }
>>   +
>>   +    private static final long serialVersionUID = 1L;
>>   +  }
>>   +
>>   +  FileRef getNextMapFilename(String prefix) throws IOException {
>>   +    String extension =
>> FileOperations.getNewFileExtension(tabletServer.getTableConfiguration(extent));
>>   +    checkTabletDir();
>>   +    return new FileRef(location.toString() + "/" + prefix +
>> UniqueNameAllocator.getInstance().getNextName() + "." + extension);
>>   +  }
>>   +
>>   +  private void checkTabletDir() throws IOException {
>>   +    if (!tableDirChecked) {
>>   +      checkTabletDir(this.location);
>>   +      tableDirChecked = true;
>>   +    }
>>   +  }
>>   +
>>   +  private void checkTabletDir(Path tabletDir) throws IOException {
>>   +
>>   +    FileStatus[] files = null;
>>   +    try {
>>   +      files = fs.listStatus(tabletDir);
>>   +    } catch (FileNotFoundException ex) {
>>   +      // ignored
>>   +    }
>>   +
>>   +    if (files == null) {
>>   +      if (tabletDir.getName().startsWith("c-"))
>>   +        log.debug("Tablet " + extent + " had no dir, creating " +
>> tabletDir); // its a clone dir...
>>   +      else
>>   +        log.warn("Tablet " + extent + " had no dir, creating " +
>> tabletDir);
>>   +
>>   +      fs.mkdirs(tabletDir);
>>   +    }
>>   +  }
>>   +
>>   +  class DatafileManager {
>>   +    // access to datafilesizes needs to be synchronized: see
>> CompactionRunner#getNumFiles
>>   +    final private Map<FileRef,DataFileValue> datafileSizes =
>> Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>());
>>   +
>>   +    DatafileManager(SortedMap<FileRef,DataFileValue> datafileSizes) {
>>   +      for (Entry<FileRef,DataFileValue> datafiles :
>> datafileSizes.entrySet())
>>   +        this.datafileSizes.put(datafiles.getKey(), datafiles.getValue());
>>   +    }
>>   +
>>   +    FileRef mergingMinorCompactionFile = null;
>>   +    Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>();
>>   +    Map<Long,Set<FileRef>> scanFileReservations = new
>> HashMap<Long,Set<FileRef>>();
>>   +    MapCounter<FileRef> fileScanReferenceCounts = new
>> MapCounter<FileRef>();
>>   +    long nextScanReservationId = 0;
>>   +    boolean reservationsBlocked = false;
>>   +
>>   +    Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
>>   +
>>   +    Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() {
>>   +      synchronized (Tablet.this) {
>>   +
>>   +        while (reservationsBlocked) {
>>   +          try {
>>   +            Tablet.this.wait(50);
>>   +          } catch (InterruptedException e) {
>>   +            log.warn(e, e);
>>   +          }
>>   +        }
>>   +
>>   +        Set<FileRef> absFilePaths = new
>> HashSet<FileRef>(datafileSizes.keySet());
>>   +
>>   +        long rid = nextScanReservationId++;
>>   +
>>   +        scanFileReservations.put(rid, absFilePaths);
>>   +
>>   +        Map<FileRef,DataFileValue> ret = new
>> HashMap<FileRef,DataFileValue>();
>>   +
>>   +        for (FileRef path : absFilePaths) {
>>   +          fileScanReferenceCounts.increment(path, 1);
>>   +          ret.put(path, datafileSizes.get(path));
>>   +        }
>>   +
>>   +        return new Pair<Long,Map<FileRef,DataFileValue>>(rid, ret);
>>   +      }
>>   +    }
>>   +
>>   +    void returnFilesForScan(Long reservationId) {
>>   +
>>   +      final Set<FileRef> filesToDelete = new HashSet<FileRef>();
>>   +
>>   +      synchronized (Tablet.this) {
>>   +        Set<FileRef> absFilePaths =
>> scanFileReservations.remove(reservationId);
>>   +
>>   +        if (absFilePaths == null)
>>   +          throw new IllegalArgumentException("Unknown scan reservation
>> id " + reservationId);
>>   +
>>   +        boolean notify = false;
>>   +        for (FileRef path : absFilePaths) {
>>   +          long refCount = fileScanReferenceCounts.decrement(path, 1);
>>   +          if (refCount == 0) {
>>   +            if (filesToDeleteAfterScan.remove(path))
>>   +              filesToDelete.add(path);
>>   +            notify = true;
>>   +          } else if (refCount < 0)
>>   +            throw new IllegalStateException("Scan ref count for " + path
>> + " is " + refCount);
>>   +        }
>>   +
>>   +        if (notify)
>>   +          Tablet.this.notifyAll();
>>   +      }
>>   +
>>   +      if (filesToDelete.size() > 0) {
>>   +        log.debug("Removing scan refs from metadata " + extent + " " +
>> filesToDelete);
>>   +        MetadataTableUtil.removeScanFiles(extent, filesToDelete,
>> SystemCredentials.get(), tabletServer.getLock());
>>   +      }
>>   +    }
>>   +
>>   +    private void removeFilesAfterScan(Set<FileRef> scanFiles) {
>>   +      if (scanFiles.size() == 0)
>>   +        return;
>>   +
>>   +      Set<FileRef> filesToDelete = new HashSet<FileRef>();
>>   +
>>   +      synchronized (Tablet.this) {
>>   +        for (FileRef path : scanFiles) {
>>   +          if (fileScanReferenceCounts.get(path) == 0)
>>   +            filesToDelete.add(path);
>>   +          else
>>   +            filesToDeleteAfterScan.add(path);
>>   +        }
>>   +      }
>>   +
>>   +      if (filesToDelete.size() > 0) {
>>   +        log.debug("Removing scan refs from metadata " + extent + " " +
>> filesToDelete);
>>   +        MetadataTableUtil.removeScanFiles(extent, filesToDelete,
>> SystemCredentials.get(), tabletServer.getLock());
>>   +      }
>>   +    }
>>   +
>>   +    private TreeSet<FileRef> waitForScansToFinish(Set<FileRef>
>> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
>>   +      long startTime = System.currentTimeMillis();
>>   +      TreeSet<FileRef> inUse = new TreeSet<FileRef>();
>>   +
>>   +      Span waitForScans = Trace.start("waitForScans");
>>   +      try {
>>   +        synchronized (Tablet.this) {
>>   +          if (blockNewScans) {
>>   +            if (reservationsBlocked)
>>   +              throw new IllegalStateException();
>>   +
>>   +            reservationsBlocked = true;
>>   +          }
>>   +
>>   +          for (FileRef path : pathsToWaitFor) {
>>   +            while (fileScanReferenceCounts.get(path) > 0 &&
>> System.currentTimeMillis() - startTime < maxWaitTime) {
>>   +              try {
>>   +                Tablet.this.wait(100);
>>   +              } catch (InterruptedException e) {
>>   +                log.warn(e, e);
>>   +              }
>>   +            }
>>   +          }
>>   +
>>   +          for (FileRef path : pathsToWaitFor) {
>>   +            if (fileScanReferenceCounts.get(path) > 0)
>>   +              inUse.add(path);
>>   +          }
>>   +
>>   +          if (blockNewScans) {
>>   +            reservationsBlocked = false;
>>   +            Tablet.this.notifyAll();
>>   +          }
>>   +
>>   +        }
>>   +      } finally {
>>   +        waitForScans.stop();
>>   +      }
>>   +      return inUse;
>>   +    }
>>   +
>>   +    public void importMapFiles(long tid, Map<FileRef,DataFileValue>
>> pathsString, boolean setTime) throws IOException {
>>   +
>>   +      String bulkDir = null;
>>   +
>>   +      Map<FileRef,DataFileValue> paths = new
>> HashMap<FileRef,DataFileValue>();
>>   +      for (Entry<FileRef,DataFileValue> entry : pathsString.entrySet())
>>   +        paths.put(entry.getKey(), entry.getValue());
>>   +
>>   +      for (FileRef tpath : paths.keySet()) {
>>   +
>>   +        boolean inTheRightDirectory = false;
>>   +        Path parent = tpath.path().getParent().getParent();
>>   +        for (String tablesDir : ServerConstants.getTablesDirs()) {
>>   +          if (parent.equals(new Path(tablesDir,
>> extent.getTableId().toString()))) {
>>   +            inTheRightDirectory = true;
>>   +            break;
>>   +          }
>>   +        }
>>   +        if (!inTheRightDirectory) {
>>   +          throw new IOException("Data file " + tpath + " not in table
>> dirs");
>>   +        }
>>   +
>>   +        if (bulkDir == null)
>>   +          bulkDir = tpath.path().getParent().toString();
>>   +        else if (!bulkDir.equals(tpath.path().getParent().toString()))
>>   +          throw new IllegalArgumentException("bulk files in different
>> dirs " + bulkDir + " " + tpath);
>>   +
>>   +      }
>>   +
>>   +      if (extent.isRootTablet()) {
>>   +        throw new IllegalArgumentException("Can not import files to root
>> tablet");
>>   +      }
>>   +
>>   +      synchronized (bulkFileImportLock) {
>>   +        Credentials creds = SystemCredentials.get();
>>   +        Connector conn;
>>   +        try {
>>   +          conn =
>> HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(),
>> creds.getToken());
>>   +        } catch (Exception ex) {
>>   +          throw new IOException(ex);
>>   +        }
>>   +        // Remove any bulk files we've previously loaded and compacted
>> away
>>   +        List<FileRef> files = MetadataTableUtil.getBulkFilesLoaded(conn,
>> extent, tid);
>>   +
>>   +        for (FileRef file : files)
>>   +          if (paths.keySet().remove(file.path()))
>>   +            log.debug("Ignoring request to re-import a file already
>> imported: " + extent + ": " + file);
>>   +
>>   +        if (paths.size() > 0) {
>>   +          long bulkTime = Long.MIN_VALUE;
>>   +          if (setTime) {
>>   +            for (DataFileValue dfv : paths.values()) {
>>   +              long nextTime = tabletTime.getAndUpdateTime();
>>   +              if (nextTime < bulkTime)
>>   +                throw new IllegalStateException("Time went backwards
>> unexpectedly " + nextTime + " " + bulkTime);
>>   +              bulkTime = nextTime;
>>   +              dfv.setTime(bulkTime);
>>   +            }
>>   +          }
>>   +
>>   +          synchronized (timeLock) {
>>   +            if (bulkTime > persistedTime)
>>   +              persistedTime = bulkTime;
>>   +
>>   +            MetadataTableUtil.updateTabletDataFile(tid, extent, paths,
>> tabletTime.getMetadataValue(persistedTime), creds, tabletServer.getLock());
>>   +          }
>>   +        }
>>   +      }
>>   +
>>   +      synchronized (Tablet.this) {
>>   +        for (Entry<FileRef,DataFileValue> tpath : paths.entrySet()) {
>>   +          if (datafileSizes.containsKey(tpath.getKey())) {
>>   +            log.error("Adding file that is already in set " +
>> tpath.getKey());
>>   +          }
>>   +          datafileSizes.put(tpath.getKey(), tpath.getValue());
>>   +
>>   +        }
>>   +
>>   +        tabletResources.importedMapFiles();
>>   +
>>   +        computeNumEntries();
>>   +      }
>>   +
>>   +      for (FileRef tpath : paths.keySet()) {
>>   +        log.log(TLevel.TABLET_HIST, extent + " import " + tpath + " " +
>> paths.get(tpath));
>>   +      }
>>   +    }
>>   +
>>   +    FileRef reserveMergingMinorCompactionFile() {
>>   +      if (mergingMinorCompactionFile != null)
>>   +        throw new IllegalStateException("Tried to reserve merging minor
>> compaction file when already reserved  : " + mergingMinorCompactionFile);
>>   +
>>   +      if (extent.isRootTablet())
>>   +        return null;
>>   +
>>   +      int maxFiles = acuTableConf.getMaxFilesPerTablet();
>>   +
>>   +      // when a major compaction is running and we are at max files,
>> write out
>>   +      // one extra file... want to avoid the case where major compaction
>> is
>>   +      // compacting everything except for the largest file, and
>> therefore the
>>   +      // largest file is returned for merging.. the following check
>> mostly
>>   +      // avoids this case, except for the case where major compactions
>> fail or
>>   +      // are canceled
>>   +      if (majorCompactingFiles.size() > 0 && datafileSizes.size() ==
>> maxFiles)
>>   +        return null;
>>   +
>>   +      if (datafileSizes.size() >= maxFiles) {
>>   +        // find the smallest file
>>   +
>>   +        long min = Long.MAX_VALUE;
>>   +        FileRef minName = null;
>>   +
>>   +        for (Entry<FileRef,DataFileValue> entry :
>> datafileSizes.entrySet()) {
>>   +          if (entry.getValue().getSize() < min &&
>> !majorCompactingFiles.contains(entry.getKey())) {
>>   +            min = entry.getValue().getSize();
>>   +            minName = entry.getKey();
>>   +          }
>>   +        }
>>   +
>>   +        if (minName == null)
>>   +          return null;
>>   +
>>   +        mergingMinorCompactionFile = minName;
>>   +        return minName;
>>   +      }
>>   +
>>   +      return null;
>>   +    }
>>   +
>>   +    void unreserveMergingMinorCompactionFile(FileRef file) {
>>   +      if ((file == null && mergingMinorCompactionFile != null) || (file
>> != null && mergingMinorCompactionFile == null)
>>   +          || (file != null && mergingMinorCompactionFile != null &&
>> !file.equals(mergingMinorCompactionFile)))
>>   +        throw new IllegalStateException("Disagreement " + file + " " +
>> mergingMinorCompactionFile);
>>   +
>>   +      mergingMinorCompactionFile = null;
>>   +    }
>>   +
>>   +    void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef
>> newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession
>> commitSession, long flushId)
>>   +        throws IOException {
>>   +
>>   +      IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
>>   +      if (extent.isRootTablet()) {
>>   +        try {
>>   +          if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) {
>>   +            throw new IllegalStateException();
>>   +          }
>>   +        } catch (Exception e) {
>>   +          throw new IllegalStateException("Can not bring major
>> compaction online, lock not held", e);
>>   +        }
>>   +      }
>>   +
>>   +      // rename before putting in metadata table, so files in metadata
>> table should
>>   +      // always exist
>>   +      do {
>>   +        try {
>>   +          if (dfv.getNumEntries() == 0) {
>>   +            fs.deleteRecursively(tmpDatafile.path());
>>   +          } else {
>>   +            if (fs.exists(newDatafile.path())) {
>>   +              log.warn("Target map file already exist " + newDatafile);
>>   +              fs.deleteRecursively(newDatafile.path());
>>   +            }
>>   +
>>   +            if (!fs.rename(tmpDatafile.path(), newDatafile.path())) {
>>   +              throw new IOException("rename fails");
>>   +            }
>>   +          }
>>   +          break;
>>   +        } catch (IOException ioe) {
>>   +          log.warn("Tablet " + extent + " failed to rename " +
>> newDatafile + " after MinC, will retry in 60 secs...", ioe);
>>   +          UtilWaitThread.sleep(60 * 1000);
>>   +        }
>>   +      } while (true);
>>   +
>>   +      long t1, t2;
>>   +
>>   +      // the code below always assumes merged files are in use by
>> scans... this must be done
>>   +      // because the in memory list of files is not updated until after
>> the metadata table
>>   +      // therefore the file is available to scans until memory is
>> updated, but want to ensure
>>   +      // the file is not available for garbage collection... if memory
>> were updated
>>   +      // before this point (like major compactions do), then the
>> following code could wait
>>   +      // for scans to finish like major compactions do.... used to wait
>> for scans to finish
>>   +      // here, but that was incorrect because a scan could start after
>> waiting but before
>>   +      // memory was updated... assuming the file is always in use by
>> scans leads to
>>   +      // one uneeded metadata update when it was not actually in use
>>   +      Set<FileRef> filesInUseByScans = Collections.emptySet();
>>   +      if (absMergeFile != null)
>>   +        filesInUseByScans = Collections.singleton(absMergeFile);
>>   +
>>   +      // very important to write delete entries outside of log lock,
>> because
>>   +      // this !METADATA write does not go up... it goes sideways or to
>> itself
>>   +      if (absMergeFile != null)
>>   +        MetadataTableUtil.addDeleteEntries(extent,
>> Collections.singleton(absMergeFile), SystemCredentials.get());
>>   +
>>   +      Set<String> unusedWalLogs = beginClearingUnusedLogs();
>>   +      try {
>>   +        // the order of writing to !METADATA and walog is important in
>> the face of machine/process failures
>>   +        // need to write to !METADATA before writing to walog, when
>> things are done in the reverse order
>>   +        // data could be lost... the minor compaction start even should
>> be written before the following metadata
>>   +        // write is made
>>   +
>>   +        synchronized (timeLock) {
>>   +          if (commitSession.getMaxCommittedTime() > persistedTime)
>>   +            persistedTime = commitSession.getMaxCommittedTime();
>>   +
>>   +          String time = tabletTime.getMetadataValue(persistedTime);
>>   +          MasterMetadataUtil.updateTabletDataFile(extent, newDatafile,
>> absMergeFile, dfv, time, SystemCredentials.get(), filesInUseByScans,
>>   +              tabletServer.getClientAddressString(),
>> tabletServer.getLock(), unusedWalLogs, lastLocation, flushId);
>>   +        }
>>   +
>>   +      } finally {
>>   +        finishClearingUnusedLogs();
>>   +      }
>>   +
>>   +      do {
>>   +        try {
>>   +          // the purpose of making this update use the new commit
>> session, instead of the old one passed in,
>>   +          // is because the new one will reference the logs used by
>> current memory...
>>   +
>>   +
>>   tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(),
>> newDatafile.toString(), commitSession.getWALogSeq() + 2);
>>   +          break;
>>   +        } catch (IOException e) {
>>   +          log.error("Failed to write to write-ahead log " +
>> e.getMessage() + " will retry", e);
>>   +          UtilWaitThread.sleep(1 * 1000);
>>   +        }
>>   +      } while (true);
>>   +
>>   +      synchronized (Tablet.this) {
>>   +        lastLocation = null;
>>   +
>>   +        t1 = System.currentTimeMillis();
>>   +        if (datafileSizes.containsKey(newDatafile)) {
>>   +          log.error("Adding file that is already in set " + newDatafile);
>>   +        }
>>   +
>>   +        if (dfv.getNumEntries() > 0) {
>>   +          datafileSizes.put(newDatafile, dfv);
>>   +        }
>>   +
>>   +        if (absMergeFile != null) {
>>   +          datafileSizes.remove(absMergeFile);
>>   +        }
>>   +
>>   +        unreserveMergingMinorCompactionFile(absMergeFile);
>>   +
>>   +        dataSourceDeletions.incrementAndGet();
>>   +        tabletMemory.finishedMinC();
>>   +
>>   +        lastFlushID = flushId;
>>   +
>>   +        computeNumEntries();
>>   +        t2 = System.currentTimeMillis();
>>   +      }
>>   +
>>   +      // must do this after list of files in memory is updated above
>>   +      removeFilesAfterScan(filesInUseByScans);
>>   +
>>   +      if (absMergeFile != null)
>>   +        log.log(TLevel.TABLET_HIST, extent + " MinC [" + absMergeFile +
>> ",memory] -> " + newDatafile);
>>   +      else
>>   +        log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " +
>> newDatafile);
>>   +      log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1)
>> / 1000.0, getExtent().toString()));
>>   +      if (dfv.getSize() >
>> acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD)) {
>>   +        log.debug(String.format("Minor Compaction wrote out file larger
>> than split threshold.  split threshold = %,d  file size = %,d",
>>   +
>>   acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD),
>> dfv.getSize()));
>>   +      }
>>   +
>>   +    }
>>   +
>>   +    public void reserveMajorCompactingFiles(Collection<FileRef> files) {
>>   +      if (majorCompactingFiles.size() != 0)
>>   +        throw new IllegalStateException("Major compacting files not
>> empty " + majorCompactingFiles);
>>   +
>>   +      if (mergingMinorCompactionFile != null &&
>> files.contains(mergingMinorCompactionFile))
>>   +        throw new IllegalStateException("Major compaction tried to
>> resrve file in use by minor compaction " + mergingMinorCompactionFile);
>>   +
>>   +      majorCompactingFiles.addAll(files);
>>   +    }
>>   +
>>   +    public void clearMajorCompactingFile() {
>>   +      majorCompactingFiles.clear();
>>   +    }
>>   +
>>   +    void bringMajorCompactionOnline(Set<FileRef> oldDatafiles, FileRef
>> tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv)
>>   +        throws IOException {
>>   +      long t1, t2;
>>   +
>>   +      if (!extent.isRootTablet()) {
>>   +
>>   +        if (fs.exists(newDatafile.path())) {
>>   +          log.error("Target map file already exist " + newDatafile, new
>> Exception());
>>   +          throw new IllegalStateException("Target map file already exist
>> " + newDatafile);
>>   +        }
>>   +
>>   +        // rename before putting in metadata table, so files in metadata
>> table should
>>   +        // always exist
>>   +        if (!fs.rename(tmpDatafile.path(), newDatafile.path()))
>>   +          log.warn("Rename of " + tmpDatafile + " to " + newDatafile + "
>> returned false");
>>   +
>>   +        if (dfv.getNumEntries() == 0) {
>>   +          fs.deleteRecursively(newDatafile.path());
>>   +        }
>>   +      }
>>   +
>>   +      TServerInstance lastLocation = null;
>>   +      synchronized (Tablet.this) {
>>   +
>>   +        t1 = System.currentTimeMillis();
>>   +
>>   +        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
>>   +
>>   +        dataSourceDeletions.incrementAndGet();
>>   +
>>   +        if (extent.isRootTablet()) {
>>   +
>>   +          waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
>>   +
>>   +          try {
>>   +            if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) {
>>   +              throw new IllegalStateException();
>>   +            }
>>   +          } catch (Exception e) {
>>   +            throw new IllegalStateException("Can not bring major
>> compaction online, lock not held", e);
>>   +          }
>>   +
>>   +          // mark files as ready for deletion, but
>>   +          // do not delete them until we successfully
>>   +          // rename the compacted map file, in case
>>   +          // the system goes down
>>   +
>>   +          String compactName = newDatafile.path().getName();
>>   +
>>   +          for (FileRef ref : oldDatafiles) {
>>   +            Path path = ref.path();
>>   +            fs.rename(path, new Path(location + "/delete+" + compactName
>> + "+" + path.getName()));
>>   +          }
>>   +
>>   +          if (fs.exists(newDatafile.path())) {
>>   +            log.error("Target map file already exist " + newDatafile,
>> new Exception());
>>   +            throw new IllegalStateException("Target map file already
>> exist " + newDatafile);
>>   +          }
>>   +
>>   +          if (!fs.rename(tmpDatafile.path(), newDatafile.path()))
>>   +            log.warn("Rename of " + tmpDatafile + " to " + newDatafile +
>> " returned false");
>>   +
>>   +          // start deleting files, if we do not finish they will be
>> cleaned
>>   +          // up later
>>   +          for (FileRef ref : oldDatafiles) {
>>   +            Path path = ref.path();
>>   +            Path deleteFile = new Path(location + "/delete+" +
>> compactName + "+" + path.getName());
>>   +            if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) ||
>> !fs.moveToTrash(deleteFile))
>>   +              fs.deleteRecursively(deleteFile);
>>   +          }
>>   +        }
>>   +
>>   +        // atomically remove old files and add new file
>>   +        for (FileRef oldDatafile : oldDatafiles) {
>>   +          if (!datafileSizes.containsKey(oldDatafile)) {
>>   +            log.error("file does not exist in set " + oldDatafile);
>>   +          }
>>   +          datafileSizes.remove(oldDatafile);
>>   +          majorCompactingFiles.remove(oldDatafile);
>>   +        }
>>   +
>>   +        if (datafileSizes.containsKey(newDatafile)) {
>>   +          log.error("Adding file that is already in set " + newDatafile);
>>   +        }
>>   +
>>   +        if (dfv.getNumEntries() > 0) {
>>   +          datafileSizes.put(newDatafile, dfv);
>>   +        }
>>   +
>>   +        // could be used by a follow on compaction in a multipass
>> compaction
>>   +        majorCompactingFiles.add(newDatafile);
>>   +
>>   +        computeNumEntries();
>>   +
>>   +        lastLocation = Tablet.this.lastLocation;
>>   +        Tablet.this.lastLocation = null;
>>   +
>>   +        if (compactionId != null)
>>   +          lastCompactID = compactionId;
>>   +
>>   +        t2 = System.currentTimeMillis();
>>   +      }
>>   +
>>   +      if (!extent.isRootTablet()) {
>>   +        Set<FileRef> filesInUseByScans =
>> waitForScansToFinish(oldDatafiles, false, 10000);
>>   +        if (filesInUseByScans.size() > 0)
>>   +          log.debug("Adding scan refs to metadata " + extent + " " +
>> filesInUseByScans);
>>   +        MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles,
>> filesInUseByScans, newDatafile, compactionId, dfv, SystemCredentials.get(),
>>   +            tabletServer.getClientAddressString(), lastLocation,
>> tabletServer.getLock());
>>   +        removeFilesAfterScan(filesInUseByScans);
>>   +      }
>>   +
>>   +      log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) /
>> 1000.0));
>>   +      log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + "
>> --> " + newDatafile);
>>   +    }
>>   +
>>   +    public SortedMap<FileRef,DataFileValue> getDatafileSizes() {
>>   +      synchronized (Tablet.this) {
>>   +        TreeMap<FileRef,DataFileValue> copy = new
>> TreeMap<FileRef,DataFileValue>(datafileSizes);
>>   +        return Collections.unmodifiableSortedMap(copy);
>>   +      }
>>   +    }
>>   +
>>   +    public Set<FileRef> getFiles() {
>>   +      synchronized (Tablet.this) {
>>   +        HashSet<FileRef> files = new
>> HashSet<FileRef>(datafileSizes.keySet());
>>   +        return Collections.unmodifiableSet(files);
>>   +      }
>>   +    }
>>   +
>>   +  }
>>   +
>>   +  public Tablet(TabletServer tabletServer, Text location, KeyExtent
>> extent, TabletResourceManager trm, SortedMap<Key,Value> tabletsKeyValues)
>>   +      throws IOException {
>>   +    this(tabletServer, location, extent, trm,
>> CachedConfiguration.getInstance(), tabletsKeyValues);
>>   +    splitCreationTime = 0;
>>   +  }
>>   +
>>   +  public Tablet(TabletServer tabletServer, Text location, KeyExtent
>> extent, TabletResourceManager trm, SortedMap<FileRef,DataFileValue>
>> datafiles, String time,
>>   +      long initFlushID, long initCompactID) throws IOException {
>>   +    this(tabletServer, location, extent, trm,
>> CachedConfiguration.getInstance(), datafiles, time, initFlushID,
>> initCompactID);
>>   +    splitCreationTime = System.currentTimeMillis();
>>   +  }
>>   +
>>   +  private Tablet(TabletServer tabletServer, Text location, KeyExtent
>> extent, TabletResourceManager trm, Configuration conf,
>>   +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>>   +    this(tabletServer, location, extent, trm, conf,
>> VolumeManagerImpl.get(), tabletsKeyValues);
>>   +  }
>>   +
>>   +  static private final List<LogEntry> EMPTY = Collections.emptyList();
>>   +
>>   +  private Tablet(TabletServer tabletServer, Text location, KeyExtent
>> extent, TabletResourceManager trm, Configuration conf,
>>   +      SortedMap<FileRef,DataFileValue> datafiles, String time, long
>> initFlushID, long initCompactID) throws IOException {
>>   +    this(tabletServer, location, extent, trm, conf,
>> VolumeManagerImpl.get(), EMPTY, datafiles, time, null, new
>> HashSet<FileRef>(), initFlushID, initCompactID);
>>   +  }
>>   +
>>   +  private static String lookupTime(AccumuloConfiguration conf, KeyExtent
>> extent, SortedMap<Key,Value> tabletsKeyValues) {
>>   +    SortedMap<Key,Value> entries;
>>   +
>>   +    if (extent.isRootTablet()) {
>>   +      return null;
>>   +    } else {
>>   +      entries = new TreeMap<Key,Value>();
>>   +      Text rowName = extent.getMetadataEntry();
>>   +      for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>   +        if (entry.getKey().compareRow(rowName) == 0 &&
>> TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
>>   +          entries.put(new Key(entry.getKey()), new
>> Value(entry.getValue()));
>>   +        }
>>   +      }
>>   +    }
>>   +
>>   +    // log.debug("extent : "+extent+"   entries : "+entries);
>>   +
>>   +    if (entries.size() == 1)
>>   +      return entries.values().iterator().next().toString();
>>   +    return null;
>>   +  }
>>   +
>>   +  private static SortedMap<FileRef,DataFileValue>
>> lookupDatafiles(AccumuloConfiguration conf, VolumeManager fs, KeyExtent
>> extent,
>>   +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>>   +
>>   +    TreeMap<FileRef,DataFileValue> datafiles = new
>> TreeMap<FileRef,DataFileValue>();
>>   +
>>   +    if (extent.isRootTablet()) { // the meta0 tablet
>>   +      Path location = new Path(MetadataTableUtil.getRootTabletDir());
>>   +      // cleanUpFiles() has special handling for delete. files
>>   +      FileStatus[] files = fs.listStatus(location);
>>   +      Collection<String> goodPaths = cleanUpFiles(fs, files, true);
>>   +      for (String good : goodPaths) {
>>   +        Path path = new Path(good);
>>   +        String filename = path.getName();
>>   +        FileRef ref = new FileRef(location.toString() + "/" + filename,
>> path);
>>   +        DataFileValue dfv = new DataFileValue(0, 0);
>>   +        datafiles.put(ref, dfv);
>>   +      }
>>   +    } else {
>>   +
>>   +      Text rowName = extent.getMetadataEntry();
>>   +
>>   +      String tableId = extent.isMeta() ? RootTable.ID : MetadataTable.ID;
>>   +      ScannerImpl mdScanner = new
>> ScannerImpl(HdfsZooInstance.getInstance(), SystemCredentials.get(),
>> tableId, Authorizations.EMPTY);
>>   +
>>   +      // Commented out because when no data file is present, each tablet
>> will scan through metadata table and return nothing
>>   +      // reduced batch size to improve performance
>>   +      // changed here after endKeys were implemented from 10 to 1000
>>   +      mdScanner.setBatchSize(1000);
>>   +
>>   +      // leave these in, again, now using endKey for safety
>>   +      mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
>>   +
>>   +      mdScanner.setRange(new Range(rowName));
>>   +
>>   +      for (Entry<Key,Value> entry : mdScanner) {
>>   +
>>   +        if (entry.getKey().compareRow(rowName) != 0) {
>>   +          break;
>>   +        }
>>   +
>>   +        FileRef ref = new
>> FileRef(entry.getKey().getColumnQualifier().toString(),
>> fs.getFullPath(entry.getKey()));
>>   +        datafiles.put(ref, new DataFileValue(entry.getValue().get()));
>>   +      }
>>   +    }
>>   +    return datafiles;
>>   +  }
>>   +
>>   +  private static List<LogEntry> lookupLogEntries(KeyExtent ke,
>> SortedMap<Key,Value> tabletsKeyValues) {
>>   +    List<LogEntry> logEntries = new ArrayList<LogEntry>();
>>   +
>>   +    if (ke.isMeta()) {
>>   +      try {
>>   +        logEntries =
>> MetadataTableUtil.getLogEntries(SystemCredentials.get(), ke);
>>   +      } catch (Exception ex) {
>>   +        throw new RuntimeException("Unable to read tablet log entries",
>> ex);
>>   +      }
>>   +    } else {
>>   +      log.debug("Looking at metadata " + tabletsKeyValues);
>>   +      Text row = ke.getMetadataEntry();
>>   +      for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>   +        Key key = entry.getKey();
>>   +        if (key.getRow().equals(row)) {
>>   +          if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
>>   +            logEntries.add(LogEntry.fromKeyValue(key, entry.getValue()));
>>   +          }
>>   +        }
>>   +      }
>>   +    }
>>   +
>>   +    log.debug("got " + logEntries + " for logs for " + ke);
>>   +    return logEntries;
>>   +  }
>>   +
>>   +  private static Set<FileRef> lookupScanFiles(KeyExtent extent,
>> SortedMap<Key,Value> tabletsKeyValues, VolumeManager fs) throws IOException
>> {
>>   +    HashSet<FileRef> scanFiles = new HashSet<FileRef>();
>>   +
>>   +    Text row = extent.getMetadataEntry();
>>   +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>   +      Key key = entry.getKey();
>>   +      if (key.getRow().equals(row) &&
>> key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) {
>>   +        String meta = key.getColumnQualifier().toString();
>>   +        Path path = fs.getFullPath(extent.getTableId().toString(), meta);
>>   +        scanFiles.add(new FileRef(meta, path));
>>   +      }
>>   +    }
>>   +
>>   +    return scanFiles;
>>   +  }
>>   +
>>   +  private static long lookupFlushID(KeyExtent extent,
>> SortedMap<Key,Value> tabletsKeyValues) {
>>   +    Text row = extent.getMetadataEntry();
>>   +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>   +      Key key = entry.getKey();
>>   +      if (key.getRow().equals(row) &&
>> TabletsSection.ServerColumnFamily.FLUSH_COLUMN.equals(key.getColumnFamily(),
>> key.getColumnQualifier()))
>>   +        return Long.parseLong(entry.getValue().toString());
>>   +    }
>>   +
>>   +    return -1;
>>   +  }
>>   +
>>   +  private static long lookupCompactID(KeyExtent extent,
>> SortedMap<Key,Value> tabletsKeyValues) {
>>   +    Text row = extent.getMetadataEntry();
>>   +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>   +      Key key = entry.getKey();
>>   +      if (key.getRow().equals(row) &&
>> TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(),
>> key.getColumnQualifier()))
>>   +        return Long.parseLong(entry.getValue().toString());
>>   +    }
>>   +
>>   +    return -1;
>>   +  }
>>   +
>>   +  private Tablet(TabletServer tabletServer, Text location, KeyExtent
>> extent, TabletResourceManager trm, Configuration conf, VolumeManager fs,
>>   +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>>   +    this(tabletServer, location, extent, trm, conf, fs,
>> lookupLogEntries(extent, tabletsKeyValues),
>> lookupDatafiles(tabletServer.getSystemConfiguration(), fs,
>>   +        extent, tabletsKeyValues),
>> lookupTime(tabletServer.getSystemConfiguration(), extent,
>> tabletsKeyValues), lookupLastServer(extent, tabletsKeyValues),
>>   +        lookupScanFiles(extent, tabletsKeyValues, fs),
>> lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent,
>> tabletsKeyValues));
>>   +  }
>>   +
>>   +  private static TServerInstance lookupLastServer(KeyExtent extent,
>> SortedMap<Key,Value> tabletsKeyValues) {
>>   +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>   +      if (entry.getKey().getColumnFamily().compareTo(
>> TabletsSection.LastLocationColumnFamily.NAME) == 0) {
>>   +        return new TServerInstance(entry.getValue(),
>> entry.getKey().getColumnQualifier());
>>   +      }
>>   +    }
>>   +    return null;
>>   +  }
>>   +
>>   +  /**
>>   +   * yet another constructor - this one allows us to avoid costly
>> lookups into the Metadata table if we already know the files we need - as
>> at split time
>>   +   */
>>   +  private Tablet(final TabletServer tabletServer, final Text location,
>> final KeyExtent extent, final TabletResourceManager trm, final
>> Configuration conf,
>>   +      final VolumeManager fs, final List<LogEntry> logEntries, final
>> SortedMap<FileRef,DataFileValue> datafiles, String time,
>>   +      final TServerInstance lastLocation, Set<FileRef> scanFiles, long
>> initFlushID, long initCompactID) throws IOException {
>>   +    Path locationPath;
>>   +    if (location.find(":") >= 0) {
>>   +      locationPath = new Path(location.toString());
>>   +    } else {
>>   +      locationPath = fs.getFullPath(FileType.TABLE,
>> extent.getTableId().toString() + location.toString());
>>   +    }
>>   +    FileSystem fsForPath = fs.getFileSystemByPath(locationPath);
>>   +    this.location = locationPath.makeQualified(fsForPath.getUri(),
>> fsForPath.getWorkingDirectory());
>>   +    this.lastLocation = lastLocation;
>>   +    this.tabletDirectory = location.toString();
>>   +    this.conf = conf;
>>   +    this.acuTableConf = tabletServer.getTableConfiguration(extent);
>>   +
>>   +    this.fs = fs;
>>   +    this.extent = extent;
>>   +    this.tabletResources = trm;
>>   +
>>   +    this.lastFlushID = initFlushID;
>>   +    this.lastCompactID = initCompactID;
>>   +
>>   +    if (extent.isRootTablet()) {
>>   +      long rtime = Long.MIN_VALUE;
>>   +      for (FileRef ref : datafiles.keySet()) {
>>   +        Path path = ref.path();
>>   +        FileSystem ns = fs.getFileSystemByPath(path);
>>   +        FileSKVIterator reader =
>> FileOperations.getInstance().openReader(path.toString(), true, ns,
>> ns.getConf(), tabletServer.getTableConfiguration(extent));
>>   +        long maxTime = -1;
>>   +        try {
>>   +
>>   +          while (reader.hasTop()) {
>>   +            maxTime = Math.max(maxTime,
>> reader.getTopKey().getTimestamp());
>>   +            reader.next();
>>   +          }
>>   +
>>   +        } finally {
>>   +          reader.close();
>>   +        }
>>   +
>>   +        if (maxTime > rtime) {
>>   +          time = TabletTime.LOGICAL_TIME_ID + "" + maxTime;
>>   +          rtime = maxTime;
>>   +        }
>>   +      }
>>   +    }
>>   +    if (time == null && datafiles.isEmpty() &&
>> extent.equals(RootTable.OLD_EXTENT)) {
>>   +      // recovery... old root tablet has no data, so time doesn't matter:
>>   +      time = TabletTime.LOGICAL_TIME_ID + "" + Long.MIN_VALUE;
>>   +    }
>>   +
>>   +    this.tabletServer = tabletServer;
>>   +    this.logId = tabletServer.createLogId(extent);
>>   +
>>   +    this.timer = new TabletStatsKeeper();
>>   +
>>   +    setupDefaultSecurityLabels(extent);
>>   +
>>   +    tabletMemory = new TabletMemory();
>>   +    tabletTime = TabletTime.getInstance(time);
>>   +    persistedTime = tabletTime.getTime();
>>   +
>>   +    acuTableConf.addObserver(configObserver = new
>> ConfigurationObserver() {
>>   +
>>   +      private void reloadConstraints() {
>>   +        constraintChecker.set(new
>> ConstraintChecker(getTableConfiguration()));
>>   +      }
>>   +
>>   +      @Override
>>   +      public void propertiesChanged() {
>>   +        reloadConstraints();
>>   +
>>   +        try {
>>   +          setupDefaultSecurityLabels(extent);
>>   +        } catch (Exception e) {
>>   +          log.error("Failed to reload default security labels for
>> extent: " + extent.toString());
>>   +        }
>>   +      }
>>   +
>>   +      @Override
>>   +      public void propertyChanged(String prop) {
>>   +        if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey()))
>>   +          reloadConstraints();
>>   +        else if
>> (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) {
>>   +          try {
>>   +            log.info("Default security labels changed for extent: " +
>> extent.toString());
>>   +            setupDefaultSecurityLabels(extent);
>>   +          } catch (Exception e) {
>>   +            log.error("Failed to reload default security labels for
>> extent: " + extent.toString());
>>   +          }
>>   +        }
>>   +
>>   +      }
>>   +
>>   +      @Override
>>   +      public void sessionExpired() {
>>   +        log.debug("Session expired, no longer updating per table
>> props...");
>>   +      }
>>   +
>>   +    });
>>   +    // Force a load of any per-table properties
>>   +    configObserver.propertiesChanged();
>>   +
>>   +    tabletResources.setTablet(this, acuTableConf);
>>   +    if (!logEntries.isEmpty()) {
>>   +      log.info("Starting Write-Ahead Log recovery for " + this.extent);
>>   +      final long[] count = new long[2];
>>   +      final CommitSession commitSession =
>> tabletMemory.getCommitSession();
>>   +      count[1] = Long.MIN_VALUE;
>>   +      try {
>>   +        Set<String> absPaths = new HashSet<String>();
>>   +        for (FileRef ref : datafiles.keySet())
>>   +          absPaths.add(ref.path().toString());
>>   +
>>   +        tabletServer.recover(this.tabletServer.getFileSystem(), this,
>> logEntries, absPaths, new MutationReceiver() {
>>   +          @Override
>>   +          public void receive(Mutation m) {
>>   +            // LogReader.printMutation(m);
>>   +            Collection<ColumnUpdate> muts = m.getUpdates();
>>   +            for (ColumnUpdate columnUpdate : muts) {
>>   +              if (!columnUpdate.hasTimestamp()) {
>>   +                // if it is not a user set timestamp, it must have been
>> set
>>   +                // by the system
>>   +                count[1] = Math.max(count[1],
>> columnUpdate.getTimestamp());
>>   +              }
>>   +            }
>>   +            tabletMemory.mutate(commitSession,
>> Collections.singletonList(m));
>>   +            count[0]++;
>>   +          }
>>   +        });
>>   +
>>   +        if (count[1] != Long.MIN_VALUE) {
>>   +          tabletTime.useMaxTimeFromWALog(count[1]);
>>   +        }
>>   +        commitSession.updateMaxCommittedTime(tabletTime.getTime());
>>   +
>> -         tabletMemory.updateMemoryUsageStats();
>> -
>>   +        if (count[0] == 0) {
>>   +          MetadataTableUtil.removeUnusedWALEntries(extent, logEntries,
>> tabletServer.getLock());
>>   +          logEntries.clear();
>>   +        }
>>   +
>>   +      } catch (Throwable t) {
>>   +        if (acuTableConf.getBoolean(Property.TABLE_FAILURES_IGNORE)) {
>>   +          log.warn("Error recovering from log files: ", t);
>>   +        } else {
>>   +          throw new RuntimeException(t);
>>   +        }
>>   +      }
>>   +      // make some closed references that represent the recovered logs
>>   +      currentLogs = new HashSet<DfsLogger>();
>>   +      for (LogEntry logEntry : logEntries) {
>>   +        for (String log : logEntry.logSet) {
>>   +          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(),
>> log));
>>   +        }
>>   +      }
>>   +
>>   +      log.info("Write-Ahead Log recovery complete for " + this.extent +
>> " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries()
>>   +          + " entries created)");
>>   +    }
>>   +
>>   +    String contextName = acuTableConf.get(Property.TABLE_CLASSPATH);
>>   +    if (contextName != null && !contextName.equals("")) {
>>   +      // initialize context classloader, instead of possibly waiting for
>> it to initialize for a scan
>>   +      // TODO this could hang, causing other tablets to fail to load -
>> ACCUMULO-1292
>>   +
>>   AccumuloVFSClassLoader.getContextManager().getClassLoader(contextName);
>>   +    }
>>   +
>>   +    // do this last after tablet is completely setup because it
>>   +    // could cause major compaction to start
>>   +    datafileManager = new DatafileManager(datafiles);
>>   +
>>   +    computeNumEntries();
>>   +
>>   +    datafileManager.removeFilesAfterScan(scanFiles);
>>   +
>>   +    // look for hints of a failure on the previous tablet server
>>   +    if (!logEntries.isEmpty() ||
>> needsMajorCompaction(MajorCompactionReason.NORMAL)) {
>>   +      // look for any temp files hanging around
>>   +      removeOldTemporaryFiles();
>>   +    }
>>   +
>>   +    log.log(TLevel.TABLET_HIST, extent + " opened");
>>   +  }
>>   +
>>   +  private void removeOldTemporaryFiles() {
>>   +    // remove any temporary files created by a previous tablet server
>>   +    try {
>>   +      for (FileStatus tmp : fs.globStatus(new Path(location, "*_tmp"))) {
>>   +        try {
>>   +          log.debug("Removing old temp file " + tmp.getPath());
>>   +          fs.delete(tmp.getPath());
>>   +        } catch (IOException ex) {
>>   +          log.error("Unable to remove old temp file " + tmp.getPath() +
>> ": " + ex);
>>   +        }
>>   +      }
>>   +    } catch (IOException ex) {
>>   +      log.error("Error scanning for old temp files in " + location);
>>   +    }
>>   +  }
>>   +
>>   +  private void setupDefaultSecurityLabels(KeyExtent extent) {
>>   +    if (extent.isMeta()) {
>>   +      defaultSecurityLabel = new byte[0];
>>   +    } else {
>>   +      try {
>>   +        ColumnVisibility cv = new
>> ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
>>   +        this.defaultSecurityLabel = cv.getExpression();
>>   +      } catch (Exception e) {
>>   +        log.error(e, e);
>>   +        this.defaultSecurityLabel = new byte[0];
>>   +      }
>>   +    }
>>   +  }
>>   +
>>   +  private static Collection<String> cleanUpFiles(VolumeManager fs,
>> FileStatus[] files, boolean deleteTmp) throws IOException {
>>   +    /*
>>   +     * called in constructor and before major compactions
>>   +     */
>>   +    Collection<String> goodFiles = new ArrayList<String>(files.length);
>>   +
>>   +    for (FileStatus file : files) {
>>   +
>>   +      String path = file.getPath().toString();
>>   +      String filename = file.getPath().getName();
>>   +
>>   +      // check for incomplete major compaction, this should only occur
>>   +      // for root tablet
>>   +      if (filename.startsWith("delete+")) {
>>   +        String expectedCompactedFile = path.substring(0,
>> path.lastIndexOf("/delete+")) + "/" + filename.split("\\+")[1];
>>   +        if (fs.exists(new Path(expectedCompactedFile))) {
>>   +          // compaction finished, but did not finish deleting compacted
>> files.. so delete it
>>   +          if (!fs.deleteRecursively(file.getPath()))
>>   +            log.warn("Delete of file: " + file.getPath().toString() + "
>> return false");
>>   +          continue;
>>   +        }
>>   +        // compaction did not finish, so put files back
>>   +
>>   +        // reset path and filename for rest of loop
>>   +        filename = filename.split("\\+", 3)[2];
>>   +        path = path.substring(0, path.lastIndexOf("/delete+")) + "/" +
>> filename;
>>   +
>>   +        if (!fs.rename(file.getPath(), new Path(path)))
>>   +          log.warn("Rename of " + file.getPath().toString() + " to " +
>> path + " returned false");
>>   +      }
>>   +
>>   +      if (filename.endsWith("_tmp")) {
>>   +        if (deleteTmp) {
>>   +          log.warn("cleaning up old tmp file: " + path);
>>   +          if (!fs.deleteRecursively(file.getPath()))
>>   +            log.warn("Delete of tmp file: " + file.getPath().toString()
>> + " return false");
>>   +
>>   +        }
>>   +        continue;
>>   +      }
>>   +
>>   +      if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") &&
>> !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) {
>>   +        log.error("unknown file in tablet" + path);
>>   +        continue;
>>   +      }
>>   +
>>   +      goodFiles.add(path);
>>   +    }
>>   +
>>   +    return goodFiles;
>>   +  }
>>   +
>>   +  public static class KVEntry extends KeyValue {
>>   +    public KVEntry(Key k, Value v) {
>>   +      super(new Key(k), Arrays.copyOf(v.get(), v.get().length));
>>   +    }
>>   +
>>   +    @Override
>>   +    public String toString() {
>>   +      return key.toString() + "=" + getValue();
>>   +    }
>>   +
>>   +    int numBytes() {
>>   +      return key.getSize() + getValue().get().length;
>>   +    }
>>   +
>>   +    int estimateMemoryUsed() {
>>   +      return key.getSize() + getValue().get().length + (9 * 32); //
>> overhead is 32 per object
>>   +    }
>>   +  }
>>   +
>>   +  private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi,
>> List<Range> ranges, HashSet<Column> columnSet, ArrayList<KVEntry> results,
>>   +      long maxResultsSize) throws IOException {
>>   +
>>   +    LookupResult lookupResult = new LookupResult();
>>   +
>>   +    boolean exceededMemoryUsage = false;
>>   +    boolean tabletClosed = false;
>>   +
>>   +    Set<ByteSequence> cfset = null;
>>   +    if (columnSet.size() > 0)
>>   +      cfset = LocalityGroupUtil.families(columnSet);
>>   +
>>   +    for (Range range : ranges) {
>>   +
>>   +      if (exceededMemoryUsage || tabletClosed) {
>>   +        lookupResult.unfinishedRanges.add(range);
>>   +        continue;
>>   +      }
>>   +
>>   +      int entriesAdded = 0;
>>   +
>>   +      try {
>>   +        if (cfset != null)
>>   +          mmfi.seek(range, cfset, true);
>>   +        else
>>   +          mmfi.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
>>   +
>>   +        while (mmfi.hasTop()) {
>>   +          Key key = mmfi.getTopKey();
>>   +
>>   +          KVEntry kve = new KVEntry(key, mmfi.getTopValue());
>>   +          results.add(kve);
>>   +          entriesAdded++;
>>   +          lookupResult.bytesAdded += kve.estimateMemoryUsed();
>>   +          lookupResult.dataSize += kve.numBytes();
>>   +
>>   +          exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize;
>>   +
>>   +          if (exceededMemoryUsage) {
>>   +            addUnfinishedRange(lookupResult, range, key, false);
>>   +            break;
>>   +          }
>>   +
>>   +          mmfi.next();
>>   +        }
>>   +
>>   +      } catch (TooManyFilesException tmfe) {
>>   +        // treat this as a closed tablet, and let the client retry
>>   +        log.warn("Tablet " + getExtent() + " has too many files, batch
>> lookup can not run");
>>   +        handleTabletClosedDuringScan(results, lookupResult,
>> exceededMemoryUsage, range, entriesAdded);
>>   +        tabletClosed = true;
>>   +      } catch (IOException ioe) {
>>   +        if (shutdownInProgress()) {
>>   +          // assume HDFS shutdown hook caused this exception
>>   +          log.debug("IOException while shutdown in progress ", ioe);
>>   +          handleTabletClosedDuringScan(results, lookupResult,
>> exceededMemoryUsage, range, entriesAdded);
>>   +          tabletClosed = true;
>>   +        } else {
>>   +          throw ioe;
>>   +        }
>>   +      } catch (IterationInterruptedException iie) {
>>   +        if (isClosed()) {
>>   +          handleTabletClosedDuringScan(results, lookupResult,
>> exceededMemoryUsage, range, entriesAdded);
>>   +          tabletClosed = true;
>>   +        } else {
>>   +          throw iie;
>>   +        }
>>   +      } catch (TabletClosedException tce) {
>>   +        handleTabletClosedDuringScan(results, lookupResult,
>> exceededMemoryUsage, range, entriesAdded);
>>   +        tabletClosed = true;
>>   +      }
>>   +
>>   +    }
>>   +
>>   +    return lookupResult;
>>   +  }
>>   +
>>   +  private void handleTabletClosedDuringScan(ArrayList<KVEntry> results,
>> LookupResult lookupResult, boolean exceededMemoryUsage, Range range, int
>> entriesAdded) {
>>   +    if (exceededMemoryUsage)
>>   +      throw new IllegalStateException("tablet should not exceed memory
>> usage or close, not both");
>>   +
>>   +    if (entriesAdded > 0)
>>   +      addUnfinishedRange(lookupResult, range, results.get(results.size()
>> - 1).key, false);
>>   +    else
>>   +      lookupResult.unfinishedRanges.add(range);
>>   +
>>   +    lookupResult.closed = true;
>>   +  }
>>   +
>>   +  private void addUnfinishedRange(LookupResult lookupResult, Range
>> range, Key key, boolean inclusiveStartKey) {
>>   +    if (range.getEndKey() == null || key.compareTo(range.getEndKey()) <
>> 0) {
>>   +      Range nlur = new Range(new Key(key), inclusiveStartKey,
>> range.getEndKey(), range.isEndKeyInclusive());
>>   +      lookupResult.unfinishedRanges.add(nlur);
>>   +    }
>>   +  }
>>   +
>>   +  public static interface KVReceiver {
>>   +    void receive(List<KVEntry> matches) throws IOException;
>>   +  }
>>   +
>>   +  class LookupResult {
>>   +    List<Range> unfinishedRanges = new ArrayList<Range>();
>>   +    long bytesAdded = 0;
>>   +    long dataSize = 0;
>>   +    boolean closed = false;
>>   +  }
>>   +
>>   +  public LookupResult lookup(List<Range> ranges, HashSet<Column>
>> columns, Authorizations authorizations, ArrayList<KVEntry> results, long
>> maxResultSize,
>>   +      List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
>> AtomicBoolean interruptFlag) throws IOException {
>>   +
>>   +    if (ranges.size() == 0) {
>>   +      return new LookupResult();
>>   +    }
>>   +
>>   +    ranges = Range.mergeOverlapping(ranges);
>>   +    Collections.sort(ranges);
>>   +
>>   +    Range tabletRange = extent.toDataRange();
>>   +    for (Range range : ranges) {
>>   +      // do a test to see if this range falls within the tablet, if it
>> does not
>>   +      // then clip will throw an exception
>>   +      tabletRange.clip(range);
>>   +    }
>>   +
>>   +    ScanDataSource dataSource = new ScanDataSource(authorizations,
>> this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag);
>>   +
>>   +    LookupResult result = null;
>>   +
>>   +    try {
>>   +      SortedKeyValueIterator<Key,Value> iter = new
>> SourceSwitchingIterator(dataSource);
>>   +      result = lookup(iter, ranges, columns, results, maxResultSize);
>>   +      return result;
>>   +    } catch (IOException ioe) {
>>   +      dataSource.close(true);
>>   +      throw ioe;
>>   +    } finally {
>>   +      // code in finally block because always want
>>   +      // to return mapfiles, even when exception is thrown
>>   +      dataSource.close(false);
>>   +
>>   +      synchronized (this) {
>>   +        queryCount += results.size();
>>   +        if (result != null)
>>   +          queryBytes += result.dataSize;
>>   +      }
>>   +    }
>>   +  }
>>   +
>>   +  private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range
>> range, int num, Set<Column> columns) throws IOException {
>>   +
>>   +    // log.info("In nextBatch..");
>>   +
>>   +    List<KVEntry> results = new ArrayList<KVEntry>();
>>   +    Key key = null;
>>   +
>>   +    Value value;
>>   +    long resultSize = 0L;
>>   +    long resultBytes = 0L;
>>   +
>>   +    long maxResultsSize =
>> acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
>>   +
>>   +    if (columns.size() == 0) {
>>   +      iter.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
>>   +    } else {
>>   +      iter.seek(range, LocalityGroupUtil.families(columns), true);
>>   +    }
>>   +
>>   +    Key continueKey = null;
>>   +    boolean skipContinueKey = false;
>>   +
>>   +    boolean endOfTabletReached = false;
>>   +    while (iter.hasTop()) {
>>   +
>>   +      value = iter.getTopValue();
>>   +      key = iter.getTopKey();
>>   +
>>   +      KVEntry kvEntry = new KVEntry(key, value); // copies key and value
>>   +      results.add(kvEntry);
>>   +      resultSize += kvEntry.estimateMemoryUsed();
>>   +      resultBytes += kvEntry.numBytes();
>>   +
>>   +      if (resultSize >= maxResultsSize || results.size() >= num) {
>>   +        continueKey = new Key(key);
>>   +        skipContinueKey = true;
>>   +        break;
>>   +      }
>>   +
>>   +      iter.next();
>>   +    }
>>   +
>>   +    if (iter.hasTop() == false) {
>>   +      endOfTabletReached = true;
>>   +    }
>>   +
>>   +    Batch retBatch = new Batch();
>>   +    retBatch.numBytes = resultBytes;
>>   +
>>   +    if (!endOfTabletReached) {
>>   +      retBatch.continueKey = continueKey;
>>   +      retBatch.skipContinueKey = skipContinueKey;
>>   +    } else {
>>   +      retBatch.continueKey = null;
>>   +    }
>>   +
>>   +    if (endOfTabletReached && results.size() == 0)
>>   +      retBatch.results = null;
>>   +    else
>>   +      retBatch.results = results;
>>   +
>>   +    return retBatch;
>>   +  }
>>   +
>>   +  /**
>>   +   * Determine if a JVM shutdown is in progress.
>>   +   *
>>   +   */
>>   +  private boolean shutdownInProgress() {
>>   +    try {
>>   +      Runtime.getRuntime().removeShutdownHook(new Thread(new Runnable() {
>>   +        @Override
>>   +        public void run() {}
>>   +      }));
>>   +    } catch (IllegalStateException ise) {
>>   +      return true;
>>   +    }
>>   +
>>   +    return false;
>>   +  }
>>   +
>>   +  private class Batch {
>>   +    public boolean skipContinueKey;
>>   +    public List<KVEntry> results;
>>   +    public Key continueKey;
>>   +    public long numBytes;
>>   +  }
>>   +
>>   +  Scanner createScanner(Range range, int num, Set<Column> columns,
>> Authorizations authorizations, List<IterInfo> ssiList,
>> Map<String,Map<String,String>> ssio,
>>   +      boolean isolated, AtomicBoolean interruptFlag) {
>>   +    // do a test to see if this range falls within the tablet, if it
>> does not
>>   +    // then clip will throw an exception
>>   +    extent.toDataRange().clip(range);
>>   +
>>   +    ScanOptions opts = new ScanOptions(num, authorizations,
>> this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated);
>>   +    return new Scanner(range, opts);
>>   +  }
>>   +
>>   +  class ScanBatch {
>>   +    boolean more;
>>   +    List<KVEntry> results;
>>   +
>>   +    ScanBatch(List<KVEntry> results, boolean more) {
>>   +      this.results = results;
>>   +      this.more = more;
>>   +    }
>>   +  }
>>   +
>>   +  class Scanner {
>>   +
>>   +    private ScanOptions options;
>>   +    private Range range;
>>   +    private SortedKeyValueIterator<Key,Value> isolatedIter;
>>   +    private ScanDataSource isolatedDataSource;
>>   +    private boolean sawException = false;
>>   +    private boolean scanClosed = false;
>>   +
>>   +    Scanner(Range range, ScanOptions options) {
>>   +      this.range = range;
>>   +      this.options = options;
>>   +    }
>>   +
>>   +    synchronized ScanBatch read() throws IOException,
>> TabletClosedException {
>>   +
>>   +      if (sawException)
>>   +        throw new IllegalStateException("Tried to use scanner after
>> exception occurred.");
>>   +
>>   +      if (scanClosed)
>>   +        throw new IllegalStateException("Tried to use scanner after it
>> was closed.");
>>   +
>>   +      Batch results = null;
>>   +
>>   +      ScanDataSource dataSource;
>>   +
>>   +      if (options.isolated) {
>>   +        if (isolatedDataSource == null)
>>   +          isolatedDataSource = new ScanDataSource(options);
>>   +        dataSource = isolatedDataSource;
>>   +      } else {
>>   +        dataSource = new ScanDataSource(options);
>>   +      }
>>   +
>>   +      try {
>>   +
>>   +        SortedKeyValueIterator<Key,Value> iter;
>>   +
>>   +        if (options.isolated) {
>>   +          if (isolatedIter == null)
>>   +            isolatedIter = new SourceSwitchingIterator(dataSource, true);
>>   +          else
>>   +            isolatedDataSource.fileManager.reattach();
>>   +          iter = isolatedIter;
>>   +        } else {
>>   +          iter = new SourceSwitchingIterator(dataSource, false);
>>   +        }
>>   +
>>   +        results = nextBatch(iter, range, options.num, options.columnSet);
>>   +
>>   +        if (results.results == null) {
>>   +          range = null;
>>   +          return new ScanBatch(new ArrayList<Tablet.KVEntry>(), false);
>>   +        } else if (results.continueKey == null) {
>>   +          return new ScanBatch(results.results, false);
>>   +        } else {
>>   +          range = new Range(results.continueKey,
>> !results.skipContinueKey, range.getEndKey(), range.isEndKeyInclusive());
>>   +          return new ScanBatch(results.results, true);
>>   +        }
>>   +
>>   +      } catch (IterationInterruptedException iie) {
>>   +        sawException = true;
>>   +        if (isClosed())
>>   +          throw new TabletClosedException(iie);
>>   +        else
>>   +          throw iie;
>>   +      } catch (IOException ioe) {
>>   +        if (shutdownInProgress()) {
>>   +          log.debug("IOException while shutdown in progress ", ioe);
>>   +          throw new TabletClosedException(ioe); // assume IOException
>> was caused by execution of HDFS shutdown hook
>>   +        }
>>   +
>>   +        sawException = true;
>>   +        dataSource.close(true);
>>   +        throw ioe;
>>   +      } catch (RuntimeException re) {
>>   +        sawException = true;
>>   +        throw re;
>>   +      } finally {
>>   +        // code in finally block because always want
>>   +        // to return mapfiles, even when exception is thrown
>>   +        if (!options.isolated)
>>   +          dataSource.close(false);
>>   +        else if (dataSource.fileManager != null)
>>   +          dataSource.fileManager.detach();
>>   +
>>   +        synchronized (Tablet.this) {
>>   +          if (results != null && results.results != null) {
>>   +            long more = results.results.size();
>>   +            queryCount += more;
>>   +            queryBytes += results.numBytes;
>>   +          }
>>   +        }
>>   +      }
>>   +    }
>>   +
>>   +    // close and read are synchronized because can not call close on the
>> data source while it is in use
>>   +    // this cloud lead to the case where file iterators that are in use
>> by a thread are returned
>>   +    // to the pool... this would be bad
>>   +    void close() {
>>   +      options.interruptFlag.set(true);
>>   +      synchronized (this) {
>>   +        scanClosed = true;
>>   +        if (isolatedDataSource != null)
>>   +          isolatedDataSource.close(false);
>>   +      }
>>   +    }
>>   +  }
>>   +
>>   +  static class ScanOptions {
>>   +
>>   +    // scan options
>>   +    Authorizations authorizations;
>>   +    byte[] defaultLabels;
>>   +    Set<Column> columnSet;
>>   +    List<IterInfo> ssiList;
>>   +    Map<String,Map<String,String>> ssio;
>>   +    AtomicBoolean interruptFlag;
>>   +    int num;
>>   +    boolean isolated;
>>   +
>>   +    ScanOptions(int num, Authorizations authorizations, byte[]
>> defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList,
>>   +        Map<String,Map<String,String>> ssio, AtomicBoolean
>> interruptFlag, boolean isolated) {
>>   +      this.num = num;
>>   +      this.authorizations = authorizations;
>>   +      this.defaultLabels = defaultLabels;
>>   +      this.columnSet = columnSet;
>>   +      this.ssiList = ssiList;
>>   +      this.ssio = ssio;
>>   +      this.interruptFlag = interruptFlag;
>>   +      this.isolated = isolated;
>>   +    }
>>   +
>>   +  }
>>   +
>>   +  class ScanDataSource implements DataSource {
>>   +
>>   +    // data source state
>>   +    private ScanFileManager fileManager;
>>   +    private SortedKeyValueIterator<Key,Value> iter;
>>   +    private long expectedDeletionCount;
>>   +    private List<MemoryIterator> memIters = null;
>>   +    private long fileReservationId;
>>   +    private AtomicBoolean interruptFlag;
>>   +    private StatsIterator statsIterator;
>>   +
>>   +    ScanOptions options;
>>   +
>>   +    ScanDataSource(Authorizations authorizations, byte[] defaultLabels,
>> HashSet<Column> columnSet, List<IterInfo> ssiList,
>> Map<String,Map<String,String>> ssio,
>>   +        AtomicBoolean interruptFlag) {
>>   +      expectedDeletionCount = dataSourceDeletions.get();
>>   +      this.options = new ScanOptions(-1, authorizations, defaultLabels,
>> columnSet, ssiList, ssio, interruptFlag, false);
>>   +      this.interruptFlag = interruptFlag;
>>   +    }
>>   +
>>   +    ScanDataSource(ScanOptions options) {
>>   +      expectedDeletionCount = dataSourceDeletions.get();
>>   +      this.options = options;
>>   +      this.interruptFlag = options.interruptFlag;
>>   +    }
>>   +
>>   +    @Override
>>   +    public DataSource getNewDataSource() {
>>   +      if (!isCurrent()) {
>>   +        // log.debug("Switching data sources during a scan");
>>   +        if (memIters != null) {
>>   +          tabletMemory.returnIterators(memIters);
>>   +          memIters = null;
>>   +          datafileManager.returnFilesForScan(fileReservationId);
>>   +          fileReservationId = -1;
>>   +        }
>>   +
>>   +        if (fileManager != null)
>>   +          fileManager.releaseOpenFiles(false);
>>   +
>>   +        expectedDeletionCount = dataSourceDeletions.get();
>>   +        iter = null;
>>   +
>>   +        return this;
>>   +      } else
>>   +        return this;
>>   +    }
>>   +
>>   +    @Override
>>   +    public boolean isCurrent() {
>>   +      return expectedDeletionCount == dataSourceDeletions.get();
>>   +    }
>>   +
>>   +    @Override
>>   +    public SortedKeyValueIterator<Key,Value> iterator() throws
>> IOException {
>>   +      if (iter == null)
>>   +        iter = createIterator();
>>   +      return iter;
>>   +    }
>>   +
>>   +    private SortedKeyValueIterator<Key,Value> createIterator() throws
>> IOException {
>>   +
>>   +      Map<FileRef,DataFileValue> files;
>>   +
>>   +      synchronized (Tablet.this) {
>>   +
>>   +        if (memIters != null)
>>   +          throw new IllegalStateException("Tried to create new scan
>> iterator w/o releasing memory");
>>   +
>>   +        if (Tablet.this.closed)
>>   +          throw new TabletClosedException();
>>   +
>>   +        if (interruptFlag.get())
>>   +          throw new IterationInterruptedException(extent.toString() + "
>> " + interruptFlag.hashCode());
>>   +
>>   +        // only acquire the file manager when we know the tablet is open
>>   +        if (fileManager == null) {
>>   +          fileManager = tabletResources.newScanFileManager();
>>   +          activeScans.add(this);
>>   +        }
>>   +
>>   +        if (fileManager.getNumOpenFiles() != 0)
>>   +          throw new IllegalStateException("Tried to create new scan
>> iterator w/o releasing files");
>>   +
>>   +        // set this before trying to get iterators in case
>>   +        // getIterators() throws an exception
>>   +        expectedDeletionCount = dataSourceDeletions.get();
>>   +
>>   +        memIters = tabletMemory.getIterators();
>>   +        Pair<Long,Map<FileRef,DataFileValue>> reservation =
>> datafileManager.reserveFilesForScan();
>>   +        fileReservationId = reservation.getFirst();
>>   +        files = reservation.getSecond();
>>   +      }
>>   +
>>   +      Collection<InterruptibleIterator> mapfiles =
>> fileManager.openFiles(files, options.isolated);
>>   +
>>   +      List<SortedKeyValueIterator<Key,Value>> iters = new
>> ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size() +
>> memIters.size());
>>   +
>>   +      iters.addAll(mapfiles);
>>   +      iters.addAll(memIters);
>>   +
>>   +      for (SortedKeyValueIterator<Key,Value> skvi : iters)
>>   +        ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag);
>>   +
>>   +      MultiIterator multiIter = new MultiIterator(iters, extent);
>>   +
>>   +      TabletIteratorEnvironment iterEnv = new
>> TabletIteratorEnvironment(IteratorScope.scan, acuTableConf, fileManager,
>> files);
>>   +
>>   +      statsIterator = new StatsIterator(multiIter,
>> TabletServer.seekCount, scannedCount);
>>   +
>>   +      DeletingIterator delIter = new DeletingIterator(statsIterator,
>> false);
>>   +
>>   +      ColumnFamilySkippingIterator cfsi = new
>> ColumnFamilySkippingIterator(delIter);
>>   +
>>   +      ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi,
>> options.columnSet);
>>   +
>>   +      VisibilityFilter visFilter = new VisibilityFilter(colFilter,
>> options.authorizations, options.defaultLabels);
>>   +
>>   +      return iterEnv.getTopLevelIterator(IteratorUtil
>>   +          .loadIterators(IteratorScope.scan, visFilter, extent,
>> acuTableConf, options.ssiList, options.ssio, iterEnv));
>>   +    }
>>   +
>>   +    private void close(boolean sawErrors) {
>>   +
>>   +      if (memIters != null) {
>>   +        tabletMemory.returnIterators(memIters);
>>   +        memIters = null;
>>   +        datafileManager.returnFilesForScan(fileReservationId);
>>   +        fileReservationId = -1;
>>   +      }
>>   +
>>   +      synchronized (Tablet.this) {
>>   +        activeScans.remove(this);
>>   +        if (activeScans.size() == 0)
>>   +          Tablet.this.notifyAll();
>>   +      }
>>   +
>>   +      if (fileManager != null) {
>>   +        fileManager.releaseOpenFiles(sawErrors);
>>   +        fileManager = null;
>>   +      }
>>   +
>>   +      if (statsIterator != null) {
>>   +        statsIterator.report();
>>   +      }
>>   +
>>   +    }
>>   +
>>   +    public void interrupt() {
>>   +      interruptFlag.set(true);
>>   +    }
>>   +
>>   +    @Override
>>   +    public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
>>   +      throw new UnsupportedOperationException();
>>   +    }
>>   +
>>   +  }
>>   +
>>   +  private DataFileValue minorCompact(Configuration conf, VolumeManager
>> fs, InMemoryMap memTable, FileRef tmpDatafile, FileRef newDatafile, FileRef
>> mergeFile,
>>   +      boolean hasQueueTime, long queued, CommitSession commitSession,
>> long flushId, MinorCompactionReason mincReason) {
>>   +    boolean failed = false;
>>   +    long start = System.currentTimeMillis();
>>   +    timer.incrementStatusMinor();
>>   +
>>   +    long count = 0;
>>   +
>>   +    try {
>>   +      Span span = Trace.start("write");
>>   +      CompactionStats stats;
>>   +      try {
>>   +        count = memTable.getNumEntries();
>>   +
>>   +        DataFileValue dfv = null;
>>   +        if (mergeFile != null)
>>   +          dfv = datafileManager.getDatafileSizes().get(mergeFile);
>>   +
>>   +        MinorCompactor compactor = new MinorCompactor(conf, fs,
>> memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent, mincReason);
>>   +        stats = compactor.call();
>>   +      } finally {
>>   +        span.stop();
>>   +      }
>>   +      span = Trace.start("bringOnline");
>>   +      try {
>>   +        datafileManager.bringMinorCompactionOnline(tmpDatafile,
>> newDatafile, mergeFile, new DataFileValue(stats.getFileSize(),
>> stats.getEntriesWritten()),
>>   +            commitSession, flushId);
>>   +      } finally {
>>   +        span.stop();
>>   +      }
>>   +      return new DataFileValue(stats.getFileSize(),
>> stats.getEntriesWritten());
>>   +    } catch (Exception E) {
>>   +      failed = true;
>>   +      throw new RuntimeException(E);
>>   +    } catch (Error E) {
>>   +      // Weird errors like "OutOfMemoryError" when trying to create the
>> thread for the compaction
>>   +      failed = true;
>>   +      throw new RuntimeException(E);
>>   +    } finally {
>>   +      try {
>>   +        tabletMemory.finalizeMinC();
>>   +      } catch (Throwable t) {
>>   +        log.error("Failed to free tablet memory", t);
>>   +      }
>>   +
>>   +      if (!failed) {
>>   +        lastMinorCompactionFinishTime = System.currentTimeMillis();
>>   +      }
>>   +      if (tabletServer.mincMetrics.isEnabled())
>>   +        tabletServer.mincMetrics.add(TabletServerMinCMetrics.minc,
>> (lastMinorCompactionFinishTime - start));
>>   +      if (hasQueueTime) {
>>   +        timer.updateTime(Operation.MINOR, queued, start, count, failed);
>>   +        if (tabletServer.mincMetrics.isEnabled())
>>   +          tabletServer.mincMetrics.add(TabletServerMinCMetrics.queue,
>> (start - queued));
>>   +      } else
>>   +        timer.updateTime(Operation.MINOR, start, count, failed);
>>   +    }
>>   +  }
>>   +
>>   +  private class MinorCompactionTask implements Runnable {
>>   +
>>   +    private long queued;
>>   +    private CommitSession commitSession;
>>   +    private DataFileValue stats;
>>   +    private FileRef mergeFile;
>>   +    private long flushId;
>>   +    private MinorCompactionReason mincReason;
>>   +
>>   +    MinorCompactionTask(FileRef mergeFile, CommitSession commitSession,
>> long flushId, MinorCompactionReason mincReason) {
>>   +      queued = System.currentTimeMillis();
>>   +      minorCompactionWaitingToStart = true;
>>   +      this.commitSession = commitSession;
>>   +      this.mergeFile = mergeFile;
>>   +      this.flushId = flushId;
>>   +      this.mincReason = mincReason;
>>   +    }
>>   +
>>   +    @Override
>>   +    public void run() {
>>   +      minorCompactionWaitingToStart = false;
>>   +      minorCompactionInProgress = true;
>>   +      Span minorCompaction = Trace.on("minorCompaction");
>>   +      try {
>>   +        FileRef newMapfileLocation = getNextMapFilename(mergeFile ==
>> null ? "F" : "M");
>>   +        FileRef tmpFileRef = new FileRef(newMapfileLocation.path() +
>> "_tmp");
>>   +        Span span = Trace.start("waitForCommits");
>>   +        synchronized (Tablet.this) {
>>   +          commitSession.waitForCommitsToFinish();
>>   +        }
>>   +        span.stop();
>>   +        span = Trace.start("start");
>>   +        while (true) {
>>   +          try {
>>   +            // the purpose of the minor compaction start event is to
>> keep track of the filename... in the case
>>   +            // where the metadata table write for the minor compaction
>> finishes and the process dies before
>>   +            // writing the minor compaction finish event, then the start
>> event+filename in metadata table will
>>   +            // prevent recovery of duplicate data... the minor
>> compaction start event could be written at any time
>>   +            // before the metadata write for the minor compaction
>>   +            tabletServer.minorCompactionStarted(commitSession,
>> commitSession.getWALogSeq() + 1, newMapfileLocation.path().toString());
>>   +            break;
>>   +          } catch (IOException e) {
>>   +            log.warn("Failed to write to write ahead log " +
>> e.getMessage(), e);
>>   +          }
>>   +        }
>>   +        span.stop();
>>   +        span = Trace.start("compact");
>>   +        this.stats = minorCompact(conf, fs,
>> tabletMemory.getMinCMemTable(), tmpFileRef, newMapfileLocation, mergeFile,
>> true, queued, commitSession, flushId,
>>   +            mincReason);
>>   +        span.stop();
>>   +
>>   +        if (needsSplit()) {
>>   +          tabletServer.executeSplit(Tablet.this);
>>   +        } else {
>>   +          initiateMajorCompaction(MajorCompactionReason.NORMAL);
>>   +        }
>>   +      } catch (Throwable t) {
>>   +        log.error("Unknown error during minor compaction for extent: " +
>> getExtent(), t);
>>   +        throw new RuntimeException(t);
>>   +      } finally {
>>   +        minorCompactionInProgress = false;
>>   +        minorCompaction.data("extent", extent.toString());
>>   +        minorCompaction.data("numEntries",
>> Long.toString(this.stats.getNumEntries()));
>>   +        minorCompaction.data("size",
>> Long.toString(this.stats.getSize()));
>>   +        minorCompaction.stop();
>>   +      }
>>   +    }
>>   +  }
>>   +
>>   +  private synchronized MinorCompactionTask prepareForMinC(long flushId,
>> MinorCompactionReason mincReason) {
>>   +    CommitSession oldCommitSession = tabletMemory.prepareForMinC();
>>   +    otherLogs = currentLogs;
>>   +    currentLogs = new HashSet<DfsLogger>();
>>   +
>>   +    FileRef mergeFile =
>> datafileManager.reserveMergingMinorCompactionFile();
>>   +
>>   +    return new MinorCompactionTask(mergeFile, oldCommitSession, flushId,
>> mincReason);
>>   +
>>   +  }
>>   +
>>   +  void flush(long tableFlushID) {
>>   +    boolean updateMetadata = false;
>>   +    boolean initiateMinor = false;
>>   +
>>   +    try {
>>   +
>>   +      synchronized (this) {
>>   +
>>   +        // only want one thing at a time to update flush ID to ensure
>> that metadata table and tablet in memory state are consistent
>>   +        if (updatingFlushID)
>>   +          return;
>>   +
>>   +        if (lastFlushID >= tableFlushID)
>>   +          return;
>>   +
>>   +        if (closing || closed || tabletMemory.memoryReservedForMinC())
>>   +          return;
>>   +
>>   +        if (tabletMemory.getMemTable().getNumEntries() == 0) {
>>   +          lastFlushID = tableFlushID;
>>   +          updatingFlushID = true;
>>   +          updateMetadata = true;
>>   +        } else
>>   +          initiateMinor = true;
>>   +      }
>>   +
>>   +      if (updateMetadata) {
>>   +        Credentials creds = SystemCredentials.get();
>>   +        // if multiple threads were allowed to update this outside of a
>> sync block, then it would be
>>   +        // a race condition
>>   +        MetadataTableUtil.updateTabletFlushID(extent, tableFlushID,
>> creds, tabletServer.getLock());
>>   +      } else if (initiateMinor)
>>   +        initiateMinorCompaction(tableFlushID,
>> MinorCompactionReason.USER);
>>   +
>>   +    } finally {
>>   +      if (updateMetadata) {
>>   +        synchronized (this) {
>>   +          updatingFlushID = false;
>>   +          this.notifyAll();
>>   +        }
>>   +      }
>>   +    }
>>   +
>>   +  }
>>   +
>>   +  boolean initiateMinorCompaction(MinorCompactionReason mincReason) {
>>   +    if (isClosed()) {
>>   +      // don't bother trying to get flush id if closed... could be
>> closed after this check but that is ok... just trying to cut down on
>> uneeded log messages....
>>   +      return false;
>>   +    }
>>   +
>>   +    // get the flush id before the new memmap is made available for write
>>   +    long flushId;
>>   +    try {
>>   +      flushId = getFlushID();
>>   +    } catch (NoNodeException e) {
>>   +      log.info("Asked to initiate MinC when there was no flush id " +
>> getExtent() + " " + e.getMessage());
>>   +      return false;
>>   +    }
>>   +    return initiateMinorCompaction(flushId, mincReason);
>>   +  }
>>   +
>>   +  boolean minorCompactNow(MinorCompactionReason mincReason) {
>>   +    long flushId;
>>   +    try {
>>   +      flushId = getFlushID();
>>   +    } catch (NoNodeException e) {
>>   +      log.info("Asked to initiate MinC when there was no flush id " +
>> getExtent() + " " + e.getMessage());
>>   +      return false;
>>   +    }
>>   +    MinorCompactionTask mct = createMinorCompactionTask(flushId,
>> mincReason);
>>   +    if (mct == null)
>>   +      return false;
>>   +    mct.run();
>>   +    return true;
>>   +  }
>>   +
>>   +  boolean initiateMinorCompaction(long flushId, MinorCompactionReason
>> mincReason) {
>>   +    MinorCompactionTask mct = createMinorCompactionTask(flushId,
>> mincReason);
>>   +    if (mct == null)
>>   +      return false;
>>   +    tabletResources.executeMinorCompaction(mct);
>>   +    return true;
>>   +  }
>>   +
>>   +  private MinorCompactionTask createMinorCompactionTask(long flushId,
>> MinorCompactionReason mincReason) {
>>   +    MinorCompactionTask mct;
>>   +    long t1, t2;
>>   +
>>   +    StringBuilder logMessage = null;
>>   +
>>   +    try {
>>   +      synchronized (this) {
>>   +        t1 = System.currentTimeMillis();
>>   +
>>   +        if (closing || closed || majorCompactionWaitingToStart ||
>> tabletMemory.memoryReservedForMinC() ||
>> tabletMemory.getMemTable().getNumEntries() == 0
>>   +            || updatingFlushID) {
>>   +
>>   +          logMessage = new StringBuilder();
>>   +
>>   +          logMessage.append(extent.toString());
>>   +          logMessage.append(" closing " + closing);
>>   +          logMessage.append(" closed " + closed);
>>   +          logMessage.append(" majorCompactionWaitingToStart " +
>> majorCompactionWaitingToStart);
>>   +          if (tabletMemory != null)
>>   +            logMessage.append(" tabletMemory.memoryReservedForMinC() " +
>> tabletMemory.memoryReservedForMinC());
>>   +          if (tabletMemory != null && tabletMemory.getMemTable() != null)
>>   +            logMessage.append(
>>
>> <TRUNCATED>
>>
>

Mime
View raw message