accumulo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Vines <vi...@apache.org>
Subject Re: [11/15] ACCUMULO-1940 do not expose a new tablet to the memory manager until after it is online
Date Sat, 30 Nov 2013 06:05:44 GMT
I like the git emails, so I know when people push, but having 23 emails of
diffs is fairly useless. Though, after seeing this one, it makes me wonder
if the diffs are just incorrect in the first place...


On Sat, Nov 30, 2013 at 12:15 AM, Josh Elser <josh.elser@gmail.com> wrote:

> Actually, I was kind of confused when I saw your commit*s* on this ticket.
> What did you actually do? You have two commits that do the same changes:
>
> 82477f08aa64e2a8a1cf7f6af0db5ce954801ac8 (in 1.4, 1.5 and 1.6)
> 9b6b9cf104ff332cffdd4900d8057557e64e0ec8 (only in 1.6)\
>
> I would've only expected to see one email with a diff, followed by 2
> "merge" emails, e.g.
>
> ----------------------------------------------------------------------
>  .../tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java  | 2 --
>  1 file changed, 2 deletions(-)
> ----------------------------------------------------------------------
>
> Although, I will admit that dealing with 3 active branches is a big pain.
> However, I don't know of a better way to handle this in a way that doesn't
> make Git super confused and thus limit us in being able to answer questions
> like "where was a problem introduced" (git-bisect) and "where does this
> change exist" (and not having multiple commits that perform the same
> changes).
>
> On 11/29/13, 8:31 PM, Eric Newton wrote:
>
>> I changed one line of this file... git seems to be having a conniption.  I
>> find the volume of git traffic to be so useless that I ignore it.
>>
>> Anyone else?
>>
>>
>>
>>
>> On Fri, Nov 29, 2013 at 1:24 PM, <ecn@apache.org> wrote:
>>
>>
>>> http://git-wip-us.apache.org/repos/asf/accumulo/blob/
>>> 9b6b9cf1/server/tserver/src/main/java/org/apache/accumulo/
>>> tserver/Tablet.java
>>> ----------------------------------------------------------------------
>>> diff --cc
>>> server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
>>> index ee3b243,0000000..fd76415
>>> mode 100644,000000..100644
>>> --- a/server/tserver/src/main/java/org/apache/accumulo/
>>> tserver/Tablet.java
>>> +++ b/server/tserver/src/main/java/org/apache/accumulo/
>>> tserver/Tablet.java
>>> @@@ -1,3868 -1,0 +1,3866 @@@
>>>   +/*
>>>   + * Licensed to the Apache Software Foundation (ASF) under one or more
>>>   + * contributor license agreements.  See the NOTICE file distributed
>>> with
>>>   + * this work for additional information regarding copyright ownership.
>>>   + * The ASF licenses this file to You under the Apache License, Version
>>> 2.0
>>>   + * (the "License"); you may not use this file except in compliance
>>> with
>>>   + * the License.  You may obtain a copy of the License at
>>>   + *
>>>   + *     http://www.apache.org/licenses/LICENSE-2.0
>>>   + *
>>>   + * Unless required by applicable law or agreed to in writing, software
>>>   + * distributed under the License is distributed on an "AS IS" BASIS,
>>>   + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>> implied.
>>>   + * See the License for the specific language governing permissions and
>>>   + * limitations under the License.
>>>   + */
>>>   +package org.apache.accumulo.tserver;
>>>   +
>>>   +import java.io.ByteArrayInputStream;
>>>   +import java.io.DataInputStream;
>>>   +import java.io.FileNotFoundException;
>>>   +import java.io.IOException;
>>>   +import java.util.ArrayList;
>>>   +import java.util.Arrays;
>>>   +import java.util.Collection;
>>>   +import java.util.Collections;
>>>   +import java.util.Comparator;
>>>   +import java.util.EnumSet;
>>>   +import java.util.HashMap;
>>>   +import java.util.HashSet;
>>>   +import java.util.Iterator;
>>>   +import java.util.List;
>>>   +import java.util.Map;
>>>   +import java.util.Map.Entry;
>>>   +import java.util.PriorityQueue;
>>>   +import java.util.Set;
>>>   +import java.util.SortedMap;
>>>   +import java.util.TreeMap;
>>>   +import java.util.TreeSet;
>>>   +import java.util.concurrent.atomic.AtomicBoolean;
>>>   +import java.util.concurrent.atomic.AtomicLong;
>>>   +import java.util.concurrent.atomic.AtomicReference;
>>>   +import java.util.concurrent.locks.ReentrantLock;
>>>   +
>>>   +import org.apache.accumulo.core.Constants;
>>>   +import org.apache.accumulo.core.client.Connector;
>>>   +import org.apache.accumulo.core.client.IteratorSetting;
>>>   +import org.apache.accumulo.core.client.impl.ScannerImpl;
>>>   +import org.apache.accumulo.core.conf.AccumuloConfiguration;
>>>   +import org.apache.accumulo.core.conf.ConfigurationCopy;
>>>   +import org.apache.accumulo.core.conf.ConfigurationObserver;
>>>   +import org.apache.accumulo.core.conf.Property;
>>>   +import org.apache.accumulo.core.constraints.Violations;
>>>   +import org.apache.accumulo.core.data.ByteSequence;
>>>   +import org.apache.accumulo.core.data.Column;
>>>   +import org.apache.accumulo.core.data.ColumnUpdate;
>>>   +import org.apache.accumulo.core.data.Key;
>>>   +import org.apache.accumulo.core.data.KeyExtent;
>>>   +import org.apache.accumulo.core.data.KeyValue;
>>>   +import org.apache.accumulo.core.data.Mutation;
>>>   +import org.apache.accumulo.core.data.Range;
>>>   +import org.apache.accumulo.core.data.Value;
>>>   +import org.apache.accumulo.core.data.thrift.IterInfo;
>>>   +import org.apache.accumulo.core.data.thrift.MapFileInfo;
>>>   +import org.apache.accumulo.core.file.FileOperations;
>>>   +import org.apache.accumulo.core.file.FileSKVIterator;
>>>   +import org.apache.accumulo.core.iterators.
>>> IterationInterruptedException;
>>>   +import org.apache.accumulo.core.iterators.IteratorEnvironment;
>>>   +import org.apache.accumulo.core.iterators.IteratorUtil;
>>>   +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
>>>   +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
>>>   +import
>>> org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
>>>   +import org.apache.accumulo.core.iterators.system.
>>> ColumnQualifierFilter;
>>>   +import org.apache.accumulo.core.iterators.system.DeletingIterator;
>>>   +import org.apache.accumulo.core.iterators.system.
>>> InterruptibleIterator;
>>>   +import org.apache.accumulo.core.iterators.system.MultiIterator;
>>>   +import org.apache.accumulo.core.iterators.system.
>>> SourceSwitchingIterator;
>>>   +import
>>> org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.
>>> DataSource;
>>>   +import org.apache.accumulo.core.iterators.system.StatsIterator;
>>>   +import org.apache.accumulo.core.iterators.system.VisibilityFilter;
>>>   +import org.apache.accumulo.core.master.thrift.TabletLoadState;
>>>   +import org.apache.accumulo.core.metadata.MetadataTable;
>>>   +import org.apache.accumulo.core.metadata.RootTable;
>>>   +import org.apache.accumulo.core.metadata.schema.DataFileValue;
>>>   +import
>>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
>>>   +import
>>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.
>>> DataFileColumnFamily;
>>>   +import
>>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.
>>> LogColumnFamily;
>>>   +import
>>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.
>>> ScanFileColumnFamily;
>>>   +import org.apache.accumulo.core.security.Authorizations;
>>>   +import org.apache.accumulo.core.security.ColumnVisibility;
>>>   +import org.apache.accumulo.core.security.Credentials;
>>>   +import org.apache.accumulo.core.tabletserver.log.LogEntry;
>>>   +import org.apache.accumulo.core.util.CachedConfiguration;
>>>   +import org.apache.accumulo.core.util.LocalityGroupUtil;
>>>   +import
>>> org.apache.accumulo.core.util.LocalityGroupUtil.
>>> LocalityGroupConfigurationError;
>>>   +import org.apache.accumulo.core.util.MapCounter;
>>>   +import org.apache.accumulo.core.util.Pair;
>>>   +import org.apache.accumulo.core.util.UtilWaitThread;
>>>   +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
>>>   +import org.apache.accumulo.server.ServerConstants;
>>>   +import org.apache.accumulo.server.client.HdfsZooInstance;
>>>   +import org.apache.accumulo.server.conf.TableConfiguration;
>>>   +import org.apache.accumulo.server.fs.FileRef;
>>>   +import org.apache.accumulo.server.fs.VolumeManager;
>>>   +import org.apache.accumulo.server.fs.VolumeManager.FileType;
>>>   +import org.apache.accumulo.server.fs.VolumeManagerImpl;
>>>   +import org.apache.accumulo.server.master.state.TServerInstance;
>>>   +import org.apache.accumulo.server.master.tableOps.
>>> CompactionIterators;
>>>   +import org.apache.accumulo.server.problems.ProblemReport;
>>>   +import org.apache.accumulo.server.problems.ProblemReports;
>>>   +import org.apache.accumulo.server.problems.ProblemType;
>>>   +import org.apache.accumulo.server.security.SystemCredentials;
>>>   +import org.apache.accumulo.server.tablets.TabletTime;
>>>   +import org.apache.accumulo.server.tablets.UniqueNameAllocator;
>>>   +import org.apache.accumulo.server.util.FileUtil;
>>>   +import org.apache.accumulo.server.util.MasterMetadataUtil;
>>>   +import org.apache.accumulo.server.util.MetadataTableUtil;
>>>   +import org.apache.accumulo.server.util.TabletOperations;
>>>   +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
>>>   +import org.apache.accumulo.start.classloader.vfs.
>>> AccumuloVFSClassLoader;
>>>   +import org.apache.accumulo.trace.instrument.Span;
>>>   +import org.apache.accumulo.trace.instrument.Trace;
>>>   +import org.apache.accumulo.tserver.Compactor.
>>> CompactionCanceledException;
>>>   +import org.apache.accumulo.tserver.Compactor.CompactionEnv;
>>>   +import org.apache.accumulo.tserver.FileManager.ScanFileManager;
>>>   +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
>>>   +import org.apache.accumulo.tserver.TabletServer.TservConstraintEnv;
>>>   +import
>>> org.apache.accumulo.tserver.TabletServerResourceManager.
>>> TabletResourceManager;
>>>   +import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
>>>   +import org.apache.accumulo.tserver.compaction.CompactionPlan;
>>>   +import org.apache.accumulo.tserver.compaction.CompactionStrategy;
>>>   +import org.apache.accumulo.tserver.compaction.
>>> DefaultCompactionStrategy;
>>>   +import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
>>>   +import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
>>>   +import org.apache.accumulo.tserver.compaction.WriteParameters;
>>>   +import org.apache.accumulo.tserver.constraints.ConstraintChecker;
>>>   +import org.apache.accumulo.tserver.log.DfsLogger;
>>>   +import org.apache.accumulo.tserver.log.MutationReceiver;
>>>   +import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
>>>   +import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
>>>   +import org.apache.commons.codec.DecoderException;
>>>   +import org.apache.commons.codec.binary.Hex;
>>>   +import org.apache.hadoop.conf.Configuration;
>>>   +import org.apache.hadoop.fs.FileStatus;
>>>   +import org.apache.hadoop.fs.FileSystem;
>>>   +import org.apache.hadoop.fs.Path;
>>>   +import org.apache.hadoop.io.Text;
>>>   +import org.apache.log4j.Logger;
>>>   +import org.apache.zookeeper.KeeperException;
>>>   +import org.apache.zookeeper.KeeperException.NoNodeException;
>>>   +
>>>   +/*
>>>   + * We need to be able to have the master tell a tabletServer to
>>>   + * close this file, and the tablet server to handle all pending client
>>> reads
>>>   + * before closing
>>>   + *
>>>   + */
>>>   +
>>>   +/**
>>>   + *
>>>   + * this class just provides an interface to read from a MapFile mostly
>>> takes care of reporting start and end keys
>>>   + *
>>>   + * need this because a single row extent can have multiple columns
>>> this
>>> manages all the columns (each handled by a store) for a single row-extent
>>>   + *
>>>   + *
>>>   + */
>>>   +
>>>   +public class Tablet {
>>>   +
>>>   +  enum MinorCompactionReason {
>>>   +    USER, SYSTEM, CLOSE
>>>   +  }
>>>   +
>>>   +  public class CommitSession {
>>>   +
>>>   +    private int seq;
>>>   +    private InMemoryMap memTable;
>>>   +    private int commitsInProgress;
>>>   +    private long maxCommittedTime = Long.MIN_VALUE;
>>>   +
>>>   +    private CommitSession(int seq, InMemoryMap imm) {
>>>   +      this.seq = seq;
>>>   +      this.memTable = imm;
>>>   +      commitsInProgress = 0;
>>>   +    }
>>>   +
>>>   +    public int getWALogSeq() {
>>>   +      return seq;
>>>   +    }
>>>   +
>>>   +    private void decrementCommitsInProgress() {
>>>   +      if (commitsInProgress < 1)
>>>   +        throw new IllegalStateException("commitsInProgress = " +
>>> commitsInProgress);
>>>   +
>>>   +      commitsInProgress--;
>>>   +      if (commitsInProgress == 0)
>>>   +        Tablet.this.notifyAll();
>>>   +    }
>>>   +
>>>   +    private void incrementCommitsInProgress() {
>>>   +      if (commitsInProgress < 0)
>>>   +        throw new IllegalStateException("commitsInProgress = " +
>>> commitsInProgress);
>>>   +
>>>   +      commitsInProgress++;
>>>   +    }
>>>   +
>>>   +    private void waitForCommitsToFinish() {
>>>   +      while (commitsInProgress > 0) {
>>>   +        try {
>>>   +          Tablet.this.wait(50);
>>>   +        } catch (InterruptedException e) {
>>>   +          log.warn(e, e);
>>>   +        }
>>>   +      }
>>>   +    }
>>>   +
>>>   +    public void abortCommit(List<Mutation> value) {
>>>   +      Tablet.this.abortCommit(this, value);
>>>   +    }
>>>   +
>>>   +    public void commit(List<Mutation> mutations) {
>>>   +      Tablet.this.commit(this, mutations);
>>>   +    }
>>>   +
>>>   +    public Tablet getTablet() {
>>>   +      return Tablet.this;
>>>   +    }
>>>   +
>>>   +    public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy,
>>> boolean mincFinish) {
>>>   +      return Tablet.this.beginUpdatingLogsUsed(memTable, copy,
>>> mincFinish);
>>>   +    }
>>>   +
>>>   +    public void finishUpdatingLogsUsed() {
>>>   +      Tablet.this.finishUpdatingLogsUsed();
>>>   +    }
>>>   +
>>>   +    public int getLogId() {
>>>   +      return logId;
>>>   +    }
>>>   +
>>>   +    public KeyExtent getExtent() {
>>>   +      return extent;
>>>   +    }
>>>   +
>>>   +    private void updateMaxCommittedTime(long time) {
>>>   +      maxCommittedTime = Math.max(time, maxCommittedTime);
>>>   +    }
>>>   +
>>>   +    private long getMaxCommittedTime() {
>>>   +      if (maxCommittedTime == Long.MIN_VALUE)
>>>   +        throw new IllegalStateException("Tried to read max committed
>>> time when it was never set");
>>>   +      return maxCommittedTime;
>>>   +    }
>>>   +
>>>   +  }
>>>   +
>>>   +  private class TabletMemory {
>>>   +    private InMemoryMap memTable;
>>>   +    private InMemoryMap otherMemTable;
>>>   +    private InMemoryMap deletingMemTable;
>>>   +    private int nextSeq = 1;
>>>   +    private CommitSession commitSession;
>>>   +
>>>   +    TabletMemory() {
>>>   +      try {
>>>   +        memTable = new InMemoryMap(acuTableConf);
>>>   +      } catch (LocalityGroupConfigurationError e) {
>>>   +        throw new RuntimeException(e);
>>>   +      }
>>>   +      commitSession = new CommitSession(nextSeq, memTable);
>>>   +      nextSeq += 2;
>>>   +    }
>>>   +
>>>   +    InMemoryMap getMemTable() {
>>>   +      return memTable;
>>>   +    }
>>>   +
>>>   +    InMemoryMap getMinCMemTable() {
>>>   +      return otherMemTable;
>>>   +    }
>>>   +
>>>   +    CommitSession prepareForMinC() {
>>>   +      if (otherMemTable != null) {
>>>   +        throw new IllegalStateException();
>>>   +      }
>>>   +
>>>   +      if (deletingMemTable != null) {
>>>   +        throw new IllegalStateException();
>>>   +      }
>>>   +
>>>   +      otherMemTable = memTable;
>>>   +      try {
>>>   +        memTable = new InMemoryMap(acuTableConf);
>>>   +      } catch (LocalityGroupConfigurationError e) {
>>>   +        throw new RuntimeException(e);
>>>   +      }
>>>   +
>>>   +      CommitSession oldCommitSession = commitSession;
>>>   +      commitSession = new CommitSession(nextSeq, memTable);
>>>   +      nextSeq += 2;
>>>   +
>>>   +
>>>   tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(
>>> ),
>>> otherMemTable.estimatedSizeInBytes());
>>>   +
>>>   +      return oldCommitSession;
>>>   +    }
>>>   +
>>>   +    void finishedMinC() {
>>>   +
>>>   +      if (otherMemTable == null) {
>>>   +        throw new IllegalStateException();
>>>   +      }
>>>   +
>>>   +      if (deletingMemTable != null) {
>>>   +        throw new IllegalStateException();
>>>   +      }
>>>   +
>>>   +      deletingMemTable = otherMemTable;
>>>   +
>>>   +      otherMemTable = null;
>>>   +      Tablet.this.notifyAll();
>>>   +    }
>>>   +
>>>   +    void finalizeMinC() {
>>>   +      try {
>>>   +        deletingMemTable.delete(15000);
>>>   +      } finally {
>>>   +        synchronized (Tablet.this) {
>>>   +          if (otherMemTable != null) {
>>>   +            throw new IllegalStateException();
>>>   +          }
>>>   +
>>>   +          if (deletingMemTable == null) {
>>>   +            throw new IllegalStateException();
>>>   +          }
>>>   +
>>>   +          deletingMemTable = null;
>>>   +
>>>   +
>>>   tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(),
>>> 0);
>>>   +        }
>>>   +      }
>>>   +    }
>>>   +
>>>   +    boolean memoryReservedForMinC() {
>>>   +      return otherMemTable != null || deletingMemTable != null;
>>>   +    }
>>>   +
>>>   +    void waitForMinC() {
>>>   +      while (otherMemTable != null || deletingMemTable != null) {
>>>   +        try {
>>>   +          Tablet.this.wait(50);
>>>   +        } catch (InterruptedException e) {
>>>   +          log.warn(e, e);
>>>   +        }
>>>   +      }
>>>   +    }
>>>   +
>>>   +    void mutate(CommitSession cm, List<Mutation> mutations) {
>>>   +      cm.memTable.mutate(mutations);
>>>   +    }
>>>   +
>>>   +    void updateMemoryUsageStats() {
>>>   +      long other = 0;
>>>   +      if (otherMemTable != null)
>>>   +        other = otherMemTable.estimatedSizeInBytes();
>>>   +      else if (deletingMemTable != null)
>>>   +        other = deletingMemTable.estimatedSizeInBytes();
>>>   +
>>>   +
>>>   tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(
>>> ),
>>> other);
>>>   +    }
>>>   +
>>>   +    List<MemoryIterator> getIterators() {
>>>   +      List<MemoryIterator> toReturn = new
>>> ArrayList<MemoryIterator>(2);
>>>   +      toReturn.add(memTable.skvIterator());
>>>   +      if (otherMemTable != null)
>>>   +        toReturn.add(otherMemTable.skvIterator());
>>>   +      return toReturn;
>>>   +    }
>>>   +
>>>   +    void returnIterators(List<MemoryIterator> iters) {
>>>   +      for (MemoryIterator iter : iters) {
>>>   +        iter.close();
>>>   +      }
>>>   +    }
>>>   +
>>>   +    public long getNumEntries() {
>>>   +      if (otherMemTable != null)
>>>   +        return memTable.getNumEntries() +
>>> otherMemTable.getNumEntries();
>>>   +      return memTable.getNumEntries();
>>>   +    }
>>>   +
>>>   +    CommitSession getCommitSession() {
>>>   +      return commitSession;
>>>   +    }
>>>   +  }
>>>   +
>>>   +  private TabletMemory tabletMemory;
>>>   +
>>>   +  private final TabletTime tabletTime;
>>>   +  private long persistedTime;
>>>   +  private final Object timeLock = new Object();
>>>   +
>>>   +  private final Path location; // absolute path of this tablets dir
>>>   +  private TServerInstance lastLocation;
>>>   +
>>>   +  private Configuration conf;
>>>   +  private VolumeManager fs;
>>>   +
>>>   +  private TableConfiguration acuTableConf;
>>>   +
>>>   +  private volatile boolean tableDirChecked = false;
>>>   +
>>>   +  private AtomicLong dataSourceDeletions = new AtomicLong(0);
>>>   +  private Set<ScanDataSource> activeScans = new
>>> HashSet<ScanDataSource>();
>>>   +
>>>   +  private volatile boolean closing = false;
>>>   +  private boolean closed = false;
>>>   +  private boolean closeComplete = false;
>>>   +
>>>   +  private long lastFlushID = -1;
>>>   +  private long lastCompactID = -1;
>>>   +
>>>   +  private KeyExtent extent;
>>>   +
>>>   +  private TabletResourceManager tabletResources;
>>>   +  final private DatafileManager datafileManager;
>>>   +  private volatile boolean majorCompactionInProgress = false;
>>>   +  private volatile boolean majorCompactionWaitingToStart = false;
>>>   +  private Set<MajorCompactionReason> majorCompactionQueued =
>>> Collections.synchronizedSet(EnumSet.noneOf(
>>> MajorCompactionReason.class));
>>>   +  private volatile boolean minorCompactionInProgress = false;
>>>   +  private volatile boolean minorCompactionWaitingToStart = false;
>>>   +
>>>   +  private boolean updatingFlushID = false;
>>>   +
>>>   +  private AtomicReference<ConstraintChecker> constraintChecker = new
>>> AtomicReference<ConstraintChecker>();
>>>   +
>>>   +  private final String tabletDirectory;
>>>   +
>>>   +  private int writesInProgress = 0;
>>>   +
>>>   +  private static final Logger log = Logger.getLogger(Tablet.class);
>>>   +  public TabletStatsKeeper timer;
>>>   +
>>>   +  private Rate queryRate = new Rate(0.2);
>>>   +  private long queryCount = 0;
>>>   +
>>>   +  private Rate queryByteRate = new Rate(0.2);
>>>   +  private long queryBytes = 0;
>>>   +
>>>   +  private Rate ingestRate = new Rate(0.2);
>>>   +  private long ingestCount = 0;
>>>   +
>>>   +  private Rate ingestByteRate = new Rate(0.2);
>>>   +  private long ingestBytes = 0;
>>>   +
>>>   +  private byte[] defaultSecurityLabel = new byte[0];
>>>   +
>>>   +  private long lastMinorCompactionFinishTime;
>>>   +  private long lastMapFileImportTime;
>>>   +
>>>   +  private volatile long numEntries;
>>>   +  private volatile long numEntriesInMemory;
>>>   +
>>>   +  // a count of the amount of data read by the iterators
>>>   +  private AtomicLong scannedCount = new AtomicLong(0);
>>>   +  private Rate scannedRate = new Rate(0.2);
>>>   +
>>>   +  private ConfigurationObserver configObserver;
>>>   +
>>>   +  private TabletServer tabletServer;
>>>   +
>>>   +  private final int logId;
>>>   +  // ensure we only have one reader/writer of our bulk file notes at
>>> at
>>> time
>>>   +  public final Object bulkFileImportLock = new Object();
>>>   +
>>>   +  public int getLogId() {
>>>   +    return logId;
>>>   +  }
>>>   +
>>>   +  public static class TabletClosedException extends RuntimeException {
>>>   +    public TabletClosedException(Exception e) {
>>>   +      super(e);
>>>   +    }
>>>   +
>>>   +    public TabletClosedException() {
>>>   +      super();
>>>   +    }
>>>   +
>>>   +    private static final long serialVersionUID = 1L;
>>>   +  }
>>>   +
>>>   +  FileRef getNextMapFilename(String prefix) throws IOException {
>>>   +    String extension =
>>> FileOperations.getNewFileExtension(tabletServer.
>>> getTableConfiguration(extent));
>>>   +    checkTabletDir();
>>>   +    return new FileRef(location.toString() + "/" + prefix +
>>> UniqueNameAllocator.getInstance().getNextName() + "." + extension);
>>>   +  }
>>>   +
>>>   +  private void checkTabletDir() throws IOException {
>>>   +    if (!tableDirChecked) {
>>>   +      checkTabletDir(this.location);
>>>   +      tableDirChecked = true;
>>>   +    }
>>>   +  }
>>>   +
>>>   +  private void checkTabletDir(Path tabletDir) throws IOException {
>>>   +
>>>   +    FileStatus[] files = null;
>>>   +    try {
>>>   +      files = fs.listStatus(tabletDir);
>>>   +    } catch (FileNotFoundException ex) {
>>>   +      // ignored
>>>   +    }
>>>   +
>>>   +    if (files == null) {
>>>   +      if (tabletDir.getName().startsWith("c-"))
>>>   +        log.debug("Tablet " + extent + " had no dir, creating " +
>>> tabletDir); // its a clone dir...
>>>   +      else
>>>   +        log.warn("Tablet " + extent + " had no dir, creating " +
>>> tabletDir);
>>>   +
>>>   +      fs.mkdirs(tabletDir);
>>>   +    }
>>>   +  }
>>>   +
>>>   +  class DatafileManager {
>>>   +    // access to datafilesizes needs to be synchronized: see
>>> CompactionRunner#getNumFiles
>>>   +    final private Map<FileRef,DataFileValue> datafileSizes =
>>> Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>());
>>>   +
>>>   +    DatafileManager(SortedMap<FileRef,DataFileValue> datafileSizes) {
>>>   +      for (Entry<FileRef,DataFileValue> datafiles :
>>> datafileSizes.entrySet())
>>>   +        this.datafileSizes.put(datafiles.getKey(),
>>> datafiles.getValue());
>>>   +    }
>>>   +
>>>   +    FileRef mergingMinorCompactionFile = null;
>>>   +    Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>();
>>>   +    Map<Long,Set<FileRef>> scanFileReservations = new
>>> HashMap<Long,Set<FileRef>>();
>>>   +    MapCounter<FileRef> fileScanReferenceCounts = new
>>> MapCounter<FileRef>();
>>>   +    long nextScanReservationId = 0;
>>>   +    boolean reservationsBlocked = false;
>>>   +
>>>   +    Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
>>>   +
>>>   +    Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() {
>>>   +      synchronized (Tablet.this) {
>>>   +
>>>   +        while (reservationsBlocked) {
>>>   +          try {
>>>   +            Tablet.this.wait(50);
>>>   +          } catch (InterruptedException e) {
>>>   +            log.warn(e, e);
>>>   +          }
>>>   +        }
>>>   +
>>>   +        Set<FileRef> absFilePaths = new
>>> HashSet<FileRef>(datafileSizes.keySet());
>>>   +
>>>   +        long rid = nextScanReservationId++;
>>>   +
>>>   +        scanFileReservations.put(rid, absFilePaths);
>>>   +
>>>   +        Map<FileRef,DataFileValue> ret = new
>>> HashMap<FileRef,DataFileValue>();
>>>   +
>>>   +        for (FileRef path : absFilePaths) {
>>>   +          fileScanReferenceCounts.increment(path, 1);
>>>   +          ret.put(path, datafileSizes.get(path));
>>>   +        }
>>>   +
>>>   +        return new Pair<Long,Map<FileRef,DataFileValue>>(rid, ret);
>>>   +      }
>>>   +    }
>>>   +
>>>   +    void returnFilesForScan(Long reservationId) {
>>>   +
>>>   +      final Set<FileRef> filesToDelete = new HashSet<FileRef>();
>>>   +
>>>   +      synchronized (Tablet.this) {
>>>   +        Set<FileRef> absFilePaths =
>>> scanFileReservations.remove(reservationId);
>>>   +
>>>   +        if (absFilePaths == null)
>>>   +          throw new IllegalArgumentException("Unknown scan
>>> reservation
>>> id " + reservationId);
>>>   +
>>>   +        boolean notify = false;
>>>   +        for (FileRef path : absFilePaths) {
>>>   +          long refCount = fileScanReferenceCounts.decrement(path, 1);
>>>   +          if (refCount == 0) {
>>>   +            if (filesToDeleteAfterScan.remove(path))
>>>   +              filesToDelete.add(path);
>>>   +            notify = true;
>>>   +          } else if (refCount < 0)
>>>   +            throw new IllegalStateException("Scan ref count for " +
>>> path
>>> + " is " + refCount);
>>>   +        }
>>>   +
>>>   +        if (notify)
>>>   +          Tablet.this.notifyAll();
>>>   +      }
>>>   +
>>>   +      if (filesToDelete.size() > 0) {
>>>   +        log.debug("Removing scan refs from metadata " + extent + " " +
>>> filesToDelete);
>>>   +        MetadataTableUtil.removeScanFiles(extent, filesToDelete,
>>> SystemCredentials.get(), tabletServer.getLock());
>>>   +      }
>>>   +    }
>>>   +
>>>   +    private void removeFilesAfterScan(Set<FileRef> scanFiles) {
>>>   +      if (scanFiles.size() == 0)
>>>   +        return;
>>>   +
>>>   +      Set<FileRef> filesToDelete = new HashSet<FileRef>();
>>>   +
>>>   +      synchronized (Tablet.this) {
>>>   +        for (FileRef path : scanFiles) {
>>>   +          if (fileScanReferenceCounts.get(path) == 0)
>>>   +            filesToDelete.add(path);
>>>   +          else
>>>   +            filesToDeleteAfterScan.add(path);
>>>   +        }
>>>   +      }
>>>   +
>>>   +      if (filesToDelete.size() > 0) {
>>>   +        log.debug("Removing scan refs from metadata " + extent + " " +
>>> filesToDelete);
>>>   +        MetadataTableUtil.removeScanFiles(extent, filesToDelete,
>>> SystemCredentials.get(), tabletServer.getLock());
>>>   +      }
>>>   +    }
>>>   +
>>>   +    private TreeSet<FileRef> waitForScansToFinish(Set<FileRef>
>>> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
>>>   +      long startTime = System.currentTimeMillis();
>>>   +      TreeSet<FileRef> inUse = new TreeSet<FileRef>();
>>>   +
>>>   +      Span waitForScans = Trace.start("waitForScans");
>>>   +      try {
>>>   +        synchronized (Tablet.this) {
>>>   +          if (blockNewScans) {
>>>   +            if (reservationsBlocked)
>>>   +              throw new IllegalStateException();
>>>   +
>>>   +            reservationsBlocked = true;
>>>   +          }
>>>   +
>>>   +          for (FileRef path : pathsToWaitFor) {
>>>   +            while (fileScanReferenceCounts.get(path) > 0 &&
>>> System.currentTimeMillis() - startTime < maxWaitTime) {
>>>   +              try {
>>>   +                Tablet.this.wait(100);
>>>   +              } catch (InterruptedException e) {
>>>   +                log.warn(e, e);
>>>   +              }
>>>   +            }
>>>   +          }
>>>   +
>>>   +          for (FileRef path : pathsToWaitFor) {
>>>   +            if (fileScanReferenceCounts.get(path) > 0)
>>>   +              inUse.add(path);
>>>   +          }
>>>   +
>>>   +          if (blockNewScans) {
>>>   +            reservationsBlocked = false;
>>>   +            Tablet.this.notifyAll();
>>>   +          }
>>>   +
>>>   +        }
>>>   +      } finally {
>>>   +        waitForScans.stop();
>>>   +      }
>>>   +      return inUse;
>>>   +    }
>>>   +
>>>   +    public void importMapFiles(long tid, Map<FileRef,DataFileValue>
>>> pathsString, boolean setTime) throws IOException {
>>>   +
>>>   +      String bulkDir = null;
>>>   +
>>>   +      Map<FileRef,DataFileValue> paths = new
>>> HashMap<FileRef,DataFileValue>();
>>>   +      for (Entry<FileRef,DataFileValue> entry :
>>> pathsString.entrySet())
>>>   +        paths.put(entry.getKey(), entry.getValue());
>>>   +
>>>   +      for (FileRef tpath : paths.keySet()) {
>>>   +
>>>   +        boolean inTheRightDirectory = false;
>>>   +        Path parent = tpath.path().getParent().getParent();
>>>   +        for (String tablesDir : ServerConstants.getTablesDirs()) {
>>>   +          if (parent.equals(new Path(tablesDir,
>>> extent.getTableId().toString()))) {
>>>   +            inTheRightDirectory = true;
>>>   +            break;
>>>   +          }
>>>   +        }
>>>   +        if (!inTheRightDirectory) {
>>>   +          throw new IOException("Data file " + tpath + " not in table
>>> dirs");
>>>   +        }
>>>   +
>>>   +        if (bulkDir == null)
>>>   +          bulkDir = tpath.path().getParent().toString();
>>>   +        else if (!bulkDir.equals(tpath.path().
>>> getParent().toString()))
>>>   +          throw new IllegalArgumentException("bulk files in different
>>> dirs " + bulkDir + " " + tpath);
>>>   +
>>>   +      }
>>>   +
>>>   +      if (extent.isRootTablet()) {
>>>   +        throw new IllegalArgumentException("Can not import files to
>>> root
>>> tablet");
>>>   +      }
>>>   +
>>>   +      synchronized (bulkFileImportLock) {
>>>   +        Credentials creds = SystemCredentials.get();
>>>   +        Connector conn;
>>>   +        try {
>>>   +          conn =
>>> HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(),
>>> creds.getToken());
>>>   +        } catch (Exception ex) {
>>>   +          throw new IOException(ex);
>>>   +        }
>>>   +        // Remove any bulk files we've previously loaded and compacted
>>> away
>>>   +        List<FileRef> files = MetadataTableUtil.
>>> getBulkFilesLoaded(conn,
>>> extent, tid);
>>>   +
>>>   +        for (FileRef file : files)
>>>   +          if (paths.keySet().remove(file.path()))
>>>   +            log.debug("Ignoring request to re-import a file already
>>> imported: " + extent + ": " + file);
>>>   +
>>>   +        if (paths.size() > 0) {
>>>   +          long bulkTime = Long.MIN_VALUE;
>>>   +          if (setTime) {
>>>   +            for (DataFileValue dfv : paths.values()) {
>>>   +              long nextTime = tabletTime.getAndUpdateTime();
>>>   +              if (nextTime < bulkTime)
>>>   +                throw new IllegalStateException("Time went backwards
>>> unexpectedly " + nextTime + " " + bulkTime);
>>>   +              bulkTime = nextTime;
>>>   +              dfv.setTime(bulkTime);
>>>   +            }
>>>   +          }
>>>   +
>>>   +          synchronized (timeLock) {
>>>   +            if (bulkTime > persistedTime)
>>>   +              persistedTime = bulkTime;
>>>   +
>>>   +            MetadataTableUtil.updateTabletDataFile(tid, extent,
>>> paths,
>>> tabletTime.getMetadataValue(persistedTime), creds,
>>> tabletServer.getLock());
>>>   +          }
>>>   +        }
>>>   +      }
>>>   +
>>>   +      synchronized (Tablet.this) {
>>>   +        for (Entry<FileRef,DataFileValue> tpath : paths.entrySet()) {
>>>   +          if (datafileSizes.containsKey(tpath.getKey())) {
>>>   +            log.error("Adding file that is already in set " +
>>> tpath.getKey());
>>>   +          }
>>>   +          datafileSizes.put(tpath.getKey(), tpath.getValue());
>>>   +
>>>   +        }
>>>   +
>>>   +        tabletResources.importedMapFiles();
>>>   +
>>>   +        computeNumEntries();
>>>   +      }
>>>   +
>>>   +      for (FileRef tpath : paths.keySet()) {
>>>   +        log.log(TLevel.TABLET_HIST, extent + " import " + tpath + " "
>>> +
>>> paths.get(tpath));
>>>   +      }
>>>   +    }
>>>   +
>>>   +    FileRef reserveMergingMinorCompactionFile() {
>>>   +      if (mergingMinorCompactionFile != null)
>>>   +        throw new IllegalStateException("Tried to reserve merging
>>> minor
>>> compaction file when already reserved  : " + mergingMinorCompactionFile);
>>>   +
>>>   +      if (extent.isRootTablet())
>>>   +        return null;
>>>   +
>>>   +      int maxFiles = acuTableConf.getMaxFilesPerTablet();
>>>   +
>>>   +      // when a major compaction is running and we are at max files,
>>> write out
>>>   +      // one extra file... want to avoid the case where major
>>> compaction
>>> is
>>>   +      // compacting everything except for the largest file, and
>>> therefore the
>>>   +      // largest file is returned for merging.. the following check
>>> mostly
>>>   +      // avoids this case, except for the case where major compactions
>>> fail or
>>>   +      // are canceled
>>>   +      if (majorCompactingFiles.size() > 0 && datafileSizes.size() ==
>>> maxFiles)
>>>   +        return null;
>>>   +
>>>   +      if (datafileSizes.size() >= maxFiles) {
>>>   +        // find the smallest file
>>>   +
>>>   +        long min = Long.MAX_VALUE;
>>>   +        FileRef minName = null;
>>>   +
>>>   +        for (Entry<FileRef,DataFileValue> entry :
>>> datafileSizes.entrySet()) {
>>>   +          if (entry.getValue().getSize() < min &&
>>> !majorCompactingFiles.contains(entry.getKey())) {
>>>   +            min = entry.getValue().getSize();
>>>   +            minName = entry.getKey();
>>>   +          }
>>>   +        }
>>>   +
>>>   +        if (minName == null)
>>>   +          return null;
>>>   +
>>>   +        mergingMinorCompactionFile = minName;
>>>   +        return minName;
>>>   +      }
>>>   +
>>>   +      return null;
>>>   +    }
>>>   +
>>>   +    void unreserveMergingMinorCompactionFile(FileRef file) {
>>>   +      if ((file == null && mergingMinorCompactionFile != null) ||
>>> (file
>>> != null && mergingMinorCompactionFile == null)
>>>   +          || (file != null && mergingMinorCompactionFile != null &&
>>> !file.equals(mergingMinorCompactionFile)))
>>>   +        throw new IllegalStateException("Disagreement " + file + " "
>>> +
>>> mergingMinorCompactionFile);
>>>   +
>>>   +      mergingMinorCompactionFile = null;
>>>   +    }
>>>   +
>>>   +    void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef
>>> newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession
>>> commitSession, long flushId)
>>>   +        throws IOException {
>>>   +
>>>   +      IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
>>>   +      if (extent.isRootTablet()) {
>>>   +        try {
>>>   +          if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) {
>>>   +            throw new IllegalStateException();
>>>   +          }
>>>   +        } catch (Exception e) {
>>>   +          throw new IllegalStateException("Can not bring major
>>> compaction online, lock not held", e);
>>>   +        }
>>>   +      }
>>>   +
>>>   +      // rename before putting in metadata table, so files in metadata
>>> table should
>>>   +      // always exist
>>>   +      do {
>>>   +        try {
>>>   +          if (dfv.getNumEntries() == 0) {
>>>   +            fs.deleteRecursively(tmpDatafile.path());
>>>   +          } else {
>>>   +            if (fs.exists(newDatafile.path())) {
>>>   +              log.warn("Target map file already exist " +
>>> newDatafile);
>>>   +              fs.deleteRecursively(newDatafile.path());
>>>   +            }
>>>   +
>>>   +            if (!fs.rename(tmpDatafile.path(), newDatafile.path())) {
>>>   +              throw new IOException("rename fails");
>>>   +            }
>>>   +          }
>>>   +          break;
>>>   +        } catch (IOException ioe) {
>>>   +          log.warn("Tablet " + extent + " failed to rename " +
>>> newDatafile + " after MinC, will retry in 60 secs...", ioe);
>>>   +          UtilWaitThread.sleep(60 * 1000);
>>>   +        }
>>>   +      } while (true);
>>>   +
>>>   +      long t1, t2;
>>>   +
>>>   +      // the code below always assumes merged files are in use by
>>> scans... this must be done
>>>   +      // because the in memory list of files is not updated until
>>> after
>>> the metadata table
>>>   +      // therefore the file is available to scans until memory is
>>> updated, but want to ensure
>>>   +      // the file is not available for garbage collection... if memory
>>> were updated
>>>   +      // before this point (like major compactions do), then the
>>> following code could wait
>>>   +      // for scans to finish like major compactions do.... used to
>>> wait
>>> for scans to finish
>>>   +      // here, but that was incorrect because a scan could start after
>>> waiting but before
>>>   +      // memory was updated... assuming the file is always in use by
>>> scans leads to
>>>   +      // one uneeded metadata update when it was not actually in use
>>>   +      Set<FileRef> filesInUseByScans = Collections.emptySet();
>>>   +      if (absMergeFile != null)
>>>   +        filesInUseByScans = Collections.singleton(absMergeFile);
>>>   +
>>>   +      // very important to write delete entries outside of log lock,
>>> because
>>>   +      // this !METADATA write does not go up... it goes sideways or to
>>> itself
>>>   +      if (absMergeFile != null)
>>>   +        MetadataTableUtil.addDeleteEntries(extent,
>>> Collections.singleton(absMergeFile), SystemCredentials.get());
>>>   +
>>>   +      Set<String> unusedWalLogs = beginClearingUnusedLogs();
>>>   +      try {
>>>   +        // the order of writing to !METADATA and walog is important in
>>> the face of machine/process failures
>>>   +        // need to write to !METADATA before writing to walog, when
>>> things are done in the reverse order
>>>   +        // data could be lost... the minor compaction start even
>>> should
>>> be written before the following metadata
>>>   +        // write is made
>>>   +
>>>   +        synchronized (timeLock) {
>>>   +          if (commitSession.getMaxCommittedTime() > persistedTime)
>>>   +            persistedTime = commitSession.getMaxCommittedTime();
>>>   +
>>>   +          String time = tabletTime.getMetadataValue(persistedTime);
>>>   +          MasterMetadataUtil.updateTabletDataFile(extent,
>>> newDatafile,
>>> absMergeFile, dfv, time, SystemCredentials.get(), filesInUseByScans,
>>>   +              tabletServer.getClientAddressString(),
>>> tabletServer.getLock(), unusedWalLogs, lastLocation, flushId);
>>>   +        }
>>>   +
>>>   +      } finally {
>>>   +        finishClearingUnusedLogs();
>>>   +      }
>>>   +
>>>   +      do {
>>>   +        try {
>>>   +          // the purpose of making this update use the new commit
>>> session, instead of the old one passed in,
>>>   +          // is because the new one will reference the logs used by
>>> current memory...
>>>   +
>>>   +
>>>   tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(),
>>> newDatafile.toString(), commitSession.getWALogSeq() + 2);
>>>   +          break;
>>>   +        } catch (IOException e) {
>>>   +          log.error("Failed to write to write-ahead log " +
>>> e.getMessage() + " will retry", e);
>>>   +          UtilWaitThread.sleep(1 * 1000);
>>>   +        }
>>>   +      } while (true);
>>>   +
>>>   +      synchronized (Tablet.this) {
>>>   +        lastLocation = null;
>>>   +
>>>   +        t1 = System.currentTimeMillis();
>>>   +        if (datafileSizes.containsKey(newDatafile)) {
>>>   +          log.error("Adding file that is already in set " +
>>> newDatafile);
>>>   +        }
>>>   +
>>>   +        if (dfv.getNumEntries() > 0) {
>>>   +          datafileSizes.put(newDatafile, dfv);
>>>   +        }
>>>   +
>>>   +        if (absMergeFile != null) {
>>>   +          datafileSizes.remove(absMergeFile);
>>>   +        }
>>>   +
>>>   +        unreserveMergingMinorCompactionFile(absMergeFile);
>>>   +
>>>   +        dataSourceDeletions.incrementAndGet();
>>>   +        tabletMemory.finishedMinC();
>>>   +
>>>   +        lastFlushID = flushId;
>>>   +
>>>   +        computeNumEntries();
>>>   +        t2 = System.currentTimeMillis();
>>>   +      }
>>>   +
>>>   +      // must do this after list of files in memory is updated above
>>>   +      removeFilesAfterScan(filesInUseByScans);
>>>   +
>>>   +      if (absMergeFile != null)
>>>   +        log.log(TLevel.TABLET_HIST, extent + " MinC [" + absMergeFile
>>> +
>>> ",memory] -> " + newDatafile);
>>>   +      else
>>>   +        log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " +
>>> newDatafile);
>>>   +      log.debug(String.format("MinC finish lock %.2f secs %s", (t2 -
>>> t1)
>>> / 1000.0, getExtent().toString()));
>>>   +      if (dfv.getSize() >
>>> acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD)) {
>>>   +        log.debug(String.format("Minor Compaction wrote out file
>>> larger
>>> than split threshold.  split threshold = %,d  file size = %,d",
>>>   +
>>>   acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD),
>>> dfv.getSize()));
>>>   +      }
>>>   +
>>>   +    }
>>>   +
>>>   +    public void reserveMajorCompactingFiles(Collection<FileRef>
>>> files) {
>>>   +      if (majorCompactingFiles.size() != 0)
>>>   +        throw new IllegalStateException("Major compacting files not
>>> empty " + majorCompactingFiles);
>>>   +
>>>   +      if (mergingMinorCompactionFile != null &&
>>> files.contains(mergingMinorCompactionFile))
>>>   +        throw new IllegalStateException("Major compaction tried to
>>> resrve file in use by minor compaction " + mergingMinorCompactionFile);
>>>   +
>>>   +      majorCompactingFiles.addAll(files);
>>>   +    }
>>>   +
>>>   +    public void clearMajorCompactingFile() {
>>>   +      majorCompactingFiles.clear();
>>>   +    }
>>>   +
>>>   +    void bringMajorCompactionOnline(Set<FileRef> oldDatafiles,
>>> FileRef
>>> tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv)
>>>   +        throws IOException {
>>>   +      long t1, t2;
>>>   +
>>>   +      if (!extent.isRootTablet()) {
>>>   +
>>>   +        if (fs.exists(newDatafile.path())) {
>>>   +          log.error("Target map file already exist " + newDatafile,
>>> new
>>> Exception());
>>>   +          throw new IllegalStateException("Target map file already
>>> exist
>>> " + newDatafile);
>>>   +        }
>>>   +
>>>   +        // rename before putting in metadata table, so files in
>>> metadata
>>> table should
>>>   +        // always exist
>>>   +        if (!fs.rename(tmpDatafile.path(), newDatafile.path()))
>>>   +          log.warn("Rename of " + tmpDatafile + " to " + newDatafile
>>> + "
>>> returned false");
>>>   +
>>>   +        if (dfv.getNumEntries() == 0) {
>>>   +          fs.deleteRecursively(newDatafile.path());
>>>   +        }
>>>   +      }
>>>   +
>>>   +      TServerInstance lastLocation = null;
>>>   +      synchronized (Tablet.this) {
>>>   +
>>>   +        t1 = System.currentTimeMillis();
>>>   +
>>>   +        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
>>>   +
>>>   +        dataSourceDeletions.incrementAndGet();
>>>   +
>>>   +        if (extent.isRootTablet()) {
>>>   +
>>>   +          waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
>>>   +
>>>   +          try {
>>>   +            if (!zoo.isLockHeld(tabletServer.getLock().getLockID()))
>>> {
>>>   +              throw new IllegalStateException();
>>>   +            }
>>>   +          } catch (Exception e) {
>>>   +            throw new IllegalStateException("Can not bring major
>>> compaction online, lock not held", e);
>>>   +          }
>>>   +
>>>   +          // mark files as ready for deletion, but
>>>   +          // do not delete them until we successfully
>>>   +          // rename the compacted map file, in case
>>>   +          // the system goes down
>>>   +
>>>   +          String compactName = newDatafile.path().getName();
>>>   +
>>>   +          for (FileRef ref : oldDatafiles) {
>>>   +            Path path = ref.path();
>>>   +            fs.rename(path, new Path(location + "/delete+" +
>>> compactName
>>> + "+" + path.getName()));
>>>   +          }
>>>   +
>>>   +          if (fs.exists(newDatafile.path())) {
>>>   +            log.error("Target map file already exist " + newDatafile,
>>> new Exception());
>>>   +            throw new IllegalStateException("Target map file already
>>> exist " + newDatafile);
>>>   +          }
>>>   +
>>>   +          if (!fs.rename(tmpDatafile.path(), newDatafile.path()))
>>>   +            log.warn("Rename of " + tmpDatafile + " to " +
>>> newDatafile +
>>> " returned false");
>>>   +
>>>   +          // start deleting files, if we do not finish they will be
>>> cleaned
>>>   +          // up later
>>>   +          for (FileRef ref : oldDatafiles) {
>>>   +            Path path = ref.path();
>>>   +            Path deleteFile = new Path(location + "/delete+" +
>>> compactName + "+" + path.getName());
>>>   +            if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) ||
>>> !fs.moveToTrash(deleteFile))
>>>   +              fs.deleteRecursively(deleteFile);
>>>   +          }
>>>   +        }
>>>   +
>>>   +        // atomically remove old files and add new file
>>>   +        for (FileRef oldDatafile : oldDatafiles) {
>>>   +          if (!datafileSizes.containsKey(oldDatafile)) {
>>>   +            log.error("file does not exist in set " + oldDatafile);
>>>   +          }
>>>   +          datafileSizes.remove(oldDatafile);
>>>   +          majorCompactingFiles.remove(oldDatafile);
>>>   +        }
>>>   +
>>>   +        if (datafileSizes.containsKey(newDatafile)) {
>>>   +          log.error("Adding file that is already in set " +
>>> newDatafile);
>>>   +        }
>>>   +
>>>   +        if (dfv.getNumEntries() > 0) {
>>>   +          datafileSizes.put(newDatafile, dfv);
>>>   +        }
>>>   +
>>>   +        // could be used by a follow on compaction in a multipass
>>> compaction
>>>   +        majorCompactingFiles.add(newDatafile);
>>>   +
>>>   +        computeNumEntries();
>>>   +
>>>   +        lastLocation = Tablet.this.lastLocation;
>>>   +        Tablet.this.lastLocation = null;
>>>   +
>>>   +        if (compactionId != null)
>>>   +          lastCompactID = compactionId;
>>>   +
>>>   +        t2 = System.currentTimeMillis();
>>>   +      }
>>>   +
>>>   +      if (!extent.isRootTablet()) {
>>>   +        Set<FileRef> filesInUseByScans =
>>> waitForScansToFinish(oldDatafiles, false, 10000);
>>>   +        if (filesInUseByScans.size() > 0)
>>>   +          log.debug("Adding scan refs to metadata " + extent + " " +
>>> filesInUseByScans);
>>>   +        MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles,
>>> filesInUseByScans, newDatafile, compactionId, dfv,
>>> SystemCredentials.get(),
>>>   +            tabletServer.getClientAddressString(), lastLocation,
>>> tabletServer.getLock());
>>>   +        removeFilesAfterScan(filesInUseByScans);
>>>   +      }
>>>   +
>>>   +      log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1)
>>> /
>>> 1000.0));
>>>   +      log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + "
>>> --> " + newDatafile);
>>>   +    }
>>>   +
>>>   +    public SortedMap<FileRef,DataFileValue> getDatafileSizes() {
>>>   +      synchronized (Tablet.this) {
>>>   +        TreeMap<FileRef,DataFileValue> copy = new
>>> TreeMap<FileRef,DataFileValue>(datafileSizes);
>>>   +        return Collections.unmodifiableSortedMap(copy);
>>>   +      }
>>>   +    }
>>>   +
>>>   +    public Set<FileRef> getFiles() {
>>>   +      synchronized (Tablet.this) {
>>>   +        HashSet<FileRef> files = new
>>> HashSet<FileRef>(datafileSizes.keySet());
>>>   +        return Collections.unmodifiableSet(files);
>>>   +      }
>>>   +    }
>>>   +
>>>   +  }
>>>   +
>>>   +  public Tablet(TabletServer tabletServer, Text location, KeyExtent
>>> extent, TabletResourceManager trm, SortedMap<Key,Value> tabletsKeyValues)
>>>   +      throws IOException {
>>>   +    this(tabletServer, location, extent, trm,
>>> CachedConfiguration.getInstance(), tabletsKeyValues);
>>>   +    splitCreationTime = 0;
>>>   +  }
>>>   +
>>>   +  public Tablet(TabletServer tabletServer, Text location, KeyExtent
>>> extent, TabletResourceManager trm, SortedMap<FileRef,DataFileValue>
>>> datafiles, String time,
>>>   +      long initFlushID, long initCompactID) throws IOException {
>>>   +    this(tabletServer, location, extent, trm,
>>> CachedConfiguration.getInstance(), datafiles, time, initFlushID,
>>> initCompactID);
>>>   +    splitCreationTime = System.currentTimeMillis();
>>>   +  }
>>>   +
>>>   +  private Tablet(TabletServer tabletServer, Text location, KeyExtent
>>> extent, TabletResourceManager trm, Configuration conf,
>>>   +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>>>   +    this(tabletServer, location, extent, trm, conf,
>>> VolumeManagerImpl.get(), tabletsKeyValues);
>>>   +  }
>>>   +
>>>   +  static private final List<LogEntry> EMPTY = Collections.emptyList();
>>>   +
>>>   +  private Tablet(TabletServer tabletServer, Text location, KeyExtent
>>> extent, TabletResourceManager trm, Configuration conf,
>>>   +      SortedMap<FileRef,DataFileValue> datafiles, String time, long
>>> initFlushID, long initCompactID) throws IOException {
>>>   +    this(tabletServer, location, extent, trm, conf,
>>> VolumeManagerImpl.get(), EMPTY, datafiles, time, null, new
>>> HashSet<FileRef>(), initFlushID, initCompactID);
>>>   +  }
>>>   +
>>>   +  private static String lookupTime(AccumuloConfiguration conf,
>>> KeyExtent
>>> extent, SortedMap<Key,Value> tabletsKeyValues) {
>>>   +    SortedMap<Key,Value> entries;
>>>   +
>>>   +    if (extent.isRootTablet()) {
>>>   +      return null;
>>>   +    } else {
>>>   +      entries = new TreeMap<Key,Value>();
>>>   +      Text rowName = extent.getMetadataEntry();
>>>   +      for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>   +        if (entry.getKey().compareRow(rowName) == 0 &&
>>> TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey()))
>>> {
>>>   +          entries.put(new Key(entry.getKey()), new
>>> Value(entry.getValue()));
>>>   +        }
>>>   +      }
>>>   +    }
>>>   +
>>>   +    // log.debug("extent : "+extent+"   entries : "+entries);
>>>   +
>>>   +    if (entries.size() == 1)
>>>   +      return entries.values().iterator().next().toString();
>>>   +    return null;
>>>   +  }
>>>   +
>>>   +  private static SortedMap<FileRef,DataFileValue>
>>> lookupDatafiles(AccumuloConfiguration conf, VolumeManager fs, KeyExtent
>>> extent,
>>>   +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>>>   +
>>>   +    TreeMap<FileRef,DataFileValue> datafiles = new
>>> TreeMap<FileRef,DataFileValue>();
>>>   +
>>>   +    if (extent.isRootTablet()) { // the meta0 tablet
>>>   +      Path location = new Path(MetadataTableUtil.getRootTabletDir());
>>>   +      // cleanUpFiles() has special handling for delete. files
>>>   +      FileStatus[] files = fs.listStatus(location);
>>>   +      Collection<String> goodPaths = cleanUpFiles(fs, files, true);
>>>   +      for (String good : goodPaths) {
>>>   +        Path path = new Path(good);
>>>   +        String filename = path.getName();
>>>   +        FileRef ref = new FileRef(location.toString() + "/" +
>>> filename,
>>> path);
>>>   +        DataFileValue dfv = new DataFileValue(0, 0);
>>>   +        datafiles.put(ref, dfv);
>>>   +      }
>>>   +    } else {
>>>   +
>>>   +      Text rowName = extent.getMetadataEntry();
>>>   +
>>>   +      String tableId = extent.isMeta() ? RootTable.ID :
>>> MetadataTable.ID;
>>>   +      ScannerImpl mdScanner = new
>>> ScannerImpl(HdfsZooInstance.getInstance(), SystemCredentials.get(),
>>> tableId, Authorizations.EMPTY);
>>>   +
>>>   +      // Commented out because when no data file is present, each
>>> tablet
>>> will scan through metadata table and return nothing
>>>   +      // reduced batch size to improve performance
>>>   +      // changed here after endKeys were implemented from 10 to 1000
>>>   +      mdScanner.setBatchSize(1000);
>>>   +
>>>   +      // leave these in, again, now using endKey for safety
>>>   +      mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
>>>   +
>>>   +      mdScanner.setRange(new Range(rowName));
>>>   +
>>>   +      for (Entry<Key,Value> entry : mdScanner) {
>>>   +
>>>   +        if (entry.getKey().compareRow(rowName) != 0) {
>>>   +          break;
>>>   +        }
>>>   +
>>>   +        FileRef ref = new
>>> FileRef(entry.getKey().getColumnQualifier().toString(),
>>> fs.getFullPath(entry.getKey()));
>>>   +        datafiles.put(ref, new DataFileValue(entry.getValue()
>>> .get()));
>>>   +      }
>>>   +    }
>>>   +    return datafiles;
>>>   +  }
>>>   +
>>>   +  private static List<LogEntry> lookupLogEntries(KeyExtent ke,
>>> SortedMap<Key,Value> tabletsKeyValues) {
>>>   +    List<LogEntry> logEntries = new ArrayList<LogEntry>();
>>>   +
>>>   +    if (ke.isMeta()) {
>>>   +      try {
>>>   +        logEntries =
>>> MetadataTableUtil.getLogEntries(SystemCredentials.get(), ke);
>>>   +      } catch (Exception ex) {
>>>   +        throw new RuntimeException("Unable to read tablet log
>>> entries",
>>> ex);
>>>   +      }
>>>   +    } else {
>>>   +      log.debug("Looking at metadata " + tabletsKeyValues);
>>>   +      Text row = ke.getMetadataEntry();
>>>   +      for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>   +        Key key = entry.getKey();
>>>   +        if (key.getRow().equals(row)) {
>>>   +          if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
>>>   +            logEntries.add(LogEntry.fromKeyValue(key,
>>> entry.getValue()));
>>>   +          }
>>>   +        }
>>>   +      }
>>>   +    }
>>>   +
>>>   +    log.debug("got " + logEntries + " for logs for " + ke);
>>>   +    return logEntries;
>>>   +  }
>>>   +
>>>   +  private static Set<FileRef> lookupScanFiles(KeyExtent extent,
>>> SortedMap<Key,Value> tabletsKeyValues, VolumeManager fs) throws
>>> IOException
>>> {
>>>   +    HashSet<FileRef> scanFiles = new HashSet<FileRef>();
>>>   +
>>>   +    Text row = extent.getMetadataEntry();
>>>   +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>   +      Key key = entry.getKey();
>>>   +      if (key.getRow().equals(row) &&
>>> key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) {
>>>   +        String meta = key.getColumnQualifier().toString();
>>>   +        Path path = fs.getFullPath(extent.getTableId().toString(),
>>> meta);
>>>   +        scanFiles.add(new FileRef(meta, path));
>>>   +      }
>>>   +    }
>>>   +
>>>   +    return scanFiles;
>>>   +  }
>>>   +
>>>   +  private static long lookupFlushID(KeyExtent extent,
>>> SortedMap<Key,Value> tabletsKeyValues) {
>>>   +    Text row = extent.getMetadataEntry();
>>>   +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>   +      Key key = entry.getKey();
>>>   +      if (key.getRow().equals(row) &&
>>> TabletsSection.ServerColumnFamily.FLUSH_COLUMN.equals(key.
>>> getColumnFamily(),
>>> key.getColumnQualifier()))
>>>   +        return Long.parseLong(entry.getValue().toString());
>>>   +    }
>>>   +
>>>   +    return -1;
>>>   +  }
>>>   +
>>>   +  private static long lookupCompactID(KeyExtent extent,
>>> SortedMap<Key,Value> tabletsKeyValues) {
>>>   +    Text row = extent.getMetadataEntry();
>>>   +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>   +      Key key = entry.getKey();
>>>   +      if (key.getRow().equals(row) &&
>>> TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.
>>> getColumnFamily(),
>>> key.getColumnQualifier()))
>>>   +        return Long.parseLong(entry.getValue().toString());
>>>   +    }
>>>   +
>>>   +    return -1;
>>>   +  }
>>>   +
>>>   +  private Tablet(TabletServer tabletServer, Text location, KeyExtent
>>> extent, TabletResourceManager trm, Configuration conf, VolumeManager fs,
>>>   +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>>>   +    this(tabletServer, location, extent, trm, conf, fs,
>>> lookupLogEntries(extent, tabletsKeyValues),
>>> lookupDatafiles(tabletServer.getSystemConfiguration(), fs,
>>>   +        extent, tabletsKeyValues),
>>> lookupTime(tabletServer.getSystemConfiguration(), extent,
>>> tabletsKeyValues), lookupLastServer(extent, tabletsKeyValues),
>>>   +        lookupScanFiles(extent, tabletsKeyValues, fs),
>>> lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent,
>>> tabletsKeyValues));
>>>   +  }
>>>   +
>>>   +  private static TServerInstance lookupLastServer(KeyExtent extent,
>>> SortedMap<Key,Value> tabletsKeyValues) {
>>>   +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>   +      if (entry.getKey().getColumnFamily().compareTo(
>>> TabletsSection.LastLocationColumnFamily.NAME) == 0) {
>>>   +        return new TServerInstance(entry.getValue(),
>>> entry.getKey().getColumnQualifier());
>>>   +      }
>>>   +    }
>>>   +    return null;
>>>   +  }
>>>   +
>>>   +  /**
>>>   +   * yet another constructor - this one allows us to avoid costly
>>> lookups into the Metadata table if we already know the files we need - as
>>> at split time
>>>   +   */
>>>   +  private Tablet(final TabletServer tabletServer, final Text location,
>>> final KeyExtent extent, final TabletResourceManager trm, final
>>> Configuration conf,
>>>   +      final VolumeManager fs, final List<LogEntry> logEntries, final
>>> SortedMap<FileRef,DataFileValue> datafiles, String time,
>>>   +      final TServerInstance lastLocation, Set<FileRef> scanFiles, long
>>> initFlushID, long initCompactID) throws IOException {
>>>   +    Path locationPath;
>>>   +    if (location.find(":") >= 0) {
>>>   +      locationPath = new Path(location.toString());
>>>   +    } else {
>>>   +      locationPath = fs.getFullPath(FileType.TABLE,
>>> extent.getTableId().toString() + location.toString());
>>>   +    }
>>>   +    FileSystem fsForPath = fs.getFileSystemByPath(locationPath);
>>>   +    this.location = locationPath.makeQualified(fsForPath.getUri(),
>>> fsForPath.getWorkingDirectory());
>>>   +    this.lastLocation = lastLocation;
>>>   +    this.tabletDirectory = location.toString();
>>>   +    this.conf = conf;
>>>   +    this.acuTableConf = tabletServer.getTableConfiguration(extent);
>>>   +
>>>   +    this.fs = fs;
>>>   +    this.extent = extent;
>>>   +    this.tabletResources = trm;
>>>   +
>>>   +    this.lastFlushID = initFlushID;
>>>   +    this.lastCompactID = initCompactID;
>>>   +
>>>   +    if (extent.isRootTablet()) {
>>>   +      long rtime = Long.MIN_VALUE;
>>>   +      for (FileRef ref : datafiles.keySet()) {
>>>   +        Path path = ref.path();
>>>   +        FileSystem ns = fs.getFileSystemByPath(path);
>>>   +        FileSKVIterator reader =
>>> FileOperations.getInstance().openReader(path.toString(), true, ns,
>>> ns.getConf(), tabletServer.getTableConfiguration(extent));
>>>   +        long maxTime = -1;
>>>   +        try {
>>>   +
>>>   +          while (reader.hasTop()) {
>>>   +            maxTime = Math.max(maxTime,
>>> reader.getTopKey().getTimestamp());
>>>   +            reader.next();
>>>   +          }
>>>   +
>>>   +        } finally {
>>>   +          reader.close();
>>>   +        }
>>>   +
>>>   +        if (maxTime > rtime) {
>>>   +          time = TabletTime.LOGICAL_TIME_ID + "" + maxTime;
>>>   +          rtime = maxTime;
>>>   +        }
>>>   +      }
>>>   +    }
>>>   +    if (time == null && datafiles.isEmpty() &&
>>> extent.equals(RootTable.OLD_EXTENT)) {
>>>   +      // recovery... old root tablet has no data, so time doesn't
>>> matter:
>>>   +      time = TabletTime.LOGICAL_TIME_ID + "" + Long.MIN_VALUE;
>>>   +    }
>>>   +
>>>   +    this.tabletServer = tabletServer;
>>>   +    this.logId = tabletServer.createLogId(extent);
>>>   +
>>>   +    this.timer = new TabletStatsKeeper();
>>>   +
>>>   +    setupDefaultSecurityLabels(extent);
>>>   +
>>>   +    tabletMemory = new TabletMemory();
>>>   +    tabletTime = TabletTime.getInstance(time);
>>>   +    persistedTime = tabletTime.getTime();
>>>   +
>>>   +    acuTableConf.addObserver(configObserver = new
>>> ConfigurationObserver() {
>>>   +
>>>   +      private void reloadConstraints() {
>>>   +        constraintChecker.set(new
>>> ConstraintChecker(getTableConfiguration()));
>>>   +      }
>>>   +
>>>   +      @Override
>>>   +      public void propertiesChanged() {
>>>   +        reloadConstraints();
>>>   +
>>>   +        try {
>>>   +          setupDefaultSecurityLabels(extent);
>>>   +        } catch (Exception e) {
>>>   +          log.error("Failed to reload default security labels for
>>> extent: " + extent.toString());
>>>   +        }
>>>   +      }
>>>   +
>>>   +      @Override
>>>   +      public void propertyChanged(String prop) {
>>>   +        if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.
>>> getKey()))
>>>   +          reloadConstraints();
>>>   +        else if
>>> (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) {
>>>   +          try {
>>>   +            log.info("Default security labels changed for extent: " +
>>> extent.toString());
>>>   +            setupDefaultSecurityLabels(extent);
>>>   +          } catch (Exception e) {
>>>   +            log.error("Failed to reload default security labels for
>>> extent: " + extent.toString());
>>>   +          }
>>>   +        }
>>>   +
>>>   +      }
>>>   +
>>>   +      @Override
>>>   +      public void sessionExpired() {
>>>   +        log.debug("Session expired, no longer updating per table
>>> props...");
>>>   +      }
>>>   +
>>>   +    });
>>>   +    // Force a load of any per-table properties
>>>   +    configObserver.propertiesChanged();
>>>   +
>>>   +    tabletResources.setTablet(this, acuTableConf);
>>>   +    if (!logEntries.isEmpty()) {
>>>   +      log.info("Starting Write-Ahead Log recovery for " +
>>> this.extent);
>>>   +      final long[] count = new long[2];
>>>   +      final CommitSession commitSession =
>>> tabletMemory.getCommitSession();
>>>   +      count[1] = Long.MIN_VALUE;
>>>   +      try {
>>>   +        Set<String> absPaths = new HashSet<String>();
>>>   +        for (FileRef ref : datafiles.keySet())
>>>   +          absPaths.add(ref.path().toString());
>>>   +
>>>   +        tabletServer.recover(this.tabletServer.getFileSystem(), this,
>>> logEntries, absPaths, new MutationReceiver() {
>>>   +          @Override
>>>   +          public void receive(Mutation m) {
>>>   +            // LogReader.printMutation(m);
>>>   +            Collection<ColumnUpdate> muts = m.getUpdates();
>>>   +            for (ColumnUpdate columnUpdate : muts) {
>>>   +              if (!columnUpdate.hasTimestamp()) {
>>>   +                // if it is not a user set timestamp, it must have
>>> been
>>> set
>>>   +                // by the system
>>>   +                count[1] = Math.max(count[1],
>>> columnUpdate.getTimestamp());
>>>   +              }
>>>   +            }
>>>   +            tabletMemory.mutate(commitSession,
>>> Collections.singletonList(m));
>>>   +            count[0]++;
>>>   +          }
>>>   +        });
>>>   +
>>>   +        if (count[1] != Long.MIN_VALUE) {
>>>   +          tabletTime.useMaxTimeFromWALog(count[1]);
>>>   +        }
>>>   +        commitSession.updateMaxCommittedTime(tabletTime.getTime());
>>>   +
>>> -         tabletMemory.updateMemoryUsageStats();
>>> -
>>>   +        if (count[0] == 0) {
>>>   +          MetadataTableUtil.removeUnusedWALEntries(extent,
>>> logEntries,
>>> tabletServer.getLock());
>>>   +          logEntries.clear();
>>>   +        }
>>>   +
>>>   +      } catch (Throwable t) {
>>>   +        if (acuTableConf.getBoolean(Property.TABLE_FAILURES_IGNORE))
>>> {
>>>   +          log.warn("Error recovering from log files: ", t);
>>>   +        } else {
>>>   +          throw new RuntimeException(t);
>>>   +        }
>>>   +      }
>>>   +      // make some closed references that represent the recovered logs
>>>   +      currentLogs = new HashSet<DfsLogger>();
>>>   +      for (LogEntry logEntry : logEntries) {
>>>   +        for (String log : logEntry.logSet) {
>>>   +          currentLogs.add(new DfsLogger(tabletServer.
>>> getServerConfig(),
>>> log));
>>>   +        }
>>>   +      }
>>>   +
>>>   +      log.info("Write-Ahead Log recovery complete for " +
>>> this.extent +
>>> " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries()
>>>   +          + " entries created)");
>>>   +    }
>>>   +
>>>   +    String contextName = acuTableConf.get(Property.TABLE_CLASSPATH);
>>>   +    if (contextName != null && !contextName.equals("")) {
>>>   +      // initialize context classloader, instead of possibly waiting
>>> for
>>> it to initialize for a scan
>>>   +      // TODO this could hang, causing other tablets to fail to load -
>>> ACCUMULO-1292
>>>   +
>>>   AccumuloVFSClassLoader.getContextManager().
>>> getClassLoader(contextName);
>>>   +    }
>>>   +
>>>   +    // do this last after tablet is completely setup because it
>>>   +    // could cause major compaction to start
>>>   +    datafileManager = new DatafileManager(datafiles);
>>>   +
>>>   +    computeNumEntries();
>>>   +
>>>   +    datafileManager.removeFilesAfterScan(scanFiles);
>>>   +
>>>   +    // look for hints of a failure on the previous tablet server
>>>   +    if (!logEntries.isEmpty() ||
>>> needsMajorCompaction(MajorCompactionReason.NORMAL)) {
>>>   +      // look for any temp files hanging around
>>>   +      removeOldTemporaryFiles();
>>>   +    }
>>>   +
>>>   +    log.log(TLevel.TABLET_HIST, extent + " opened");
>>>   +  }
>>>   +
>>>   +  private void removeOldTemporaryFiles() {
>>>   +    // remove any temporary files created by a previous tablet server
>>>   +    try {
>>>   +      for (FileStatus tmp : fs.globStatus(new Path(location,
>>> "*_tmp"))) {
>>>   +        try {
>>>   +          log.debug("Removing old temp file " + tmp.getPath());
>>>   +          fs.delete(tmp.getPath());
>>>   +        } catch (IOException ex) {
>>>   +          log.error("Unable to remove old temp file " + tmp.getPath()
>>> +
>>> ": " + ex);
>>>   +        }
>>>   +      }
>>>   +    } catch (IOException ex) {
>>>   +      log.error("Error scanning for old temp files in " + location);
>>>   +    }
>>>   +  }
>>>   +
>>>   +  private void setupDefaultSecurityLabels(KeyExtent extent) {
>>>   +    if (extent.isMeta()) {
>>>   +      defaultSecurityLabel = new byte[0];
>>>   +    } else {
>>>   +      try {
>>>   +        ColumnVisibility cv = new
>>> ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_
>>> SCANTIME_VISIBILITY));
>>>   +        this.defaultSecurityLabel = cv.getExpression();
>>>   +      } catch (Exception e) {
>>>   +        log.error(e, e);
>>>   +        this.defaultSecurityLabel = new byte[0];
>>>   +      }
>>>   +    }
>>>   +  }
>>>   +
>>>   +  private static Collection<String> cleanUpFiles(VolumeManager fs,
>>> FileStatus[] files, boolean deleteTmp) throws IOException {
>>>   +    /*
>>>   +     * called in constructor and before major compactions
>>>   +     */
>>>   +    Collection<String> goodFiles = new ArrayList<String>(files.
>>> length);
>>>   +
>>>   +    for (FileStatus file : files) {
>>>   +
>>>   +      String path = file.getPath().toString();
>>>   +      String filename = file.getPath().getName();
>>>   +
>>>   +      // check for incomplete major compaction, this should only occur
>>>   +      // for root tablet
>>>   +      if (filename.startsWith("delete+")) {
>>>   +        String expectedCompactedFile = path.substring(0,
>>> path.lastIndexOf("/delete+")) + "/" + filename.split("\\+")[1];
>>>   +        if (fs.exists(new Path(expectedCompactedFile))) {
>>>   +          // compaction finished, but did not finish deleting
>>> compacted
>>> files.. so delete it
>>>   +          if (!fs.deleteRecursively(file.getPath()))
>>>   +            log.warn("Delete of file: " + file.getPath().toString() +
>>> "
>>> return false");
>>>   +          continue;
>>>   +        }
>>>   +        // compaction did not finish, so put files back
>>>   +
>>>   +        // reset path and filename for rest of loop
>>>   +        filename = filename.split("\\+", 3)[2];
>>>   +        path = path.substring(0, path.lastIndexOf("/delete+")) + "/" +
>>> filename;
>>>   +
>>>   +        if (!fs.rename(file.getPath(), new Path(path)))
>>>   +          log.warn("Rename of " + file.getPath().toString() + " to " +
>>> path + " returned false");
>>>   +      }
>>>   +
>>>   +      if (filename.endsWith("_tmp")) {
>>>   +        if (deleteTmp) {
>>>   +          log.warn("cleaning up old tmp file: " + path);
>>>   +          if (!fs.deleteRecursively(file.getPath()))
>>>   +            log.warn("Delete of tmp file: " +
>>> file.getPath().toString()
>>> + " return false");
>>>   +
>>>   +        }
>>>   +        continue;
>>>   +      }
>>>   +
>>>   +      if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") &&
>>> !FileOperations.getValidExtensions().contains(filename.split("\\.")[1]))
>>> {
>>>   +        log.error("unknown file in tablet" + path);
>>>   +        continue;
>>>   +      }
>>>   +
>>>   +      goodFiles.add(path);
>>>   +    }
>>>   +
>>>   +    return goodFiles;
>>>   +  }
>>>   +
>>>   +  public static class KVEntry extends KeyValue {
>>>   +    public KVEntry(Key k, Value v) {
>>>   +      super(new Key(k), Arrays.copyOf(v.get(), v.get().length));
>>>   +    }
>>>   +
>>>   +    @Override
>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message