accumulo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Newton <eric.new...@gmail.com>
Subject Re: [11/15] ACCUMULO-1940 do not expose a new tablet to the memory manager until after it is online
Date Sat, 30 Nov 2013 22:06:43 GMT
There was a merge "conflict" in 1.5 or 1.6. There was an extra line of
whitespace, or a line missing.

It is annoying to maintain 1.4, 1.5, and the largely unnecessary 1.6 (just
use master).

However, I think this chore comes with software maturity and a larger user
base.




On Sat, Nov 30, 2013 at 12:15 AM, Josh Elser <josh.elser@gmail.com> wrote:

> Actually, I was kind of confused when I saw your commit*s* on this ticket.
> What did you actually do? You have two commits that do the same changes:
>
> 82477f08aa64e2a8a1cf7f6af0db5ce954801ac8 (in 1.4, 1.5 and 1.6)
> 9b6b9cf104ff332cffdd4900d8057557e64e0ec8 (only in 1.6)\
>
> I would've only expected to see one email with a diff, followed by 2
> "merge" emails, e.g.
>
> ----------------------------------------------------------------------
>  .../tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java  | 2 --
>  1 file changed, 2 deletions(-)
> ----------------------------------------------------------------------
>
> Although, I will admit that dealing with 3 active branches is a big pain.
> However, I don't know of a better way to handle this in a way that doesn't
> make Git super confused and thus limit us in being able to answer questions
> like "where was a problem introduced" (git-bisect) and "where does this
> change exist" (and not having multiple commits that perform the same
> changes).
>
> On 11/29/13, 8:31 PM, Eric Newton wrote:
>
>> I changed one line of this file... git seems to be having a conniption.  I
>> find the volume of git traffic to be so useless that I ignore it.
>>
>> Anyone else?
>>
>>
>>
>>
>> On Fri, Nov 29, 2013 at 1:24 PM, <ecn@apache.org> wrote:
>>
>>
>>> http://git-wip-us.apache.org/repos/asf/accumulo/blob/
>>> 9b6b9cf1/server/tserver/src/main/java/org/apache/accumulo/
>>> tserver/Tablet.java
>>> ----------------------------------------------------------------------
>>> diff --cc
>>> server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
>>> index ee3b243,0000000..fd76415
>>> mode 100644,000000..100644
>>> --- a/server/tserver/src/main/java/org/apache/accumulo/
>>> tserver/Tablet.java
>>> +++ b/server/tserver/src/main/java/org/apache/accumulo/
>>> tserver/Tablet.java
>>> @@@ -1,3868 -1,0 +1,3866 @@@
>>>   +/*
>>>   + * Licensed to the Apache Software Foundation (ASF) under one or more
>>>   + * contributor license agreements.  See the NOTICE file distributed
>>> with
>>>   + * this work for additional information regarding copyright ownership.
>>>   + * The ASF licenses this file to You under the Apache License, Version
>>> 2.0
>>>   + * (the "License"); you may not use this file except in compliance
>>> with
>>>   + * the License.  You may obtain a copy of the License at
>>>   + *
>>>   + *     http://www.apache.org/licenses/LICENSE-2.0
>>>   + *
>>>   + * Unless required by applicable law or agreed to in writing, software
>>>   + * distributed under the License is distributed on an "AS IS" BASIS,
>>>   + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>> implied.
>>>   + * See the License for the specific language governing permissions and
>>>   + * limitations under the License.
>>>   + */
>>>   +package org.apache.accumulo.tserver;
>>>   +
>>>   +import java.io.ByteArrayInputStream;
>>>   +import java.io.DataInputStream;
>>>   +import java.io.FileNotFoundException;
>>>   +import java.io.IOException;
>>>   +import java.util.ArrayList;
>>>   +import java.util.Arrays;
>>>   +import java.util.Collection;
>>>   +import java.util.Collections;
>>>   +import java.util.Comparator;
>>>   +import java.util.EnumSet;
>>>   +import java.util.HashMap;
>>>   +import java.util.HashSet;
>>>   +import java.util.Iterator;
>>>   +import java.util.List;
>>>   +import java.util.Map;
>>>   +import java.util.Map.Entry;
>>>   +import java.util.PriorityQueue;
>>>   +import java.util.Set;
>>>   +import java.util.SortedMap;
>>>   +import java.util.TreeMap;
>>>   +import java.util.TreeSet;
>>>   +import java.util.concurrent.atomic.AtomicBoolean;
>>>   +import java.util.concurrent.atomic.AtomicLong;
>>>   +import java.util.concurrent.atomic.AtomicReference;
>>>   +import java.util.concurrent.locks.ReentrantLock;
>>>   +
>>>   +import org.apache.accumulo.core.Constants;
>>>   +import org.apache.accumulo.core.client.Connector;
>>>   +import org.apache.accumulo.core.client.IteratorSetting;
>>>   +import org.apache.accumulo.core.client.impl.ScannerImpl;
>>>   +import org.apache.accumulo.core.conf.AccumuloConfiguration;
>>>   +import org.apache.accumulo.core.conf.ConfigurationCopy;
>>>   +import org.apache.accumulo.core.conf.ConfigurationObserver;
>>>   +import org.apache.accumulo.core.conf.Property;
>>>   +import org.apache.accumulo.core.constraints.Violations;
>>>   +import org.apache.accumulo.core.data.ByteSequence;
>>>   +import org.apache.accumulo.core.data.Column;
>>>   +import org.apache.accumulo.core.data.ColumnUpdate;
>>>   +import org.apache.accumulo.core.data.Key;
>>>   +import org.apache.accumulo.core.data.KeyExtent;
>>>   +import org.apache.accumulo.core.data.KeyValue;
>>>   +import org.apache.accumulo.core.data.Mutation;
>>>   +import org.apache.accumulo.core.data.Range;
>>>   +import org.apache.accumulo.core.data.Value;
>>>   +import org.apache.accumulo.core.data.thrift.IterInfo;
>>>   +import org.apache.accumulo.core.data.thrift.MapFileInfo;
>>>   +import org.apache.accumulo.core.file.FileOperations;
>>>   +import org.apache.accumulo.core.file.FileSKVIterator;
>>>   +import org.apache.accumulo.core.iterators.
>>> IterationInterruptedException;
>>>   +import org.apache.accumulo.core.iterators.IteratorEnvironment;
>>>   +import org.apache.accumulo.core.iterators.IteratorUtil;
>>>   +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
>>>   +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
>>>   +import
>>> org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
>>>   +import org.apache.accumulo.core.iterators.system.
>>> ColumnQualifierFilter;
>>>   +import org.apache.accumulo.core.iterators.system.DeletingIterator;
>>>   +import org.apache.accumulo.core.iterators.system.
>>> InterruptibleIterator;
>>>   +import org.apache.accumulo.core.iterators.system.MultiIterator;
>>>   +import org.apache.accumulo.core.iterators.system.
>>> SourceSwitchingIterator;
>>>   +import
>>> org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.
>>> DataSource;
>>>   +import org.apache.accumulo.core.iterators.system.StatsIterator;
>>>   +import org.apache.accumulo.core.iterators.system.VisibilityFilter;
>>>   +import org.apache.accumulo.core.master.thrift.TabletLoadState;
>>>   +import org.apache.accumulo.core.metadata.MetadataTable;
>>>   +import org.apache.accumulo.core.metadata.RootTable;
>>>   +import org.apache.accumulo.core.metadata.schema.DataFileValue;
>>>   +import
>>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
>>>   +import
>>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.
>>> DataFileColumnFamily;
>>>   +import
>>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.
>>> LogColumnFamily;
>>>   +import
>>> org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.
>>> ScanFileColumnFamily;
>>>   +import org.apache.accumulo.core.security.Authorizations;
>>>   +import org.apache.accumulo.core.security.ColumnVisibility;
>>>   +import org.apache.accumulo.core.security.Credentials;
>>>   +import org.apache.accumulo.core.tabletserver.log.LogEntry;
>>>   +import org.apache.accumulo.core.util.CachedConfiguration;
>>>   +import org.apache.accumulo.core.util.LocalityGroupUtil;
>>>   +import
>>> org.apache.accumulo.core.util.LocalityGroupUtil.
>>> LocalityGroupConfigurationError;
>>>   +import org.apache.accumulo.core.util.MapCounter;
>>>   +import org.apache.accumulo.core.util.Pair;
>>>   +import org.apache.accumulo.core.util.UtilWaitThread;
>>>   +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
>>>   +import org.apache.accumulo.server.ServerConstants;
>>>   +import org.apache.accumulo.server.client.HdfsZooInstance;
>>>   +import org.apache.accumulo.server.conf.TableConfiguration;
>>>   +import org.apache.accumulo.server.fs.FileRef;
>>>   +import org.apache.accumulo.server.fs.VolumeManager;
>>>   +import org.apache.accumulo.server.fs.VolumeManager.FileType;
>>>   +import org.apache.accumulo.server.fs.VolumeManagerImpl;
>>>   +import org.apache.accumulo.server.master.state.TServerInstance;
>>>   +import org.apache.accumulo.server.master.tableOps.
>>> CompactionIterators;
>>>   +import org.apache.accumulo.server.problems.ProblemReport;
>>>   +import org.apache.accumulo.server.problems.ProblemReports;
>>>   +import org.apache.accumulo.server.problems.ProblemType;
>>>   +import org.apache.accumulo.server.security.SystemCredentials;
>>>   +import org.apache.accumulo.server.tablets.TabletTime;
>>>   +import org.apache.accumulo.server.tablets.UniqueNameAllocator;
>>>   +import org.apache.accumulo.server.util.FileUtil;
>>>   +import org.apache.accumulo.server.util.MasterMetadataUtil;
>>>   +import org.apache.accumulo.server.util.MetadataTableUtil;
>>>   +import org.apache.accumulo.server.util.TabletOperations;
>>>   +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
>>>   +import org.apache.accumulo.start.classloader.vfs.
>>> AccumuloVFSClassLoader;
>>>   +import org.apache.accumulo.trace.instrument.Span;
>>>   +import org.apache.accumulo.trace.instrument.Trace;
>>>   +import org.apache.accumulo.tserver.Compactor.
>>> CompactionCanceledException;
>>>   +import org.apache.accumulo.tserver.Compactor.CompactionEnv;
>>>   +import org.apache.accumulo.tserver.FileManager.ScanFileManager;
>>>   +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
>>>   +import org.apache.accumulo.tserver.TabletServer.TservConstraintEnv;
>>>   +import
>>> org.apache.accumulo.tserver.TabletServerResourceManager.
>>> TabletResourceManager;
>>>   +import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
>>>   +import org.apache.accumulo.tserver.compaction.CompactionPlan;
>>>   +import org.apache.accumulo.tserver.compaction.CompactionStrategy;
>>>   +import org.apache.accumulo.tserver.compaction.
>>> DefaultCompactionStrategy;
>>>   +import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
>>>   +import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
>>>   +import org.apache.accumulo.tserver.compaction.WriteParameters;
>>>   +import org.apache.accumulo.tserver.constraints.ConstraintChecker;
>>>   +import org.apache.accumulo.tserver.log.DfsLogger;
>>>   +import org.apache.accumulo.tserver.log.MutationReceiver;
>>>   +import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
>>>   +import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
>>>   +import org.apache.commons.codec.DecoderException;
>>>   +import org.apache.commons.codec.binary.Hex;
>>>   +import org.apache.hadoop.conf.Configuration;
>>>   +import org.apache.hadoop.fs.FileStatus;
>>>   +import org.apache.hadoop.fs.FileSystem;
>>>   +import org.apache.hadoop.fs.Path;
>>>   +import org.apache.hadoop.io.Text;
>>>   +import org.apache.log4j.Logger;
>>>   +import org.apache.zookeeper.KeeperException;
>>>   +import org.apache.zookeeper.KeeperException.NoNodeException;
>>>   +
>>>   +/*
>>>   + * We need to be able to have the master tell a tabletServer to
>>>   + * close this file, and the tablet server to handle all pending client
>>> reads
>>>   + * before closing
>>>   + *
>>>   + */
>>>   +
>>>   +/**
>>>   + *
>>>   + * this class just provides an interface to read from a MapFile mostly
>>> takes care of reporting start and end keys
>>>   + *
>>>   + * need this because a single row extent can have multiple columns
>>> this
>>> manages all the columns (each handled by a store) for a single row-extent
>>>   + *
>>>   + *
>>>   + */
>>>   +
>>>   +public class Tablet {
>>>   +
>>>   +  enum MinorCompactionReason {
>>>   +    USER, SYSTEM, CLOSE
>>>   +  }
>>>   +
>>>   +  public class CommitSession {
>>>   +
>>>   +    private int seq;
>>>   +    private InMemoryMap memTable;
>>>   +    private int commitsInProgress;
>>>   +    private long maxCommittedTime = Long.MIN_VALUE;
>>>   +
>>>   +    private CommitSession(int seq, InMemoryMap imm) {
>>>   +      this.seq = seq;
>>>   +      this.memTable = imm;
>>>   +      commitsInProgress = 0;
>>>   +    }
>>>   +
>>>   +    public int getWALogSeq() {
>>>   +      return seq;
>>>   +    }
>>>   +
>>>   +    private void decrementCommitsInProgress() {
>>>   +      if (commitsInProgress < 1)
>>>   +        throw new IllegalStateException("commitsInProgress = " +
>>> commitsInProgress);
>>>   +
>>>   +      commitsInProgress--;
>>>   +      if (commitsInProgress == 0)
>>>   +        Tablet.this.notifyAll();
>>>   +    }
>>>   +
>>>   +    private void incrementCommitsInProgress() {
>>>   +      if (commitsInProgress < 0)
>>>   +        throw new IllegalStateException("commitsInProgress = " +
>>> commitsInProgress);
>>>   +
>>>   +      commitsInProgress++;
>>>   +    }
>>>   +
>>>   +    private void waitForCommitsToFinish() {
>>>   +      while (commitsInProgress > 0) {
>>>   +        try {
>>>   +          Tablet.this.wait(50);
>>>   +        } catch (InterruptedException e) {
>>>   +          log.warn(e, e);
>>>   +        }
>>>   +      }
>>>   +    }
>>>   +
>>>   +    public void abortCommit(List<Mutation> value) {
>>>   +      Tablet.this.abortCommit(this, value);
>>>   +    }
>>>   +
>>>   +    public void commit(List<Mutation> mutations) {
>>>   +      Tablet.this.commit(this, mutations);
>>>   +    }
>>>   +
>>>   +    public Tablet getTablet() {
>>>   +      return Tablet.this;
>>>   +    }
>>>   +
>>>   +    public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy,
>>> boolean mincFinish) {
>>>   +      return Tablet.this.beginUpdatingLogsUsed(memTable, copy,
>>> mincFinish);
>>>   +    }
>>>   +
>>>   +    public void finishUpdatingLogsUsed() {
>>>   +      Tablet.this.finishUpdatingLogsUsed();
>>>   +    }
>>>   +
>>>   +    public int getLogId() {
>>>   +      return logId;
>>>   +    }
>>>   +
>>>   +    public KeyExtent getExtent() {
>>>   +      return extent;
>>>   +    }
>>>   +
>>>   +    private void updateMaxCommittedTime(long time) {
>>>   +      maxCommittedTime = Math.max(time, maxCommittedTime);
>>>   +    }
>>>   +
>>>   +    private long getMaxCommittedTime() {
>>>   +      if (maxCommittedTime == Long.MIN_VALUE)
>>>   +        throw new IllegalStateException("Tried to read max committed
>>> time when it was never set");
>>>   +      return maxCommittedTime;
>>>   +    }
>>>   +
>>>   +  }
>>>   +
>>>   +  private class TabletMemory {
>>>   +    private InMemoryMap memTable;
>>>   +    private InMemoryMap otherMemTable;
>>>   +    private InMemoryMap deletingMemTable;
>>>   +    private int nextSeq = 1;
>>>   +    private CommitSession commitSession;
>>>   +
>>>   +    TabletMemory() {
>>>   +      try {
>>>   +        memTable = new InMemoryMap(acuTableConf);
>>>   +      } catch (LocalityGroupConfigurationError e) {
>>>   +        throw new RuntimeException(e);
>>>   +      }
>>>   +      commitSession = new CommitSession(nextSeq, memTable);
>>>   +      nextSeq += 2;
>>>   +    }
>>>   +
>>>   +    InMemoryMap getMemTable() {
>>>   +      return memTable;
>>>   +    }
>>>   +
>>>   +    InMemoryMap getMinCMemTable() {
>>>   +      return otherMemTable;
>>>   +    }
>>>   +
>>>   +    CommitSession prepareForMinC() {
>>>   +      if (otherMemTable != null) {
>>>   +        throw new IllegalStateException();
>>>   +      }
>>>   +
>>>   +      if (deletingMemTable != null) {
>>>   +        throw new IllegalStateException();
>>>   +      }
>>>   +
>>>   +      otherMemTable = memTable;
>>>   +      try {
>>>   +        memTable = new InMemoryMap(acuTableConf);
>>>   +      } catch (LocalityGroupConfigurationError e) {
>>>   +        throw new RuntimeException(e);
>>>   +      }
>>>   +
>>>   +      CommitSession oldCommitSession = commitSession;
>>>   +      commitSession = new CommitSession(nextSeq, memTable);
>>>   +      nextSeq += 2;
>>>   +
>>>   +
>>>   tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(
>>> ),
>>> otherMemTable.estimatedSizeInBytes());
>>>   +
>>>   +      return oldCommitSession;
>>>   +    }
>>>   +
>>>   +    void finishedMinC() {
>>>   +
>>>   +      if (otherMemTable == null) {
>>>   +        throw new IllegalStateException();
>>>   +      }
>>>   +
>>>   +      if (deletingMemTable != null) {
>>>   +        throw new IllegalStateException();
>>>   +      }
>>>   +
>>>   +      deletingMemTable = otherMemTable;
>>>   +
>>>   +      otherMemTable = null;
>>>   +      Tablet.this.notifyAll();
>>>   +    }
>>>   +
>>>   +    void finalizeMinC() {
>>>   +      try {
>>>   +        deletingMemTable.delete(15000);
>>>   +      } finally {
>>>   +        synchronized (Tablet.this) {
>>>   +          if (otherMemTable != null) {
>>>   +            throw new IllegalStateException();
>>>   +          }
>>>   +
>>>   +          if (deletingMemTable == null) {
>>>   +            throw new IllegalStateException();
>>>   +          }
>>>   +
>>>   +          deletingMemTable = null;
>>>   +
>>>   +
>>>   tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(),
>>> 0);
>>>   +        }
>>>   +      }
>>>   +    }
>>>   +
>>>   +    boolean memoryReservedForMinC() {
>>>   +      return otherMemTable != null || deletingMemTable != null;
>>>   +    }
>>>   +
>>>   +    void waitForMinC() {
>>>   +      while (otherMemTable != null || deletingMemTable != null) {
>>>   +        try {
>>>   +          Tablet.this.wait(50);
>>>   +        } catch (InterruptedException e) {
>>>   +          log.warn(e, e);
>>>   +        }
>>>   +      }
>>>   +    }
>>>   +
>>>   +    void mutate(CommitSession cm, List<Mutation> mutations) {
>>>   +      cm.memTable.mutate(mutations);
>>>   +    }
>>>   +
>>>   +    void updateMemoryUsageStats() {
>>>   +      long other = 0;
>>>   +      if (otherMemTable != null)
>>>   +        other = otherMemTable.estimatedSizeInBytes();
>>>   +      else if (deletingMemTable != null)
>>>   +        other = deletingMemTable.estimatedSizeInBytes();
>>>   +
>>>   +
>>>   tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(
>>> ),
>>> other);
>>>   +    }
>>>   +
>>>   +    List<MemoryIterator> getIterators() {
>>>   +      List<MemoryIterator> toReturn = new
>>> ArrayList<MemoryIterator>(2);
>>>   +      toReturn.add(memTable.skvIterator());
>>>   +      if (otherMemTable != null)
>>>   +        toReturn.add(otherMemTable.skvIterator());
>>>   +      return toReturn;
>>>   +    }
>>>   +
>>>   +    void returnIterators(List<MemoryIterator> iters) {
>>>   +      for (MemoryIterator iter : iters) {
>>>   +        iter.close();
>>>   +      }
>>>   +    }
>>>   +
>>>   +    public long getNumEntries() {
>>>   +      if (otherMemTable != null)
>>>   +        return memTable.getNumEntries() +
>>> otherMemTable.getNumEntries();
>>>   +      return memTable.getNumEntries();
>>>   +    }
>>>   +
>>>   +    CommitSession getCommitSession() {
>>>   +      return commitSession;
>>>   +    }
>>>   +  }
>>>   +
>>>   +  private TabletMemory tabletMemory;
>>>   +
>>>   +  private final TabletTime tabletTime;
>>>   +  private long persistedTime;
>>>   +  private final Object timeLock = new Object();
>>>   +
>>>   +  private final Path location; // absolute path of this tablets dir
>>>   +  private TServerInstance lastLocation;
>>>   +
>>>   +  private Configuration conf;
>>>   +  private VolumeManager fs;
>>>   +
>>>   +  private TableConfiguration acuTableConf;
>>>   +
>>>   +  private volatile boolean tableDirChecked = false;
>>>   +
>>>   +  private AtomicLong dataSourceDeletions = new AtomicLong(0);
>>>   +  private Set<ScanDataSource> activeScans = new
>>> HashSet<ScanDataSource>();
>>>   +
>>>   +  private volatile boolean closing = false;
>>>   +  private boolean closed = false;
>>>   +  private boolean closeComplete = false;
>>>   +
>>>   +  private long lastFlushID = -1;
>>>   +  private long lastCompactID = -1;
>>>   +
>>>   +  private KeyExtent extent;
>>>   +
>>>   +  private TabletResourceManager tabletResources;
>>>   +  final private DatafileManager datafileManager;
>>>   +  private volatile boolean majorCompactionInProgress = false;
>>>   +  private volatile boolean majorCompactionWaitingToStart = false;
>>>   +  private Set<MajorCompactionReason> majorCompactionQueued =
>>> Collections.synchronizedSet(EnumSet.noneOf(
>>> MajorCompactionReason.class));
>>>   +  private volatile boolean minorCompactionInProgress = false;
>>>   +  private volatile boolean minorCompactionWaitingToStart = false;
>>>   +
>>>   +  private boolean updatingFlushID = false;
>>>   +
>>>   +  private AtomicReference<ConstraintChecker> constraintChecker = new
>>> AtomicReference<ConstraintChecker>();
>>>   +
>>>   +  private final String tabletDirectory;
>>>   +
>>>   +  private int writesInProgress = 0;
>>>   +
>>>   +  private static final Logger log = Logger.getLogger(Tablet.class);
>>>   +  public TabletStatsKeeper timer;
>>>   +
>>>   +  private Rate queryRate = new Rate(0.2);
>>>   +  private long queryCount = 0;
>>>   +
>>>   +  private Rate queryByteRate = new Rate(0.2);
>>>   +  private long queryBytes = 0;
>>>   +
>>>   +  private Rate ingestRate = new Rate(0.2);
>>>   +  private long ingestCount = 0;
>>>   +
>>>   +  private Rate ingestByteRate = new Rate(0.2);
>>>   +  private long ingestBytes = 0;
>>>   +
>>>   +  private byte[] defaultSecurityLabel = new byte[0];
>>>   +
>>>   +  private long lastMinorCompactionFinishTime;
>>>   +  private long lastMapFileImportTime;
>>>   +
>>>   +  private volatile long numEntries;
>>>   +  private volatile long numEntriesInMemory;
>>>   +
>>>   +  // a count of the amount of data read by the iterators
>>>   +  private AtomicLong scannedCount = new AtomicLong(0);
>>>   +  private Rate scannedRate = new Rate(0.2);
>>>   +
>>>   +  private ConfigurationObserver configObserver;
>>>   +
>>>   +  private TabletServer tabletServer;
>>>   +
>>>   +  private final int logId;
>>>   +  // ensure we only have one reader/writer of our bulk file notes at
>>> at
>>> time
>>>   +  public final Object bulkFileImportLock = new Object();
>>>   +
>>>   +  public int getLogId() {
>>>   +    return logId;
>>>   +  }
>>>   +
>>>   +  public static class TabletClosedException extends RuntimeException {
>>>   +    public TabletClosedException(Exception e) {
>>>   +      super(e);
>>>   +    }
>>>   +
>>>   +    public TabletClosedException() {
>>>   +      super();
>>>   +    }
>>>   +
>>>   +    private static final long serialVersionUID = 1L;
>>>   +  }
>>>   +
>>>   +  FileRef getNextMapFilename(String prefix) throws IOException {
>>>   +    String extension =
>>> FileOperations.getNewFileExtension(tabletServer.
>>> getTableConfiguration(extent));
>>>   +    checkTabletDir();
>>>   +    return new FileRef(location.toString() + "/" + prefix +
>>> UniqueNameAllocator.getInstance().getNextName() + "." + extension);
>>>   +  }
>>>   +
>>>   +  private void checkTabletDir() throws IOException {
>>>   +    if (!tableDirChecked) {
>>>   +      checkTabletDir(this.location);
>>>   +      tableDirChecked = true;
>>>   +    }
>>>   +  }
>>>   +
>>>   +  private void checkTabletDir(Path tabletDir) throws IOException {
>>>   +
>>>   +    FileStatus[] files = null;
>>>   +    try {
>>>   +      files = fs.listStatus(tabletDir);
>>>   +    } catch (FileNotFoundException ex) {
>>>   +      // ignored
>>>   +    }
>>>   +
>>>   +    if (files == null) {
>>>   +      if (tabletDir.getName().startsWith("c-"))
>>>   +        log.debug("Tablet " + extent + " had no dir, creating " +
>>> tabletDir); // its a clone dir...
>>>   +      else
>>>   +        log.warn("Tablet " + extent + " had no dir, creating " +
>>> tabletDir);
>>>   +
>>>   +      fs.mkdirs(tabletDir);
>>>   +    }
>>>   +  }
>>>   +
>>>   +  class DatafileManager {
>>>   +    // access to datafilesizes needs to be synchronized: see
>>> CompactionRunner#getNumFiles
>>>   +    final private Map<FileRef,DataFileValue> datafileSizes =
>>> Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>());
>>>   +
>>>   +    DatafileManager(SortedMap<FileRef,DataFileValue> datafileSizes) {
>>>   +      for (Entry<FileRef,DataFileValue> datafiles :
>>> datafileSizes.entrySet())
>>>   +        this.datafileSizes.put(datafiles.getKey(),
>>> datafiles.getValue());
>>>   +    }
>>>   +
>>>   +    FileRef mergingMinorCompactionFile = null;
>>>   +    Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>();
>>>   +    Map<Long,Set<FileRef>> scanFileReservations = new
>>> HashMap<Long,Set<FileRef>>();
>>>   +    MapCounter<FileRef> fileScanReferenceCounts = new
>>> MapCounter<FileRef>();
>>>   +    long nextScanReservationId = 0;
>>>   +    boolean reservationsBlocked = false;
>>>   +
>>>   +    Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
>>>   +
>>>   +    Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() {
>>>   +      synchronized (Tablet.this) {
>>>   +
>>>   +        while (reservationsBlocked) {
>>>   +          try {
>>>   +            Tablet.this.wait(50);
>>>   +          } catch (InterruptedException e) {
>>>   +            log.warn(e, e);
>>>   +          }
>>>   +        }
>>>   +
>>>   +        Set<FileRef> absFilePaths = new
>>> HashSet<FileRef>(datafileSizes.keySet());
>>>   +
>>>   +        long rid = nextScanReservationId++;
>>>   +
>>>   +        scanFileReservations.put(rid, absFilePaths);
>>>   +
>>>   +        Map<FileRef,DataFileValue> ret = new
>>> HashMap<FileRef,DataFileValue>();
>>>   +
>>>   +        for (FileRef path : absFilePaths) {
>>>   +          fileScanReferenceCounts.increment(path, 1);
>>>   +          ret.put(path, datafileSizes.get(path));
>>>   +        }
>>>   +
>>>   +        return new Pair<Long,Map<FileRef,DataFileValue>>(rid, ret);
>>>   +      }
>>>   +    }
>>>   +
>>>   +    void returnFilesForScan(Long reservationId) {
>>>   +
>>>   +      final Set<FileRef> filesToDelete = new HashSet<FileRef>();
>>>   +
>>>   +      synchronized (Tablet.this) {
>>>   +        Set<FileRef> absFilePaths =
>>> scanFileReservations.remove(reservationId);
>>>   +
>>>   +        if (absFilePaths == null)
>>>   +          throw new IllegalArgumentException("Unknown scan
>>> reservation
>>> id " + reservationId);
>>>   +
>>>   +        boolean notify = false;
>>>   +        for (FileRef path : absFilePaths) {
>>>   +          long refCount = fileScanReferenceCounts.decrement(path, 1);
>>>   +          if (refCount == 0) {
>>>   +            if (filesToDeleteAfterScan.remove(path))
>>>   +              filesToDelete.add(path);
>>>   +            notify = true;
>>>   +          } else if (refCount < 0)
>>>   +            throw new IllegalStateException("Scan ref count for " +
>>> path
>>> + " is " + refCount);
>>>   +        }
>>>   +
>>>   +        if (notify)
>>>   +          Tablet.this.notifyAll();
>>>   +      }
>>>   +
>>>   +      if (filesToDelete.size() > 0) {
>>>   +        log.debug("Removing scan refs from metadata " + extent + " " +
>>> filesToDelete);
>>>   +        MetadataTableUtil.removeScanFiles(extent, filesToDelete,
>>> SystemCredentials.get(), tabletServer.getLock());
>>>   +      }
>>>   +    }
>>>   +
>>>   +    private void removeFilesAfterScan(Set<FileRef> scanFiles) {
>>>   +      if (scanFiles.size() == 0)
>>>   +        return;
>>>   +
>>>   +      Set<FileRef> filesToDelete = new HashSet<FileRef>();
>>>   +
>>>   +      synchronized (Tablet.this) {
>>>   +        for (FileRef path : scanFiles) {
>>>   +          if (fileScanReferenceCounts.get(path) == 0)
>>>   +            filesToDelete.add(path);
>>>   +          else
>>>   +            filesToDeleteAfterScan.add(path);
>>>   +        }
>>>   +      }
>>>   +
>>>   +      if (filesToDelete.size() > 0) {
>>>   +        log.debug("Removing scan refs from metadata " + extent + " " +
>>> filesToDelete);
>>>   +        MetadataTableUtil.removeScanFiles(extent, filesToDelete,
>>> SystemCredentials.get(), tabletServer.getLock());
>>>   +      }
>>>   +    }
>>>   +
>>>   +    private TreeSet<FileRef> waitForScansToFinish(Set<FileRef>
>>> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
>>>   +      long startTime = System.currentTimeMillis();
>>>   +      TreeSet<FileRef> inUse = new TreeSet<FileRef>();
>>>   +
>>>   +      Span waitForScans = Trace.start("waitForScans");
>>>   +      try {
>>>   +        synchronized (Tablet.this) {
>>>   +          if (blockNewScans) {
>>>   +            if (reservationsBlocked)
>>>   +              throw new IllegalStateException();
>>>   +
>>>   +            reservationsBlocked = true;
>>>   +          }
>>>   +
>>>   +          for (FileRef path : pathsToWaitFor) {
>>>   +            while (fileScanReferenceCounts.get(path) > 0 &&
>>> System.currentTimeMillis() - startTime < maxWaitTime) {
>>>   +              try {
>>>   +                Tablet.this.wait(100);
>>>   +              } catch (InterruptedException e) {
>>>   +                log.warn(e, e);
>>>   +              }
>>>   +            }
>>>   +          }
>>>   +
>>>   +          for (FileRef path : pathsToWaitFor) {
>>>   +            if (fileScanReferenceCounts.get(path) > 0)
>>>   +              inUse.add(path);
>>>   +          }
>>>   +
>>>   +          if (blockNewScans) {
>>>   +            reservationsBlocked = false;
>>>   +            Tablet.this.notifyAll();
>>>   +          }
>>>   +
>>>   +        }
>>>   +      } finally {
>>>   +        waitForScans.stop();
>>>   +      }
>>>   +      return inUse;
>>>   +    }
>>>   +
>>>   +    public void importMapFiles(long tid, Map<FileRef,DataFileValue>
>>> pathsString, boolean setTime) throws IOException {
>>>   +
>>>   +      String bulkDir = null;
>>>   +
>>>   +      Map<FileRef,DataFileValue> paths = new
>>> HashMap<FileRef,DataFileValue>();
>>>   +      for (Entry<FileRef,DataFileValue> entry :
>>> pathsString.entrySet())
>>>   +        paths.put(entry.getKey(), entry.getValue());
>>>   +
>>>   +      for (FileRef tpath : paths.keySet()) {
>>>   +
>>>   +        boolean inTheRightDirectory = false;
>>>   +        Path parent = tpath.path().getParent().getParent();
>>>   +        for (String tablesDir : ServerConstants.getTablesDirs()) {
>>>   +          if (parent.equals(new Path(tablesDir,
>>> extent.getTableId().toString()))) {
>>>   +            inTheRightDirectory = true;
>>>   +            break;
>>>   +          }
>>>   +        }
>>>   +        if (!inTheRightDirectory) {
>>>   +          throw new IOException("Data file " + tpath + " not in table
>>> dirs");
>>>   +        }
>>>   +
>>>   +        if (bulkDir == null)
>>>   +          bulkDir = tpath.path().getParent().toString();
>>>   +        else if (!bulkDir.equals(tpath.path().
>>> getParent().toString()))
>>>   +          throw new IllegalArgumentException("bulk files in different
>>> dirs " + bulkDir + " " + tpath);
>>>   +
>>>   +      }
>>>   +
>>>   +      if (extent.isRootTablet()) {
>>>   +        throw new IllegalArgumentException("Can not import files to
>>> root
>>> tablet");
>>>   +      }
>>>   +
>>>   +      synchronized (bulkFileImportLock) {
>>>   +        Credentials creds = SystemCredentials.get();
>>>   +        Connector conn;
>>>   +        try {
>>>   +          conn =
>>> HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(),
>>> creds.getToken());
>>>   +        } catch (Exception ex) {
>>>   +          throw new IOException(ex);
>>>   +        }
>>>   +        // Remove any bulk files we've previously loaded and compacted
>>> away
>>>   +        List<FileRef> files = MetadataTableUtil.
>>> getBulkFilesLoaded(conn,
>>> extent, tid);
>>>   +
>>>   +        for (FileRef file : files)
>>>   +          if (paths.keySet().remove(file.path()))
>>>   +            log.debug("Ignoring request to re-import a file already
>>> imported: " + extent + ": " + file);
>>>   +
>>>   +        if (paths.size() > 0) {
>>>   +          long bulkTime = Long.MIN_VALUE;
>>>   +          if (setTime) {
>>>   +            for (DataFileValue dfv : paths.values()) {
>>>   +              long nextTime = tabletTime.getAndUpdateTime();
>>>   +              if (nextTime < bulkTime)
>>>   +                throw new IllegalStateException("Time went backwards
>>> unexpectedly " + nextTime + " " + bulkTime);
>>>   +              bulkTime = nextTime;
>>>   +              dfv.setTime(bulkTime);
>>>   +            }
>>>   +          }
>>>   +
>>>   +          synchronized (timeLock) {
>>>   +            if (bulkTime > persistedTime)
>>>   +              persistedTime = bulkTime;
>>>   +
>>>   +            MetadataTableUtil.updateTabletDataFile(tid, extent,
>>> paths,
>>> tabletTime.getMetadataValue(persistedTime), creds,
>>> tabletServer.getLock());
>>>   +          }
>>>   +        }
>>>   +      }
>>>   +
>>>   +      synchronized (Tablet.this) {
>>>   +        for (Entry<FileRef,DataFileValue> tpath : paths.entrySet()) {
>>>   +          if (datafileSizes.containsKey(tpath.getKey())) {
>>>   +            log.error("Adding file that is already in set " +
>>> tpath.getKey());
>>>   +          }
>>>   +          datafileSizes.put(tpath.getKey(), tpath.getValue());
>>>   +
>>>   +        }
>>>   +
>>>   +        tabletResources.importedMapFiles();
>>>   +
>>>   +        computeNumEntries();
>>>   +      }
>>>   +
>>>   +      for (FileRef tpath : paths.keySet()) {
>>>   +        log.log(TLevel.TABLET_HIST, extent + " import " + tpath + " "
>>> +
>>> paths.get(tpath));
>>>   +      }
>>>   +    }
>>>   +
>>>   +    FileRef reserveMergingMinorCompactionFile() {
>>>   +      if (mergingMinorCompactionFile != null)
>>>   +        throw new IllegalStateException("Tried to reserve merging
>>> minor
>>> compaction file when already reserved  : " + mergingMinorCompactionFile);
>>>   +
>>>   +      if (extent.isRootTablet())
>>>   +        return null;
>>>   +
>>>   +      int maxFiles = acuTableConf.getMaxFilesPerTablet();
>>>   +
>>>   +      // when a major compaction is running and we are at max files,
>>> write out
>>>   +      // one extra file... want to avoid the case where major
>>> compaction
>>> is
>>>   +      // compacting everything except for the largest file, and
>>> therefore the
>>>   +      // largest file is returned for merging.. the following check
>>> mostly
>>>   +      // avoids this case, except for the case where major compactions
>>> fail or
>>>   +      // are canceled
>>>   +      if (majorCompactingFiles.size() > 0 && datafileSizes.size() ==
>>> maxFiles)
>>>   +        return null;
>>>   +
>>>   +      if (datafileSizes.size() >= maxFiles) {
>>>   +        // find the smallest file
>>>   +
>>>   +        long min = Long.MAX_VALUE;
>>>   +        FileRef minName = null;
>>>   +
>>>   +        for (Entry<FileRef,DataFileValue> entry :
>>> datafileSizes.entrySet()) {
>>>   +          if (entry.getValue().getSize() < min &&
>>> !majorCompactingFiles.contains(entry.getKey())) {
>>>   +            min = entry.getValue().getSize();
>>>   +            minName = entry.getKey();
>>>   +          }
>>>   +        }
>>>   +
>>>   +        if (minName == null)
>>>   +          return null;
>>>   +
>>>   +        mergingMinorCompactionFile = minName;
>>>   +        return minName;
>>>   +      }
>>>   +
>>>   +      return null;
>>>   +    }
>>>   +
>>>   +    void unreserveMergingMinorCompactionFile(FileRef file) {
>>>   +      if ((file == null && mergingMinorCompactionFile != null) ||
>>> (file
>>> != null && mergingMinorCompactionFile == null)
>>>   +          || (file != null && mergingMinorCompactionFile != null &&
>>> !file.equals(mergingMinorCompactionFile)))
>>>   +        throw new IllegalStateException("Disagreement " + file + " "
>>> +
>>> mergingMinorCompactionFile);
>>>   +
>>>   +      mergingMinorCompactionFile = null;
>>>   +    }
>>>   +
>>>   +    void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef
>>> newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession
>>> commitSession, long flushId)
>>>   +        throws IOException {
>>>   +
>>>   +      IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
>>>   +      if (extent.isRootTablet()) {
>>>   +        try {
>>>   +          if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) {
>>>   +            throw new IllegalStateException();
>>>   +          }
>>>   +        } catch (Exception e) {
>>>   +          throw new IllegalStateException("Can not bring major
>>> compaction online, lock not held", e);
>>>   +        }
>>>   +      }
>>>   +
>>>   +      // rename before putting in metadata table, so files in metadata
>>> table should
>>>   +      // always exist
>>>   +      do {
>>>   +        try {
>>>   +          if (dfv.getNumEntries() == 0) {
>>>   +            fs.deleteRecursively(tmpDatafile.path());
>>>   +          } else {
>>>   +            if (fs.exists(newDatafile.path())) {
>>>   +              log.warn("Target map file already exist " +
>>> newDatafile);
>>>   +              fs.deleteRecursively(newDatafile.path());
>>>   +            }
>>>   +
>>>   +            if (!fs.rename(tmpDatafile.path(), newDatafile.path())) {
>>>   +              throw new IOException("rename fails");
>>>   +            }
>>>   +          }
>>>   +          break;
>>>   +        } catch (IOException ioe) {
>>>   +          log.warn("Tablet " + extent + " failed to rename " +
>>> newDatafile + " after MinC, will retry in 60 secs...", ioe);
>>>   +          UtilWaitThread.sleep(60 * 1000);
>>>   +        }
>>>   +      } while (true);
>>>   +
>>>   +      long t1, t2;
>>>   +
>>>   +      // the code below always assumes merged files are in use by
>>> scans... this must be done
>>>   +      // because the in memory list of files is not updated until
>>> after
>>> the metadata table
>>>   +      // therefore the file is available to scans until memory is
>>> updated, but want to ensure
>>>   +      // the file is not available for garbage collection... if memory
>>> were updated
>>>   +      // before this point (like major compactions do), then the
>>> following code could wait
>>>   +      // for scans to finish like major compactions do.... used to
>>> wait
>>> for scans to finish
>>>   +      // here, but that was incorrect because a scan could start after
>>> waiting but before
>>>   +      // memory was updated... assuming the file is always in use by
>>> scans leads to
>>>   +      // one uneeded metadata update when it was not actually in use
>>>   +      Set<FileRef> filesInUseByScans = Collections.emptySet();
>>>   +      if (absMergeFile != null)
>>>   +        filesInUseByScans = Collections.singleton(absMergeFile);
>>>   +
>>>   +      // very important to write delete entries outside of log lock,
>>> because
>>>   +      // this !METADATA write does not go up... it goes sideways or to
>>> itself
>>>   +      if (absMergeFile != null)
>>>   +        MetadataTableUtil.addDeleteEntries(extent,
>>> Collections.singleton(absMergeFile), SystemCredentials.get());
>>>   +
>>>   +      Set<String> unusedWalLogs = beginClearingUnusedLogs();
>>>   +      try {
>>>   +        // the order of writing to !METADATA and walog is important in
>>> the face of machine/process failures
>>>   +        // need to write to !METADATA before writing to walog, when
>>> things are done in the reverse order
>>>   +        // data could be lost... the minor compaction start even
>>> should
>>> be written before the following metadata
>>>   +        // write is made
>>>   +
>>>   +        synchronized (timeLock) {
>>>   +          if (commitSession.getMaxCommittedTime() > persistedTime)
>>>   +            persistedTime = commitSession.getMaxCommittedTime();
>>>   +
>>>   +          String time = tabletTime.getMetadataValue(persistedTime);
>>>   +          MasterMetadataUtil.updateTabletDataFile(extent,
>>> newDatafile,
>>> absMergeFile, dfv, time, SystemCredentials.get(), filesInUseByScans,
>>>   +              tabletServer.getClientAddressString(),
>>> tabletServer.getLock(), unusedWalLogs, lastLocation, flushId);
>>>   +        }
>>>   +
>>>   +      } finally {
>>>   +        finishClearingUnusedLogs();
>>>   +      }
>>>   +
>>>   +      do {
>>>   +        try {
>>>   +          // the purpose of making this update use the new commit
>>> session, instead of the old one passed in,
>>>   +          // is because the new one will reference the logs used by
>>> current memory...
>>>   +
>>>   +
>>>   tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(),
>>> newDatafile.toString(), commitSession.getWALogSeq() + 2);
>>>   +          break;
>>>   +        } catch (IOException e) {
>>>   +          log.error("Failed to write to write-ahead log " +
>>> e.getMessage() + " will retry", e);
>>>   +          UtilWaitThread.sleep(1 * 1000);
>>>   +        }
>>>   +      } while (true);
>>>   +
>>>   +      synchronized (Tablet.this) {
>>>   +        lastLocation = null;
>>>   +
>>>   +        t1 = System.currentTimeMillis();
>>>   +        if (datafileSizes.containsKey(newDatafile)) {
>>>   +          log.error("Adding file that is already in set " +
>>> newDatafile);
>>>   +        }
>>>   +
>>>   +        if (dfv.getNumEntries() > 0) {
>>>   +          datafileSizes.put(newDatafile, dfv);
>>>   +        }
>>>   +
>>>   +        if (absMergeFile != null) {
>>>   +          datafileSizes.remove(absMergeFile);
>>>   +        }
>>>   +
>>>   +        unreserveMergingMinorCompactionFile(absMergeFile);
>>>   +
>>>   +        dataSourceDeletions.incrementAndGet();
>>>   +        tabletMemory.finishedMinC();
>>>   +
>>>   +        lastFlushID = flushId;
>>>   +
>>>   +        computeNumEntries();
>>>   +        t2 = System.currentTimeMillis();
>>>   +      }
>>>   +
>>>   +      // must do this after list of files in memory is updated above
>>>   +      removeFilesAfterScan(filesInUseByScans);
>>>   +
>>>   +      if (absMergeFile != null)
>>>   +        log.log(TLevel.TABLET_HIST, extent + " MinC [" + absMergeFile
>>> +
>>> ",memory] -> " + newDatafile);
>>>   +      else
>>>   +        log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " +
>>> newDatafile);
>>>   +      log.debug(String.format("MinC finish lock %.2f secs %s", (t2 -
>>> t1)
>>> / 1000.0, getExtent().toString()));
>>>   +      if (dfv.getSize() >
>>> acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD)) {
>>>   +        log.debug(String.format("Minor Compaction wrote out file
>>> larger
>>> than split threshold.  split threshold = %,d  file size = %,d",
>>>   +
>>>   acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD),
>>> dfv.getSize()));
>>>   +      }
>>>   +
>>>   +    }
>>>   +
>>>   +    public void reserveMajorCompactingFiles(Collection<FileRef>
>>> files) {
>>>   +      if (majorCompactingFiles.size() != 0)
>>>   +        throw new IllegalStateException("Major compacting files not
>>> empty " + majorCompactingFiles);
>>>   +
>>>   +      if (mergingMinorCompactionFile != null &&
>>> files.contains(mergingMinorCompactionFile))
>>>   +        throw new IllegalStateException("Major compaction tried to
>>> resrve file in use by minor compaction " + mergingMinorCompactionFile);
>>>   +
>>>   +      majorCompactingFiles.addAll(files);
>>>   +    }
>>>   +
>>>   +    public void clearMajorCompactingFile() {
>>>   +      majorCompactingFiles.clear();
>>>   +    }
>>>   +
>>>   +    void bringMajorCompactionOnline(Set<FileRef> oldDatafiles,
>>> FileRef
>>> tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv)
>>>   +        throws IOException {
>>>   +      long t1, t2;
>>>   +
>>>   +      if (!extent.isRootTablet()) {
>>>   +
>>>   +        if (fs.exists(newDatafile.path())) {
>>>   +          log.error("Target map file already exist " + newDatafile,
>>> new
>>> Exception());
>>>   +          throw new IllegalStateException("Target map file already
>>> exist
>>> " + newDatafile);
>>>   +        }
>>>   +
>>>   +        // rename before putting in metadata table, so files in
>>> metadata
>>> table should
>>>   +        // always exist
>>>   +        if (!fs.rename(tmpDatafile.path(), newDatafile.path()))
>>>   +          log.warn("Rename of " + tmpDatafile + " to " + newDatafile
>>> + "
>>> returned false");
>>>   +
>>>   +        if (dfv.getNumEntries() == 0) {
>>>   +          fs.deleteRecursively(newDatafile.path());
>>>   +        }
>>>   +      }
>>>   +
>>>   +      TServerInstance lastLocation = null;
>>>   +      synchronized (Tablet.this) {
>>>   +
>>>   +        t1 = System.currentTimeMillis();
>>>   +
>>>   +        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
>>>   +
>>>   +        dataSourceDeletions.incrementAndGet();
>>>   +
>>>   +        if (extent.isRootTablet()) {
>>>   +
>>>   +          waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
>>>   +
>>>   +          try {
>>>   +            if (!zoo.isLockHeld(tabletServer.getLock().getLockID()))
>>> {
>>>   +              throw new IllegalStateException();
>>>   +            }
>>>   +          } catch (Exception e) {
>>>   +            throw new IllegalStateException("Can not bring major
>>> compaction online, lock not held", e);
>>>   +          }
>>>   +
>>>   +          // mark files as ready for deletion, but
>>>   +          // do not delete them until we successfully
>>>   +          // rename the compacted map file, in case
>>>   +          // the system goes down
>>>   +
>>>   +          String compactName = newDatafile.path().getName();
>>>   +
>>>   +          for (FileRef ref : oldDatafiles) {
>>>   +            Path path = ref.path();
>>>   +            fs.rename(path, new Path(location + "/delete+" +
>>> compactName
>>> + "+" + path.getName()));
>>>   +          }
>>>   +
>>>   +          if (fs.exists(newDatafile.path())) {
>>>   +            log.error("Target map file already exist " + newDatafile,
>>> new Exception());
>>>   +            throw new IllegalStateException("Target map file already
>>> exist " + newDatafile);
>>>   +          }
>>>   +
>>>   +          if (!fs.rename(tmpDatafile.path(), newDatafile.path()))
>>>   +            log.warn("Rename of " + tmpDatafile + " to " +
>>> newDatafile +
>>> " returned false");
>>>   +
>>>   +          // start deleting files, if we do not finish they will be
>>> cleaned
>>>   +          // up later
>>>   +          for (FileRef ref : oldDatafiles) {
>>>   +            Path path = ref.path();
>>>   +            Path deleteFile = new Path(location + "/delete+" +
>>> compactName + "+" + path.getName());
>>>   +            if (acuTableConf.getBoolean(Property.GC_TRASH_IGNORE) ||
>>> !fs.moveToTrash(deleteFile))
>>>   +              fs.deleteRecursively(deleteFile);
>>>   +          }
>>>   +        }
>>>   +
>>>   +        // atomically remove old files and add new file
>>>   +        for (FileRef oldDatafile : oldDatafiles) {
>>>   +          if (!datafileSizes.containsKey(oldDatafile)) {
>>>   +            log.error("file does not exist in set " + oldDatafile);
>>>   +          }
>>>   +          datafileSizes.remove(oldDatafile);
>>>   +          majorCompactingFiles.remove(oldDatafile);
>>>   +        }
>>>   +
>>>   +        if (datafileSizes.containsKey(newDatafile)) {
>>>   +          log.error("Adding file that is already in set " +
>>> newDatafile);
>>>   +        }
>>>   +
>>>   +        if (dfv.getNumEntries() > 0) {
>>>   +          datafileSizes.put(newDatafile, dfv);
>>>   +        }
>>>   +
>>>   +        // could be used by a follow on compaction in a multipass
>>> compaction
>>>   +        majorCompactingFiles.add(newDatafile);
>>>   +
>>>   +        computeNumEntries();
>>>   +
>>>   +        lastLocation = Tablet.this.lastLocation;
>>>   +        Tablet.this.lastLocation = null;
>>>   +
>>>   +        if (compactionId != null)
>>>   +          lastCompactID = compactionId;
>>>   +
>>>   +        t2 = System.currentTimeMillis();
>>>   +      }
>>>   +
>>>   +      if (!extent.isRootTablet()) {
>>>   +        Set<FileRef> filesInUseByScans =
>>> waitForScansToFinish(oldDatafiles, false, 10000);
>>>   +        if (filesInUseByScans.size() > 0)
>>>   +          log.debug("Adding scan refs to metadata " + extent + " " +
>>> filesInUseByScans);
>>>   +        MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles,
>>> filesInUseByScans, newDatafile, compactionId, dfv,
>>> SystemCredentials.get(),
>>>   +            tabletServer.getClientAddressString(), lastLocation,
>>> tabletServer.getLock());
>>>   +        removeFilesAfterScan(filesInUseByScans);
>>>   +      }
>>>   +
>>>   +      log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1)
>>> /
>>> 1000.0));
>>>   +      log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + "
>>> --> " + newDatafile);
>>>   +    }
>>>   +
>>>   +    public SortedMap<FileRef,DataFileValue> getDatafileSizes() {
>>>   +      synchronized (Tablet.this) {
>>>   +        TreeMap<FileRef,DataFileValue> copy = new
>>> TreeMap<FileRef,DataFileValue>(datafileSizes);
>>>   +        return Collections.unmodifiableSortedMap(copy);
>>>   +      }
>>>   +    }
>>>   +
>>>   +    public Set<FileRef> getFiles() {
>>>   +      synchronized (Tablet.this) {
>>>   +        HashSet<FileRef> files = new
>>> HashSet<FileRef>(datafileSizes.keySet());
>>>   +        return Collections.unmodifiableSet(files);
>>>   +      }
>>>   +    }
>>>   +
>>>   +  }
>>>   +
>>>   +  public Tablet(TabletServer tabletServer, Text location, KeyExtent
>>> extent, TabletResourceManager trm, SortedMap<Key,Value> tabletsKeyValues)
>>>   +      throws IOException {
>>>   +    this(tabletServer, location, extent, trm,
>>> CachedConfiguration.getInstance(), tabletsKeyValues);
>>>   +    splitCreationTime = 0;
>>>   +  }
>>>   +
>>>   +  public Tablet(TabletServer tabletServer, Text location, KeyExtent
>>> extent, TabletResourceManager trm, SortedMap<FileRef,DataFileValue>
>>> datafiles, String time,
>>>   +      long initFlushID, long initCompactID) throws IOException {
>>>   +    this(tabletServer, location, extent, trm,
>>> CachedConfiguration.getInstance(), datafiles, time, initFlushID,
>>> initCompactID);
>>>   +    splitCreationTime = System.currentTimeMillis();
>>>   +  }
>>>   +
>>>   +  private Tablet(TabletServer tabletServer, Text location, KeyExtent
>>> extent, TabletResourceManager trm, Configuration conf,
>>>   +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>>>   +    this(tabletServer, location, extent, trm, conf,
>>> VolumeManagerImpl.get(), tabletsKeyValues);
>>>   +  }
>>>   +
>>>   +  static private final List<LogEntry> EMPTY = Collections.emptyList();
>>>   +
>>>   +  private Tablet(TabletServer tabletServer, Text location, KeyExtent
>>> extent, TabletResourceManager trm, Configuration conf,
>>>   +      SortedMap<FileRef,DataFileValue> datafiles, String time, long
>>> initFlushID, long initCompactID) throws IOException {
>>>   +    this(tabletServer, location, extent, trm, conf,
>>> VolumeManagerImpl.get(), EMPTY, datafiles, time, null, new
>>> HashSet<FileRef>(), initFlushID, initCompactID);
>>>   +  }
>>>   +
>>>   +  private static String lookupTime(AccumuloConfiguration conf,
>>> KeyExtent
>>> extent, SortedMap<Key,Value> tabletsKeyValues) {
>>>   +    SortedMap<Key,Value> entries;
>>>   +
>>>   +    if (extent.isRootTablet()) {
>>>   +      return null;
>>>   +    } else {
>>>   +      entries = new TreeMap<Key,Value>();
>>>   +      Text rowName = extent.getMetadataEntry();
>>>   +      for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>   +        if (entry.getKey().compareRow(rowName) == 0 &&
>>> TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey()))
>>> {
>>>   +          entries.put(new Key(entry.getKey()), new
>>> Value(entry.getValue()));
>>>   +        }
>>>   +      }
>>>   +    }
>>>   +
>>>   +    // log.debug("extent : "+extent+"   entries : "+entries);
>>>   +
>>>   +    if (entries.size() == 1)
>>>   +      return entries.values().iterator().next().toString();
>>>   +    return null;
>>>   +  }
>>>   +
>>>   +  private static SortedMap<FileRef,DataFileValue>
>>> lookupDatafiles(AccumuloConfiguration conf, VolumeManager fs, KeyExtent
>>> extent,
>>>   +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>>>   +
>>>   +    TreeMap<FileRef,DataFileValue> datafiles = new
>>> TreeMap<FileRef,DataFileValue>();
>>>   +
>>>   +    if (extent.isRootTablet()) { // the meta0 tablet
>>>   +      Path location = new Path(MetadataTableUtil.getRootTabletDir());
>>>   +      // cleanUpFiles() has special handling for delete. files
>>>   +      FileStatus[] files = fs.listStatus(location);
>>>   +      Collection<String> goodPaths = cleanUpFiles(fs, files, true);
>>>   +      for (String good : goodPaths) {
>>>   +        Path path = new Path(good);
>>>   +        String filename = path.getName();
>>>   +        FileRef ref = new FileRef(location.toString() + "/" +
>>> filename,
>>> path);
>>>   +        DataFileValue dfv = new DataFileValue(0, 0);
>>>   +        datafiles.put(ref, dfv);
>>>   +      }
>>>   +    } else {
>>>   +
>>>   +      Text rowName = extent.getMetadataEntry();
>>>   +
>>>   +      String tableId = extent.isMeta() ? RootTable.ID :
>>> MetadataTable.ID;
>>>   +      ScannerImpl mdScanner = new
>>> ScannerImpl(HdfsZooInstance.getInstance(), SystemCredentials.get(),
>>> tableId, Authorizations.EMPTY);
>>>   +
>>>   +      // Commented out because when no data file is present, each
>>> tablet
>>> will scan through metadata table and return nothing
>>>   +      // reduced batch size to improve performance
>>>   +      // changed here after endKeys were implemented from 10 to 1000
>>>   +      mdScanner.setBatchSize(1000);
>>>   +
>>>   +      // leave these in, again, now using endKey for safety
>>>   +      mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
>>>   +
>>>   +      mdScanner.setRange(new Range(rowName));
>>>   +
>>>   +      for (Entry<Key,Value> entry : mdScanner) {
>>>   +
>>>   +        if (entry.getKey().compareRow(rowName) != 0) {
>>>   +          break;
>>>   +        }
>>>   +
>>>   +        FileRef ref = new
>>> FileRef(entry.getKey().getColumnQualifier().toString(),
>>> fs.getFullPath(entry.getKey()));
>>>   +        datafiles.put(ref, new DataFileValue(entry.getValue()
>>> .get()));
>>>   +      }
>>>   +    }
>>>   +    return datafiles;
>>>   +  }
>>>   +
>>>   +  private static List<LogEntry> lookupLogEntries(KeyExtent ke,
>>> SortedMap<Key,Value> tabletsKeyValues) {
>>>   +    List<LogEntry> logEntries = new ArrayList<LogEntry>();
>>>   +
>>>   +    if (ke.isMeta()) {
>>>   +      try {
>>>   +        logEntries =
>>> MetadataTableUtil.getLogEntries(SystemCredentials.get(), ke);
>>>   +      } catch (Exception ex) {
>>>   +        throw new RuntimeException("Unable to read tablet log
>>> entries",
>>> ex);
>>>   +      }
>>>   +    } else {
>>>   +      log.debug("Looking at metadata " + tabletsKeyValues);
>>>   +      Text row = ke.getMetadataEntry();
>>>   +      for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>   +        Key key = entry.getKey();
>>>   +        if (key.getRow().equals(row)) {
>>>   +          if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
>>>   +            logEntries.add(LogEntry.fromKeyValue(key,
>>> entry.getValue()));
>>>   +          }
>>>   +        }
>>>   +      }
>>>   +    }
>>>   +
>>>   +    log.debug("got " + logEntries + " for logs for " + ke);
>>>   +    return logEntries;
>>>   +  }
>>>   +
>>>   +  private static Set<FileRef> lookupScanFiles(KeyExtent extent,
>>> SortedMap<Key,Value> tabletsKeyValues, VolumeManager fs) throws
>>> IOException
>>> {
>>>   +    HashSet<FileRef> scanFiles = new HashSet<FileRef>();
>>>   +
>>>   +    Text row = extent.getMetadataEntry();
>>>   +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>   +      Key key = entry.getKey();
>>>   +      if (key.getRow().equals(row) &&
>>> key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) {
>>>   +        String meta = key.getColumnQualifier().toString();
>>>   +        Path path = fs.getFullPath(extent.getTableId().toString(),
>>> meta);
>>>   +        scanFiles.add(new FileRef(meta, path));
>>>   +      }
>>>   +    }
>>>   +
>>>   +    return scanFiles;
>>>   +  }
>>>   +
>>>   +  private static long lookupFlushID(KeyExtent extent,
>>> SortedMap<Key,Value> tabletsKeyValues) {
>>>   +    Text row = extent.getMetadataEntry();
>>>   +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>   +      Key key = entry.getKey();
>>>   +      if (key.getRow().equals(row) &&
>>> TabletsSection.ServerColumnFamily.FLUSH_COLUMN.equals(key.
>>> getColumnFamily(),
>>> key.getColumnQualifier()))
>>>   +        return Long.parseLong(entry.getValue().toString());
>>>   +    }
>>>   +
>>>   +    return -1;
>>>   +  }
>>>   +
>>>   +  private static long lookupCompactID(KeyExtent extent,
>>> SortedMap<Key,Value> tabletsKeyValues) {
>>>   +    Text row = extent.getMetadataEntry();
>>>   +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>   +      Key key = entry.getKey();
>>>   +      if (key.getRow().equals(row) &&
>>> TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.
>>> getColumnFamily(),
>>> key.getColumnQualifier()))
>>>   +        return Long.parseLong(entry.getValue().toString());
>>>   +    }
>>>   +
>>>   +    return -1;
>>>   +  }
>>>   +
>>>   +  private Tablet(TabletServer tabletServer, Text location, KeyExtent
>>> extent, TabletResourceManager trm, Configuration conf, VolumeManager fs,
>>>   +      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
>>>   +    this(tabletServer, location, extent, trm, conf, fs,
>>> lookupLogEntries(extent, tabletsKeyValues),
>>> lookupDatafiles(tabletServer.getSystemConfiguration(), fs,
>>>   +        extent, tabletsKeyValues),
>>> lookupTime(tabletServer.getSystemConfiguration(), extent,
>>> tabletsKeyValues), lookupLastServer(extent, tabletsKeyValues),
>>>   +        lookupScanFiles(extent, tabletsKeyValues, fs),
>>> lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent,
>>> tabletsKeyValues));
>>>   +  }
>>>   +
>>>   +  private static TServerInstance lookupLastServer(KeyExtent extent,
>>> SortedMap<Key,Value> tabletsKeyValues) {
>>>   +    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
>>>   +      if (entry.getKey().getColumnFamily().compareTo(
>>> TabletsSection.LastLocationColumnFamily.NAME) == 0) {
>>>   +        return new TServerInstance(entry.getValue(),
>>> entry.getKey().getColumnQualifier());
>>>   +      }
>>>   +    }
>>>   +    return null;
>>>   +  }
>>>   +
>>>   +  /**
>>>   +   * yet another constructor - this one allows us to avoid costly
>>> lookups into the Metadata table if we already know the files we need - as
>>> at split time
>>>   +   */
>>>   +  private Tablet(final TabletServer tabletServer, final Text location,
>>> final KeyExtent extent, final TabletResourceManager trm, final
>>> Configuration conf,
>>>   +      final VolumeManager fs, final List<LogEntry> logEntries, final
>>> SortedMap<FileRef,DataFileValue> datafiles, String time,
>>>   +      final TServerInstance lastLocation, Set<FileRef> scanFiles, long
>>> initFlushID, long initCompactID) throws IOException {
>>>   +    Path locationPath;
>>>   +    if (location.find(":") >= 0) {
>>>   +      locationPath = new Path(location.toString());
>>>   +    } else {
>>>   +      locationPath = fs.getFullPath(FileType.TABLE,
>>> extent.getTableId().toString() + location.toString());
>>>   +    }
>>>   +    FileSystem fsForPath = fs.getFileSystemByPath(locationPath);
>>>   +    this.location = locationPath.makeQualified(fsForPath.getUri(),
>>> fsForPath.getWorkingDirectory());
>>>   +    this.lastLocation = lastLocation;
>>>   +    this.tabletDirectory = location.toString();
>>>   +    this.conf = conf;
>>>   +    this.acuTableConf = tabletServer.getTableConfiguration(extent);
>>>   +
>>>   +    this.fs = fs;
>>>   +    this.extent = extent;
>>>   +    this.tabletResources = trm;
>>>   +
>>>   +    this.lastFlushID = initFlushID;
>>>   +    this.lastCompactID = initCompactID;
>>>   +
>>>   +    if (extent.isRootTablet()) {
>>>   +      long rtime = Long.MIN_VALUE;
>>>   +      for (FileRef ref : datafiles.keySet()) {
>>>   +        Path path = ref.path();
>>>   +        FileSystem ns = fs.getFileSystemByPath(path);
>>>   +        FileSKVIterator reader =
>>> FileOperations.getInstance().openReader(path.toString(), true, ns,
>>> ns.getConf(), tabletServer.getTableConfiguration(extent));
>>>   +        long maxTime = -1;
>>>   +        try {
>>>   +
>>>   +          while (reader.hasTop()) {
>>>   +            maxTime = Math.max(maxTime,
>>> reader.getTopKey().getTimestamp());
>>>   +            reader.next();
>>>   +          }
>>>   +
>>>   +        } finally {
>>>   +          reader.close();
>>>   +        }
>>>   +
>>>   +        if (maxTime > rtime) {
>>>   +          time = TabletTime.LOGICAL_TIME_ID + "" + maxTime;
>>>   +          rtime = maxTime;
>>>   +        }
>>>   +      }
>>>   +    }
>>>   +    if (time == null && datafiles.isEmpty() &&
>>> extent.equals(RootTable.OLD_EXTENT)) {
>>>   +      // recovery... old root tablet has no data, so time doesn't
>>> matter:
>>>   +      time = TabletTime.LOGICAL_TIME_ID + "" + Long.MIN_VALUE;
>>>   +    }
>>>   +
>>>   +    this.tabletServer = tabletServer;
>>>   +    this.logId = tabletServer.createLogId(extent);
>>>   +
>>>   +    this.timer = new TabletStatsKeeper();
>>>   +
>>>   +    setupDefaultSecurityLabels(extent);
>>>   +
>>>   +    tabletMemory = new TabletMemory();
>>>   +    tabletTime = TabletTime.getInstance(time);
>>>   +    persistedTime = tabletTime.getTime();
>>>   +
>>>   +    acuTableConf.addObserver(configObserver = new
>>> ConfigurationObserver() {
>>>   +
>>>   +      private void reloadConstraints() {
>>>   +        constraintChecker.set(new
>>> ConstraintChecker(getTableConfiguration()));
>>>   +      }
>>>   +
>>>   +      @Override
>>>   +      public void propertiesChanged() {
>>>   +        reloadConstraints();
>>>   +
>>>   +        try {
>>>   +          setupDefaultSecurityLabels(extent);
>>>   +        } catch (Exception e) {
>>>   +          log.error("Failed to reload default security labels for
>>> extent: " + extent.toString());
>>>   +        }
>>>   +      }
>>>   +
>>>   +      @Override
>>>   +      public void propertyChanged(String prop) {
>>>   +        if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.
>>> getKey()))
>>>   +          reloadConstraints();
>>>   +        else if
>>> (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) {
>>>   +          try {
>>>   +            log.info("Default security labels changed for extent: " +
>>> extent.toString());
>>>   +            setupDefaultSecurityLabels(extent);
>>>   +          } catch (Exception e) {
>>>   +            log.error("Failed to reload default security labels for
>>> extent: " + extent.toString());
>>>   +          }
>>>   +        }
>>>   +
>>>   +      }
>>>   +
>>>   +      @Override
>>>   +      public void sessionExpired() {
>>>   +        log.debug("Session expired, no longer updating per table
>>> props...");
>>>   +      }
>>>   +
>>>   +    });
>>>   +    // Force a load of any per-table properties
>>>   +    configObserver.propertiesChanged();
>>>   +
>>>   +    tabletResources.setTablet(this, acuTableConf);
>>>   +    if (!logEntries.isEmpty()) {
>>>   +      log.info("Starting Write-Ahead Log recovery for " +
>>> this.extent);
>>>   +      final long[] count = new long[2];
>>>   +      final CommitSession commitSession =
>>> tabletMemory.getCommitSession();
>>>   +      count[1] = Long.MIN_VALUE;
>>>   +      try {
>>>   +        Set<String> absPaths = new HashSet<String>();
>>>   +        for (FileRef ref : datafiles.keySet())
>>>   +          absPaths.add(ref.path().toString());
>>>   +
>>>   +        tabletServer.recover(this.tabletServer.getFileSystem(), this,
>>> logEntries, absPaths, new MutationReceiver() {
>>>   +          @Override
>>>   +          public void receive(Mutation m) {
>>>   +            // LogReader.printMutation(m);
>>>   +            Collection<ColumnUpdate> muts = m.getUpdates();
>>>   +            for (ColumnUpdate columnUpdate : muts) {
>>>   +              if (!columnUpdate.hasTimestamp()) {
>>>   +                // if it is not a user set timestamp, it must have
>>> been
>>> set
>>>   +                // by the system
>>>   +                count[1] = Math.max(count[1],
>>> columnUpdate.getTimestamp());
>>>   +              }
>>>   +            }
>>>   +            tabletMemory.mutate(commitSession,
>>> Collections.singletonList(m));
>>>   +            count[0]++;
>>>   +          }
>>>   +        });
>>>   +
>>>   +        if (count[1] != Long.MIN_VALUE) {
>>>   +          tabletTime.useMaxTimeFromWALog(count[1]);
>>>   +        }
>>>   +        commitSession.updateMaxCommittedTime(tabletTime.getTime());
>>>   +
>>> -         tabletMemory.updateMemoryUsageStats();
>>> -
>>>   +        if (count[0] == 0) {
>>>   +          MetadataTableUtil.removeUnusedWALEntries(extent,
>>> logEntries,
>>> tabletServer.getLock());
>>>   +          logEntries.clear();
>>>   +        }
>>>   +
>>>   +      } catch (Throwable t) {
>>>   +        if (acuTableConf.getBoolean(Property.TABLE_FAILURES_IGNORE))
>>> {
>>>   +          log.warn("Error recovering from log files: ", t);
>>>   +        } else {
>>>   +          throw new RuntimeException(t);
>>>   +        }
>>>   +      }
>>>   +      // make some closed references that represent the recovered logs
>>>   +      currentLogs = new HashSet<DfsLogger>();
>>>   +      for (LogEntry logEntry : logEntries) {
>>>   +        for (String log : logEntry.logSet) {
>>>   +          currentLogs.add(new DfsLogger(tabletServer.
>>> getServerConfig(),
>>> log));
>>>   +        }
>>>   +      }
>>>   +
>>>   +      log.info("Write-Ahead Log recovery complete for " +
>>> this.extent +
>>> " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries()
>>>   +          + " entries created)");
>>>   +    }
>>>   +
>>>   +    String contextName = acuTableConf.get(Property.TABLE_CLASSPATH);
>>>   +    if (contextName != null && !contextName.equals("")) {
>>>   +      // initialize context classloader, instead of possibly waiting
>>> for
>>> it to initialize for a scan
>>>   +      // TODO this could hang, causing other tablets to fail to load -
>>> ACCUMULO-1292
>>>   +
>>>   AccumuloVFSClassLoader.getContextManager().
>>> getClassLoader(contextName);
>>>   +    }
>>>   +
>>>   +    // do this last after tablet is completely setup because it
>>>   +    // could cause major compaction to start
>>>   +    datafileManager = new DatafileManager(datafiles);
>>>   +
>>>   +    computeNumEntries();
>>>   +
>>>   +    datafileManager.removeFilesAfterScan(scanFiles);
>>>   +
>>>   +    // look for hints of a failure on the previous tablet server
>>>   +    if (!logEntries.isEmpty() ||
>>> needsMajorCompaction(MajorCompactionReason.NORMAL)) {
>>>   +      // look for any temp files hanging around
>>>   +      removeOldTemporaryFiles();
>>>   +    }
>>>   +
>>>   +    log.log(TLevel.TABLET_HIST, extent + " opened");
>>>   +  }
>>>   +
>>>   +  private void removeOldTemporaryFiles() {
>>>   +    // remove any temporary files created by a previous tablet server
>>>   +    try {
>>>   +      for (FileStatus tmp : fs.globStatus(new Path(location,
>>> "*_tmp"))) {
>>>   +        try {
>>>   +          log.debug("Removing old temp file " + tmp.getPath());
>>>   +          fs.delete(tmp.getPath());
>>>   +        } catch (IOException ex) {
>>>   +          log.error("Unable to remove old temp file " + tmp.getPath()
>>> +
>>> ": " + ex);
>>>   +        }
>>>   +      }
>>>   +    } catch (IOException ex) {
>>>   +      log.error("Error scanning for old temp files in " + location);
>>>   +    }
>>>   +  }
>>>   +
>>>   +  private void setupDefaultSecurityLabels(KeyExtent extent) {
>>>   +    if (extent.isMeta()) {
>>>   +      defaultSecurityLabel = new byte[0];
>>>   +    } else {
>>>   +      try {
>>>   +        ColumnVisibility cv = new
>>> ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_
>>> SCANTIME_VISIBILITY));
>>>   +        this.defaultSecurityLabel = cv.getExpression();
>>>   +      } catch (Exception e) {
>>>   +        log.error(e, e);
>>>   +        this.defaultSecurityLabel = new byte[0];
>>>   +      }
>>>   +    }
>>>   +  }
>>>   +
>>>   +  private static Collection<String> cleanUpFiles(VolumeManager fs,
>>> FileStatus[] files, boolean deleteTmp) throws IOException {
>>>   +    /*
>>>   +     * called in constructor and before major compactions
>>>   +     */
>>>   +    Collection<String> goodFiles = new ArrayList<String>(files.
>>> length);
>>>   +
>>>   +    for (FileStatus file : files) {
>>>   +
>>>   +      String path = file.getPath().toString();
>>>   +      String filename = file.getPath().getName();
>>>   +
>>>   +      // check for incomplete major compaction, this should only occur
>>>   +      // for root tablet
>>>   +      if (filename.startsWith("delete+")) {
>>>   +        String expectedCompactedFile = path.substring(0,
>>> path.lastIndexOf("/delete+")) + "/" + filename.split("\\+")[1];
>>>   +        if (fs.exists(new Path(expectedCompactedFile))) {
>>>   +          // compaction finished, but did not finish deleting
>>> compacted
>>> files.. so delete it
>>>   +          if (!fs.deleteRecursively(file.getPath()))
>>>   +            log.warn("Delete of file: " + file.getPath().toString() +
>>> "
>>> return false");
>>>   +          continue;
>>>   +        }
>>>   +        // compaction did not finish, so put files back
>>>   +
>>>   +        // reset path and filename for rest of loop
>>>   +        filename = filename.split("\\+", 3)[2];
>>>   +        path = path.substring(0, path.lastIndexOf("/delete+")) + "/" +
>>> filename;
>>>   +
>>>   +        if (!fs.rename(file.getPath(), new Path(path)))
>>>   +          log.warn("Rename of " + file.getPath().toString() + " to " +
>>> path + " returned false");
>>>   +      }
>>>   +
>>>   +      if (filename.endsWith("_tmp")) {
>>>   +        if (deleteTmp) {
>>>   +          log.warn("cleaning up old tmp file: " + path);
>>>   +          if (!fs.deleteRecursively(file.getPath()))
>>>   +            log.warn("Delete of tmp file: " +
>>> file.getPath().toString()
>>> + " return false");
>>>   +
>>>   +        }
>>>   +        continue;
>>>   +      }
>>>   +
>>>   +      if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") &&
>>> !FileOperations.getValidExtensions().contains(filename.split("\\.")[1]))
>>> {
>>>   +        log.error("unknown file in tablet" + path);
>>>   +        continue;
>>>   +      }
>>>   +
>>>   +      goodFiles.add(path);
>>>   +    }
>>>   +
>>>   +    return goodFiles;
>>>   +  }
>>>   +
>>>   +  public static class KVEntry extends KeyValue {
>>>   +    public KVEntry(Key k, Value v) {
>>>   +      super(new Key(k), Arrays.copyOf(v.get(), v.get().length));
>>>   +    }
>>>   +
>>>   +    @Override
>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message