hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r747672 [2/4] - in /hadoop/hbase/trunk: conf/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/io/hfile/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/...
Date Wed, 25 Feb 2009 05:59:27 GMT
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=747672&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Feb 25 05:59:26 2009
@@ -0,0 +1,1807 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.SequenceFile;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+  * A Store holds a column family in a Region.  Its a memcache and a set of zero
+  * or more StoreFiles, which stretch backwards over time.
+  *
+  * <p>There's no reason to consider append-logging at this level; all logging 
+  * and locking is handled at the HRegion level.  Store just provides
+  * services to manage sets of StoreFiles.  One of the most important of those
+  * services is compaction services where files are aggregated once they pass
+  * a configurable threshold.
+  *
+  * <p>The only thing having to do with logs that Store needs to deal with is
+  * the reconstructionLog.  This is a segment of an HRegion's log that might
+  * NOT be present upon startup.  If the param is NULL, there's nothing to do.
+  * If the param is non-NULL, we need to process the log to reconstruct
+  * a TreeMap that might not have been written to disk before the process
+  * died.
+  *
+  * <p>It's assumed that after this constructor returns, the reconstructionLog
+  * file will be deleted (by whoever has instantiated the Store).
+ *
+ * <p>Locking and transactions are handled at a higher level.  This API should
+ * not be called directly but by an HRegion manager.
+ */
+public class Store implements HConstants {
+  static final Log LOG = LogFactory.getLog(Store.class);
+  protected final Memcache memcache;
+  // This stores directory in the filesystem.
+  private final Path homedir;
+  private final HRegionInfo regioninfo;
+  private final HColumnDescriptor family;
+  final FileSystem fs;
+  private final HBaseConfiguration conf;
+  // ttl in milliseconds.
+  protected long ttl;
+  private long majorCompactionTime;
+  private int maxFilesToCompact;
+  private final long desiredMaxFileSize;
+  private volatile long storeSize = 0L;
+  private final Integer flushLock = new Integer(0);
+  final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  final byte [] storeName;
+  private final String storeNameStr;
+
+  /*
+   * Sorted Map of readers keyed by maximum edit sequence id (Most recent should
+   * be last in in list).  ConcurrentSkipListMap is lazily consistent so no
+   * need to lock it down when iterating; iterator view is that of when the
+   * iterator was taken out.
+   */
+  private final NavigableMap<Long, StoreFile> storefiles =
+    new ConcurrentSkipListMap<Long, StoreFile>();
+
+  // All access must be synchronized.
+  private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
+    new CopyOnWriteArraySet<ChangedReadersObserver>();
+
+  // The most-recent log-seq-ID.  The most-recent such ID means we can ignore
+  // all log messages up to and including that ID (because they're already
+  // reflected in the TreeMaps).
+  private volatile long maxSeqId = -1;
+
+  private final Path compactionDir;
+  private final Integer compactLock = new Integer(0);
+  private final int compactionThreshold;
+
+  /**
+   * Constructor
+   * @param basedir qualified path under which the region directory lives;
+   * generally the table subdirectory
+   * @param info HRegionInfo for this region
+   * @param family HColumnDescriptor for this column
+   * @param fs file system object
+   * @param reconstructionLog existing log file to apply if any
+   * @param conf configuration object
+   * @param reporter Call on a period so hosting server can report we're
+   * making progress to master -- otherwise master might think region deploy
+   * failed.  Can be null.
+   * @throws IOException
+   */
+  protected Store(Path basedir, HRegionInfo info, HColumnDescriptor family,
+    FileSystem fs, Path reconstructionLog, HBaseConfiguration conf,
+    final Progressable reporter)
+  throws IOException {  
+    this.homedir = getStoreHomedir(basedir, info.getEncodedName(),
+      family.getName());
+    this.regioninfo = info;
+    this.family = family;
+    this.fs = fs;
+    this.conf = conf;
+    // getTimeToLive returns ttl in seconds.  Convert to milliseconds.
+    this.ttl = family.getTimeToLive();
+    if (ttl != HConstants.FOREVER) {
+      this.ttl *= 1000;
+    }
+    this.memcache = new Memcache(this.ttl);
+    this.compactionDir = HRegion.getCompactionDir(basedir);
+    this.storeName = Bytes.toBytes(this.regioninfo.getEncodedName() + "/" +
+      Bytes.toString(this.family.getName()));
+    this.storeNameStr = Bytes.toString(this.storeName);
+
+    // By default, we compact if an HStore has more than
+    // MIN_COMMITS_FOR_COMPACTION map files
+    this.compactionThreshold =
+      conf.getInt("hbase.hstore.compactionThreshold", 3);
+
+    // By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
+    long maxFileSize = info.getTableDesc().getMaxFileSize();
+    if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) {
+      maxFileSize = conf.getLong("hbase.hregion.max.filesize",
+        HConstants.DEFAULT_MAX_FILE_SIZE);
+    }
+    this.desiredMaxFileSize = maxFileSize;
+
+    this.majorCompactionTime =
+      conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 86400000);
+    if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
+      String strCompactionTime =
+        family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
+      this.majorCompactionTime = (new Long(strCompactionTime)).longValue();
+    }
+
+    this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
+
+    // loadStoreFiles calculates this.maxSeqId. as side-effect.
+    this.storefiles.putAll(loadStoreFiles());
+    if (LOG.isDebugEnabled() && this.storefiles.size() > 0) {
+      LOG.debug("Loaded " + this.storefiles.size() + " file(s) in Store " +
+        Bytes.toString(this.storeName) + ", max sequence id " + this.maxSeqId);
+    }
+
+    // Do reconstruction log.
+    runReconstructionLog(reconstructionLog, this.maxSeqId, reporter);
+  }
+
+  HColumnDescriptor getFamily() {
+    return this.family;
+  }
+
+  long getMaxSequenceId() {
+    return this.maxSeqId;
+  }
+
+  /**
+   * @param tabledir
+   * @param encodedName Encoded region name.
+   * @param family
+   * @return Path to family/Store home directory.
+   */
+  public static Path getStoreHomedir(final Path tabledir,
+      final int encodedName, final byte [] family) {
+    return new Path(tabledir, new Path(Integer.toString(encodedName),
+      new Path(Bytes.toString(family))));
+  }
+
+  /*
+   * Run reconstruction log
+   * @param reconstructionLog
+   * @param msid
+   * @param reporter
+   * @throws IOException
+   */
+  private void runReconstructionLog(final Path reconstructionLog,
+    final long msid, final Progressable reporter)
+  throws IOException {
+    try {
+      doReconstructionLog(reconstructionLog, msid, reporter);
+    } catch (EOFException e) {
+      // Presume we got here because of lack of HADOOP-1700; for now keep going
+      // but this is probably not what we want long term.  If we got here there
+      // has been data-loss
+      LOG.warn("Exception processing reconstruction log " + reconstructionLog +
+        " opening " + this.storeName +
+        " -- continuing.  Probably lack-of-HADOOP-1700 causing DATA LOSS!", e);
+    } catch (IOException e) {
+      // Presume we got here because of some HDFS issue. Don't just keep going.
+      // Fail to open the HStore.  Probably means we'll fail over and over
+      // again until human intervention but alternative has us skipping logs
+      // and losing edits: HBASE-642.
+      LOG.warn("Exception processing reconstruction log " + reconstructionLog +
+        " opening " + this.storeName, e);
+      throw e;
+    }
+  }
+
+  /*
+   * Read the reconstructionLog to see whether we need to build a brand-new 
+   * file out of non-flushed log entries.  
+   *
+   * We can ignore any log message that has a sequence ID that's equal to or 
+   * lower than maxSeqID.  (Because we know such log messages are already 
+   * reflected in the MapFiles.)
+   */
+  @SuppressWarnings("unchecked")
+  private void doReconstructionLog(final Path reconstructionLog,
+    final long maxSeqID, final Progressable reporter)
+  throws UnsupportedEncodingException, IOException {
+    if (reconstructionLog == null || !this.fs.exists(reconstructionLog)) {
+      // Nothing to do.
+      return;
+    }
+    // Check its not empty.
+    FileStatus [] stats = this.fs.listStatus(reconstructionLog);
+    if (stats == null || stats.length == 0) {
+      LOG.warn("Passed reconstruction log " + reconstructionLog +
+        " is zero-length");
+      return;
+    }
+    // TODO: This could grow large and blow heap out.  Need to get it into
+    // general memory usage accounting.
+    long maxSeqIdInLog = -1;
+    NavigableMap<HStoreKey, byte []> reconstructedCache =
+      new TreeMap<HStoreKey, byte []>(new HStoreKey.HStoreKeyWritableComparator());
+    SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs,
+      reconstructionLog, this.conf);
+    try {
+      HLogKey key = new HLogKey();
+      HLogEdit val = new HLogEdit();
+      long skippedEdits = 0;
+      long editsCount = 0;
+      // How many edits to apply before we send a progress report.
+      int reportInterval =
+        this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
+      while (logReader.next(key, val)) {
+        maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
+        if (key.getLogSeqNum() <= maxSeqID) {
+          skippedEdits++;
+          continue;
+        }
+        // Check this edit is for me. Also, guard against writing
+        // METACOLUMN info such as HBASE::CACHEFLUSH entries
+        byte [] column = val.getColumn();
+        if (val.isTransactionEntry() || Bytes.equals(column, HLog.METACOLUMN)
+            || !Bytes.equals(key.getRegionName(), regioninfo.getRegionName())
+            || !HStoreKey.matchingFamily(family.getName(), column)) {
+          continue;
+        }
+        HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
+        reconstructedCache.put(k, val.getVal());
+        editsCount++;
+        // Every 2k edits, tell the reporter we're making progress.
+        // Have seen 60k edits taking 3minutes to complete.
+        if (reporter != null && (editsCount % reportInterval) == 0) {
+          reporter.progress();
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
+          " because sequence id <= " + maxSeqID);
+      }
+    } finally {
+      logReader.close();
+    }
+    
+    if (reconstructedCache.size() > 0) {
+      // We create a "virtual flush" at maxSeqIdInLog+1.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("flushing reconstructionCache");
+      }
+      internalFlushCache(reconstructedCache, maxSeqIdInLog + 1);
+    }
+  }
+
+  /*
+   * Creates a series of StoreFile loaded from the given directory.
+   * @throws IOException
+   */
+  private Map<Long, StoreFile> loadStoreFiles()
+  throws IOException {
+    Map<Long, StoreFile> results = new HashMap<Long, StoreFile>();
+    FileStatus files[] = this.fs.listStatus(this.homedir);
+    for (int i = 0; files != null && i < files.length; i++) {
+      // Skip directories.
+      if (files[i].isDir()) {
+        continue;
+      }
+      Path p = files[i].getPath();
+      // Check for empty file.  Should never be the case but can happen
+      // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
+      if (this.fs.getFileStatus(p).getLen() <= 0) {
+        LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
+        continue;
+      }
+      StoreFile curfile = new StoreFile(fs, p);
+      long storeSeqId = curfile.getMaxSequenceId();
+      if (storeSeqId > this.maxSeqId) {
+        this.maxSeqId = storeSeqId;
+      }
+      long length = curfile.getReader().length();
+      this.storeSize += length;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" +
+          curfile.isReference() + ", sequence id=" + storeSeqId +
+          ", length=" + length + ", majorCompaction=" +
+          curfile.isMajorCompaction());
+      }
+      results.put(Long.valueOf(storeSeqId), curfile);
+    }
+    return results;
+  }
+
+  /**
+   * Adds a value to the memcache
+   * 
+   * @param key
+   * @param value
+   * @return memcache size delta
+   */
+  protected long add(HStoreKey key, byte[] value) {
+    lock.readLock().lock();
+    try {
+      return this.memcache.add(key, value);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * @return All store files.
+   */
+  NavigableMap<Long, StoreFile> getStorefiles() {
+    return this.storefiles;
+  }
+
+  /**
+   * Close all the readers
+   * 
+   * We don't need to worry about subsequent requests because the HRegion holds
+   * a write lock that will prevent any more reads or writes.
+   * 
+   * @throws IOException
+   */
+  List<StoreFile> close() throws IOException {
+    this.lock.writeLock().lock();
+    try {
+      ArrayList<StoreFile> result =
+        new ArrayList<StoreFile>(storefiles.values());
+      // Clear so metrics doesn't find them.
+      this.storefiles.clear();
+      for (StoreFile f: result) {
+        f.close();
+      }
+      LOG.debug("closed " + this.storeNameStr);
+      return result;
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Snapshot this stores memcache.  Call before running
+   * {@link #flushCache(long)} so it has some work to do.
+   */
+  void snapshot() {
+    this.memcache.snapshot();
+  }
+
+  /**
+   * Write out current snapshot.  Presumes {@link #snapshot()} has been called
+   * previously.
+   * @param logCacheFlushId flush sequence number
+   * @return true if a compaction is needed
+   * @throws IOException
+   */
+  boolean flushCache(final long logCacheFlushId) throws IOException {
+    // Get the snapshot to flush.  Presumes that a call to
+    // this.memcache.snapshot() has happened earlier up in the chain.
+    SortedMap<HStoreKey, byte []> cache = this.memcache.getSnapshot();
+    // If an exception happens flushing, we let it out without clearing
+    // the memcache snapshot.  The old snapshot will be returned when we say
+    // 'snapshot', the next time flush comes around.
+    StoreFile sf = internalFlushCache(cache, logCacheFlushId);
+    if (sf == null) {
+      return false;
+    }
+    // Add new file to store files.  Clear snapshot too while we have the
+    // Store write lock.
+    int size = updateStorefiles(logCacheFlushId, sf, cache);
+    return size >= this.compactionThreshold;
+  }
+
+  /*
+   * @param cache
+   * @param logCacheFlushId
+   * @return StoreFile created.
+   * @throws IOException
+   */
+  private StoreFile internalFlushCache(final SortedMap<HStoreKey, byte []> cache,
+    final long logCacheFlushId)
+  throws IOException {
+    HFile.Writer writer = null;
+    long flushed = 0;
+    // Don't flush if there are no entries.
+    if (cache.size() == 0) {
+      return null;
+    }
+    long now = System.currentTimeMillis();
+    // TODO:  We can fail in the below block before we complete adding this
+    // flush to list of store files.  Add cleanup of anything put on filesystem
+    // if we fail.
+    synchronized (flushLock) {
+      // A. Write the map out to the disk
+      writer = StoreFile.getWriter(this.fs, this.homedir);
+      int entries = 0;
+      try {
+        for (Map.Entry<HStoreKey, byte []> es: cache.entrySet()) {
+          HStoreKey curkey = es.getKey();
+          byte[] bytes = es.getValue();
+          if (!isExpired(curkey, ttl, now)) {
+            writer.append(curkey.getBytes(), bytes);
+            entries++;
+            flushed += this.memcache.heapSize(curkey, bytes, null);
+          }
+        }
+        // B. Write out the log sequence number that corresponds to this output
+        // MapFile.  The MapFile is current up to and including logCacheFlushId.
+        StoreFile.appendMetadata(writer, logCacheFlushId);
+      } finally {
+        writer.close();
+      }
+    }
+    StoreFile sf = new StoreFile(this.fs, writer.getPath());
+    this.storeSize += sf.getReader().length();
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Added " + sf + ", entries=" + sf.getReader().getEntries() +
+        ", sequenceid=" + logCacheFlushId +
+        ", memsize=" + StringUtils.humanReadableInt(flushed) +
+        ", filesize=" + StringUtils.humanReadableInt(sf.getReader().length()) +
+        " to " + this.regioninfo.getRegionNameAsString());
+    }
+    return sf;
+  }
+  
+  /*
+   * Change storefiles adding into place the Reader produced by this new flush.
+   * @param logCacheFlushId
+   * @param sf
+   * @param cache That was used to make the passed file <code>p</code>.
+   * @throws IOException
+   * @return Count of store files.
+   */
+  private int updateStorefiles(final long logCacheFlushId,
+    final StoreFile sf, final SortedMap<HStoreKey, byte []> cache)
+  throws IOException {
+    int count = 0;
+    this.lock.writeLock().lock();
+    try {
+      this.storefiles.put(Long.valueOf(logCacheFlushId), sf);
+      count = this.storefiles.size();
+      // Tell listeners of the change in readers.
+      notifyChangedReadersObservers();
+      this.memcache.clearSnapshot(cache);
+      return count;
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+
+  /*
+   * Notify all observers that set of Readers has changed.
+   * @throws IOException
+   */
+  private void notifyChangedReadersObservers() throws IOException {
+    for (ChangedReadersObserver o: this.changedReaderObservers) {
+      o.updateReaders();
+    }
+  }
+
+  /*
+   * @param o Observer who wants to know about changes in set of Readers
+   */
+  void addChangedReaderObserver(ChangedReadersObserver o) {
+    this.changedReaderObservers.add(o);
+  }
+
+  /*
+   * @param o Observer no longer interested in changes in set of Readers.
+   */
+  void deleteChangedReaderObserver(ChangedReadersObserver o) {
+    if (!this.changedReaderObservers.remove(o)) {
+      LOG.warn("Not in set" + o);
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Compaction
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Compact the StoreFiles.  This method may take some time, so the calling 
+   * thread must be able to block for long periods.
+   * 
+   * <p>During this time, the Store can work as usual, getting values from
+   * MapFiles and writing new MapFiles from the Memcache.
+   * 
+   * Existing MapFiles are not destroyed until the new compacted TreeMap is 
+   * completely written-out to disk.
+   *
+   * The compactLock prevents multiple simultaneous compactions.
+   * The structureLock prevents us from interfering with other write operations.
+   * 
+   * We don't want to hold the structureLock for the whole time, as a compact() 
+   * can be lengthy and we want to allow cache-flushes during this period.
+   * 
+   * @param mc True to force a major compaction regardless of
+   * thresholds
+   * @return row to split around if a split is needed, null otherwise
+   * @throws IOException
+   */
+  StoreSize compact(final boolean mc) throws IOException {
+    boolean forceSplit = this.regioninfo.shouldSplit(false);
+    boolean majorcompaction = mc;
+    synchronized (compactLock) {
+      long maxId = -1;
+      // filesToCompact are sorted oldest to newest.
+      List<StoreFile> filesToCompact = null;
+      filesToCompact = new ArrayList<StoreFile>(this.storefiles.values());
+      if (filesToCompact.size() <= 0) {
+        LOG.debug(this.storeNameStr + ": no store files to compact");
+        return null;
+      }
+      // The max-sequenceID in any of the to-be-compacted TreeMaps is the 
+      // last key of storefiles.
+      maxId = this.storefiles.lastKey().longValue();
+      // Check to see if we need to do a major compaction on this region.
+      // If so, change doMajorCompaction to true to skip the incremental
+      // compacting below. Only check if doMajorCompaction is not true.
+      if (!majorcompaction) {
+        majorcompaction = isMajorCompaction(filesToCompact);
+      }
+      boolean references = hasReferences(filesToCompact);
+      if (!majorcompaction && !references && 
+          (forceSplit || (filesToCompact.size() < compactionThreshold))) {
+        return checkSplit(forceSplit);
+      }
+      if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) {
+        LOG.warn("Mkdir on " + compactionDir.toString() + " failed");
+        return checkSplit(forceSplit);
+      }
+
+      // HBASE-745, preparing all store file sizes for incremental compacting
+      // selection.
+      int countOfFiles = filesToCompact.size();
+      long totalSize = 0;
+      long[] fileSizes = new long[countOfFiles];
+      long skipped = 0;
+      int point = 0;
+      for (int i = 0; i < countOfFiles; i++) {
+        StoreFile file = filesToCompact.get(i);
+        Path path = file.getPath();
+        if (path == null) {
+          LOG.warn("Path is null for " + file);
+          return null;
+        }
+        long len = file.getReader().length();
+        fileSizes[i] = len;
+        totalSize += len;
+      }
+      if (!majorcompaction && !references) {
+        // Here we select files for incremental compaction.  
+        // The rule is: if the largest(oldest) one is more than twice the 
+        // size of the second, skip the largest, and continue to next...,
+        // until we meet the compactionThreshold limit.
+        for (point = 0; point < countOfFiles - 1; point++) {
+          if ((fileSizes[point] < fileSizes[point + 1] * 2) && 
+               (countOfFiles - point) <= maxFilesToCompact) {
+            break;
+          }
+          skipped += fileSizes[point];
+        }
+        filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(point,
+          countOfFiles));
+        if (filesToCompact.size() <= 1) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skipped compaction of 1 file; compaction size of " +
+              this.storeNameStr + ": " +
+              StringUtils.humanReadableInt(totalSize) + "; Skipped " + point +
+              " files, size: " + skipped);
+          }
+          return checkSplit(forceSplit);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Compaction size of " + this.storeNameStr + ": " +
+            StringUtils.humanReadableInt(totalSize) + "; Skipped " + point +
+            " file(s), size: " + skipped);
+        }
+      }
+ 
+      // Step through them, writing to the brand-new file
+      HFile.Writer writer = StoreFile.getWriter(this.fs, this.homedir);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Started compaction of " + filesToCompact.size() + " file(s)" +
+          (references? ", hasReferences=true,": " ") + " into " +
+          FSUtils.getPath(writer.getPath()));
+      }
+      try {
+        compact(writer, filesToCompact, majorcompaction);
+      } finally {
+        // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
+        StoreFile.appendMetadata(writer, maxId, majorcompaction);
+        writer.close();
+      }
+
+      // Move the compaction into place.
+      completeCompaction(filesToCompact, writer);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Completed " + (majorcompaction? "major": "") +
+          " compaction of " + this.storeNameStr +
+          " store size is " + StringUtils.humanReadableInt(storeSize));
+      }
+    }
+    return checkSplit(forceSplit);
+  }
+
+  /*
+   * @param files
+   * @return True if any of the files in <code>files</code> are References.
+   */
+  private boolean hasReferences(Collection<StoreFile> files) {
+    if (files != null && files.size() > 0) {
+      for (StoreFile hsf: files) {
+        if (hsf.isReference()) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /*
+   * Gets lowest timestamp from files in a dir
+   * 
+   * @param fs
+   * @param dir
+   * @throws IOException
+   */
+  private static long getLowestTimestamp(FileSystem fs, Path dir)
+  throws IOException {
+    FileStatus[] stats = fs.listStatus(dir);
+    if (stats == null || stats.length == 0) {
+      return 0l;
+    }
+    long lowTimestamp = Long.MAX_VALUE;
+    for (int i = 0; i < stats.length; i++) {
+      long timestamp = stats[i].getModificationTime();
+      if (timestamp < lowTimestamp){
+        lowTimestamp = timestamp;
+      }
+    }
+    return lowTimestamp;
+  }
+
+  /*
+   * @return True if we should run a major compaction.
+   */
+  boolean isMajorCompaction() throws IOException {
+    List<StoreFile> filesToCompact = null;
+    // filesToCompact are sorted oldest to newest.
+    filesToCompact = new ArrayList<StoreFile>(this.storefiles.values());
+    return isMajorCompaction(filesToCompact);
+  }
+
+  /*
+   * @param filesToCompact Files to compact. Can be null.
+   * @return True if we should run a major compaction.
+   */
+  private boolean isMajorCompaction(final List<StoreFile> filesToCompact)
+  throws IOException {
+    boolean result = false;
+    if (filesToCompact == null || filesToCompact.size() <= 0) {
+      return result;
+    }
+    long lowTimestamp = getLowestTimestamp(fs,
+      filesToCompact.get(0).getPath().getParent());
+    long now = System.currentTimeMillis();
+    if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) {
+      // Major compaction time has elapsed.
+      long elapsedTime = now - lowTimestamp;
+      if (filesToCompact.size() == 1 &&
+          filesToCompact.get(0).isMajorCompaction() &&
+          (this.ttl == HConstants.FOREVER || elapsedTime < this.ttl)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping major compaction of " + this.storeNameStr +
+            " because one (major) compacted file only and elapsedTime " +
+            elapsedTime + "ms is < ttl=" + this.ttl);
+        }
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
+            "; time since last major compaction " + (now - lowTimestamp) + "ms");
+        }
+        result = true;
+      }
+    }
+    return result;
+  }
+
+  /*
+   * @param r StoreFile list to reverse
+   * @return A new array of content of <code>readers</code>, reversed.
+   */
+  private StoreFile [] reverse(final List<StoreFile> r) {
+    List<StoreFile> copy = new ArrayList<StoreFile>(r);
+    Collections.reverse(copy);
+    // MapFile.Reader is instance of StoreFileReader so this should be ok.
+    return copy.toArray(new StoreFile[0]);
+  }
+
+  /*
+   * @param rdrs List of StoreFiles
+   * @param keys Current keys
+   * @param done Which readers are done
+   * @return The lowest current key in passed <code>rdrs</code>
+   */
+  private int getLowestKey(final HFileScanner [] rdrs, final ByteBuffer [] keys,
+      final boolean [] done) {
+    int lowestKey = -1;
+    for (int i = 0; i < rdrs.length; i++) {
+      if (done[i]) {
+        continue;
+      }
+      if (lowestKey < 0) {
+        lowestKey = i;
+      } else {
+        RawComparator<byte []> c = rdrs[i].getReader().getComparator();
+        if (c.compare(keys[i].array(), keys[i].arrayOffset(), keys[i].limit(),
+            keys[lowestKey].array(), keys[lowestKey].arrayOffset(),
+              keys[lowestKey].limit()) < 0) {
+          lowestKey = i;
+        }
+      }
+    }
+    return lowestKey;
+  }
+
+  /*
+   * Compact a list of StoreFiles.
+   * 
+   * We work by iterating through the readers in parallel looking at newest
+   * store file first. We always increment the lowest-ranked one. Updates to a
+   * single row/column will appear ranked by timestamp.
+   * @param compactedOut Where to write compaction.
+   * @param pReaders List of readers sorted oldest to newest.
+   * @param majorCompaction True to force a major compaction regardless of
+   * thresholds
+   * @throws IOException
+   */
+  private void compact(final HFile.Writer compactedOut,
+      final List<StoreFile> pReaders, final boolean majorCompaction)
+  throws IOException {
+    // Reverse order so newest store file is first.
+    StoreFile[] files = reverse(pReaders);
+    HFileScanner[] rdrs = new HFileScanner[files.length];
+    ByteBuffer[] keys = new ByteBuffer[rdrs.length];
+    ByteBuffer[] vals = new ByteBuffer[rdrs.length];
+    boolean[] done = new boolean[rdrs.length];
+    // Now, advance through the readers in order. This will have the
+    // effect of a run-time sort of the entire dataset.
+    int numDone = 0;
+    for (int i = 0; i < rdrs.length; i++) {
+      rdrs[i] = files[i].getReader().getScanner();
+      done[i] = !rdrs[i].seekTo();
+      if (done[i]) {
+        numDone++;
+      } else {
+        keys[i] = rdrs[i].getKey();
+        vals[i] = rdrs[i].getValue();
+      }
+    }
+
+    long now = System.currentTimeMillis();
+    int timesSeen = 0;
+    HStoreKey lastSeen = new HStoreKey();
+    HStoreKey lastDelete = null;
+    while (numDone < done.length) {
+      // Get lowest key in all store files.
+      int lowestKey = getLowestKey(rdrs, keys, done);
+      // TODO: Suboptimal. And below where we are going from ByteBuffer to
+      // byte array. FIX!! Can we get rid of HSK instantiations?
+      HStoreKey hsk = HStoreKey.create(keys[lowestKey]);
+      // If its same row and column as last key, increment times seen.
+      if (HStoreKey.equalsTwoRowKeys(lastSeen.getRow(), hsk.getRow())
+          && Bytes.equals(lastSeen.getColumn(), hsk.getColumn())) {
+        timesSeen++;
+        // Reset last delete if not exact timestamp -- lastDelete only stops
+        // exactly the same key making it out to the compacted store file.
+        if (lastDelete != null
+            && lastDelete.getTimestamp() != hsk.getTimestamp()) {
+          lastDelete = null;
+        }
+      } else {
+        timesSeen = 1;
+        lastDelete = null;
+      }
+
+      // Don't write empty rows or columns. Only remove cells on major
+      // compaction. Remove if expired of > VERSIONS
+      if (hsk.getRow().length != 0 && hsk.getColumn().length != 0) {
+        ByteBuffer value = vals[lowestKey];
+        if (!majorCompaction) {
+          // Write out all values if not a major compaction.
+          compactedOut.append(hsk.getBytes(), Bytes.toBytes(value));
+        } else {
+          boolean expired = false;
+          boolean deleted = false;
+          if (timesSeen <= family.getMaxVersions()
+              && !(expired = isExpired(hsk, ttl, now))) {
+            // If this value key is same as a deleted key, skip
+            if (lastDelete != null && hsk.equals(lastDelete)) {
+              deleted = true;
+            } else if (HLogEdit.isDeleted(value)) {
+              // If a deleted value, skip
+              deleted = true;
+              lastDelete = hsk;
+            } else {
+              compactedOut.append(hsk.getBytes(), Bytes.toBytes(value));
+            }
+          }
+          if (expired || deleted) {
+            // HBASE-855 remove one from timesSeen because it did not make it
+            // past expired check -- don't count against max versions.
+            timesSeen--;
+          }
+        }
+      }
+
+      // Update last-seen items
+      lastSeen = hsk;
+
+      // Advance the smallest key. If that reader's all finished, then
+      // mark it as done.
+      if (!rdrs[lowestKey].next()) {
+        done[lowestKey] = true;
+        rdrs[lowestKey] = null;
+        numDone++;
+      } else {
+        keys[lowestKey] = rdrs[lowestKey].getKey();
+        vals[lowestKey] = rdrs[lowestKey].getValue();
+      }
+    }
+  }
+
+  /*
+   * It's assumed that the compactLock  will be acquired prior to calling this 
+   * method!  Otherwise, it is not thread-safe!
+   *
+   * It works by processing a compaction that's been written to disk.
+   * 
+   * <p>It is usually invoked at the end of a compaction, but might also be
+   * invoked at HStore startup, if the prior execution died midway through.
+   * 
+   * <p>Moving the compacted TreeMap into place means:
+   * <pre>
+   * 1) Moving the new compacted MapFile into place
+   * 2) Unload all replaced MapFiles, close and collect list to delete.
+   * 3) Loading the new TreeMap.
+   * 4) Compute new store size
+   * </pre>
+   * 
+   * @param compactedFiles list of files that were compacted
+   * @param compactedFile HStoreFile that is the result of the compaction
+   * @throws IOException
+   */
+  private void completeCompaction(final List<StoreFile> compactedFiles,
+    final HFile.Writer compactedFile)
+  throws IOException {
+    // 1. Moving the new files into place.
+    Path p = null;
+    try {
+      p = StoreFile.rename(this.fs, compactedFile.getPath(),
+        StoreFile.getRandomFilename(fs, this.homedir));
+    } catch (IOException e) {
+      LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
+      return;
+    }
+    StoreFile finalCompactedFile = new StoreFile(this.fs, p);
+    this.lock.writeLock().lock();
+    try {
+      try {
+        // 3. Loading the new TreeMap.
+        // Change this.storefiles so it reflects new state but do not
+        // delete old store files until we have sent out notification of
+        // change in case old files are still being accessed by outstanding
+        // scanners.
+        for (Map.Entry<Long, StoreFile> e: this.storefiles.entrySet()) {
+          if (compactedFiles.contains(e.getValue())) {
+            this.storefiles.remove(e.getKey());
+          }
+        }
+        // Add new compacted Reader and store file.
+        Long orderVal = Long.valueOf(finalCompactedFile.getMaxSequenceId());
+        this.storefiles.put(orderVal, finalCompactedFile);
+        // Tell observers that list of Readers has changed.
+        notifyChangedReadersObservers();
+        // Finally, delete old store files.
+        for (StoreFile hsf: compactedFiles) {
+          hsf.delete();
+        }
+      } catch (IOException e) {
+        e = RemoteExceptionHandler.checkIOException(e);
+        LOG.error("Failed replacing compacted files for " +
+            this.storeNameStr +
+            ". Compacted file is " + finalCompactedFile.toString() +
+            ".  Files replaced are " + compactedFiles.toString() +
+            " some of which may have been already removed", e);
+      }
+      // 4. Compute new store size
+      this.storeSize = 0L;
+      for (StoreFile hsf : this.storefiles.values()) {
+        this.storeSize += hsf.getReader().length();
+      }
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+
+  // ////////////////////////////////////////////////////////////////////////////
+  // Accessors.
+  // (This is the only section that is directly useful!)
+  //////////////////////////////////////////////////////////////////////////////
+  
+  /**
+   * Return all the available columns for the given key.  The key indicates a 
+   * row and timestamp, but not a column name.
+   *
+   * The returned object should map column names to Cells.
+   */
+  void getFull(HStoreKey key, final Set<byte []> columns,
+      final int numVersions, Map<byte [], Cell> results)
+  throws IOException {
+    int versions = versionsToReturn(numVersions);
+    Map<byte [], Long> deletes =
+      new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+    // if the key is null, we're not even looking for anything. return.
+    if (key == null) {
+      return;
+    }
+    
+    this.lock.readLock().lock();
+    // get from the memcache first.
+    this.memcache.getFull(key, columns, versions, deletes, results);
+    try {
+      Map<Long, StoreFile> m = this.storefiles.descendingMap();
+      for (Iterator<Map.Entry<Long, StoreFile>> i = m.entrySet().iterator();
+          i.hasNext();) {
+        getFullFromStoreFile(i.next().getValue(), key, columns, versions, deletes, results);
+      }
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  private void getFullFromStoreFile(StoreFile f, HStoreKey key, 
+    Set<byte []> columns, int numVersions, Map<byte [], Long> deletes,
+    Map<byte [], Cell> results) 
+  throws IOException {
+    long now = System.currentTimeMillis();
+    HFileScanner scanner = f.getReader().getScanner();
+    if (!getClosest(scanner, key.getBytes())) {
+      return;
+    }
+    do {
+      HStoreKey readkey = HStoreKey.create(scanner.getKey());
+      byte[] readcol = readkey.getColumn();
+
+      // if we're looking for this column (or all of them), and there isn't
+      // already a value for this column in the results map or there is a value
+      // but we haven't collected enough versions yet, and the key we
+      // just read matches, then we'll consider it
+      if ((columns == null || columns.contains(readcol)) &&
+          (!results.containsKey(readcol) ||
+            results.get(readcol).getNumValues() < numVersions) &&
+          key.matchesWithoutColumn(readkey)) {
+        // if the value of the cell we're looking at right now is a delete,
+        // we need to treat it differently
+        ByteBuffer value = scanner.getValue();
+        if (HLogEdit.isDeleted(value)) {
+          // if it's not already recorded as a delete or recorded with a more
+          // recent delete timestamp, record it for later
+          if (!deletes.containsKey(readcol)
+              || deletes.get(readcol).longValue() < readkey.getTimestamp()) {
+            deletes.put(readcol, Long.valueOf(readkey.getTimestamp()));
+          }
+        } else if (!(deletes.containsKey(readcol) && deletes.get(readcol)
+            .longValue() >= readkey.getTimestamp())) {
+          // So the cell itself isn't a delete, but there may be a delete
+          // pending from earlier in our search. Only record this result if
+          // there aren't any pending deletes.
+          if (!(deletes.containsKey(readcol) && deletes.get(readcol)
+              .longValue() >= readkey.getTimestamp())) {
+            if (!isExpired(readkey, ttl, now)) {
+              if (!results.containsKey(readcol)) {
+                results.put(readcol, new Cell(value, readkey.getTimestamp()));
+              } else {
+                results.get(readcol).add(Bytes.toBytes(value),
+                    readkey.getTimestamp());
+              }
+            }
+          }
+        }
+      } else if (HStoreKey.compareTwoRowKeys(key.getRow(), readkey.getRow()) < 0) {
+        // if we've crossed into the next row, then we can just stop
+        // iterating
+        break;
+      }
+    } while (scanner.next());
+  }
+
+  /*
+   * @param wantedVersions How many versions were asked for.
+   * @return wantedVersions or this families' MAX_VERSIONS.
+   */
+  private int versionsToReturn(final int wantedVersions) {
+    if (wantedVersions <= 0) {
+      throw new IllegalArgumentException("Number of versions must be > 0");
+    }
+    // Make sure we do not return more than maximum versions for this store.
+    return wantedVersions > this.family.getMaxVersions()?
+      this.family.getMaxVersions(): wantedVersions;
+  }
+  
+  /**
+   * Get the value for the indicated HStoreKey.  Grab the target value and the 
+   * previous <code>numVersions - 1</code> values, as well.
+   *
+   * Use {@link HConstants.ALL_VERSIONS} to retrieve all versions.
+   * @param key
+   * @param numVersions Number of versions to fetch.  Must be > 0.
+   * @return values for the specified versions
+   * @throws IOException
+   */
+  Cell[] get(final HStoreKey key, final int numVersions) throws IOException {
+    // This code below is very close to the body of the getKeys method.  Any 
+    // changes in the flow below should also probably be done in getKeys.
+    // TODO: Refactor so same code used.
+    long now = System.currentTimeMillis();
+    int versions = versionsToReturn(numVersions);
+    // Keep a list of deleted cell keys.  We need this because as we go through
+    // the memcache and store files, the cell with the delete marker may be
+    // in one store and the old non-delete cell value in a later store.
+    // If we don't keep around the fact that the cell was deleted in a newer
+    // record, we end up returning the old value if user is asking for more
+    // than one version. This List of deletes should not be large since we
+    // are only keeping rows and columns that match those set on the get and
+    // which have delete values.  If memory usage becomes an issue, could
+    // redo as bloom filter.
+    Set<HStoreKey> deletes = new HashSet<HStoreKey>();
+    this.lock.readLock().lock();
+    try {
+      // Check the memcache
+      List<Cell> results = this.memcache.get(key, versions, deletes, now);
+      // If we got sufficient versions from memcache, return. 
+      if (results.size() == versions) {
+        return results.toArray(new Cell[results.size()]);
+      }
+      Map<Long, StoreFile> m = this.storefiles.descendingMap();
+      byte [] keyBytes = key.getBytes();
+      for (Iterator<Map.Entry<Long, StoreFile>> i = m.entrySet().iterator();
+          i.hasNext() && !hasEnoughVersions(versions, results);) {
+        StoreFile f = i.next().getValue();
+        HFileScanner scanner = f.getReader().getScanner();
+        if (!getClosest(scanner, keyBytes)) {
+          continue;
+        }
+        HStoreKey readkey = HStoreKey.create(scanner.getKey());
+        if (!readkey.matchesRowCol(key)) {
+          continue;
+        }
+        if (get(readkey, scanner.getValue(), versions, results, deletes, now)) {
+          break;
+        }
+        while (scanner.next()) {
+          readkey = HStoreKey.create(scanner.getKey());
+          if (!readkey.matchesRowCol(key)) {
+            break;
+          }
+          if (get(readkey, scanner.getValue(), versions, results, deletes, now)) {
+            break;
+          }
+        }
+      }
+      return results.size() == 0 ?
+        null : results.toArray(new Cell[results.size()]);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+  
+  /*
+   * Look at one key/value.
+   * @param key
+   * @param value
+   * @param versions
+   * @param results
+   * @param deletes
+   * @param now
+   * @return True if we have enough versions.
+   */
+  private boolean get(final HStoreKey key, ByteBuffer value,
+      final int versions, final List<Cell> results,
+      final Set<HStoreKey> deletes, final long now) {
+    if (!HLogEdit.isDeleted(value)) {
+      if (notExpiredAndNotInDeletes(this.ttl, key, now, deletes)) {
+        results.add(new Cell(value, key.getTimestamp()));
+      }
+      // Perhaps only one version is wanted.  I could let this
+      // test happen later in the for loop test but it would cost
+      // the allocation of an ImmutableBytesWritable.
+      if (hasEnoughVersions(versions, results)) {
+        return true;
+      }
+    } else {
+      // Is this copy necessary?
+      deletes.add(new HStoreKey(key));
+    }
+    return false;
+  }
+
+  /*
+   * Small method to check if we are over the max number of versions
+   * or we acheived this family max versions. 
+   * The later happens when we have the situation described in HBASE-621.
+   * @param versions
+   * @param c
+   * @return 
+   */
+  private boolean hasEnoughVersions(final int versions, final List<Cell> c) {
+    return c.size() >= versions;
+  }
+
+  /**
+   * Get <code>versions</code> of keys matching the origin key's
+   * row/column/timestamp and those of an older vintage.
+   * @param origin Where to start searching.
+   * @param versions How many versions to return. Pass
+   * {@link HConstants#ALL_VERSIONS} to retrieve all.
+   * @param now
+   * @param columnPattern regex pattern for column matching. if columnPattern
+   * is not null, we use column pattern to match columns. And the columnPattern
+   * only works when origin's column is null or its length is zero.
+   * @return Matching keys.
+   * @throws IOException
+   */
+  public List<HStoreKey> getKeys(final HStoreKey origin, final int versions,
+    final long now, final Pattern columnPattern)
+  throws IOException {
+    // This code below is very close to the body of the get method.  Any 
+    // changes in the flow below should also probably be done in get.
+    // TODO: Refactor so same code used.
+    Set<HStoreKey> deletes = new HashSet<HStoreKey>();
+    this.lock.readLock().lock();
+    try {
+      // Check the memcache
+      List<HStoreKey> keys =
+        this.memcache.getKeys(origin, versions, deletes, now, columnPattern);
+      // If we got sufficient versions from memcache, return.
+      if (keys.size() >= versions) {
+        return keys;
+      }
+      Map<Long, StoreFile> m = this.storefiles.descendingMap();
+      for (Iterator<Map.Entry<Long, StoreFile>> i = m.entrySet().iterator();
+          i.hasNext() && keys.size() < versions;) {
+        StoreFile f = i.next().getValue();
+        HFileScanner scanner = f.getReader().getScanner();
+        if (!getClosest(scanner, origin.getBytes())) {
+          continue;
+        }
+        do {
+          HStoreKey readkey = HStoreKey.create(scanner.getKey());
+          // if the row and column matches, we might want this one.
+          if (rowMatches(origin, readkey)) {
+            // if the column pattern is not null, we use it for column matching.
+            // we will skip the keys whose column doesn't match the pattern.
+            if (columnPattern != null) {
+              if (!(columnPattern.
+                  matcher(Bytes.toString(readkey.getColumn())).matches())) {
+                continue;
+              }
+            }
+            // if the cell address matches, then we definitely want this key.
+            if (cellMatches(origin, readkey)) {
+              ByteBuffer readval = scanner.getValue();
+              // Store key if isn't deleted or superceded by memcache
+              if (!HLogEdit.isDeleted(readval)) {
+                if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
+                  keys.add(readkey);
+                }
+                if (keys.size() >= versions) {
+                  break;
+                }
+              } else {
+                deletes.add(readkey);
+              }
+            } else {
+              // the cell doesn't match, but there might be more with different
+              // timestamps, so move to the next key
+              continue;
+            }
+          } else {
+            // the row doesn't match, so we've gone too far.
+            break;
+          }
+        } while (scanner.next()); // advance to the next key
+      }
+      return keys;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Find the key that matches <i>row</i> exactly, or the one that immediately
+   * preceeds it. WARNING: Only use this method on a table where writes occur 
+   * with stricly increasing timestamps. This method assumes this pattern of 
+   * writes in order to make it reasonably performant.
+   * @param row
+   * @return Found row
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  byte [] getRowKeyAtOrBefore(final byte [] row)
+  throws IOException{
+    // Map of HStoreKeys that are candidates for holding the row key that
+    // most closely matches what we're looking for. We'll have to update it as
+    // deletes are found all over the place as we go along before finally
+    // reading the best key out of it at the end.
+    NavigableMap<HStoreKey, Long> candidateKeys = new TreeMap<HStoreKey, Long>(
+      new HStoreKey.HStoreKeyWritableComparator());
+    
+    // Keep a list of deleted cell keys.  We need this because as we go through
+    // the store files, the cell with the delete marker may be in one file and
+    // the old non-delete cell value in a later store file. If we don't keep
+    // around the fact that the cell was deleted in a newer record, we end up
+    // returning the old value if user is asking for more than one version.
+    // This List of deletes should not be large since we are only keeping rows
+    // and columns that match those set on the scanner and which have delete
+    // values.  If memory usage becomes an issue, could redo as bloom filter.
+    Set<HStoreKey> deletes = new HashSet<HStoreKey>();
+    this.lock.readLock().lock();
+    try {
+      // First go to the memcache.  Pick up deletes and candidates.
+      this.memcache.getRowKeyAtOrBefore(row, candidateKeys, deletes);
+      // Process each store file.  Run through from newest to oldest.
+      // This code below is very close to the body of the getKeys method.
+      Map<Long, StoreFile> m = this.storefiles.descendingMap();
+      for (Map.Entry<Long, StoreFile> e: m.entrySet()) {
+        // Update the candidate keys from the current map file
+        rowAtOrBeforeFromStoreFile(e.getValue(), row, candidateKeys, deletes);
+      }
+      // Return the best key from candidateKeys
+      byte [] result =
+        candidateKeys.isEmpty()? null: candidateKeys.lastKey().getRow();
+      return result;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /*
+   * Check an individual MapFile for the row at or before a given key 
+   * and timestamp
+   * @param f
+   * @param row
+   * @param candidateKeys
+   * @throws IOException
+   */
+  private void rowAtOrBeforeFromStoreFile(final StoreFile f,
+    final byte [] row, final SortedMap<HStoreKey, Long> candidateKeys,
+    final Set<HStoreKey> deletes)
+  throws IOException {
+    HFileScanner scanner = f.getReader().getScanner();
+    // TODO: FIX THIS PROFLIGACY!!!
+    if (!scanner.seekBefore(new HStoreKey(row).getBytes())) {
+      return;
+    }
+    long now = System.currentTimeMillis();
+    HStoreKey startKey = HStoreKey.create(scanner.getKey());
+    // if there aren't any candidate keys yet, we'll do some things different 
+    if (candidateKeys.isEmpty()) {
+      rowAtOrBeforeCandidate(startKey, f, row, candidateKeys, deletes, now);
+    } else {
+      rowAtOrBeforeWithCandidates(startKey, f, row, candidateKeys, deletes, now);
+    }
+  }
+  
+  /* Find a candidate for row that is at or before passed row in passed
+   * mapfile.
+   * @param startKey First key in the mapfile.
+   * @param map
+   * @param row
+   * @param candidateKeys
+   * @param now
+   * @throws IOException
+   */
+  private void rowAtOrBeforeCandidate(final HStoreKey startKey,
+    final StoreFile f, final byte[] row,
+    final SortedMap<HStoreKey, Long> candidateKeys,
+    final Set<HStoreKey> deletes, final long now) 
+  throws IOException {
+    // if the row we're looking for is past the end of this mapfile, set the
+    // search key to be the last key.  If its a deleted key, then we'll back
+    // up to the row before and return that.
+    HStoreKey finalKey = HStoreKey.create(f.getReader().getLastKey());
+    HStoreKey searchKey = null;
+    if (HStoreKey.compareTwoRowKeys(finalKey.getRow(), row) < 0) {
+      searchKey = finalKey;
+    } else {
+      searchKey = new HStoreKey(row);
+      if (searchKey.compareTo(startKey) < 0) {
+        searchKey = startKey;
+      }
+    }
+    rowAtOrBeforeCandidate(f, searchKey, candidateKeys, deletes, now);
+  }
+
+  /* 
+   * @param ttlSetting
+   * @param hsk
+   * @param now
+   * @param deletes
+   * @return True if key has not expired and is not in passed set of deletes.
+   */
+  static boolean notExpiredAndNotInDeletes(final long ttl,
+      final HStoreKey hsk, final long now, final Set<HStoreKey> deletes) {
+    return !isExpired(hsk, ttl, now) &&
+      (deletes == null || !deletes.contains(hsk));
+  }
+  
+  static boolean isExpired(final HStoreKey hsk, final long ttl,
+      final long now) {
+    return ttl != HConstants.FOREVER && now > hsk.getTimestamp() + ttl;
+  }
+
+  /* Find a candidate for row that is at or before passed key, sk, in mapfile.
+   * @param f
+   * @param sk Key to go search the mapfile with.
+   * @param candidateKeys
+   * @param now
+   * @throws IOException
+   * @see {@link #rowAtOrBeforeCandidate(HStoreKey, org.apache.hadoop.io.MapFile.Reader, byte[], SortedMap, long)}
+   */
+  private void rowAtOrBeforeCandidate(final StoreFile f,
+    final HStoreKey sk, final SortedMap<HStoreKey, Long> candidateKeys,
+    final Set<HStoreKey> deletes, final long now)
+  throws IOException {
+    HStoreKey searchKey = sk;
+    HStoreKey readkey = null;
+    HStoreKey knownNoGoodKey = null;
+    HFileScanner scanner = f.getReader().getScanner();
+    for (boolean foundCandidate = false; !foundCandidate;) {
+      // Seek to the exact row, or the one that would be immediately before it
+      int result = scanner.seekTo(searchKey.getBytes());
+      if (result < 0) {
+        // Not in file.
+        continue;
+      }
+      HStoreKey deletedOrExpiredRow = null;
+      do {
+        readkey = HStoreKey.create(scanner.getKey());
+        ByteBuffer value = scanner.getValue();
+        // If we have an exact match on row, and it's not a delete, save this
+        // as a candidate key
+        if (HStoreKey.equalsTwoRowKeys(readkey.getRow(), searchKey.getRow())) {
+          if (!HLogEdit.isDeleted(value)) {
+            if (handleNonDelete(readkey, now, deletes, candidateKeys)) {
+              foundCandidate = true;
+              // NOTE! Continue.
+              continue;
+            }
+          }
+          HStoreKey copy = addCopyToDeletes(readkey, deletes);
+          if (deletedOrExpiredRow == null) {
+            deletedOrExpiredRow = copy;
+          }
+        } else if (HStoreKey.compareTwoRowKeys(readkey.getRow(),
+            searchKey.getRow()) > 0) {
+          // if the row key we just read is beyond the key we're searching for,
+          // then we're done.
+          break;
+        } else {
+          // So, the row key doesn't match, but we haven't gone past the row
+          // we're seeking yet, so this row is a candidate for closest
+          // (assuming that it isn't a delete).
+          if (!HLogEdit.isDeleted(value)) {
+            if (handleNonDelete(readkey, now, deletes, candidateKeys)) {
+              foundCandidate = true;
+              // NOTE: Continue
+              continue;
+            }
+          }
+          HStoreKey copy = addCopyToDeletes(readkey, deletes);
+          if (deletedOrExpiredRow == null) {
+            deletedOrExpiredRow = copy;
+          }
+        }
+      } while(scanner.next() && (knownNoGoodKey == null ||
+          readkey.compareTo(knownNoGoodKey) < 0));
+
+      // If we get here and have no candidates but we did find a deleted or
+      // expired candidate, we need to look at the key before that
+      if (!foundCandidate && deletedOrExpiredRow != null) {
+        knownNoGoodKey = deletedOrExpiredRow;
+        if (!scanner.seekBefore(deletedOrExpiredRow.getBytes())) {
+          // Is this right?
+          break;
+        }
+        searchKey = HStoreKey.create(scanner.getKey());
+      } else {
+        // No candidates and no deleted or expired candidates. Give up.
+        break;
+      }
+    }
+    
+    // Arriving here just means that we consumed the whole rest of the map
+    // without going "past" the key we're searching for. we can just fall
+    // through here.
+  }
+  
+  /*
+   * @param key Key to copy and add to <code>deletes</code>
+   * @param deletes
+   * @return Instance of the copy added to <code>deletes</code>
+   */
+  private HStoreKey addCopyToDeletes(final HStoreKey key,
+      final Set<HStoreKey> deletes) {
+    HStoreKey copy = new HStoreKey(key);
+    deletes.add(copy);
+    return copy;
+  }
+
+  private void rowAtOrBeforeWithCandidates(final HStoreKey startKey,
+    final StoreFile f, final byte[] row,
+    final SortedMap<HStoreKey, Long> candidateKeys,
+    final Set<HStoreKey> deletes, final long now) 
+  throws IOException {
+    // if there are already candidate keys, we need to start our search 
+    // at the earliest possible key so that we can discover any possible
+    // deletes for keys between the start and the search key.  Back up to start
+    // of the row in case there are deletes for this candidate in this mapfile
+    // BUT do not backup before the first key in the store file.
+    // TODO: FIX THIS PROFLIGATE OBJECT MAKING!!!
+    byte [] searchKey =
+      new HStoreKey(candidateKeys.firstKey().getRow()).getBytes();
+    if (f.getReader().getComparator().compare(searchKey, 0, searchKey.length,
+        startKey.getRow(), 0, startKey.getRow().length) < 0) {
+      searchKey = startKey.getBytes();
+    }
+
+    // Seek to the exact row, or the one that would be immediately before it
+    HFileScanner scanner = f.getReader().getScanner();
+    int result = scanner.seekTo(searchKey);
+    if (result < 0) {
+      // Key is before start of this file.  Return.
+      return;
+    }
+    do {
+      HStoreKey k = HStoreKey.create(scanner.getKey());
+      ByteBuffer v = scanner.getValue();
+      // if we have an exact match on row, and it's not a delete, save this
+      // as a candidate key
+      if (HStoreKey.equalsTwoRowKeys(k.getRow(), row)) {
+        handleKey(k, v, now, deletes, candidateKeys);
+      } else if (HStoreKey.compareTwoRowKeys(k.getRow(), row) > 0 ) {
+        // if the row key we just read is beyond the key we're searching for,
+        // then we're done.
+        break;
+      } else {
+        // So, the row key doesn't match, but we haven't gone past the row
+        // we're seeking yet, so this row is a candidate for closest 
+        // (assuming that it isn't a delete).
+        handleKey(k, v, now, deletes, candidateKeys);
+      }
+    } while(scanner.next());
+  }
+
+  /*
+   * @param readkey
+   * @param now
+   * @param deletes
+   * @param candidateKeys
+   */
+  private void handleKey(final HStoreKey readkey, ByteBuffer value,
+      final long now, final Set<HStoreKey> deletes,
+      final SortedMap<HStoreKey, Long> candidateKeys) {
+    if (!HLogEdit.isDeleted(value)) {
+      handleNonDelete(readkey, now, deletes, candidateKeys);
+    } else {
+      // Pass copy because readkey will change next time next is called.
+      handleDeleted(new HStoreKey(readkey), candidateKeys, deletes);
+    }
+  }
+
+  /*
+   * @param readkey
+   * @param now
+   * @param deletes
+   * @param candidateKeys
+   * @return True if we added a candidate.
+   */
+  private boolean handleNonDelete(final HStoreKey readkey, final long now,
+      final Set<HStoreKey> deletes, final Map<HStoreKey, Long> candidateKeys) {
+    if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
+      candidateKeys.put(stripTimestamp(readkey),
+        Long.valueOf(readkey.getTimestamp()));
+      return true;
+    }
+    return false;
+  }
+
+  /* Handle keys whose values hold deletes.
+   * Add to the set of deletes and then if the candidate keys contain any that
+   * might match by timestamp, then check for a match and remove it if it's too
+   * young to survive the delete 
+   * @param k Be careful; if key was gotten from a Mapfile, pass in a copy.
+   * Values gotten by 'nexting' out of Mapfiles will change in each invocation.
+   * @param candidateKeys
+   * @param deletes
+   */
+  static void handleDeleted(final HStoreKey k,
+      final SortedMap<HStoreKey, Long> candidateKeys,
+      final Set<HStoreKey> deletes) {
+    deletes.add(k);
+    HStoreKey strippedKey = stripTimestamp(k);
+    if (candidateKeys.containsKey(strippedKey)) {
+      long bestCandidateTs = 
+        candidateKeys.get(strippedKey).longValue();
+      if (bestCandidateTs <= k.getTimestamp()) {
+        candidateKeys.remove(strippedKey);
+      }
+    }
+  }
+
+  static HStoreKey stripTimestamp(HStoreKey key) {
+    return new HStoreKey(key.getRow(), key.getColumn());
+  }
+    
+  /*
+   * Test that the <i>target</i> matches the <i>origin</i> cell address. If the 
+   * <i>origin</i> has an empty column, then it's assumed to mean any column 
+   * matches and only match on row and timestamp. Otherwise, it compares the
+   * keys with HStoreKey.matchesRowCol().
+   * @param origin The key we're testing against
+   * @param target The key we're testing
+   */
+  private boolean cellMatches(HStoreKey origin, HStoreKey target){
+    // if the origin's column is empty, then we're matching any column
+    if (Bytes.equals(origin.getColumn(), HConstants.EMPTY_BYTE_ARRAY)) {
+      // if the row matches, then...
+      if (HStoreKey.equalsTwoRowKeys(target.getRow(), origin.getRow())) {
+        // check the timestamp
+        return target.getTimestamp() <= origin.getTimestamp();
+      }
+      return false;
+    }
+    // otherwise, we want to match on row and column
+    return target.matchesRowCol(origin);
+  }
+    
+  /*
+   * Test that the <i>target</i> matches the <i>origin</i>. If the <i>origin</i>
+   * has an empty column, then it just tests row equivalence. Otherwise, it uses
+   * HStoreKey.matchesRowCol().
+   * @param origin Key we're testing against
+   * @param target Key we're testing
+   */
+  private boolean rowMatches(final HStoreKey origin, final HStoreKey target){
+    // if the origin's column is empty, then we're matching any column
+    if (Bytes.equals(origin.getColumn(), HConstants.EMPTY_BYTE_ARRAY)) {
+      // if the row matches, then...
+      return HStoreKey.equalsTwoRowKeys(target.getRow(), origin.getRow());
+    }
+    // otherwise, we want to match on row and column
+    return target.matchesRowCol(origin);
+  }
+
+  /**
+   * Determines if HStore can be split
+   * @param force Whether to force a split or not.
+   * @return a StoreSize if store can be split, null otherwise.
+   */
+  StoreSize checkSplit(final boolean force) {
+    this.lock.readLock().lock();
+    try {
+      // Iterate through all store files
+      if (this.storefiles.size() <= 0) {
+        return null;
+      }
+      if (!force && (storeSize < this.desiredMaxFileSize)) {
+        return null;
+      }
+      // Not splitable if we find a reference store file present in the store.
+      boolean splitable = true;
+      long maxSize = 0L;
+      Long mapIndex = Long.valueOf(0L);
+      for (Map.Entry<Long, StoreFile> e: storefiles.entrySet()) {
+        StoreFile curHSF = e.getValue();
+        if (splitable) {
+          splitable = !curHSF.isReference();
+          if (!splitable) {
+            // RETURN IN MIDDLE OF FUNCTION!!! If not splitable, just return.
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(curHSF +  " is not splittable");
+            }
+            return null;
+          }
+        }
+        long size = curHSF.getReader().length();
+        if (size > maxSize) {
+          // This is the largest one so far
+          maxSize = size;
+          mapIndex = e.getKey();
+        }
+      }
+
+      HFile.Reader r = this.storefiles.get(mapIndex).getReader();
+      // Get first, last, and mid keys.  Midkey is the key that starts block
+      // in middle of hfile.  Has column and timestamp.  Need to return just
+      // the row we want to split on as midkey.
+      byte [] midkey = r.midkey();
+      if (midkey != null) {
+        HStoreKey mk = HStoreKey.create(midkey);
+        HStoreKey firstKey = HStoreKey.create(r.getFirstKey());
+        HStoreKey lastKey = HStoreKey.create(r.getLastKey());
+        // if the midkey is the same as the first and last keys, then we cannot
+        // (ever) split this region. 
+        if (HStoreKey.equalsTwoRowKeys(mk.getRow(), firstKey.getRow()) && 
+            HStoreKey.equalsTwoRowKeys( mk.getRow(), lastKey.getRow())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("cannot split because midkey is the same as first or " +
+              "last row");
+          }
+          return null;
+        }
+        return new StoreSize(maxSize, mk.getRow());
+      }
+    } catch(IOException e) {
+      LOG.warn("Failed getting store size for " + this.storeNameStr, e);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+    return null;
+  }
+  
+  /** @return aggregate size of HStore */
+  public long getSize() {
+    return storeSize;
+  }
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // File administration
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Return a scanner for both the memcache and the HStore files
+   */
+  protected InternalScanner getScanner(long timestamp, byte [][] targetCols,
+      byte [] firstRow, RowFilterInterface filter)
+  throws IOException {
+    lock.readLock().lock();
+    try {
+      return new StoreScanner(this, targetCols, firstRow, timestamp, filter);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return this.storeNameStr;
+  }
+
+  /**
+   * @return Count of store files
+   */
+  int getStorefilesCount() {
+    return this.storefiles.size();
+  }
+
+  /**
+   * @return The size of the store file indexes, in bytes.
+   * @throws IOException if there was a problem getting file sizes from the
+   * filesystem
+   */
+  long getStorefilesIndexSize() throws IOException {
+    long size = 0;
+    for (StoreFile s: storefiles.values())
+      size += s.getReader().indexSize();
+    return size;
+  }
+
+  /*
+   * Datastructure that holds size and row to split a file around.
+   */
+  class StoreSize {
+    private final long size;
+    private final byte[] key;
+    StoreSize(long size, byte[] key) {
+      this.size = size;
+      this.key = new byte[key.length];
+      System.arraycopy(key, 0, this.key, 0, key.length);
+    }
+    /* @return the size */
+    long getSize() {
+      return size;
+    }
+    /* @return the key */
+    byte[] getSplitRow() {
+      return key;
+    }
+  }
+
+  HRegionInfo getHRegionInfo() {
+    return this.regioninfo;
+  }
+
+  /**
+   * Convenience method that implements the old MapFile.getClosest on top of
+   * HFile Scanners.  getClosest used seek to the asked-for key or just after
+   * (HFile seeks to the key or just before).
+   * @param s
+   * @param b
+   * @return True if we were able to seek the scanner to <code>b</code> or to
+   * the key just after.
+   * @throws IOException 
+   */
+  static boolean getClosest(final HFileScanner s, final byte [] b)
+  throws IOException {
+    int result = s.seekTo(b);
+    if (result < 0) {
+      // Not in file.  Will the first key do?
+      if (!s.seekTo()) {
+        return false;
+      }
+    } else if (result > 0) {
+      // Less than what was asked for but maybe < because we're asking for
+      // r/c/LATEST_TIMESTAMP -- what was returned was r/c-1/SOME_TS...
+      // A next will get us a r/c/SOME_TS.
+      if (!s.next()) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=747672&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Wed Feb 25 05:59:26 2009
@@ -0,0 +1,443 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.HalfHFileReader;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * A Store data file.  Stores usually have one or more of these files.  They
+ * are produced by flushing the memcache to disk.  To
+ * create, call {@link #getWriter(FileSystem, Path)} and append data.  Be
+ * sure to add any metadata before calling close on the Writer
+ * (Use the appendMetadata convenience methods). On close, a StoreFile is
+ * sitting in the Filesystem.  To refer to it, create a StoreFile instance
+ * passing filesystem and path.  To read, call {@link #getReader()}.
+ * <p>StoreFiles may also reference store files in another Store.
+ */
+public class StoreFile implements HConstants {
+  static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
+  
+  // Make default block size for StoreFiles 8k while testing.  TODO: FIX!
+  // Need to make it 8k for testing.
+  private static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
+
+  private final FileSystem fs;
+  // This file's path.
+  private final Path path;
+  // If this storefile references another, this is the reference instance.
+  private Reference reference;
+  // If this StoreFile references another, this is the other files path.
+  private Path referencePath;
+
+  // Keys for metadata stored in backing HFile.
+  private static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
+  // Set when we obtain a Reader.
+  private long sequenceid = -1;
+
+  private static final byte [] MAJOR_COMPACTION_KEY =
+    Bytes.toBytes("MAJOR_COMPACTION_KEY");
+  // If true, this file was product of a major compaction.  Its then set
+  // whenever you get a Reader.
+  private AtomicBoolean majorCompaction = null;
+
+  /*
+   * Regex that will work for straight filenames and for reference names.
+   * If reference, then the regex has more than just one group.  Group 1 is
+   * this files id.  Group 2 the referenced region name, etc.
+   */
+  private static final Pattern REF_NAME_PARSER =
+    Pattern.compile("^(\\d+)(?:\\.(.+))?$");
+
+  private volatile HFile.Reader reader;
+
+  // Used making file ids.
+  private final static Random rand = new Random();
+
+  /**
+   * Constructor.
+   * Loads up a Reader (and its indices, etc.).
+   * @param fs Filesystem.
+   * @param p qualified path
+   * @throws IOException
+   */
+  StoreFile(final FileSystem fs, final Path p)
+  throws IOException {
+    this.fs = fs;
+    this.path = p;
+    if (isReference(p)) {
+      this.reference = Reference.read(fs, p);
+      this.referencePath = getReferredToFile(this.path);
+    }
+    this.reader = open();
+  }
+
+  /**
+   * @return Path or null if this StoreFile was made with a Stream.
+   */
+  Path getPath() {
+    return this.path;
+  }
+
+  /**
+   * @return The Store/ColumnFamily this file belongs to.
+   */
+  byte [] getFamily() {
+    return Bytes.toBytes(this.path.getParent().getName());
+  }
+
+  /**
+   * @return True if this is a StoreFile Reference; call after {@link #open()}
+   * else may get wrong answer.
+   */
+  boolean isReference() {
+    return this.reference != null;
+  }
+
+  /**
+   * @param p Path to check.
+   * @return True if the path has format of a HStoreFile reference.
+   */
+  public static boolean isReference(final Path p) {
+    return isReference(p, REF_NAME_PARSER.matcher(p.getName()));
+  }
+
+  /**
+   * @param p Path to check.
+   * @param m Matcher to use.
+   * @return True if the path has format of a HStoreFile reference.
+   */
+  public static boolean isReference(final Path p, final Matcher m) {
+    if (m == null || !m.matches()) {
+      LOG.warn("Failed match of store file name " + p.toString());
+      throw new RuntimeException("Failed match of store file name " +
+          p.toString());
+    }
+    return m.groupCount() > 1 && m.group(2) != null;
+  }
+
+  /*
+   * Return path to the file referred to by a Reference.  Presumes a directory
+   * hierarchy of <code>${hbase.rootdir}/tablename/regionname/familyname</code>.
+   * @param p Path to a Reference file.
+   * @return Calculated path to parent region file.
+   * @throws IOException
+   */
+  static Path getReferredToFile(final Path p) throws IOException {
+    Matcher m = REF_NAME_PARSER.matcher(p.getName());
+    if (m == null || !m.matches()) {
+      LOG.warn("Failed match of store file name " + p.toString());
+      throw new RuntimeException("Failed match of store file name " +
+          p.toString());
+    }
+    // Other region name is suffix on the passed Reference file name
+    String otherRegion = m.group(2);
+    // Tabledir is up two directories from where Reference was written.
+    Path tableDir = p.getParent().getParent().getParent();
+    String nameStrippedOfSuffix = m.group(1);
+    // Build up new path with the referenced region in place of our current
+    // region in the reference path.  Also strip regionname suffix from name.
+    return new Path(new Path(new Path(tableDir, otherRegion),
+      p.getParent().getName()), nameStrippedOfSuffix);
+  }
+
+  /**
+   * @return True if this file was made by a major compaction.
+   */
+  boolean isMajorCompaction() {
+    if (this.majorCompaction == null) {
+      throw new NullPointerException("This has not been set yet");
+    }
+    return this.majorCompaction.get();
+  }
+
+  /**
+   * @return This files maximum edit sequence id.
+   */
+  public long getMaxSequenceId() {
+    if (this.sequenceid == -1) {
+      throw new IllegalAccessError("Has not been initialized");
+    }
+    return this.sequenceid;
+  }
+
+  /**
+   * Opens reader on this store file.  Called by Constructor.
+   * @return Reader for the store file.
+   * @throws IOException
+   * @see #close()
+   */
+  protected HFile.Reader open()
+  throws IOException {
+    if (this.reader != null) {
+      throw new IllegalAccessError("Already open");
+    }
+    if (isReference()) {
+      this.reader = new HalfHFileReader(this.fs, this.referencePath, null,
+        this.reference);
+    } else {
+      this.reader = new HFile.Reader(this.fs, this.path, null);
+    }
+    // Load up indices and fileinfo.
+    Map<byte [], byte []> map = this.reader.loadFileInfo();
+    // Read in our metadata.
+    byte [] b = map.get(MAX_SEQ_ID_KEY);
+    if (b != null) {
+      // By convention, if halfhfile, top half has a sequence number > bottom
+      // half. Thats why we add one in below. Its done for case the two halves
+      // are ever merged back together --rare.  Without it, on open of store,
+      // since store files are distingushed by sequence id, the one half would
+      // subsume the other.
+      this.sequenceid = Bytes.toLong(b);
+      if (isReference()) {
+        if (Reference.isTopFileRegion(this.reference.getFileRegion())) {
+          this.sequenceid += 1;
+        }
+      }
+      
+    }
+    b = map.get(MAJOR_COMPACTION_KEY);
+    if (b != null) {
+      boolean mc = Bytes.toBoolean(b);
+      if (this.majorCompaction == null) {
+        this.majorCompaction = new AtomicBoolean(mc);
+      } else {
+        this.majorCompaction.set(mc);
+      }
+    }
+    return this.reader;
+  }
+
+  /**
+   * @return Current reader.  Must call open first.
+   */
+  public HFile.Reader getReader() {
+    if (this.reader == null) {
+      throw new IllegalAccessError("Call open first");
+    }
+    return this.reader;
+  }
+
+  /**
+   * @throws IOException
+   * @see #open()
+   */
+  public synchronized void close() throws IOException {
+    if (this.reader != null) {
+      this.reader.close();
+      this.reader = null;
+    }
+  }
+
+  public String toString() {
+    return this.path.toString() +
+      (isReference()? "-" + this.referencePath + "-" + reference.toString(): "");
+  }
+
+  /**
+   * Delete this file
+   * @throws IOException 
+   */
+  public void delete() throws IOException {
+    close();
+    this.fs.delete(getPath(), true);
+  }
+
+  /**
+   * Utility to help with rename.
+   * @param fs
+   * @param src
+   * @param tgt
+   * @return True if succeeded.
+   * @throws IOException
+   */
+  public static Path rename(final FileSystem fs, final Path src,
+      final Path tgt)
+  throws IOException {
+    if (!fs.exists(src)) {
+      throw new FileNotFoundException(src.toString());
+    }
+    if (!fs.rename(src, tgt)) {
+      throw new IOException("Failed rename of " + src + " to " + tgt);
+    }
+    return tgt;
+  }
+
+  /**
+   * Get a store file writer. Client is responsible for closing file when done.
+   * If metadata, add BEFORE closing using
+   * {@link #appendMetadata(org.apache.hadoop.hbase.io.hfile.HFile.Writer, long)}.
+   * @param fs
+   * @param dir Path to family directory.  Makes the directory if doesn't exist.
+   * Creates a file with a unique name in this directory.
+   * @return HFile.Writer
+   * @throws IOException
+   */
+  public static HFile.Writer getWriter(final FileSystem fs, final Path dir)
+  throws IOException {
+    return getWriter(fs, dir, DEFAULT_BLOCKSIZE_SMALL, null, null);
+  }
+
+  /**
+   * Get a store file writer. Client is responsible for closing file when done.
+   * If metadata, add BEFORE closing using
+   * {@link #appendMetadata(org.apache.hadoop.hbase.io.hfile.HFile.Writer, long)}.
+   * @param fs
+   * @param dir Path to family directory.  Makes the directory if doesn't exist.
+   * Creates a file with a unique name in this directory.
+   * @param blocksize
+   * @param algorithm Pass null to get default.
+   * @param c Pass null to get default.
+   * @return HFile.Writer
+   * @throws IOException
+   */
+  public static HFile.Writer getWriter(final FileSystem fs, final Path dir,
+    final int blocksize, final String algorithm, final RawComparator<byte []> c)
+  throws IOException {
+    if (!fs.exists(dir)) {
+      fs.mkdirs(dir);
+    }
+    Path path = getUniqueFile(fs, dir);
+    return new HFile.Writer(fs, path, blocksize,
+      algorithm == null? HFile.DEFAULT_COMPRESSION: algorithm,
+      c == null? HStoreKey.BYTECOMPARATOR: c);
+  }
+
+  /**
+   * @param fs
+   * @param p
+   * @return random filename inside passed <code>dir</code>
+   */
+  static Path getUniqueFile(final FileSystem fs, final Path p)
+  throws IOException {
+    if (!fs.getFileStatus(p).isDir()) {
+      throw new IOException("Expecting a directory");
+    }
+    return fs.getFileStatus(p).isDir()? getRandomFilename(fs, p): p;
+  }
+
+  /**
+   * @param fs
+   * @param dir
+   * @param encodedRegionName
+   * @param family
+   * @return Path to a file that doesn't exist at time of this invocation.
+   * @throws IOException
+   */
+  static Path getRandomFilename(final FileSystem fs, final Path dir)
+  throws IOException {
+    return getRandomFilename(fs, dir, null);
+  }
+
+  /**
+   * @param fs
+   * @param dir
+   * @param encodedRegionName
+   * @param family
+   * @param suffix
+   * @return Path to a file that doesn't exist at time of this invocation.
+   * @throws IOException
+   */
+  static Path getRandomFilename(final FileSystem fs, final Path dir,
+      final String suffix)
+  throws IOException {
+    long id = -1;
+    Path p = null;
+    do {
+      id = Math.abs(rand.nextLong());
+      p = new Path(dir, Long.toString(id) +
+        ((suffix == null || suffix.length() <= 0)? "": suffix));
+    } while(fs.exists(p));
+    return p;
+  }
+
+  /**
+   * Write file metadata.
+   * Call before you call close on the passed <code>w</code> since its written
+   * as metadata to that file.
+   * 
+   * @param filesystem file system
+   * @param maxSequenceId Maximum sequence id.
+   * @throws IOException
+   */
+  static void appendMetadata(final HFile.Writer w, final long maxSequenceId)
+  throws IOException {
+    appendMetadata(w, maxSequenceId, false);
+  }
+
+  /**
+   * Writes metadata.
+   * Call before you call close on the passed <code>w</code> since its written
+   * as metadata to that file.
+   * @param maxSequenceId Maximum sequence id.
+   * @param mc True if this file is product of a major compaction
+   * @throws IOException
+   */
+  static void appendMetadata(final HFile.Writer w, final long maxSequenceId,
+    final boolean mc)
+  throws IOException {
+    w.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
+    w.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(mc));
+  }
+
+  /*
+   * Write out a split reference.
+   * @param fs
+   * @param splitDir Presumes path format is actually
+   * <code>SOME_DIRECTORY/REGIONNAME/FAMILY</code>.
+   * @param f File to split.
+   * @param splitRow
+   * @param range
+   * @return Path to created reference.
+   * @throws IOException
+   */
+  static Path split(final FileSystem fs, final Path splitDir,
+    final StoreFile f, final byte [] splitRow, final Reference.Range range)
+  throws IOException {
+    // A reference to the bottom half of the hsf store file.
+    Reference r = new Reference(new HStoreKey(splitRow).getBytes(), range);
+    // Add the referred-to regions name as a dot separated suffix. 
+    // See REF_NAME_PARSER regex above.  The referred-to regions name is
+    // up in the path of the passed in <code>f</code> -- parentdir is family,
+    // then the directory above is the region name.
+    String parentRegionName = f.getPath().getParent().getParent().getName();
+    // Write reference with same file id only with the other region name as
+    // suffix and into the new region location (under same family).
+    Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
+    return r.write(fs, p);
+  }
+}
\ No newline at end of file



Mime
View raw message