accumulo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Newton <eric.new...@gmail.com>
Subject Re: [11/15] ACCUMULO-1940 do not expose a new tablet to the memory manager until after it is online
Date Sat, 30 Nov 2013 01:31:04 GMT
I changed one line of this file... git seems to be having a conniption.  I
find the volume of git traffic to be so useless that I ignore it.

Anyone else?




On Fri, Nov 29, 2013 at 1:24 PM, <ecn@apache.org> wrote:

>
> http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b6b9cf1/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
> ----------------------------------------------------------------------
> diff --cc
> server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
> index ee3b243,0000000..fd76415
> mode 100644,000000..100644
> --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
> +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
> @@@ -1,3868 -1,0 +1,3866 @@@
>  +/*
>  + * Licensed to the Apache Software Foundation (ASF) under one or more
>  + * contributor license agreements.  See the NOTICE file distributed with
>  + * this work for additional information regarding copyright ownership.
>  + * The ASF licenses this file to You under the Apache License, Version
> 2.0
>  + * (the "License"); you may not use this file except in compliance with
>  + * the License.  You may obtain a copy of the License at
>  + *
>  + *     http://www.apache.org/licenses/LICENSE-2.0
>  + *
>  + * Unless required by applicable law or agreed to in writing, software
>  + * distributed under the License is distributed on an "AS IS" BASIS,
>  + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
>  + * See the License for the specific language governing permissions and
>  + * limitations under the License.
>  + */
>  +package org.apache.accumulo.tserver;
>  +
>  +import java.io.ByteArrayInputStream;
>  +import java.io.DataInputStream;
>  +import java.io.FileNotFoundException;
>  +import java.io.IOException;
>  +import java.util.ArrayList;
>  +import java.util.Arrays;
>  +import java.util.Collection;
>  +import java.util.Collections;
>  +import java.util.Comparator;
>  +import java.util.EnumSet;
>  +import java.util.HashMap;
>  +import java.util.HashSet;
>  +import java.util.Iterator;
>  +import java.util.List;
>  +import java.util.Map;
>  +import java.util.Map.Entry;
>  +import java.util.PriorityQueue;
>  +import java.util.Set;
>  +import java.util.SortedMap;
>  +import java.util.TreeMap;
>  +import java.util.TreeSet;
>  +import java.util.concurrent.atomic.AtomicBoolean;
>  +import java.util.concurrent.atomic.AtomicLong;
>  +import java.util.concurrent.atomic.AtomicReference;
>  +import java.util.concurrent.locks.ReentrantLock;
>  +
>  +import org.apache.accumulo.core.Constants;
>  +import org.apache.accumulo.core.client.Connector;
>  +import org.apache.accumulo.core.client.IteratorSetting;
>  +import org.apache.accumulo.core.client.impl.ScannerImpl;
>  +import org.apache.accumulo.core.conf.AccumuloConfiguration;
>  +import org.apache.accumulo.core.conf.ConfigurationCopy;
>  +import org.apache.accumulo.core.conf.ConfigurationObserver;
>  +import org.apache.accumulo.core.conf.Property;
>  +import org.apache.accumulo.core.constraints.Violations;
>  +import org.apache.accumulo.core.data.ByteSequence;
>  +import org.apache.accumulo.core.data.Column;
>  +import org.apache.accumulo.core.data.ColumnUpdate;
>  +import org.apache.accumulo.core.data.Key;
>  +import org.apache.accumulo.core.data.KeyExtent;
>  +import org.apache.accumulo.core.data.KeyValue;
>  +import org.apache.accumulo.core.data.Mutation;
>  +import org.apache.accumulo.core.data.Range;
>  +import org.apache.accumulo.core.data.Value;
>  +import org.apache.accumulo.core.data.thrift.IterInfo;
>  +import org.apache.accumulo.core.data.thrift.MapFileInfo;
>  +import org.apache.accumulo.core.file.FileOperations;
>  +import org.apache.accumulo.core.file.FileSKVIterator;
>  +import org.apache.accumulo.core.iterators.IterationInterruptedException;
>  +import org.apache.accumulo.core.iterators.IteratorEnvironment;
>  +import org.apache.accumulo.core.iterators.IteratorUtil;
>  +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
>  +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
>  +import
> org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
>  +import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
>  +import org.apache.accumulo.core.iterators.system.DeletingIterator;
>  +import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
>  +import org.apache.accumulo.core.iterators.system.MultiIterator;
>  +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
>  +import
> org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
>  +import org.apache.accumulo.core.iterators.system.StatsIterator;
>  +import org.apache.accumulo.core.iterators.system.VisibilityFilter;
>  +import org.apache.accumulo.core.master.thrift.TabletLoadState;
>  +import org.apache.accumulo.core.metadata.MetadataTable;
>  +import org.apache.accumulo.core.metadata.RootTable;
>  +import org.apache.accumulo.core.metadata.schema.DataFileValue;
>  +import
> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
>  +import
> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
>  +import
> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
>  +import
> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
>  +import org.apache.accumulo.core.security.Authorizations;
>  +import org.apache.accumulo.core.security.ColumnVisibility;
>  +import org.apache.accumulo.core.security.Credentials;
>  +import org.apache.accumulo.core.tabletserver.log.LogEntry;
>  +import org.apache.accumulo.core.util.CachedConfiguration;
>  +import org.apache.accumulo.core.util.LocalityGroupUtil;
>  +import
> org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
>  +import org.apache.accumulo.core.util.MapCounter;
>  +import org.apache.accumulo.core.util.Pair;
>  +import org.apache.accumulo.core.util.UtilWaitThread;
>  +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
>  +import org.apache.accumulo.server.ServerConstants;
>  +import org.apache.accumulo.server.client.HdfsZooInstance;
>  +import org.apache.accumulo.server.conf.TableConfiguration;
>  +import org.apache.accumulo.server.fs.FileRef;
>  +import org.apache.accumulo.server.fs.VolumeManager;
>  +import org.apache.accumulo.server.fs.VolumeManager.FileType;
>  +import org.apache.accumulo.server.fs.VolumeManagerImpl;
>  +import org.apache.accumulo.server.master.state.TServerInstance;
>  +import org.apache.accumulo.server.master.tableOps.CompactionIterators;
>  +import org.apache.accumulo.server.problems.ProblemReport;
>  +import org.apache.accumulo.server.problems.ProblemReports;
>  +import org.apache.accumulo.server.problems.ProblemType;
>  +import org.apache.accumulo.server.security.SystemCredentials;
>  +import org.apache.accumulo.server.tablets.TabletTime;
>  +import org.apache.accumulo.server.tablets.UniqueNameAllocator;
>  +import org.apache.accumulo.server.util.FileUtil;
>  +import org.apache.accumulo.server.util.MasterMetadataUtil;
>  +import org.apache.accumulo.server.util.MetadataTableUtil;
>  +import org.apache.accumulo.server.util.TabletOperations;
>  +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
>  +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
>  +import org.apache.accumulo.trace.instrument.Span;
>  +import org.apache.accumulo.trace.instrument.Trace;
>  +import org.apache.accumulo.tserver.Compactor.CompactionCanceledException;
>  +import org.apache.accumulo.tserver.Compactor.CompactionEnv;
>  +import org.apache.accumulo.tserver.FileManager.ScanFileManager;
>  +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
>  +import org.apache.accumulo.tserver.TabletServer.TservConstraintEnv;
>  +import
> org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
>  +import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
>  +import org.apache.accumulo.tserver.compaction.CompactionPlan;
>  +import org.apache.accumulo.tserver.compaction.CompactionStrategy;
>  +import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
>  +import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
>  +import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
>  +import org.apache.accumulo.tserver.compaction.WriteParameters;
>  +import org.apache.accumulo.tserver.constraints.ConstraintChecker;
>  +import org.apache.accumulo.tserver.log.DfsLogger;
>  +import org.apache.accumulo.tserver.log.MutationReceiver;
>  +import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
>  +import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
>  +import org.apache.commons.codec.DecoderException;
>  +import org.apache.commons.codec.binary.Hex;
>  +import org.apache.hadoop.conf.Configuration;
>  +import org.apache.hadoop.fs.FileStatus;
>  +import org.apache.hadoop.fs.FileSystem;
>  +import org.apache.hadoop.fs.Path;
>  +import org.apache.hadoop.io.Text;
>  +import org.apache.log4j.Logger;
>  +import org.apache.zookeeper.KeeperException;
>  +import org.apache.zookeeper.KeeperException.NoNodeException;
>  +
>  +/*
>  + * We need to be able to have the master tell a tabletServer to
>  + * close this file, and the tablet server to handle all pending client
> reads
>  + * before closing
>  + *
>  + */
>  +
>  +/**
>  + *
>  + * this class just provides an interface to read from a MapFile mostly
> takes care of reporting start and end keys
>  + *
>  + * need this because a single row extent can have multiple columns this
> manages all the columns (each handled by a store) for a single row-extent
>  + *
>  + *
>  + */
>  +
>  +public class Tablet {
>  +
>  +  enum MinorCompactionReason {
>  +    USER, SYSTEM, CLOSE
>  +  }
>  +
>  +  public class CommitSession {
>  +
>  +    private int seq;
>  +    private InMemoryMap memTable;
>  +    private int commitsInProgress;
>  +    private long maxCommittedTime = Long.MIN_VALUE;
>  +
>  +    private CommitSession(int seq, InMemoryMap imm) {
>  +      this.seq = seq;
>  +      this.memTable = imm;
>  +      commitsInProgress = 0;
>  +    }
>  +
>  +    public int getWALogSeq() {
>  +      return seq;
>  +    }
>  +
>  +    private void decrementCommitsInProgress() {
>  +      if (commitsInProgress < 1)
>  +        throw new IllegalStateException("commitsInProgress = " +
> commitsInProgress);
>  +
>  +      commitsInProgress--;
>  +      if (commitsInProgress == 0)
>  +        Tablet.this.notifyAll();
>  +    }
>  +
>  +    private void incrementCommitsInProgress() {
>  +      if (commitsInProgress < 0)
>  +        throw new IllegalStateException("commitsInProgress = " +
> commitsInProgress);
>  +
>  +      commitsInProgress++;
>  +    }
>  +
>  +    private void waitForCommitsToFinish() {
>  +      while (commitsInProgress > 0) {
>  +        try {
>  +          Tablet.this.wait(50);
>  +        } catch (InterruptedException e) {
>  +          log.warn(e, e);
>  +        }
>  +      }
>  +    }
>  +
>  +    public void abortCommit(List<Mutation> value) {
>  +      Tablet.this.abortCommit(this, value);
>  +    }
>  +
>  +    public void commit(List<Mutation> mutations) {
>  +      Tablet.this.commit(this, mutations);
>  +    }
>  +
>  +    public Tablet getTablet() {
>  +      return Tablet.this;
>  +    }
>  +
>  +    public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy,
> boolean mincFinish) {
>  +      return Tablet.this.beginUpdatingLogsUsed(memTable, copy,
> mincFinish);
>  +    }
>  +
>  +    public void finishUpdatingLogsUsed() {
>  +      Tablet.this.finishUpdatingLogsUsed();
>  +    }
>  +
>  +    public int getLogId() {
>  +      return logId;
>  +    }
>  +
>  +    public KeyExtent getExtent() {
>  +      return extent;
>  +    }
>  +
>  +    private void updateMaxCommittedTime(long time) {
>  +      maxCommittedTime = Math.max(time, maxCommittedTime);
>  +    }
>  +
>  +    private long getMaxCommittedTime() {
>  +      if (maxCommittedTime == Long.MIN_VALUE)
>  +        throw new IllegalStateException("Tried to read max committed
> time when it was never set");
>  +      return maxCommittedTime;
>  +    }
>  +
>  +  }
>  +
>  +  private class TabletMemory {
>  +    private InMemoryMap memTable;
>  +    private InMemoryMap otherMemTable;
>  +    private InMemoryMap deletingMemTable;
>  +    private int nextSeq = 1;
>  +    private CommitSession commitSession;
>  +
>  +    TabletMemory() {
>  +      try {
>  +        memTable = new InMemoryMap(acuTableConf);
>  +      } catch (LocalityGroupConfigurationError e) {
>  +        throw new RuntimeException(e);
>  +      }
>  +      commitSession = new CommitSession(nextSeq, memTable);
>  +      nextSeq += 2;
>  +    }
>  +
>  +    InMemoryMap getMemTable() {
>  +      return memTable;
>  +    }
>  +
>  +    InMemoryMap getMinCMemTable() {
>  +      return otherMemTable;
>  +    }
>  +
>  +    CommitSession prepareForMinC() {
>  +      if (otherMemTable != null) {
>  +        throw new IllegalStateException();
>  +      }
>  +
>  +      if (deletingMemTable != null) {
>  +        throw new IllegalStateException();
>  +      }
>  +
>  +      otherMemTable = memTable;
>  +      try {
>  +        memTable = new InMemoryMap(acuTableConf);
>  +      } catch (LocalityGroupConfigurationError e) {
>  +        throw new RuntimeException(e);
>  +      }
>  +
>  +      CommitSession oldCommitSession = commitSession;
>  +      commitSession = new CommitSession(nextSeq, memTable);
>  +      nextSeq += 2;
>  +
>  +
>  tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(),
> otherMemTable.estimatedSizeInBytes());
>  +
>  +      return oldCommitSession;
>  +    }
>  +
>  +    void finishedMinC() {
>  +
>  +      if (otherMemTable == null) {
>  +        throw new IllegalStateException();
>  +      }
>  +
>  +      if (deletingMemTable != null) {
>  +        throw new IllegalStateException();
>  +      }
>  +
>  +      deletingMemTable = otherMemTable;
>  +
>  +      otherMemTable = null;
>  +      Tablet.this.notifyAll();
>  +    }
>  +
>  +    void finalizeMinC() {
>  +      try {
>  +        deletingMemTable.delete(15000);
>  +      } finally {
>  +        synchronized (Tablet.this) {
>  +          if (otherMemTable != null) {
>  +            throw new IllegalStateException();
>  +          }
>  +
>  +          if (deletingMemTable == null) {
>  +            throw new IllegalStateException();
>  +          }
>  +
>  +          deletingMemTable = null;
>  +
>  +
>  tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0);
>  +        }
>  +      }
>  +    }
>  +
>  +    boolean memoryReservedForMinC() {
>  +      return otherMemTable != null || deletingMemTable != null;
>  +    }
>  +
>  +    void waitForMinC() {
>  +      while (otherMemTable != null || deletingMemTable != null) {
>  +        try {
>  +          Tablet.this.wait(50);
>  +        } catch (InterruptedException e) {
>  +          log.warn(e, e);
>  +        }
>  +      }
>  +    }
>  +
>  +    void mutate(CommitSession cm, List<Mutation> mutations) {
>  +      cm.memTable.mutate(mutations);
>  +    }
>  +
>  +    void updateMemoryUsageStats() {
>  +      long other = 0;
>  +      if (otherMemTable != null)
>  +        other = otherMemTable.estimatedSizeInBytes();
>  +      else if (deletingMemTable != null)
>  +        other = deletingMemTable.estimatedSizeInBytes();
>  +
>  +
>  tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(),
> other);
>  +    }
>  +
>  +    List<MemoryIterator> getIterators() {
>  +      List<MemoryIterator> toReturn = new ArrayList<MemoryIterator>(2);
>  +      toReturn.add(memTable.skvIterator());
>  +      if (otherMemTable != null)
>  +        toReturn.add(otherMemTable.skvIterator());
>  +      return toReturn;
>  +    }
>  +
>  +    void returnIterators(List<MemoryIterator> iters) {
>  +      for (MemoryIterator iter : iters) {
>  +        iter.close();
>  +      }
>  +    }
>  +
>  +    public long getNumEntries() {
>  +      if (otherMemTable != null)
>  +        return memTable.getNumEntries() + otherMemTable.getNumEntries();
>  +      return memTable.getNumEntries();
>  +    }
>  +
>  +    CommitSession getCommitSession() {
>  +      return commitSession;
>  +    }
>  +  }
>  +
>  +  private TabletMemory tabletMemory;
>  +
>  +  private final TabletTime tabletTime;
>  +  private long persistedTime;
>  +  private final Object timeLock = new Object();
>  +
>  +  private final Path location; // absolute path of this tablets dir
>  +  private TServerInstance lastLocation;
>  +
>  +  private Configuration conf;
>  +  private VolumeManager fs;
>  +
>  +  private TableConfiguration acuTableConf;
>  +
>  +  private volatile boolean tableDirChecked = false;
>  +
>  +  private AtomicLong dataSourceDeletions = new AtomicLong(0);
>  +  private Set<ScanDataSource> activeScans = new
> HashSet<ScanDataSource>();
>  +
>  +  private volatile boolean closing = false;
>  +  private boolean closed = false;
>  +  private boolean closeComplete = false;
>  +
>  +  private long lastFlushID = -1;
>  +  private long lastCompactID = -1;
>  +
>  +  private KeyExtent extent;
>  +
>  +  private TabletResourceManager tabletResources;
>  +  final private DatafileManager datafileManager;
>  +  private volatile boolean majorCompactionInProgress = false;
>  +  private volatile boolean majorCompactionWaitingToStart = false;
>  +  private Set<MajorCompactionReason> majorCompactionQueued =
> Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class));
>  +  private volatile boolean minorCompactionInProgress = false;
>  +  private volatile boolean minorCompactionWaitingToStart = false;
>  +
>  +  private boolean updatingFlushID = false;
>  +
>  +  private AtomicReference<ConstraintChecker> constraintChecker = new
> AtomicReference<ConstraintChecker>();
>  +
>  +  private final String tabletDirectory;
>  +
>  +  private int writesInProgress = 0;
>  +
>  +  private static final Logger log = Logger.getLogger(Tablet.class);
>  +  public TabletStatsKeeper timer;
>  +
>  +  private Rate queryRate = new Rate(0.2);
>  +  private long queryCount = 0;
>  +
>  +  private Rate queryByteRate = new Rate(0.2);
>  +  private long queryBytes = 0;
>  +
>  +  private Rate ingestRate = new Rate(0.2);
>  +  private long ingestCount = 0;
>  +
>  +  private Rate ingestByteRate = new Rate(0.2);
>  +  private long ingestBytes = 0;
>  +
>  +  private byte[] defaultSecurityLabel = new byte[0];
>  +
>  +  private long lastMinorCompactionFinishTime;
>  +  private long lastMapFileImportTime;
>  +
>  +  private volatile long numEntries;
>  +  private volatile long numEntriesInMemory;
>  +
>  +  // a count of the amount of data read by the iterators
>  +  private AtomicLong scannedCount = new AtomicLong(0);
>  +  private Rate scannedRate = new Rate(0.2);
>  +
>  +  private ConfigurationObserver configObserver;
>  +
>  +  private TabletServer tabletServer;
>  +
>  +  private final int logId;
>  +  // ensure we only have one reader/writer of our bulk file notes at at
> time
>  +  public final Object bulkFileImportLock = new Object();
>  +
>  +  public int getLogId() {
>  +    return logId;
>  +  }
>  +
>  +  public static class TabletClosedException extends RuntimeException {
>  +    public TabletClosedException(Exception e) {
>  +      super(e);
>  +    }
>  +
>  +    public TabletClosedException() {
>  +      super();
>  +    }
>  +
>  +    private static final long serialVersionUID = 1L;
>  +  }
>  +
>  +  FileRef getNextMapFilename(String prefix) throws IOException {
>  +    String extension =
> FileOperations.getNewFileExtension(tabletServer.getTableConfiguration(extent));
>  +    checkTabletDir();
>  +    return new FileRef(location.toString() + "/" + prefix +
> UniqueNameAllocator.getInstance().getNextName() + "." + extension);
>  +  }
>  +
>  +  private void checkTabletDir() throws IOException {
>  +    if (!tableDirChecked) {
>  +      checkTabletDir(this.location);
>  +      tableDirChecked = true;
>  +    }
>  +  }
>  +
>  +  private void checkTabletDir(Path tabletDir) throws IOException {
>  +
>  +    FileStatus[] files = null;
>  +    try {
>  +      files = fs.listStatus(tabletDir);
>  +    } catch (FileNotFoundException ex) {
>  +      // ignored
>  +    }
>  +
>  +    if (files == null) {
>  +      if (tabletDir.getName().startsWith("c-"))
>  +        log.debug("Tablet " + extent + " had no dir, creating " +
> tabletDir); // its a clone dir...
>  +      else
>  +        log.warn("Tablet " + extent + " had no dir, creating " +
> tabletDir);
>  +
>  +      fs.mkdirs(tabletDir);
>  +    }
>  +  }
>  +
>  +  class DatafileManager {
>  +    // access to datafilesizes needs to be synchronized: see
> CompactionRunner#getNumFiles
>  +    final private Map<FileRef,DataFileValue> datafileSizes =
> Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>());
>  +
>  +    DatafileManager(SortedMap<FileRef,DataFileValue> datafileSizes) {
>  +      for (Entry<FileRef,DataFileValue> datafiles :
> datafileSizes.entrySet())
>  +        this.datafileSizes.put(datafiles.getKey(), datafiles.getValue());
>  +    }
>  +
>  +    FileRef mergingMinorCompactionFile = null;
>  +    Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>();
>  +    Map<Long,Set<FileRef>> scanFileReservations = new
> HashMap<Long,Set<FileRef>>();
>  +    MapCounter<FileRef> fileScanReferenceCounts = new
> MapCounter<FileRef>();
>  +    long nextScanReservationId = 0;
>  +    boolean reservationsBlocked = false;
>  +
>  +    Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
>  +
>  +    Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() {
>  +      synchronized (Tablet.this) {
>  +
>  +        while (reservationsBlocked) {
>  +          try {
>  +            Tablet.this.wait(50);
>  +          } catch (InterruptedException e) {
>  +            log.warn(e, e);
>  +          }
>  +        }
>  +
>  +        Set<FileRef> absFilePaths = new
> HashSet<FileRef>(datafileSizes.keySet());
>  +
>  +        long rid = nextScanReservationId++;
>  +
>  +        scanFileReservations.put(rid, absFilePaths);
>  +
>  +        Map<FileRef,DataFileValue> ret = new
> HashMap<FileRef,DataFileValue>();
>  +
>  +        for (FileRef path : absFilePaths) {
>  +          fileScanReferenceCounts.increment(path, 1);
>  +          ret.put(path, datafileSizes.get(path));
>  +        }
>  +
>  +        return new Pair<Long,Map<FileRef,DataFileValue>>(rid, ret);
>  +      }
>  +    }
>  +
>  +    void returnFilesForScan(Long reservationId) {
>  +
>  +      final Set<FileRef> filesToDelete = new HashSet<FileRef>();
>  +
>  +      synchronized (Tablet.this) {
>  +        Set<FileRef> absFilePaths =
> scanFileReservations.remove(reservationId);
>  +
>  +        if (absFilePaths == null)
>  +          throw new IllegalArgumentException("Unknown scan reservation
> id " + reservationId);
>  +
>  +        boolean notify = false;
>  +        for (FileRef path : absFilePaths) {
>  +          long refCount = fileScanReferenceCounts.decrement(path, 1);
>  +          if (refCount == 0) {
>  +            if (filesToDeleteAfterScan.remove(path))
>  +              filesToDelete.add(path);
>  +            notify = true;
>  +          } else if (refCount < 0)
>  +            throw new IllegalStateException("Scan ref count for " + path
> + " is " + refCount);
>  +        }
>  +
>  +        if (notify)
>  +          Tablet.this.notifyAll();
>  +      }
>  +
>  +      if (filesToDelete.size() > 0) {
>  +        log.debug("Removing scan refs from metadata " + extent + " " +
> filesToDelete);
>  +        MetadataTableUtil.removeScanFiles(extent, filesToDelete,
> SystemCredentials.get(), tabletServer.getLock());
>  +      }
>  +    }
>  +
>  +    private void removeFilesAfterScan(Set<FileRef> scanFiles) {
>  +      if (scanFiles.size() == 0)
>  +        return;
>  +
>  +      Set<FileRef> filesToDelete = new HashSet<FileRef>();
>  +
>  +      synchronized (Tablet.this) {
>  +        for (FileRef path : scanFiles) {
>  +          if (fileScanReferenceCounts.get(path) == 0)
>  +            filesToDelete.add(path);
>  +          else
>  +            filesToDeleteAfterScan.add(path);
>  +        }
>  +      }
>  +
>  +      if (filesToDelete.size() > 0) {
>  +        log.debug("Removing scan refs from metadata " + extent + " " +
> filesToDelete);
>  +        MetadataTableUtil.removeScanFiles(extent, filesToDelete,
> SystemCredentials.get(), tabletServer.getLock());
>  +      }
>  +    }
>  +
>  +    private TreeSet<FileRef> waitForScansToFinish(Set<FileRef>
> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
>  +      long startTime = System.currentTimeMillis();
>  +      TreeSet<FileRef> inUse = new TreeSet<FileRef>();
>  +
>  +      Span waitForScans = Trace.start("waitForScans");
>  +      try {
>  +        synchronized (Tablet.this) {
>  +          if (blockNewScans) {
>  +            if (reservationsBlocked)
>  +              throw new IllegalStateException();
>  +
>  +            reservationsBlocked = true;
>  +          }
>  +
>  +          for (FileRef path : pathsToWaitFor) {
>  +            while (fileScanReferenceCounts.get(path) > 0 &&
> System.currentTimeMillis() - startTime < maxWaitTime) {
>  +              try {
>  +                Tablet.this.wait(100);
>  +              } catch (InterruptedException e) {
>  +                log.warn(e, e);
>  +              }
>  +            }
>  +          }
>  +
>  +          for (FileRef path : pathsToWaitFor) {
>  +            if (fileScanReferenceCounts.get(path) > 0)
>  +              inUse.add(path);
>  +          }
>  +
>  +          if (blockNewScans) {
>  +            reservationsBlocked = false;
>  +            Tablet.this.notifyAll();
>  +          }
>  +
>  +        }
>  +      } finally {
>  +        waitForScans.stop();
>  +      }
>  +      return inUse;
>  +    }
>  +
>  +    public void importMapFiles(long tid, Map<FileRef,DataFileValue>
> pathsString, boolean setTime) throws IOException {
>  +
>  +      String bulkDir = null;
>  +
>  +      Map<FileRef,DataFileValue> paths = new
> HashMap<FileRef,DataFileValue>();
>  +      for (Entry<FileRef,DataFileValue> entry : pathsString.entrySet())
>  +        paths.put(entry.getKey(), entry.getValue());
>  +
>  +      for (FileRef tpath : paths.keySet()) {
>  +
>  +        boolean inTheRightDirectory = false;
>  +        Path parent = tpath.path().getParent().getParent();
>  +        for (String tablesDir : ServerConstants.getTablesDirs()) {
>  +          if (parent.equals(new Path(tablesDir,
> extent.getTableId().toString()))) {
>  +            inTheRightDirectory = true;
>  +            break;
>  +          }
>  +        }
>  +        if (!inTheRightDirectory) {
>  +          throw new IOException("Data file " + tpath + " not in table
> dirs");
>  +        }
>  +
>  +        if (bulkDir == null)
>  +          bulkDir = tpath.path().getParent().toString();
>  +        else if (!bulkDir.equals(tpath.path().getParent().toString()))
>  +          throw new IllegalArgumentException("bulk files in different
> dirs " + bulkDir + " " + tpath);
>  +
>  +      }
>  +
>  +      if (extent.isRootTablet()) {
>  +        throw new IllegalArgumentException("Can not import files to root
> tablet");
>  +      }
>  +
>  +      synchronized (bulkFileImportLock) {
>  +        Credentials creds = SystemCredentials.get();
>  +        Connector conn;
>  +        try {
>  +          conn =
> HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(),
> creds.getToken());
>  +        } catch (Exception ex) {
>  +          throw new IOException(ex);
>  +        }
>  +        // Remove any bulk files we've previously loaded and compacted
> away
>  +        List<FileRef> files = MetadataTableUtil.getBulkFilesLoaded(conn,
> extent, tid);
>  +
>  +        for (FileRef file : files)
>  +          if (paths.keySet().remove(file.path()))
>  +            log.debug("Ignoring request to re-import a file already
> imported: " + extent + ": " + file);
>  +
>  +        if (paths.size() > 0) {
>  +          long bulkTime = Long.MIN_VALUE;
>  +          if (setTime) {
>  +            for (DataFileValue dfv : paths.values()) {
>  +              long nextTime = tabletTime.getAndUpdateTime();
>  +              if (nextTime < bulkTime)
>  +                throw new IllegalStateException("Time went backwards
> unexpectedly " + nextTime + " " + bulkTime);
>  +              bulkTime = nextTime;
>  +              dfv.setTime(bulkTime);
>  +            }
>  +          }
>  +
>  +          synchronized (timeLock) {
>  +            if (bulkTime > persistedTime)
>  +              persistedTime = bulkTime;
>  +
>  +            MetadataTableUtil.updateTabletDataFile(tid, extent, paths,
> tabletTime.getMetadataValue(persistedTime), creds, tabletServer.getLock());
>  +          }
>  +        }
>  +      }
>  +
>  +      synchronized (Tablet.this) {
>  +        for (Entry<FileRef,DataFileValue> tpath : paths.entrySet()) {
>  +          if (datafileSizes.containsKey(tpath.getKey())) {
>  +            log.error("Adding file that is already in set " +
> tpath.getKey());
>  +          }
>  +          datafileSizes.put(tpath.getKey(), tpath.getValue());
>  +
>  +        }
>  +
>  +        tabletResources.importedMapFiles();
>  +
>  +        computeNumEntries();
>  +      }
>  +
>  +      for (FileRef tpath : paths.keySet()) {
>  +        log.log(TLevel.TABLET_HIST, extent + " import " + tpath + " " +
> paths.get(tpath));
>  +      }
>  +    }
>  +
>  +    FileRef reserveMergingMinorCompactionFile() {
>  +      if (mergingMinorCompactionFile != null)
>  +        throw new IllegalStateException("Tried to reserve merging minor
> compaction file when already reserved  : " + mergingMinorCompactionFile);
>  +
>  +      if (extent.isRootTablet())
>  +        return null;
>  +
>  +      int maxFiles = acuTableConf.getMaxFilesPerTablet();
>  +
>  +      // when a major compaction is running and we are at max files,
> write out
>  +      // one extra file... want to avoid the case where major compaction
> is
>  +      // compacting everything except for the largest file, and
> therefore the
>  +      // largest file is returned for merging.. the following check
> mostly
>  +      // avoids this case, except for the case where major compactions
> fail or
>  +      // are canceled
>  +      if (majorCompactingFiles.size() > 0 && datafileSizes.size() ==
> maxFiles)
>  +        return null;
>  +
>  +      if (datafileSizes.size() >= maxFiles) {
>  +        // find the smallest file
>  +
>  +        long min = Long.MAX_VALUE;
>  +        FileRef minName = null;
>  +
>  +        for (Entry<FileRef,DataFileValue> entry :
> datafileSizes.entrySet()) {
>  +          if (entry.getValue().getSize() < min &&
> !majorCompactingFiles.contains(entry.getKey())) {
>  +            min = entry.getValue().getSize();
>  +            minName = entry.getKey();
>  +          }
>  +        }
>  +
>  +        if (minName == null)
>  +          return null;
>  +
>  +        mergingMinorCompactionFile = minName;
>  +        return minName;
>  +      }
>  +
>  +      return null;
>  +    }
>  +
>  +    void unreserveMergingMinorCompactionFile(FileRef file) {
>  +      if ((file == null && mergingMinorCompactionFile != null) || (file
> != null && mergingMinorCompactionFile == null)
>  +          || (file != null && mergingMinorCompactionFile != null &&
> !file.equals(mergingMinorCompactionFile)))
>  +        throw new IllegalStateException("Disagreement " + file + " " +
> mergingMinorCompactionFile);
>  +
>  +      mergingMinorCompactionFile = null;
>  +    }
>  +
>  +    void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef
> newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession
> commitSession, long flushId)
>  +        throws IOException {
>  +
>  +      IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
>  +      if (extent.isRootTablet()) {
>  +        try {
>  +          if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) {
>  +            throw new IllegalStateException();
>  +          }
>  +        } catch (Exception e) {
>  +          throw new IllegalStateException("Can not bring major
> compaction online, lock not held", e);
>  +        }
>  +      }
>  +
>  +      // rename before putting in metadata table, so files in metadata
> table should
>  +      // always exist
>  +      do {
>  +        try {
>  +          if (dfv.getNumEntries() == 0) {
>  +            fs.deleteRecursively(tmpDatafile.path());
>  +          } else {
>  +            if (fs.exists(newDatafile.path())) {
>  +              log.warn("Target map file already exist " + newDatafile);
>  +              fs.deleteRecursively(newDatafile.path());
>  +            }
>  +
>  +            if (!fs.rename(tmpDatafile.path(), newDatafile.path())) {
>  +              throw new IOException("rename fails");
>  +            }
>  +          }
>  +          break;
>  +        } catch (IOException ioe) {
>  +          log.warn("Tablet " + extent + " failed to rename " +
> newDatafile + " after MinC, will retry in 60 secs...", ioe);
>  +          UtilWaitThread.sleep(60 * 1000);
>  +        }
>  +      } while (true);
>  +
>  +      long t1, t2;
>  +
>  +      // the code below always assumes merged files are in use by
> scans... this must be done
>  +      // because the in memory list of files is not updated until after
> the metadata table
>  +      // therefore the file is available to scans until memory is
> updated, but want to ensure
>  +      // the file is not available for garbage collection... if memory
> were updated
>  +      // before this point (like major compactions do), then the
> following code could wait
>  +      // for scans to finish like major compactions do.... used to wait
> for scans to finish
>  +      // here, but that was incorrect because a scan could start after
> waiting but before
>  +      // memory was updated... assuming the file is always in use by
> scans leads to
>  +      // one uneeded metadata update when it was not actually in use
>  +      Set<FileRef> filesInUseByScans = Collections.emptySet();
>  +      if (absMergeFile != null)
>  +        filesInUseByScans = Collections.singleton(absMergeFile);
>  +
>  +      // very important to write delete entries outside of log lock,
> because
>  +      // this !METADATA write does not go up... it goes sideways or to
> itself
>  +      if (absMergeFile != null)
>  +        MetadataTableUtil.addDeleteEntries(extent,
> Collections.singleton(absMergeFile), SystemCredentials.get());
>  +
>  +      Set<String> unusedWalLogs = beginClearingUnusedLogs();
>  +      try {
>  +        // the order of writing to !METADATA and walog is important in
> the face of machine/process failures
>  +        // need to write to !METADATA before writing to walog, when
> things are done in the reverse order
>  +        // data could be lost... the minor compaction start even should
> be written before the following metadata
>  +        // write is made
>  +
>  +        synchronized (timeLock) {
>  +          if (commitSession.getMaxCommittedTime() > persistedTime)
>  +            persistedTime = commitSession.getMaxCommittedTime();
>  +
>  +          String time = tabletTime.getMetadataValue(persistedTime);
>  +          MasterMetadataUtil.updateTabletDataFile(extent, newDatafile,
> absMergeFile, dfv, time, SystemCredentials.get(), filesInUseByScans,
>  +              tabletServer.getClientAddressString(),
> tabletServer.getLock(), unusedWalLogs, lastLocation, flushId);
>  +        }
>  +
>  +      } finally {
>  +        finishClearingUnusedLogs();
>  +      }
>  +
>  +      do {
>  +        try {
>  +          // the purpose of making this update use the new commit
> session, instead of the old one passed in,
>  +          // is because the new one will reference the logs used by
> current memory...
>  +
>  +
>  tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(),
> newDatafile.toString(), commitSession.getWALogSeq() + 2);
>  +          break;
>  +        } catch (IOException e) {
>  +          log.error("Failed to write to write-ahead log " +
> e.getMessage() + " will retry", e);
>  +          UtilWaitThread.sleep(1 * 1000);
>  +        }
>  +      } while (true);
>  +
>  +      synchronized (Tablet.this) {
>  +        lastLocation = null;
>  +
>  +        t1 = System.currentTimeMillis();
>  +        if (datafileSizes.containsKey(newDatafile)) {
>  +          log.error("Adding file that is already in set " + newDatafile);
>  +        }
>  +
>  +        if (dfv.getNumEntries() > 0) {
>  +          datafileSizes.put(newDatafile, dfv);
>  +        }
>  +
>  +        if (absMergeFile != null) {
>  +          datafileSizes.remove(absMergeFile);
>  +        }
>  +
>  +        unreserveMergingMinorCompactionFile(absMergeFile);
>  +
>  +        dataSourceDeletions.incrementAndGet();
>  +        tabletMemory.finishedMinC();
>  +
>  +        lastFlushID = flushId;
>  +
>  +        computeNumEntries();
>  +        t2 = System.currentTimeMillis();
>  +      }
>  +
>  +      // must do this after list of files in memory is updated above
>  +      removeFilesAfterScan(filesInUseByScans);
>  +
>  +      if (absMergeFile != null)
>  +        log.log(TLevel.TABLET_HIST, extent + " MinC [" + absMergeFile +
> ",memory] -> " + newDatafile);
>  +      else
>  +        log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " +
> newDatafile);
>  +      log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1)
> / 1000.0, getExtent().toString()));
>  +      if (dfv.getSize() >
> acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD)) {
>  +        log.debug(String.format("Minor Compaction wrote out file larger
> than split threshold.  split threshold = %,d  file size = %,d",
>  +
>  acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD),
> dfv.getSize()));
>  +      }
>  +
>  +    }
>  +
>  +    public void reserveMajorCompactingFiles(Collection<FileRef> files) {
>  +      if (majorCompactingFiles.size() != 0)
>  +        throw new IllegalStateException("Major compacting files not
> empty " + majorCompactingFiles);
>  +
>  +      if (mergingMinorCompactionFile != null &&
> files.contains(mergingMinorCompactionFile))
>  +        throw new IllegalStateException("Major compaction tried to
> resrve file in use by minor compaction " + mergingMinorCompactionFile);
>  +
>  +      majorCompactingFiles.addAll(files);
>  +    }
>  +
>  +    public void clearMajorCompactingFile() {
>  +      majorCompactingFiles.clear();
>  +    }
>  +
>  +    void bringMajorCompactionOnline(Set<FileRef> oldDatafiles, FileRef
> tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv)
>  +        throws IOException {
>  +      long t1, t2;
>  +
>  +      if (!extent.isRootTablet()) {
>  +
>  +        if (fs.exists(newDatafile.path())) {
>  +          log.error("Target map file already exist " + newDatafile, new
> Exception());
>  +          throw new IllegalStateException("Target map file already exist
> " + newDatafile);
>  +        }
>  +
>  +        // rename before putting in metadata table, so files in metadata
> table should
>  +        // always exist
>  +        if (!fs.rename(tmpDatafile.path(), newDatafile.path()))
>  +          log.warn("Rename of " + tmpDatafile + " to " + newDatafile + "
> returned false");
>  +
>  +        if (dfv.getNumEntries() == 0) {
>  +          fs.deleteRecursively(newDatafile.path());
>  +        }
>  +      }
>  +
>  +      TServerInstance lastLocation = null;
>  +      synchronized (Tablet.this) {
>  +
>  +        t1 = System.currentTimeMillis();
>  +
>  +        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
>  +
>  +        dataSourceDeletions.incrementAndGet();
>  +
>  +        if (extent.isRootTablet()) {
>  +
>  +          waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
>  +
>  +          try {
>  +            if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) {
>  +              throw new IllegalStateException();
>  +            }
>  +          } catch (Exception e) {
>  +            throw new IllegalStateException("Can not bring major
> compaction online, lock not held", e);
>  +          }
>  +
>  +          // mark files as ready for deletion, but
>  +          // do not delete them until we successfully
>  +          // rename the compacted map file, in case
>  +          // the system goes down
>  +
>  +          String compactName = newDatafile.path().getName();
>  +
>  +          for (FileRef ref : oldDatafiles) {
>  +            Path path = ref.path();
>  +            fs.rename(path, new Path(location + "/delete+" + compactName
> + "+" + path.getName()));
>  +          }
>  +
>  +          if (fs.exists(newDatafile.path())) {
>  +            log.error("Target map file already exist " + newDatafile,
> new Exception());
>  +            throw new IllegalStateException("Target map file already
> exist " + newDatafile);
>  +          }
>  +
>  +          if (!fs.rename(tmpDatafile.path(), newDatafile.path()))
>  +            log.warn("Rename of " + tmpDatafile + " to " + newDatafile +
> " returned false");
>  +
>  +          // start deleting files, if we do not finish they will be
> cleaned
>  +          // up later
>  +          for (FileRef ref : oldDatafiles) {
>  +            Path path = ref.path();
>  +            Path deleteFile = new Path(location + "/delete+" +
> compactName + "+" + path.getName());
>  +            if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) ||
> !fs.moveToTrash(deleteFile))
>  +              fs.deleteRecursively(deleteFile);
>  +          }
>  +        }
>  +
>  +        // atomically remove old files and add new file
>  +        for (FileRef oldDatafile : oldDatafiles) {
>  +          if (!datafileSizes.containsKey(oldDatafile)) {
>  +            log.error("file does not exist in set " + oldDatafile);
>  +          }
>  +          datafileSizes.remove(oldDatafile);
>  +          majorCompactingFiles.remove(oldDatafile);
>  +        }
>  +
>  +        if (datafileSizes.containsKey(newDatafile)) {
>  +          log.error("Adding file that is already in set " + newDatafile);
>  +        }
>  +
>  +        if (dfv.getNumEntries() > 0) {
>  +          datafileSizes.put(newDatafile, dfv);
>  +        }
>  +
>  +        // could be used by a follow on compaction in a multipass
> compaction
>  +        majorCompactingFiles.add(newDatafile);
>  +
>  +        computeNumEntries();
>  +
>  +        lastLocation = Tablet.this.lastLocation;
>  +        Tablet.this.lastLocation = null;
>  +
>  +        if (compactionId != null)
>  +          lastCompactID = compactionId;
>  +
>  +        t2 = System.currentTimeMillis();
>  +      }
>  +
>  +      if (!extent.isRootTablet()) {
>  +        Set<FileRef> filesInUseByScans =
> waitForScansToFinish(oldDatafiles, false, 10000);
>  +        if (filesInUseByScans.size() > 0)
>  +          log.debug("Adding scan refs to metadata " + extent + " " +
> filesInUseByScans);
>  +        MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles,
> filesInUseByScans, newDatafile, compactionId, dfv, SystemCredentials.get(),
>  +            tabletServer.getClientAddressString(), lastLocation,
> tabletServer.getLock());
>  +        removeFilesAfterScan(filesInUseByScans);
>  +      }
>  +
>  +      log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) /
> 1000.0));
>  +      log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + "
> --> " + newDatafile);
>  +    }
>  +
>  +    public SortedMap<FileRef,DataFileValue> getDatafileSizes() {
>  +      synchronized (Tablet.this) {
>  +        TreeMap<FileRef,DataFileValue> copy = new
> TreeMap<FileRef,DataFileValue>(datafileSizes);
>  +        return Collections.unmodifiableSortedMap(copy);
>  +      }
>  +    }
>  +
>  +    public Set<FileRef> getFiles() {
>  +      synchronized (Tablet.this) {
>  +        HashSet<FileRef> files = new
> HashSet<FileRef>(datafileSizes.keySet());
>  +        return Collections.unmodifiableSet(files);
>  +      }
>  +    }
>  +
>  +  }
>  +
>  +  public Tablet(TabletServer tabletServer, Text location, KeyExtent
> extent, TabletResourceManager trm, SortedMap<Key,Value> tabletsKeyValues)
>  +      throws IOException {
>  +    this(tabletServer, location, extent, trm,
> CachedConfiguration.getInstance(), tabletsKeyValues);
>  +    splitCreationTime = 0;
>  +  }
>  +
>  +  public Tablet(TabletServer tabletServer, Text location, KeyExtent
> extent, TabletResourceManager trm, SortedMap<FileRef,DataFileValue>
> datafiles, String time,
>  +      long initFlushID, long initCompactID) throws IOException {
>  +    this(tabletServer, location, extent, trm,
> CachedConfiguration.getInstance(), datafiles, time, initFlushID,
> initCompactID);
>  +    splitCreationTime = System.currentTimeMillis();
>  +  }
>  +
>  +  private Tablet(TabletServer tabletServer, Text location, KeyExtent
> extent, TabletResourceManager trm, Configuration conf,
>  +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>  +    this(tabletServer, location, extent, trm, conf,
> VolumeManagerImpl.get(), tabletsKeyValues);
>  +  }
>  +
>  +  static private final List<LogEntry> EMPTY = Collections.emptyList();
>  +
>  +  private Tablet(TabletServer tabletServer, Text location, KeyExtent
> extent, TabletResourceManager trm, Configuration conf,
>  +      SortedMap<FileRef,DataFileValue> datafiles, String time, long
> initFlushID, long initCompactID) throws IOException {
>  +    this(tabletServer, location, extent, trm, conf,
> VolumeManagerImpl.get(), EMPTY, datafiles, time, null, new
> HashSet<FileRef>(), initFlushID, initCompactID);
>  +  }
>  +
>  +  private static String lookupTime(AccumuloConfiguration conf, KeyExtent
> extent, SortedMap<Key,Value> tabletsKeyValues) {
>  +    SortedMap<Key,Value> entries;
>  +
>  +    if (extent.isRootTablet()) {
>  +      return null;
>  +    } else {
>  +      entries = new TreeMap<Key,Value>();
>  +      Text rowName = extent.getMetadataEntry();
>  +      for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>  +        if (entry.getKey().compareRow(rowName) == 0 &&
> TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
>  +          entries.put(new Key(entry.getKey()), new
> Value(entry.getValue()));
>  +        }
>  +      }
>  +    }
>  +
>  +    // log.debug("extent : "+extent+"   entries : "+entries);
>  +
>  +    if (entries.size() == 1)
>  +      return entries.values().iterator().next().toString();
>  +    return null;
>  +  }
>  +
>  +  private static SortedMap<FileRef,DataFileValue>
> lookupDatafiles(AccumuloConfiguration conf, VolumeManager fs, KeyExtent
> extent,
>  +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>  +
>  +    TreeMap<FileRef,DataFileValue> datafiles = new
> TreeMap<FileRef,DataFileValue>();
>  +
>  +    if (extent.isRootTablet()) { // the meta0 tablet
>  +      Path location = new Path(MetadataTableUtil.getRootTabletDir());
>  +      // cleanUpFiles() has special handling for delete. files
>  +      FileStatus[] files = fs.listStatus(location);
>  +      Collection<String> goodPaths = cleanUpFiles(fs, files, true);
>  +      for (String good : goodPaths) {
>  +        Path path = new Path(good);
>  +        String filename = path.getName();
>  +        FileRef ref = new FileRef(location.toString() + "/" + filename,
> path);
>  +        DataFileValue dfv = new DataFileValue(0, 0);
>  +        datafiles.put(ref, dfv);
>  +      }
>  +    } else {
>  +
>  +      Text rowName = extent.getMetadataEntry();
>  +
>  +      String tableId = extent.isMeta() ? RootTable.ID : MetadataTable.ID;
>  +      ScannerImpl mdScanner = new
> ScannerImpl(HdfsZooInstance.getInstance(), SystemCredentials.get(),
> tableId, Authorizations.EMPTY);
>  +
>  +      // Commented out because when no data file is present, each tablet
> will scan through metadata table and return nothing
>  +      // reduced batch size to improve performance
>  +      // changed here after endKeys were implemented from 10 to 1000
>  +      mdScanner.setBatchSize(1000);
>  +
>  +      // leave these in, again, now using endKey for safety
>  +      mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
>  +
>  +      mdScanner.setRange(new Range(rowName));
>  +
>  +      for (Entry<Key,Value> entry : mdScanner) {
>  +
>  +        if (entry.getKey().compareRow(rowName) != 0) {
>  +          break;
>  +        }
>  +
>  +        FileRef ref = new
> FileRef(entry.getKey().getColumnQualifier().toString(),
> fs.getFullPath(entry.getKey()));
>  +        datafiles.put(ref, new DataFileValue(entry.getValue().get()));
>  +      }
>  +    }
>  +    return datafiles;
>  +  }
>  +
>  +  private static List<LogEntry> lookupLogEntries(KeyExtent ke,
> SortedMap<Key,Value> tabletsKeyValues) {
>  +    List<LogEntry> logEntries = new ArrayList<LogEntry>();
>  +
>  +    if (ke.isMeta()) {
>  +      try {
>  +        logEntries =
> MetadataTableUtil.getLogEntries(SystemCredentials.get(), ke);
>  +      } catch (Exception ex) {
>  +        throw new RuntimeException("Unable to read tablet log entries",
> ex);
>  +      }
>  +    } else {
>  +      log.debug("Looking at metadata " + tabletsKeyValues);
>  +      Text row = ke.getMetadataEntry();
>  +      for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>  +        Key key = entry.getKey();
>  +        if (key.getRow().equals(row)) {
>  +          if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
>  +            logEntries.add(LogEntry.fromKeyValue(key, entry.getValue()));
>  +          }
>  +        }
>  +      }
>  +    }
>  +
>  +    log.debug("got " + logEntries + " for logs for " + ke);
>  +    return logEntries;
>  +  }
>  +
>  +  private static Set<FileRef> lookupScanFiles(KeyExtent extent,
> SortedMap<Key,Value> tabletsKeyValues, VolumeManager fs) throws IOException
> {
>  +    HashSet<FileRef> scanFiles = new HashSet<FileRef>();
>  +
>  +    Text row = extent.getMetadataEntry();
>  +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>  +      Key key = entry.getKey();
>  +      if (key.getRow().equals(row) &&
> key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) {
>  +        String meta = key.getColumnQualifier().toString();
>  +        Path path = fs.getFullPath(extent.getTableId().toString(), meta);
>  +        scanFiles.add(new FileRef(meta, path));
>  +      }
>  +    }
>  +
>  +    return scanFiles;
>  +  }
>  +
>  +  private static long lookupFlushID(KeyExtent extent,
> SortedMap<Key,Value> tabletsKeyValues) {
>  +    Text row = extent.getMetadataEntry();
>  +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>  +      Key key = entry.getKey();
>  +      if (key.getRow().equals(row) &&
> TabletsSection.ServerColumnFamily.FLUSH_COLUMN.equals(key.getColumnFamily(),
> key.getColumnQualifier()))
>  +        return Long.parseLong(entry.getValue().toString());
>  +    }
>  +
>  +    return -1;
>  +  }
>  +
>  +  private static long lookupCompactID(KeyExtent extent,
> SortedMap<Key,Value> tabletsKeyValues) {
>  +    Text row = extent.getMetadataEntry();
>  +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>  +      Key key = entry.getKey();
>  +      if (key.getRow().equals(row) &&
> TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(),
> key.getColumnQualifier()))
>  +        return Long.parseLong(entry.getValue().toString());
>  +    }
>  +
>  +    return -1;
>  +  }
>  +
>  +  private Tablet(TabletServer tabletServer, Text location, KeyExtent
> extent, TabletResourceManager trm, Configuration conf, VolumeManager fs,
>  +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>  +    this(tabletServer, location, extent, trm, conf, fs,
> lookupLogEntries(extent, tabletsKeyValues),
> lookupDatafiles(tabletServer.getSystemConfiguration(), fs,
>  +        extent, tabletsKeyValues),
> lookupTime(tabletServer.getSystemConfiguration(), extent,
> tabletsKeyValues), lookupLastServer(extent, tabletsKeyValues),
>  +        lookupScanFiles(extent, tabletsKeyValues, fs),
> lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent,
> tabletsKeyValues));
>  +  }
>  +
>  +  private static TServerInstance lookupLastServer(KeyExtent extent,
> SortedMap<Key,Value> tabletsKeyValues) {
>  +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>  +      if (entry.getKey().getColumnFamily().compareTo(
> TabletsSection.LastLocationColumnFamily.NAME) == 0) {
>  +        return new TServerInstance(entry.getValue(),
> entry.getKey().getColumnQualifier());
>  +      }
>  +    }
>  +    return null;
>  +  }
>  +
>  +  /**
>  +   * yet another constructor - this one allows us to avoid costly
> lookups into the Metadata table if we already know the files we need - as
> at split time
>  +   */
>  +  private Tablet(final TabletServer tabletServer, final Text location,
> final KeyExtent extent, final TabletResourceManager trm, final
> Configuration conf,
>  +      final VolumeManager fs, final List<LogEntry> logEntries, final
> SortedMap<FileRef,DataFileValue> datafiles, String time,
>  +      final TServerInstance lastLocation, Set<FileRef> scanFiles, long
> initFlushID, long initCompactID) throws IOException {
>  +    Path locationPath;
>  +    if (location.find(":") >= 0) {
>  +      locationPath = new Path(location.toString());
>  +    } else {
>  +      locationPath = fs.getFullPath(FileType.TABLE,
> extent.getTableId().toString() + location.toString());
>  +    }
>  +    FileSystem fsForPath = fs.getFileSystemByPath(locationPath);
>  +    this.location = locationPath.makeQualified(fsForPath.getUri(),
> fsForPath.getWorkingDirectory());
>  +    this.lastLocation = lastLocation;
>  +    this.tabletDirectory = location.toString();
>  +    this.conf = conf;
>  +    this.acuTableConf = tabletServer.getTableConfiguration(extent);
>  +
>  +    this.fs = fs;
>  +    this.extent = extent;
>  +    this.tabletResources = trm;
>  +
>  +    this.lastFlushID = initFlushID;
>  +    this.lastCompactID = initCompactID;
>  +
>  +    if (extent.isRootTablet()) {
>  +      long rtime = Long.MIN_VALUE;
>  +      for (FileRef ref : datafiles.keySet()) {
>  +        Path path = ref.path();
>  +        FileSystem ns = fs.getFileSystemByPath(path);
>  +        FileSKVIterator reader =
> FileOperations.getInstance().openReader(path.toString(), true, ns,
> ns.getConf(), tabletServer.getTableConfiguration(extent));
>  +        long maxTime = -1;
>  +        try {
>  +
>  +          while (reader.hasTop()) {
>  +            maxTime = Math.max(maxTime,
> reader.getTopKey().getTimestamp());
>  +            reader.next();
>  +          }
>  +
>  +        } finally {
>  +          reader.close();
>  +        }
>  +
>  +        if (maxTime > rtime) {
>  +          time = TabletTime.LOGICAL_TIME_ID + "" + maxTime;
>  +          rtime = maxTime;
>  +        }
>  +      }
>  +    }
>  +    if (time == null && datafiles.isEmpty() &&
> extent.equals(RootTable.OLD_EXTENT)) {
>  +      // recovery... old root tablet has no data, so time doesn't matter:
>  +      time = TabletTime.LOGICAL_TIME_ID + "" + Long.MIN_VALUE;
>  +    }
>  +
>  +    this.tabletServer = tabletServer;
>  +    this.logId = tabletServer.createLogId(extent);
>  +
>  +    this.timer = new TabletStatsKeeper();
>  +
>  +    setupDefaultSecurityLabels(extent);
>  +
>  +    tabletMemory = new TabletMemory();
>  +    tabletTime = TabletTime.getInstance(time);
>  +    persistedTime = tabletTime.getTime();
>  +
>  +    acuTableConf.addObserver(configObserver = new
> ConfigurationObserver() {
>  +
>  +      private void reloadConstraints() {
>  +        constraintChecker.set(new
> ConstraintChecker(getTableConfiguration()));
>  +      }
>  +
>  +      @Override
>  +      public void propertiesChanged() {
>  +        reloadConstraints();
>  +
>  +        try {
>  +          setupDefaultSecurityLabels(extent);
>  +        } catch (Exception e) {
>  +          log.error("Failed to reload default security labels for
> extent: " + extent.toString());
>  +        }
>  +      }
>  +
>  +      @Override
>  +      public void propertyChanged(String prop) {
>  +        if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey()))
>  +          reloadConstraints();
>  +        else if
> (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) {
>  +          try {
>  +            log.info("Default security labels changed for extent: " +
> extent.toString());
>  +            setupDefaultSecurityLabels(extent);
>  +          } catch (Exception e) {
>  +            log.error("Failed to reload default security labels for
> extent: " + extent.toString());
>  +          }
>  +        }
>  +
>  +      }
>  +
>  +      @Override
>  +      public void sessionExpired() {
>  +        log.debug("Session expired, no longer updating per table
> props...");
>  +      }
>  +
>  +    });
>  +    // Force a load of any per-table properties
>  +    configObserver.propertiesChanged();
>  +
>  +    tabletResources.setTablet(this, acuTableConf);
>  +    if (!logEntries.isEmpty()) {
>  +      log.info("Starting Write-Ahead Log recovery for " + this.extent);
>  +      final long[] count = new long[2];
>  +      final CommitSession commitSession =
> tabletMemory.getCommitSession();
>  +      count[1] = Long.MIN_VALUE;
>  +      try {
>  +        Set<String> absPaths = new HashSet<String>();
>  +        for (FileRef ref : datafiles.keySet())
>  +          absPaths.add(ref.path().toString());
>  +
>  +        tabletServer.recover(this.tabletServer.getFileSystem(), this,
> logEntries, absPaths, new MutationReceiver() {
>  +          @Override
>  +          public void receive(Mutation m) {
>  +            // LogReader.printMutation(m);
>  +            Collection<ColumnUpdate> muts = m.getUpdates();
>  +            for (ColumnUpdate columnUpdate : muts) {
>  +              if (!columnUpdate.hasTimestamp()) {
>  +                // if it is not a user set timestamp, it must have been
> set
>  +                // by the system
>  +                count[1] = Math.max(count[1],
> columnUpdate.getTimestamp());
>  +              }
>  +            }
>  +            tabletMemory.mutate(commitSession,
> Collections.singletonList(m));
>  +            count[0]++;
>  +          }
>  +        });
>  +
>  +        if (count[1] != Long.MIN_VALUE) {
>  +          tabletTime.useMaxTimeFromWALog(count[1]);
>  +        }
>  +        commitSession.updateMaxCommittedTime(tabletTime.getTime());
>  +
> -         tabletMemory.updateMemoryUsageStats();
> -
>  +        if (count[0] == 0) {
>  +          MetadataTableUtil.removeUnusedWALEntries(extent, logEntries,
> tabletServer.getLock());
>  +          logEntries.clear();
>  +        }
>  +
>  +      } catch (Throwable t) {
>  +        if (acuTableConf.getBoolean(Property.TABLE_FAILURES_IGNORE)) {
>  +          log.warn("Error recovering from log files: ", t);
>  +        } else {
>  +          throw new RuntimeException(t);
>  +        }
>  +      }
>  +      // make some closed references that represent the recovered logs
>  +      currentLogs = new HashSet<DfsLogger>();
>  +      for (LogEntry logEntry : logEntries) {
>  +        for (String log : logEntry.logSet) {
>  +          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(),
> log));
>  +        }
>  +      }
>  +
>  +      log.info("Write-Ahead Log recovery complete for " + this.extent +
> " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries()
>  +          + " entries created)");
>  +    }
>  +
>  +    String contextName = acuTableConf.get(Property.TABLE_CLASSPATH);
>  +    if (contextName != null && !contextName.equals("")) {
>  +      // initialize context classloader, instead of possibly waiting for
> it to initialize for a scan
>  +      // TODO this could hang, causing other tablets to fail to load -
> ACCUMULO-1292
>  +
>  AccumuloVFSClassLoader.getContextManager().getClassLoader(contextName);
>  +    }
>  +
>  +    // do this last after tablet is completely setup because it
>  +    // could cause major compaction to start
>  +    datafileManager = new DatafileManager(datafiles);
>  +
>  +    computeNumEntries();
>  +
>  +    datafileManager.removeFilesAfterScan(scanFiles);
>  +
>  +    // look for hints of a failure on the previous tablet server
>  +    if (!logEntries.isEmpty() ||
> needsMajorCompaction(MajorCompactionReason.NORMAL)) {
>  +      // look for any temp files hanging around
>  +      removeOldTemporaryFiles();
>  +    }
>  +
>  +    log.log(TLevel.TABLET_HIST, extent + " opened");
>  +  }
>  +
>  +  private void removeOldTemporaryFiles() {
>  +    // remove any temporary files created by a previous tablet server
>  +    try {
>  +      for (FileStatus tmp : fs.globStatus(new Path(location, "*_tmp"))) {
>  +        try {
>  +          log.debug("Removing old temp file " + tmp.getPath());
>  +          fs.delete(tmp.getPath());
>  +        } catch (IOException ex) {
>  +          log.error("Unable to remove old temp file " + tmp.getPath() +
> ": " + ex);
>  +        }
>  +      }
>  +    } catch (IOException ex) {
>  +      log.error("Error scanning for old temp files in " + location);
>  +    }
>  +  }
>  +
>  +  private void setupDefaultSecurityLabels(KeyExtent extent) {
>  +    if (extent.isMeta()) {
>  +      defaultSecurityLabel = new byte[0];
>  +    } else {
>  +      try {
>  +        ColumnVisibility cv = new
> ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
>  +        this.defaultSecurityLabel = cv.getExpression();
>  +      } catch (Exception e) {
>  +        log.error(e, e);
>  +        this.defaultSecurityLabel = new byte[0];
>  +      }
>  +    }
>  +  }
>  +
>  +  private static Collection<String> cleanUpFiles(VolumeManager fs,
> FileStatus[] files, boolean deleteTmp) throws IOException {
>  +    /*
>  +     * called in constructor and before major compactions
>  +     */
>  +    Collection<String> goodFiles = new ArrayList<String>(files.length);
>  +
>  +    for (FileStatus file : files) {
>  +
>  +      String path = file.getPath().toString();
>  +      String filename = file.getPath().getName();
>  +
>  +      // check for incomplete major compaction, this should only occur
>  +      // for root tablet
>  +      if (filename.startsWith("delete+")) {
>  +        String expectedCompactedFile = path.substring(0,
> path.lastIndexOf("/delete+")) + "/" + filename.split("\\+")[1];
>  +        if (fs.exists(new Path(expectedCompactedFile))) {
>  +          // compaction finished, but did not finish deleting compacted
> files.. so delete it
>  +          if (!fs.deleteRecursively(file.getPath()))
>  +            log.warn("Delete of file: " + file.getPath().toString() + "
> return false");
>  +          continue;
>  +        }
>  +        // compaction did not finish, so put files back
>  +
>  +        // reset path and filename for rest of loop
>  +        filename = filename.split("\\+", 3)[2];
>  +        path = path.substring(0, path.lastIndexOf("/delete+")) + "/" +
> filename;
>  +
>  +        if (!fs.rename(file.getPath(), new Path(path)))
>  +          log.warn("Rename of " + file.getPath().toString() + " to " +
> path + " returned false");
>  +      }
>  +
>  +      if (filename.endsWith("_tmp")) {
>  +        if (deleteTmp) {
>  +          log.warn("cleaning up old tmp file: " + path);
>  +          if (!fs.deleteRecursively(file.getPath()))
>  +            log.warn("Delete of tmp file: " + file.getPath().toString()
> + " return false");
>  +
>  +        }
>  +        continue;
>  +      }
>  +
>  +      if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") &&
> !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) {
>  +        log.error("unknown file in tablet" + path);
>  +        continue;
>  +      }
>  +
>  +      goodFiles.add(path);
>  +    }
>  +
>  +    return goodFiles;
>  +  }
>  +
>  +  public static class KVEntry extends KeyValue {
>  +    public KVEntry(Key k, Value v) {
>  +      super(new Key(k), Arrays.copyOf(v.get(), v.get().length));
>  +    }
>  +
>  +    @Override
>  +    public String toString() {
>  +      return key.toString() + "=" + getValue();
>  +    }
>  +
>  +    int numBytes() {
>  +      return key.getSize() + getValue().get().length;
>  +    }
>  +
>  +    int estimateMemoryUsed() {
>  +      return key.getSize() + getValue().get().length + (9 * 32); //
> overhead is 32 per object
>  +    }
>  +  }
>  +
>  +  private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi,
> List<Range> ranges, HashSet<Column> columnSet, ArrayList<KVEntry> results,
>  +      long maxResultsSize) throws IOException {
>  +
>  +    LookupResult lookupResult = new LookupResult();
>  +
>  +    boolean exceededMemoryUsage = false;
>  +    boolean tabletClosed = false;
>  +
>  +    Set<ByteSequence> cfset = null;
>  +    if (columnSet.size() > 0)
>  +      cfset = LocalityGroupUtil.families(columnSet);
>  +
>  +    for (Range range : ranges) {
>  +
>  +      if (exceededMemoryUsage || tabletClosed) {
>  +        lookupResult.unfinishedRanges.add(range);
>  +        continue;
>  +      }
>  +
>  +      int entriesAdded = 0;
>  +
>  +      try {
>  +        if (cfset != null)
>  +          mmfi.seek(range, cfset, true);
>  +        else
>  +          mmfi.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
>  +
>  +        while (mmfi.hasTop()) {
>  +          Key key = mmfi.getTopKey();
>  +
>  +          KVEntry kve = new KVEntry(key, mmfi.getTopValue());
>  +          results.add(kve);
>  +          entriesAdded++;
>  +          lookupResult.bytesAdded += kve.estimateMemoryUsed();
>  +          lookupResult.dataSize += kve.numBytes();
>  +
>  +          exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize;
>  +
>  +          if (exceededMemoryUsage) {
>  +            addUnfinishedRange(lookupResult, range, key, false);
>  +            break;
>  +          }
>  +
>  +          mmfi.next();
>  +        }
>  +
>  +      } catch (TooManyFilesException tmfe) {
>  +        // treat this as a closed tablet, and let the client retry
>  +        log.warn("Tablet " + getExtent() + " has too many files, batch
> lookup can not run");
>  +        handleTabletClosedDuringScan(results, lookupResult,
> exceededMemoryUsage, range, entriesAdded);
>  +        tabletClosed = true;
>  +      } catch (IOException ioe) {
>  +        if (shutdownInProgress()) {
>  +          // assume HDFS shutdown hook caused this exception
>  +          log.debug("IOException while shutdown in progress ", ioe);
>  +          handleTabletClosedDuringScan(results, lookupResult,
> exceededMemoryUsage, range, entriesAdded);
>  +          tabletClosed = true;
>  +        } else {
>  +          throw ioe;
>  +        }
>  +      } catch (IterationInterruptedException iie) {
>  +        if (isClosed()) {
>  +          handleTabletClosedDuringScan(results, lookupResult,
> exceededMemoryUsage, range, entriesAdded);
>  +          tabletClosed = true;
>  +        } else {
>  +          throw iie;
>  +        }
>  +      } catch (TabletClosedException tce) {
>  +        handleTabletClosedDuringScan(results, lookupResult,
> exceededMemoryUsage, range, entriesAdded);
>  +        tabletClosed = true;
>  +      }
>  +
>  +    }
>  +
>  +    return lookupResult;
>  +  }
>  +
>  +  private void handleTabletClosedDuringScan(ArrayList<KVEntry> results,
> LookupResult lookupResult, boolean exceededMemoryUsage, Range range, int
> entriesAdded) {
>  +    if (exceededMemoryUsage)
>  +      throw new IllegalStateException("tablet should not exceed memory
> usage or close, not both");
>  +
>  +    if (entriesAdded > 0)
>  +      addUnfinishedRange(lookupResult, range, results.get(results.size()
> - 1).key, false);
>  +    else
>  +      lookupResult.unfinishedRanges.add(range);
>  +
>  +    lookupResult.closed = true;
>  +  }
>  +
>  +  private void addUnfinishedRange(LookupResult lookupResult, Range
> range, Key key, boolean inclusiveStartKey) {
>  +    if (range.getEndKey() == null || key.compareTo(range.getEndKey()) <
> 0) {
>  +      Range nlur = new Range(new Key(key), inclusiveStartKey,
> range.getEndKey(), range.isEndKeyInclusive());
>  +      lookupResult.unfinishedRanges.add(nlur);
>  +    }
>  +  }
>  +
>  +  public static interface KVReceiver {
>  +    void receive(List<KVEntry> matches) throws IOException;
>  +  }
>  +
>  +  class LookupResult {
>  +    List<Range> unfinishedRanges = new ArrayList<Range>();
>  +    long bytesAdded = 0;
>  +    long dataSize = 0;
>  +    boolean closed = false;
>  +  }
>  +
>  +  public LookupResult lookup(List<Range> ranges, HashSet<Column>
> columns, Authorizations authorizations, ArrayList<KVEntry> results, long
> maxResultSize,
>  +      List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
> AtomicBoolean interruptFlag) throws IOException {
>  +
>  +    if (ranges.size() == 0) {
>  +      return new LookupResult();
>  +    }
>  +
>  +    ranges = Range.mergeOverlapping(ranges);
>  +    Collections.sort(ranges);
>  +
>  +    Range tabletRange = extent.toDataRange();
>  +    for (Range range : ranges) {
>  +      // do a test to see if this range falls within the tablet, if it
> does not
>  +      // then clip will throw an exception
>  +      tabletRange.clip(range);
>  +    }
>  +
>  +    ScanDataSource dataSource = new ScanDataSource(authorizations,
> this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag);
>  +
>  +    LookupResult result = null;
>  +
>  +    try {
>  +      SortedKeyValueIterator<Key,Value> iter = new
> SourceSwitchingIterator(dataSource);
>  +      result = lookup(iter, ranges, columns, results, maxResultSize);
>  +      return result;
>  +    } catch (IOException ioe) {
>  +      dataSource.close(true);
>  +      throw ioe;
>  +    } finally {
>  +      // code in finally block because always want
>  +      // to return mapfiles, even when exception is thrown
>  +      dataSource.close(false);
>  +
>  +      synchronized (this) {
>  +        queryCount += results.size();
>  +        if (result != null)
>  +          queryBytes += result.dataSize;
>  +      }
>  +    }
>  +  }
>  +
>  +  private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range
> range, int num, Set<Column> columns) throws IOException {
>  +
>  +    // log.info("In nextBatch..");
>  +
>  +    List<KVEntry> results = new ArrayList<KVEntry>();
>  +    Key key = null;
>  +
>  +    Value value;
>  +    long resultSize = 0L;
>  +    long resultBytes = 0L;
>  +
>  +    long maxResultsSize =
> acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
>  +
>  +    if (columns.size() == 0) {
>  +      iter.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
>  +    } else {
>  +      iter.seek(range, LocalityGroupUtil.families(columns), true);
>  +    }
>  +
>  +    Key continueKey = null;
>  +    boolean skipContinueKey = false;
>  +
>  +    boolean endOfTabletReached = false;
>  +    while (iter.hasTop()) {
>  +
>  +      value = iter.getTopValue();
>  +      key = iter.getTopKey();
>  +
>  +      KVEntry kvEntry = new KVEntry(key, value); // copies key and value
>  +      results.add(kvEntry);
>  +      resultSize += kvEntry.estimateMemoryUsed();
>  +      resultBytes += kvEntry.numBytes();
>  +
>  +      if (resultSize >= maxResultsSize || results.size() >= num) {
>  +        continueKey = new Key(key);
>  +        skipContinueKey = true;
>  +        break;
>  +      }
>  +
>  +      iter.next();
>  +    }
>  +
>  +    if (iter.hasTop() == false) {
>  +      endOfTabletReached = true;
>  +    }
>  +
>  +    Batch retBatch = new Batch();
>  +    retBatch.numBytes = resultBytes;
>  +
>  +    if (!endOfTabletReached) {
>  +      retBatch.continueKey = continueKey;
>  +      retBatch.skipContinueKey = skipContinueKey;
>  +    } else {
>  +      retBatch.continueKey = null;
>  +    }
>  +
>  +    if (endOfTabletReached && results.size() == 0)
>  +      retBatch.results = null;
>  +    else
>  +      retBatch.results = results;
>  +
>  +    return retBatch;
>  +  }
>  +
>  +  /**
>  +   * Determine if a JVM shutdown is in progress.
>  +   *
>  +   */
>  +  private boolean shutdownInProgress() {
>  +    try {
>  +      Runtime.getRuntime().removeShutdownHook(new Thread(new Runnable() {
>  +        @Override
>  +        public void run() {}
>  +      }));
>  +    } catch (IllegalStateException ise) {
>  +      return true;
>  +    }
>  +
>  +    return false;
>  +  }
>  +
>  +  private class Batch {
>  +    public boolean skipContinueKey;
>  +    public List<KVEntry> results;
>  +    public Key continueKey;
>  +    public long numBytes;
>  +  }
>  +
>  +  Scanner createScanner(Range range, int num, Set<Column> columns,
> Authorizations authorizations, List<IterInfo> ssiList,
> Map<String,Map<String,String>> ssio,
>  +      boolean isolated, AtomicBoolean interruptFlag) {
>  +    // do a test to see if this range falls within the tablet, if it
> does not
>  +    // then clip will throw an exception
>  +    extent.toDataRange().clip(range);
>  +
>  +    ScanOptions opts = new ScanOptions(num, authorizations,
> this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated);
>  +    return new Scanner(range, opts);
>  +  }
>  +
>  +  class ScanBatch {
>  +    boolean more;
>  +    List<KVEntry> results;
>  +
>  +    ScanBatch(List<KVEntry> results, boolean more) {
>  +      this.results = results;
>  +      this.more = more;
>  +    }
>  +  }
>  +
>  +  class Scanner {
>  +
>  +    private ScanOptions options;
>  +    private Range range;
>  +    private SortedKeyValueIterator<Key,Value> isolatedIter;
>  +    private ScanDataSource isolatedDataSource;
>  +    private boolean sawException = false;
>  +    private boolean scanClosed = false;
>  +
>  +    Scanner(Range range, ScanOptions options) {
>  +      this.range = range;
>  +      this.options = options;
>  +    }
>  +
>  +    synchronized ScanBatch read() throws IOException,
> TabletClosedException {
>  +
>  +      if (sawException)
>  +        throw new IllegalStateException("Tried to use scanner after
> exception occurred.");
>  +
>  +      if (scanClosed)
>  +        throw new IllegalStateException("Tried to use scanner after it
> was closed.");
>  +
>  +      Batch results = null;
>  +
>  +      ScanDataSource dataSource;
>  +
>  +      if (options.isolated) {
>  +        if (isolatedDataSource == null)
>  +          isolatedDataSource = new ScanDataSource(options);
>  +        dataSource = isolatedDataSource;
>  +      } else {
>  +        dataSource = new ScanDataSource(options);
>  +      }
>  +
>  +      try {
>  +
>  +        SortedKeyValueIterator<Key,Value> iter;
>  +
>  +        if (options.isolated) {
>  +          if (isolatedIter == null)
>  +            isolatedIter = new SourceSwitchingIterator(dataSource, true);
>  +          else
>  +            isolatedDataSource.fileManager.reattach();
>  +          iter = isolatedIter;
>  +        } else {
>  +          iter = new SourceSwitchingIterator(dataSource, false);
>  +        }
>  +
>  +        results = nextBatch(iter, range, options.num, options.columnSet);
>  +
>  +        if (results.results == null) {
>  +          range = null;
>  +          return new ScanBatch(new ArrayList<Tablet.KVEntry>(), false);
>  +        } else if (results.continueKey == null) {
>  +          return new ScanBatch(results.results, false);
>  +        } else {
>  +          range = new Range(results.continueKey,
> !results.skipContinueKey, range.getEndKey(), range.isEndKeyInclusive());
>  +          return new ScanBatch(results.results, true);
>  +        }
>  +
>  +      } catch (IterationInterruptedException iie) {
>  +        sawException = true;
>  +        if (isClosed())
>  +          throw new TabletClosedException(iie);
>  +        else
>  +          throw iie;
>  +      } catch (IOException ioe) {
>  +        if (shutdownInProgress()) {
>  +          log.debug("IOException while shutdown in progress ", ioe);
>  +          throw new TabletClosedException(ioe); // assume IOException
> was caused by execution of HDFS shutdown hook
>  +        }
>  +
>  +        sawException = true;
>  +        dataSource.close(true);
>  +        throw ioe;
>  +      } catch (RuntimeException re) {
>  +        sawException = true;
>  +        throw re;
>  +      } finally {
>  +        // code in finally block because always want
>  +        // to return mapfiles, even when exception is thrown
>  +        if (!options.isolated)
>  +          dataSource.close(false);
>  +        else if (dataSource.fileManager != null)
>  +          dataSource.fileManager.detach();
>  +
>  +        synchronized (Tablet.this) {
>  +          if (results != null && results.results != null) {
>  +            long more = results.results.size();
>  +            queryCount += more;
>  +            queryBytes += results.numBytes;
>  +          }
>  +        }
>  +      }
>  +    }
>  +
>  +    // close and read are synchronized because can not call close on the
> data source while it is in use
>  +    // this cloud lead to the case where file iterators that are in use
> by a thread are returned
>  +    // to the pool... this would be bad
>  +    void close() {
>  +      options.interruptFlag.set(true);
>  +      synchronized (this) {
>  +        scanClosed = true;
>  +        if (isolatedDataSource != null)
>  +          isolatedDataSource.close(false);
>  +      }
>  +    }
>  +  }
>  +
>  +  static class ScanOptions {
>  +
>  +    // scan options
>  +    Authorizations authorizations;
>  +    byte[] defaultLabels;
>  +    Set<Column> columnSet;
>  +    List<IterInfo> ssiList;
>  +    Map<String,Map<String,String>> ssio;
>  +    AtomicBoolean interruptFlag;
>  +    int num;
>  +    boolean isolated;
>  +
>  +    ScanOptions(int num, Authorizations authorizations, byte[]
> defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList,
>  +        Map<String,Map<String,String>> ssio, AtomicBoolean
> interruptFlag, boolean isolated) {
>  +      this.num = num;
>  +      this.authorizations = authorizations;
>  +      this.defaultLabels = defaultLabels;
>  +      this.columnSet = columnSet;
>  +      this.ssiList = ssiList;
>  +      this.ssio = ssio;
>  +      this.interruptFlag = interruptFlag;
>  +      this.isolated = isolated;
>  +    }
>  +
>  +  }
>  +
>  +  class ScanDataSource implements DataSource {
>  +
>  +    // data source state
>  +    private ScanFileManager fileManager;
>  +    private SortedKeyValueIterator<Key,Value> iter;
>  +    private long expectedDeletionCount;
>  +    private List<MemoryIterator> memIters = null;
>  +    private long fileReservationId;
>  +    private AtomicBoolean interruptFlag;
>  +    private StatsIterator statsIterator;
>  +
>  +    ScanOptions options;
>  +
>  +    ScanDataSource(Authorizations authorizations, byte[] defaultLabels,
> HashSet<Column> columnSet, List<IterInfo> ssiList,
> Map<String,Map<String,String>> ssio,
>  +        AtomicBoolean interruptFlag) {
>  +      expectedDeletionCount = dataSourceDeletions.get();
>  +      this.options = new ScanOptions(-1, authorizations, defaultLabels,
> columnSet, ssiList, ssio, interruptFlag, false);
>  +      this.interruptFlag = interruptFlag;
>  +    }
>  +
>  +    ScanDataSource(ScanOptions options) {
>  +      expectedDeletionCount = dataSourceDeletions.get();
>  +      this.options = options;
>  +      this.interruptFlag = options.interruptFlag;
>  +    }
>  +
>  +    @Override
>  +    public DataSource getNewDataSource() {
>  +      if (!isCurrent()) {
>  +        // log.debug("Switching data sources during a scan");
>  +        if (memIters != null) {
>  +          tabletMemory.returnIterators(memIters);
>  +          memIters = null;
>  +          datafileManager.returnFilesForScan(fileReservationId);
>  +          fileReservationId = -1;
>  +        }
>  +
>  +        if (fileManager != null)
>  +          fileManager.releaseOpenFiles(false);
>  +
>  +        expectedDeletionCount = dataSourceDeletions.get();
>  +        iter = null;
>  +
>  +        return this;
>  +      } else
>  +        return this;
>  +    }
>  +
>  +    @Override
>  +    public boolean isCurrent() {
>  +      return expectedDeletionCount == dataSourceDeletions.get();
>  +    }
>  +
>  +    @Override
>  +    public SortedKeyValueIterator<Key,Value> iterator() throws
> IOException {
>  +      if (iter == null)
>  +        iter = createIterator();
>  +      return iter;
>  +    }
>  +
>  +    private SortedKeyValueIterator<Key,Value> createIterator() throws
> IOException {
>  +
>  +      Map<FileRef,DataFileValue> files;
>  +
>  +      synchronized (Tablet.this) {
>  +
>  +        if (memIters != null)
>  +          throw new IllegalStateException("Tried to create new scan
> iterator w/o releasing memory");
>  +
>  +        if (Tablet.this.closed)
>  +          throw new TabletClosedException();
>  +
>  +        if (interruptFlag.get())
>  +          throw new IterationInterruptedException(extent.toString() + "
> " + interruptFlag.hashCode());
>  +
>  +        // only acquire the file manager when we know the tablet is open
>  +        if (fileManager == null) {
>  +          fileManager = tabletResources.newScanFileManager();
>  +          activeScans.add(this);
>  +        }
>  +
>  +        if (fileManager.getNumOpenFiles() != 0)
>  +          throw new IllegalStateException("Tried to create new scan
> iterator w/o releasing files");
>  +
>  +        // set this before trying to get iterators in case
>  +        // getIterators() throws an exception
>  +        expectedDeletionCount = dataSourceDeletions.get();
>  +
>  +        memIters = tabletMemory.getIterators();
>  +        Pair<Long,Map<FileRef,DataFileValue>> reservation =
> datafileManager.reserveFilesForScan();
>  +        fileReservationId = reservation.getFirst();
>  +        files = reservation.getSecond();
>  +      }
>  +
>  +      Collection<InterruptibleIterator> mapfiles =
> fileManager.openFiles(files, options.isolated);
>  +
>  +      List<SortedKeyValueIterator<Key,Value>> iters = new
> ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size() +
> memIters.size());
>  +
>  +      iters.addAll(mapfiles);
>  +      iters.addAll(memIters);
>  +
>  +      for (SortedKeyValueIterator<Key,Value> skvi : iters)
>  +        ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag);
>  +
>  +      MultiIterator multiIter = new MultiIterator(iters, extent);
>  +
>  +      TabletIteratorEnvironment iterEnv = new
> TabletIteratorEnvironment(IteratorScope.scan, acuTableConf, fileManager,
> files);
>  +
>  +      statsIterator = new StatsIterator(multiIter,
> TabletServer.seekCount, scannedCount);
>  +
>  +      DeletingIterator delIter = new DeletingIterator(statsIterator,
> false);
>  +
>  +      ColumnFamilySkippingIterator cfsi = new
> ColumnFamilySkippingIterator(delIter);
>  +
>  +      ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi,
> options.columnSet);
>  +
>  +      VisibilityFilter visFilter = new VisibilityFilter(colFilter,
> options.authorizations, options.defaultLabels);
>  +
>  +      return iterEnv.getTopLevelIterator(IteratorUtil
>  +          .loadIterators(IteratorScope.scan, visFilter, extent,
> acuTableConf, options.ssiList, options.ssio, iterEnv));
>  +    }
>  +
>  +    private void close(boolean sawErrors) {
>  +
>  +      if (memIters != null) {
>  +        tabletMemory.returnIterators(memIters);
>  +        memIters = null;
>  +        datafileManager.returnFilesForScan(fileReservationId);
>  +        fileReservationId = -1;
>  +      }
>  +
>  +      synchronized (Tablet.this) {
>  +        activeScans.remove(this);
>  +        if (activeScans.size() == 0)
>  +          Tablet.this.notifyAll();
>  +      }
>  +
>  +      if (fileManager != null) {
>  +        fileManager.releaseOpenFiles(sawErrors);
>  +        fileManager = null;
>  +      }
>  +
>  +      if (statsIterator != null) {
>  +        statsIterator.report();
>  +      }
>  +
>  +    }
>  +
>  +    public void interrupt() {
>  +      interruptFlag.set(true);
>  +    }
>  +
>  +    @Override
>  +    public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
>  +      throw new UnsupportedOperationException();
>  +    }
>  +
>  +  }
>  +
>  +  private DataFileValue minorCompact(Configuration conf, VolumeManager
> fs, InMemoryMap memTable, FileRef tmpDatafile, FileRef newDatafile, FileRef
> mergeFile,
>  +      boolean hasQueueTime, long queued, CommitSession commitSession,
> long flushId, MinorCompactionReason mincReason) {
>  +    boolean failed = false;
>  +    long start = System.currentTimeMillis();
>  +    timer.incrementStatusMinor();
>  +
>  +    long count = 0;
>  +
>  +    try {
>  +      Span span = Trace.start("write");
>  +      CompactionStats stats;
>  +      try {
>  +        count = memTable.getNumEntries();
>  +
>  +        DataFileValue dfv = null;
>  +        if (mergeFile != null)
>  +          dfv = datafileManager.getDatafileSizes().get(mergeFile);
>  +
>  +        MinorCompactor compactor = new MinorCompactor(conf, fs,
> memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent, mincReason);
>  +        stats = compactor.call();
>  +      } finally {
>  +        span.stop();
>  +      }
>  +      span = Trace.start("bringOnline");
>  +      try {
>  +        datafileManager.bringMinorCompactionOnline(tmpDatafile,
> newDatafile, mergeFile, new DataFileValue(stats.getFileSize(),
> stats.getEntriesWritten()),
>  +            commitSession, flushId);
>  +      } finally {
>  +        span.stop();
>  +      }
>  +      return new DataFileValue(stats.getFileSize(),
> stats.getEntriesWritten());
>  +    } catch (Exception E) {
>  +      failed = true;
>  +      throw new RuntimeException(E);
>  +    } catch (Error E) {
>  +      // Weird errors like "OutOfMemoryError" when trying to create the
> thread for the compaction
>  +      failed = true;
>  +      throw new RuntimeException(E);
>  +    } finally {
>  +      try {
>  +        tabletMemory.finalizeMinC();
>  +      } catch (Throwable t) {
>  +        log.error("Failed to free tablet memory", t);
>  +      }
>  +
>  +      if (!failed) {
>  +        lastMinorCompactionFinishTime = System.currentTimeMillis();
>  +      }
>  +      if (tabletServer.mincMetrics.isEnabled())
>  +        tabletServer.mincMetrics.add(TabletServerMinCMetrics.minc,
> (lastMinorCompactionFinishTime - start));
>  +      if (hasQueueTime) {
>  +        timer.updateTime(Operation.MINOR, queued, start, count, failed);
>  +        if (tabletServer.mincMetrics.isEnabled())
>  +          tabletServer.mincMetrics.add(TabletServerMinCMetrics.queue,
> (start - queued));
>  +      } else
>  +        timer.updateTime(Operation.MINOR, start, count, failed);
>  +    }
>  +  }
>  +
>  +  private class MinorCompactionTask implements Runnable {
>  +
>  +    private long queued;
>  +    private CommitSession commitSession;
>  +    private DataFileValue stats;
>  +    private FileRef mergeFile;
>  +    private long flushId;
>  +    private MinorCompactionReason mincReason;
>  +
>  +    MinorCompactionTask(FileRef mergeFile, CommitSession commitSession,
> long flushId, MinorCompactionReason mincReason) {
>  +      queued = System.currentTimeMillis();
>  +      minorCompactionWaitingToStart = true;
>  +      this.commitSession = commitSession;
>  +      this.mergeFile = mergeFile;
>  +      this.flushId = flushId;
>  +      this.mincReason = mincReason;
>  +    }
>  +
>  +    @Override
>  +    public void run() {
>  +      minorCompactionWaitingToStart = false;
>  +      minorCompactionInProgress = true;
>  +      Span minorCompaction = Trace.on("minorCompaction");
>  +      try {
>  +        FileRef newMapfileLocation = getNextMapFilename(mergeFile ==
> null ? "F" : "M");
>  +        FileRef tmpFileRef = new FileRef(newMapfileLocation.path() +
> "_tmp");
>  +        Span span = Trace.start("waitForCommits");
>  +        synchronized (Tablet.this) {
>  +          commitSession.waitForCommitsToFinish();
>  +        }
>  +        span.stop();
>  +        span = Trace.start("start");
>  +        while (true) {
>  +          try {
>  +            // the purpose of the minor compaction start event is to
> keep track of the filename... in the case
>  +            // where the metadata table write for the minor compaction
> finishes and the process dies before
>  +            // writing the minor compaction finish event, then the start
> event+filename in metadata table will
>  +            // prevent recovery of duplicate data... the minor
> compaction start event could be written at any time
>  +            // before the metadata write for the minor compaction
>  +            tabletServer.minorCompactionStarted(commitSession,
> commitSession.getWALogSeq() + 1, newMapfileLocation.path().toString());
>  +            break;
>  +          } catch (IOException e) {
>  +            log.warn("Failed to write to write ahead log " +
> e.getMessage(), e);
>  +          }
>  +        }
>  +        span.stop();
>  +        span = Trace.start("compact");
>  +        this.stats = minorCompact(conf, fs,
> tabletMemory.getMinCMemTable(), tmpFileRef, newMapfileLocation, mergeFile,
> true, queued, commitSession, flushId,
>  +            mincReason);
>  +        span.stop();
>  +
>  +        if (needsSplit()) {
>  +          tabletServer.executeSplit(Tablet.this);
>  +        } else {
>  +          initiateMajorCompaction(MajorCompactionReason.NORMAL);
>  +        }
>  +      } catch (Throwable t) {
>  +        log.error("Unknown error during minor compaction for extent: " +
> getExtent(), t);
>  +        throw new RuntimeException(t);
>  +      } finally {
>  +        minorCompactionInProgress = false;
>  +        minorCompaction.data("extent", extent.toString());
>  +        minorCompaction.data("numEntries",
> Long.toString(this.stats.getNumEntries()));
>  +        minorCompaction.data("size",
> Long.toString(this.stats.getSize()));
>  +        minorCompaction.stop();
>  +      }
>  +    }
>  +  }
>  +
>  +  private synchronized MinorCompactionTask prepareForMinC(long flushId,
> MinorCompactionReason mincReason) {
>  +    CommitSession oldCommitSession = tabletMemory.prepareForMinC();
>  +    otherLogs = currentLogs;
>  +    currentLogs = new HashSet<DfsLogger>();
>  +
>  +    FileRef mergeFile =
> datafileManager.reserveMergingMinorCompactionFile();
>  +
>  +    return new MinorCompactionTask(mergeFile, oldCommitSession, flushId,
> mincReason);
>  +
>  +  }
>  +
>  +  void flush(long tableFlushID) {
>  +    boolean updateMetadata = false;
>  +    boolean initiateMinor = false;
>  +
>  +    try {
>  +
>  +      synchronized (this) {
>  +
>  +        // only want one thing at a time to update flush ID to ensure
> that metadata table and tablet in memory state are consistent
>  +        if (updatingFlushID)
>  +          return;
>  +
>  +        if (lastFlushID >= tableFlushID)
>  +          return;
>  +
>  +        if (closing || closed || tabletMemory.memoryReservedForMinC())
>  +          return;
>  +
>  +        if (tabletMemory.getMemTable().getNumEntries() == 0) {
>  +          lastFlushID = tableFlushID;
>  +          updatingFlushID = true;
>  +          updateMetadata = true;
>  +        } else
>  +          initiateMinor = true;
>  +      }
>  +
>  +      if (updateMetadata) {
>  +        Credentials creds = SystemCredentials.get();
>  +        // if multiple threads were allowed to update this outside of a
> sync block, then it would be
>  +        // a race condition
>  +        MetadataTableUtil.updateTabletFlushID(extent, tableFlushID,
> creds, tabletServer.getLock());
>  +      } else if (initiateMinor)
>  +        initiateMinorCompaction(tableFlushID,
> MinorCompactionReason.USER);
>  +
>  +    } finally {
>  +      if (updateMetadata) {
>  +        synchronized (this) {
>  +          updatingFlushID = false;
>  +          this.notifyAll();
>  +        }
>  +      }
>  +    }
>  +
>  +  }
>  +
>  +  boolean initiateMinorCompaction(MinorCompactionReason mincReason) {
>  +    if (isClosed()) {
>  +      // don't bother trying to get flush id if closed... could be
> closed after this check but that is ok... just trying to cut down on
> uneeded log messages....
>  +      return false;
>  +    }
>  +
>  +    // get the flush id before the new memmap is made available for write
>  +    long flushId;
>  +    try {
>  +      flushId = getFlushID();
>  +    } catch (NoNodeException e) {
>  +      log.info("Asked to initiate MinC when there was no flush id " +
> getExtent() + " " + e.getMessage());
>  +      return false;
>  +    }
>  +    return initiateMinorCompaction(flushId, mincReason);
>  +  }
>  +
>  +  boolean minorCompactNow(MinorCompactionReason mincReason) {
>  +    long flushId;
>  +    try {
>  +      flushId = getFlushID();
>  +    } catch (NoNodeException e) {
>  +      log.info("Asked to initiate MinC when there was no flush id " +
> getExtent() + " " + e.getMessage());
>  +      return false;
>  +    }
>  +    MinorCompactionTask mct = createMinorCompactionTask(flushId,
> mincReason);
>  +    if (mct == null)
>  +      return false;
>  +    mct.run();
>  +    return true;
>  +  }
>  +
>  +  boolean initiateMinorCompaction(long flushId, MinorCompactionReason
> mincReason) {
>  +    MinorCompactionTask mct = createMinorCompactionTask(flushId,
> mincReason);
>  +    if (mct == null)
>  +      return false;
>  +    tabletResources.executeMinorCompaction(mct);
>  +    return true;
>  +  }
>  +
>  +  private MinorCompactionTask createMinorCompactionTask(long flushId,
> MinorCompactionReason mincReason) {
>  +    MinorCompactionTask mct;
>  +    long t1, t2;
>  +
>  +    StringBuilder logMessage = null;
>  +
>  +    try {
>  +      synchronized (this) {
>  +        t1 = System.currentTimeMillis();
>  +
>  +        if (closing || closed || majorCompactionWaitingToStart ||
> tabletMemory.memoryReservedForMinC() ||
> tabletMemory.getMemTable().getNumEntries() == 0
>  +            || updatingFlushID) {
>  +
>  +          logMessage = new StringBuilder();
>  +
>  +          logMessage.append(extent.toString());
>  +          logMessage.append(" closing " + closing);
>  +          logMessage.append(" closed " + closed);
>  +          logMessage.append(" majorCompactionWaitingToStart " +
> majorCompactionWaitingToStart);
>  +          if (tabletMemory != null)
>  +            logMessage.append(" tabletMemory.memoryReservedForMinC() " +
> tabletMemory.memoryReservedForMinC());
>  +          if (tabletMemory != null && tabletMemory.getMemTable() != null)
>  +            logMessage.append(
>
> <TRUNCATED>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message