hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [1/3] hbase git commit: HBASE-10201 Port 'Make flush decisions per column family' to trunk
Date Fri, 19 Dec 2014 00:06:19 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 9895604e9 -> 5d34d2d02


http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 43072ce..e40b48d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -31,10 +33,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -50,7 +54,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -65,15 +68,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
-import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
-import org.apache.hadoop.hbase.wal.WALProvider.Writer;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.DrainBarrier;
@@ -81,6 +76,13 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.util.StringUtils;
 import org.htrace.NullScope;
@@ -89,6 +91,7 @@ import org.htrace.Trace;
 import org.htrace.TraceScope;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.ExceptionHandler;
@@ -334,33 +337,35 @@ public class FSHLog implements WAL {
   // sequence id numbers are by region and unrelated to the ring buffer sequence number accounting
   // done above in failedSequence, highest sequence, etc.
   /**
-   * This lock ties all operations on oldestFlushingRegionSequenceIds and
-   * oldestFlushedRegionSequenceIds Maps with the exception of append's putIfAbsent call into
-   * oldestUnflushedSeqNums. We use these Maps to find out the low bound regions sequence id, or
-   * to find regions  with old sequence ids to force flush; we are interested in old stuff not the
-   * new additions (TODO: IS THIS SAFE?  CHECK!).
+   * This lock ties all operations on lowestFlushingStoreSequenceIds and
+   * oldestUnflushedStoreSequenceIds Maps with the exception of append's putIfAbsent call into
+   * oldestUnflushedStoreSequenceIds. We use these Maps to find out the low bound regions
+   * sequence id, or to find regions with old sequence ids to force flush; we are interested in
+   * old stuff not the new additions (TODO: IS THIS SAFE?  CHECK!).
    */
   private final Object regionSequenceIdLock = new Object();
 
   /**
-   * Map of encoded region names to their OLDEST -- i.e. their first, the longest-lived --
-   * sequence id in memstore. Note that this sequence id is the region sequence id.  This is not
-   * related to the id we use above for {@link #highestSyncedSequence} and
-   * {@link #highestUnsyncedSequence} which is the sequence from the disruptor ring buffer.
+   * Map of encoded region names and family names to their OLDEST -- i.e. their first,
+   * the longest-lived -- sequence id in memstore. Note that this sequence id is the region
+   * sequence id.  This is not related to the id we use above for {@link #highestSyncedSequence}
+   * and {@link #highestUnsyncedSequence} which is the sequence from the disruptor
+   * ring buffer.
    */
-  private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedRegionSequenceIds =
-    new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+  private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> oldestUnflushedStoreSequenceIds
+    = new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
+      Bytes.BYTES_COMPARATOR);
 
   /**
-   * Map of encoded region names to their lowest or OLDEST sequence/edit id in memstore currently
-   * being flushed out to hfiles. Entries are moved here from
-   * {@link #oldestUnflushedRegionSequenceIds} while the lock {@link #regionSequenceIdLock} is held
+   * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id in
+   * memstore currently being flushed out to hfiles. Entries are moved here from
+   * {@link #oldestUnflushedStoreSequenceIds} while the lock {@link #regionSequenceIdLock} is held
    * (so movement between the Maps is atomic). This is not related to the id we use above for
    * {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from
    * the disruptor ring buffer, an internal detail.
    */
-  private final Map<byte[], Long> lowestFlushingRegionSequenceIds =
-    new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+  private final Map<byte[], Map<byte[], Long>> lowestFlushingStoreSequenceIds =
+    new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
 
  /**
   * Map of region encoded names to the latest region sequence id.  Updated on each append of
@@ -735,6 +740,28 @@ public class FSHLog implements WAL {
     return DefaultWALProvider.createWriter(conf, fs, path, false);
   }
 
+  private long getLowestSeqId(Map<byte[], Long> seqIdMap) {
+    long result = HConstants.NO_SEQNUM;
+    for (Long seqNum: seqIdMap.values()) {
+      if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) {
+        result = seqNum.longValue();
+      }
+    }
+    return result;
+  }
+
+  private <T extends Map<byte[], Long>> Map<byte[], Long> copyMapWithLowestSeqId(
+      Map<byte[], T> mapToCopy) {
+    Map<byte[], Long> copied = Maps.newHashMap();
+    for (Map.Entry<byte[], T> entry: mapToCopy.entrySet()) {
+      long lowestSeqId = getLowestSeqId(entry.getValue());
+      if (lowestSeqId != HConstants.NO_SEQNUM) {
+        copied.put(entry.getKey(), lowestSeqId);
+      }
+    }
+    return copied;
+  }
+
   /**
    * Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits
    * have been flushed to hfiles.
@@ -747,22 +774,23 @@ public class FSHLog implements WAL {
    * @throws IOException
    */
   private void cleanOldLogs() throws IOException {
-    Map<byte[], Long> oldestFlushingSeqNumsLocal = null;
-    Map<byte[], Long> oldestUnflushedSeqNumsLocal = null;
+    Map<byte[], Long> lowestFlushingRegionSequenceIdsLocal = null;
+    Map<byte[], Long> oldestUnflushedRegionSequenceIdsLocal = null;
     List<Path> logsToArchive = new ArrayList<Path>();
     // make a local copy so as to avoid locking when we iterate over these maps.
     synchronized (regionSequenceIdLock) {
-      oldestFlushingSeqNumsLocal = new HashMap<byte[], Long>(this.lowestFlushingRegionSequenceIds);
-      oldestUnflushedSeqNumsLocal =
-        new HashMap<byte[], Long>(this.oldestUnflushedRegionSequenceIds);
+      lowestFlushingRegionSequenceIdsLocal =
+          copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds);
+      oldestUnflushedRegionSequenceIdsLocal =
+          copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds);
     }
     for (Map.Entry<Path, Map<byte[], Long>> e : byWalRegionSequenceIds.entrySet()) {
       // iterate over the log file.
       Path log = e.getKey();
       Map<byte[], Long> sequenceNums = e.getValue();
       // iterate over the map for this log file, and tell whether it should be archive or not.
-      if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal,
-          oldestUnflushedSeqNumsLocal)) {
+      if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal,
+          oldestUnflushedRegionSequenceIdsLocal)) {
         logsToArchive.add(log);
         LOG.debug("WAL file ready for archiving " + log);
       }
@@ -816,10 +844,11 @@ public class FSHLog implements WAL {
     List<byte[]> regionsToFlush = null;
     // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
     synchronized (regionSequenceIdLock) {
-      for (Map.Entry<byte[], Long> e : regionsSequenceNums.entrySet()) {
-        Long unFlushedVal = this.oldestUnflushedRegionSequenceIds.get(e.getKey());
-        if (unFlushedVal != null && unFlushedVal <= e.getValue()) {
-          if (regionsToFlush == null) regionsToFlush = new ArrayList<byte[]>();
+      for (Map.Entry<byte[], Long> e: regionsSequenceNums.entrySet()) {
+        long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey());
+        if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) {
+          if (regionsToFlush == null)
+            regionsToFlush = new ArrayList<byte[]>();
           regionsToFlush.add(e.getKey());
         }
       }
@@ -1585,36 +1614,53 @@ public class FSHLog implements WAL {
     // +1 for current use log
     return getNumRolledLogFiles() + 1;
   }
-  
+
   // public only until class moves to o.a.h.h.wal
   /** @return the size of log files in use */
   public long getLogFileSize() {
     return this.totalLogSize.get();
   }
-  
+
   @Override
-  public boolean startCacheFlush(final byte[] encodedRegionName) {
-    Long oldRegionSeqNum = null;
+  public boolean startCacheFlush(final byte[] encodedRegionName,
+      Set<byte[]> flushedFamilyNames) {
+    Map<byte[], Long> oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
     if (!closeBarrier.beginOp()) {
       LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
         " - because the server is closing.");
       return false;
     }
     synchronized (regionSequenceIdLock) {
-      oldRegionSeqNum = this.oldestUnflushedRegionSequenceIds.remove(encodedRegionName);
-      if (oldRegionSeqNum != null) {
-        Long oldValue =
-          this.lowestFlushingRegionSequenceIds.put(encodedRegionName, oldRegionSeqNum);
-        assert oldValue ==
-          null : "Flushing map not cleaned up for " + Bytes.toString(encodedRegionName);
+      ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+          oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+      if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
+        for (byte[] familyName: flushedFamilyNames) {
+          Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName);
+          if (seqId != null) {
+            oldStoreSeqNum.put(familyName, seqId);
+          }
+        }
+        if (!oldStoreSeqNum.isEmpty()) {
+          Map<byte[], Long> oldValue = this.lowestFlushingStoreSequenceIds.put(
+              encodedRegionName, oldStoreSeqNum);
+          assert oldValue == null: "Flushing map not cleaned up for "
+              + Bytes.toString(encodedRegionName);
+        }
+        if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) {
+          // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever
+          // even if the region is already moved to other server.
+          // Do not worry about data racing, we held write lock of region when calling
+          // startCacheFlush, so no one can add value to the map we removed.
+          oldestUnflushedStoreSequenceIds.remove(encodedRegionName);
+        }
       }
     }
-    if (oldRegionSeqNum == null) {
-      // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either
-      //       the region is already flushing (which would make this call invalid), or there
-      //       were no appends after last flush, so why are we starting flush? Maybe we should
-      //       assert not null, and switch to "long" everywhere. Less rigorous, but safer,
-      //       alternative is telling the caller to stop. For now preserve old logic.
+    if (oldStoreSeqNum.isEmpty()) {
+      // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either
+      // the region is already flushing (which would make this call invalid), or there
+      // were no appends after last flush, so why are we starting flush? Maybe we should
+      // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop.
+      // For now preserve old logic.
       LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
         + Bytes.toString(encodedRegionName) + "]");
     }
@@ -1624,30 +1670,59 @@ public class FSHLog implements WAL {
   @Override
   public void completeCacheFlush(final byte [] encodedRegionName) {
     synchronized (regionSequenceIdLock) {
-      this.lowestFlushingRegionSequenceIds.remove(encodedRegionName);
+      this.lowestFlushingStoreSequenceIds.remove(encodedRegionName);
     }
     closeBarrier.endOp();
   }
 
+  private ConcurrentMap<byte[], Long> getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(
+      byte[] encodedRegionName) {
+    ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+        oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+    if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
+      return oldestUnflushedStoreSequenceIdsOfRegion;
+    }
+    oldestUnflushedStoreSequenceIdsOfRegion =
+        new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+    ConcurrentMap<byte[], Long> alreadyPut =
+        oldestUnflushedStoreSequenceIds.put(encodedRegionName,
+          oldestUnflushedStoreSequenceIdsOfRegion);
+    return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut;
+  }
+
   @Override
   public void abortCacheFlush(byte[] encodedRegionName) {
-    Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
+    Map<byte[], Long> storeSeqNumsBeforeFlushStarts;
+    Map<byte[], Long> currentStoreSeqNums = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
     synchronized (regionSequenceIdLock) {
-      seqNumBeforeFlushStarts = this.lowestFlushingRegionSequenceIds.remove(encodedRegionName);
-      if (seqNumBeforeFlushStarts != null) {
-        currentSeqNum =
-          this.oldestUnflushedRegionSequenceIds.put(encodedRegionName, seqNumBeforeFlushStarts);
+      storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove(
+        encodedRegionName);
+      if (storeSeqNumsBeforeFlushStarts != null) {
+        ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+            getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
+        for (Map.Entry<byte[], Long> familyNameAndSeqId: storeSeqNumsBeforeFlushStarts
+            .entrySet()) {
+          currentStoreSeqNums.put(familyNameAndSeqId.getKey(),
+            oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(),
+              familyNameAndSeqId.getValue()));
+        }
       }
     }
     closeBarrier.endOp();
-    if ((currentSeqNum != null)
-        && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
-      String errorStr = "Region " + Bytes.toString(encodedRegionName) +
-          "acquired edits out of order current memstore seq=" + currentSeqNum
-          + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
-      LOG.error(errorStr);
-      assert false : errorStr;
-      Runtime.getRuntime().halt(1);
+    if (storeSeqNumsBeforeFlushStarts != null) {
+      for (Map.Entry<byte[], Long> familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) {
+        Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey());
+        if (currentSeqNum != null
+            && currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) {
+          String errorStr =
+              "Region " + Bytes.toString(encodedRegionName) + " family "
+                  + Bytes.toString(familyNameAndSeqId.getKey())
+                  + " acquired edits out of order current memstore seq=" + currentSeqNum
+                  + ", previous oldest unflushed id=" + familyNameAndSeqId.getValue();
+          LOG.error(errorStr);
+          Runtime.getRuntime().halt(1);
+        }
+      }
     }
   }
 
@@ -1678,8 +1753,23 @@ public class FSHLog implements WAL {
 
   @Override
   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
-    Long result = oldestUnflushedRegionSequenceIds.get(encodedRegionName);
-    return result == null ? HConstants.NO_SEQNUM : result.longValue();
+    ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+        this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+    return oldestUnflushedStoreSequenceIdsOfRegion != null ?
+        getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM;
+  }
+
+  @Override
+  public long getEarliestMemstoreSeqNum(byte[] encodedRegionName,
+      byte[] familyName) {
+    ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+        this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
+    if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
+      Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName);
+      return result != null ? result.longValue() : HConstants.NO_SEQNUM;
+    } else {
+      return HConstants.NO_SEQNUM;
+    }
   }
 
   /**
@@ -1915,6 +2005,15 @@ public class FSHLog implements WAL {
       }
     }
 
+    private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName,
+        Set<byte[]> familyNameSet, Long lRegionSequenceId) {
+      ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
+          getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
+      for (byte[] familyName : familyNameSet) {
+        oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId);
+      }
+    }
+
     /**
      * Append to the WAL.  Does all CP and WAL listener calls.
      * @param entry
@@ -1962,9 +2061,10 @@ public class FSHLog implements WAL {
         Long lRegionSequenceId = Long.valueOf(regionSequenceId);
         highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
         if (entry.isInMemstore()) {
-          oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId);
+          updateOldestUnflushedSequenceIds(encodedRegionName,
+              entry.getFamilyNames(), lRegionSequenceId);
         }
-        
+
         coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
         // Update metrics.
         postAppend(entry, EnvironmentEdgeManager.currentTime() - start);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index d9942b3..147a13d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -19,13 +19,21 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CollectionUtils;
+
+import com.google.common.collect.Sets;
 
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -96,7 +104,7 @@ class FSWALEntry extends Entry {
    */
   long stampRegionSequenceId() throws IOException {
     long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
-    if (!this.getEdit().isReplay() && memstoreCells != null && !memstoreCells.isEmpty()) {
+    if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(memstoreCells)) {
       for (Cell cell : this.memstoreCells) {
         CellUtil.setSequenceId(cell, regionSequenceId);
       }
@@ -105,4 +113,21 @@ class FSWALEntry extends Entry {
     key.setLogSeqNum(regionSequenceId);
     return regionSequenceId;
   }
+
+  /**
+   * @return the family names which are effected by this edit.
+   */
+  Set<byte[]> getFamilyNames() {
+    ArrayList<Cell> cells = this.getEdit().getCells();
+    if (CollectionUtils.isEmpty(cells)) {
+      return Collections.<byte[]>emptySet();
+    }
+    Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
+    for (Cell cell : cells) {
+      if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+        familySet.add(CellUtil.cloneFamily(cell));
+      }
+    }
+    return familySet;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 70bfff1..1fbe151 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -183,7 +184,7 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
-    public boolean startCacheFlush(final byte[] encodedRegionName) {
+    public boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
       return !(closed.get());
     }
 
@@ -206,6 +207,11 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
+    public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
+      return HConstants.NO_SEQNUM;
+    }
+
+    @Override
     public String toString() {
       return "WAL disabled.";
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 23f8c9f..5a2b08d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.wal;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -152,7 +153,7 @@ public interface WAL {
    * @return true if the flush can proceed, false in case wal is closing (ususally, when server is
    * closing) and flush couldn't be started.
    */
-  boolean startCacheFlush(final byte[] encodedRegionName);
+  boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames);
 
   /**
    * Complete the cache flush.
@@ -182,6 +183,14 @@ public interface WAL {
   long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
 
   /**
+   * Gets the earliest sequence number in the memstore for this particular region and store.
+   * @param encodedRegionName The region to get the number for.
+   * @param familyName The family to get the number for.
+   * @return The number if present, HConstants.NO_SEQNUM if absent.
+   */
+  long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName);
+
+  /**
    * Human readable identifying information about the state of this WAL.
    * Implementors are encouraged to include information appropriate for debugging.
    * Consumers are advised not to rely on the details of the returned String; it does

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 48feb03..dd37e24 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -266,7 +266,7 @@ public class TestIOFencing {
       compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
       LOG.info("Blocking compactions");
       compactingRegion.stopCompactions();
-      long lastFlushTime = compactingRegion.getLastFlushTime();
+      long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores();
       // Load some rows
       TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
 
@@ -282,7 +282,7 @@ public class TestIOFencing {
 
       // Wait till flush has happened, otherwise there won't be multiple store files
       long startWaitTime = System.currentTimeMillis();
-      while (compactingRegion.getLastFlushTime() <= lastFlushTime ||
+      while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime ||
           compactingRegion.countStoreFiles() <= 1) {
         LOG.info("Waiting for the region to flush " + compactingRegion.getRegionNameAsString());
         Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
index 00bf09b..edb4926 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java
@@ -33,8 +33,8 @@ public class TestFlushRegionEntry {
 
   @Test
   public void test() {
-    FlushRegionEntry entry = new FlushRegionEntry(Mockito.mock(HRegion.class));
-    FlushRegionEntry other = new FlushRegionEntry(Mockito.mock(HRegion.class));
+    FlushRegionEntry entry = new FlushRegionEntry(Mockito.mock(HRegion.class), true);
+    FlushRegionEntry other = new FlushRegionEntry(Mockito.mock(HRegion.class), true);
 
     assertEquals(entry.hashCode(), other.hashCode());
     assertEquals(entry, other);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
index 2d63775..e69e735 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
@@ -112,11 +112,11 @@ public class TestHeapMemoryManager {
     long oldBlockCacheSize = blockCache.maxSize;
     heapMemoryManager.start();
     memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
-    memStoreFlusher.requestFlush(null);
-    memStoreFlusher.requestFlush(null);
-    memStoreFlusher.requestFlush(null);
+    memStoreFlusher.requestFlush(null, false);
+    memStoreFlusher.requestFlush(null, false);
+    memStoreFlusher.requestFlush(null, false);
     memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
-    memStoreFlusher.requestFlush(null);
+    memStoreFlusher.requestFlush(null, false);
     Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
     assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
         memStoreFlusher.memstoreSize);
@@ -126,8 +126,8 @@ public class TestHeapMemoryManager {
     oldBlockCacheSize = blockCache.maxSize;
     // Do some more flushes before the next run of HeapMemoryTuner
     memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
-    memStoreFlusher.requestFlush(null);
-    memStoreFlusher.requestFlush(null);
+    memStoreFlusher.requestFlush(null, false);
+    memStoreFlusher.requestFlush(null, false);
     Thread.sleep(1500);
     assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
         memStoreFlusher.memstoreSize);
@@ -407,12 +407,12 @@ public class TestHeapMemoryManager {
     }
 
     @Override
-    public void requestFlush(HRegion region) {
+    public void requestFlush(HRegion region, boolean forceFlushAllStores) {
       this.listener.flushRequested(flushType, region);
     }
 
     @Override
-    public void requestDelayedFlush(HRegion region, long delay) {
+    public void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) {
 
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
new file mode 100644
index 0000000..43a9575
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
+import org.apache.hadoop.hbase.regionserver.FlushAllStoresPolicy;
+import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy;
+import org.apache.hadoop.hbase.regionserver.FlushPolicy;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.hash.Hashing;
+
+/**
+ * This test verifies the correctness of the Per Column Family flushing strategy
+ */
+@Category(MediumTests.class)
+public class TestPerColumnFamilyFlush {
+  private static final Log LOG = LogFactory.getLog(TestPerColumnFamilyFlush.class);
+
+  HRegion region = null;
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
+
+  public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
+
+  public static final byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
+      Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
+
+  public static final byte[] FAMILY1 = families[0];
+
+  public static final byte[] FAMILY2 = families[1];
+
+  public static final byte[] FAMILY3 = families[2];
+
+  private void initHRegion(String callingMethod, Configuration conf) throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
+    for (byte[] family : families) {
+      htd.addFamily(new HColumnDescriptor(family));
+    }
+    HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
+    Path path = new Path(DIR, callingMethod);
+    region = HRegion.createHRegion(info, path, conf, htd);
+  }
+
+  // A helper function to create puts.
+  private Put createPut(int familyNum, int putNum) {
+    byte[] qf = Bytes.toBytes("q" + familyNum);
+    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
+    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
+    Put p = new Put(row);
+    p.add(families[familyNum - 1], qf, val);
+    return p;
+  }
+
+  // A helper function to create puts.
+  private Get createGet(int familyNum, int putNum) {
+    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
+    return new Get(row);
+  }
+
+  // A helper function to verify edits.
+  void verifyEdit(int familyNum, int putNum, HTable table) throws IOException {
+    Result r = table.get(createGet(familyNum, putNum));
+    byte[] family = families[familyNum - 1];
+    byte[] qf = Bytes.toBytes("q" + familyNum);
+    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
+    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
+    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
+      r.getFamilyMap(family).get(qf));
+    assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
+      Arrays.equals(r.getFamilyMap(family).get(qf), val));
+  }
+
+  @Test
+  public void testSelectiveFlushWhenEnabled() throws IOException {
+    // Set up the configuration
+    Configuration conf = HBaseConfiguration.create();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+      FlushLargeStoresPolicy.class.getName());
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+      100 * 1024);
+    // Intialize the HRegion
+    initHRegion("testSelectiveFlushWhenEnabled", conf);
+    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
+    for (int i = 1; i <= 1200; i++) {
+      region.put(createPut(1, i));
+
+      if (i <= 100) {
+        region.put(createPut(2, i));
+        if (i <= 50) {
+          region.put(createPut(3, i));
+        }
+      }
+    }
+
+    long totalMemstoreSize = region.getMemstoreSize().get();
+
+    // Find the smallest LSNs for edits wrt to each CF.
+    long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
+    long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2);
+    long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
+
+    // Find the sizes of the memstores of each CF.
+    long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+
+    // Get the overall smallest LSN in the region's memstores.
+    long smallestSeqInRegionCurrentMemstore =
+        region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+    // The overall smallest LSN in the region's memstores should be the same as
+    // the LSN of the smallest edit in CF1
+    assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
+
+    // Some other sanity checks.
+    assertTrue(smallestSeqCF1 < smallestSeqCF2);
+    assertTrue(smallestSeqCF2 < smallestSeqCF3);
+    assertTrue(cf1MemstoreSize > 0);
+    assertTrue(cf2MemstoreSize > 0);
+    assertTrue(cf3MemstoreSize > 0);
+
+    // The total memstore size should be the same as the sum of the sizes of
+    // memstores of CF1, CF2 and CF3.
+    assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize
+        + cf2MemstoreSize + cf3MemstoreSize);
+
+    // Flush!
+    region.flushcache(false);
+
+    // Will use these to check if anything changed.
+    long oldCF2MemstoreSize = cf2MemstoreSize;
+    long oldCF3MemstoreSize = cf3MemstoreSize;
+
+    // Recalculate everything
+    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    totalMemstoreSize = region.getMemstoreSize().get();
+    smallestSeqInRegionCurrentMemstore =
+        region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+    // We should have cleared out only CF1, since we chose the flush thresholds
+    // and number of puts accordingly.
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+    // Nothing should have happened to CF2, ...
+    assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
+    // ... or CF3
+    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
+    // Now the smallest LSN in the region should be the same as the smallest
+    // LSN in the memstore of CF2.
+    assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
+    // Of course, this should hold too.
+    assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
+        + cf3MemstoreSize);
+
+    // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
+    for (int i = 1200; i < 2400; i++) {
+      region.put(createPut(2, i));
+
+      // Add only 100 puts for CF3
+      if (i - 1200 < 100) {
+        region.put(createPut(3, i));
+      }
+    }
+
+    // How much does the CF3 memstore occupy? Will be used later.
+    oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+
+    // Flush again
+    region.flushcache(false);
+
+    // Recalculate everything
+    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    totalMemstoreSize = region.getMemstoreSize().get();
+    smallestSeqInRegionCurrentMemstore =
+        region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+    // CF1 and CF2, both should be absent.
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize);
+    // CF3 shouldn't have been touched.
+    assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
+    assertEquals(totalMemstoreSize + DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize);
+    assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3);
+
+    // What happens when we hit the memstore limit, but we are not able to find
+    // any Column Family above the threshold?
+    // In that case, we should flush all the CFs.
+
+    // Clearing the existing memstores.
+    region.flushcache(true);
+
+    // The memstore limit is 200*1024 and the column family flush threshold is
+    // around 50*1024. We try to just hit the memstore limit with each CF's
+    // memstore being below the CF flush threshold.
+    for (int i = 1; i <= 300; i++) {
+      region.put(createPut(1, i));
+      region.put(createPut(2, i));
+      region.put(createPut(3, i));
+      region.put(createPut(4, i));
+      region.put(createPut(5, i));
+    }
+
+    region.flushcache(false);
+    // Since we won't find any CF above the threshold, and hence no specific
+    // store to flush, we should flush all the memstores.
+    assertEquals(0, region.getMemstoreSize().get());
+  }
+
+  @Test
+  public void testSelectiveFlushWhenNotEnabled() throws IOException {
+    // Set up the configuration
+    Configuration conf = HBaseConfiguration.create();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
+
+    // Intialize the HRegion
+    initHRegion("testSelectiveFlushWhenNotEnabled", conf);
+    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
+    for (int i = 1; i <= 1200; i++) {
+      region.put(createPut(1, i));
+      if (i <= 100) {
+        region.put(createPut(2, i));
+        if (i <= 50) {
+          region.put(createPut(3, i));
+        }
+      }
+    }
+
+    long totalMemstoreSize = region.getMemstoreSize().get();
+
+    // Find the sizes of the memstores of each CF.
+    long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+
+    // Some other sanity checks.
+    assertTrue(cf1MemstoreSize > 0);
+    assertTrue(cf2MemstoreSize > 0);
+    assertTrue(cf3MemstoreSize > 0);
+
+    // The total memstore size should be the same as the sum of the sizes of
+    // memstores of CF1, CF2 and CF3.
+    assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize
+        + cf2MemstoreSize + cf3MemstoreSize);
+
+    // Flush!
+    region.flushcache(false);
+
+    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    totalMemstoreSize = region.getMemstoreSize().get();
+    long smallestSeqInRegionCurrentMemstore =
+        region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
+
+    // Everything should have been cleared
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize);
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize);
+    assertEquals(0, totalMemstoreSize);
+    assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
+  }
+
+  // Find the (first) region which has the specified name.
+  private static Pair<HRegion, HRegionServer> getRegionWithName(TableName tableName) {
+    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
+    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
+    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
+      HRegionServer hrs = rsts.get(i).getRegionServer();
+      for (HRegion region : hrs.getOnlineRegions(tableName)) {
+        return Pair.newPair(region, hrs);
+      }
+    }
+    return null;
+  }
+
+  @Test
+  public void testLogReplay() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000);
+    // Carefully chosen limits so that the memstore just flushes when we're done
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+      FlushLargeStoresPolicy.class.getName());
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 10000);
+    final int numRegionServers = 4;
+    TEST_UTIL.startMiniCluster(numRegionServers);
+    TEST_UTIL.getHBaseAdmin().createNamespace(
+      NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
+    HTable table = TEST_UTIL.createTable(TABLENAME, families);
+    HTableDescriptor htd = table.getTableDescriptor();
+
+    for (byte[] family : families) {
+      if (!htd.hasFamily(family)) {
+        htd.addFamily(new HColumnDescriptor(family));
+      }
+    }
+
+    // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
+    // These will all be interleaved in the log.
+    for (int i = 1; i <= 80; i++) {
+      table.put(createPut(1, i));
+      if (i <= 10) {
+        table.put(createPut(2, i));
+        table.put(createPut(3, i));
+      }
+    }
+    table.flushCommits();
+    Thread.sleep(1000);
+
+    Pair<HRegion, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
+    HRegion desiredRegion = desiredRegionAndServer.getFirst();
+    assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
+
+    // Flush the region selectively.
+    desiredRegion.flushcache(false);
+
+    long totalMemstoreSize;
+    long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
+    totalMemstoreSize = desiredRegion.getMemstoreSize().get();
+
+    // Find the sizes of the memstores of each CF.
+    cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize();
+    cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize();
+    cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize();
+
+    // CF1 Should have been flushed
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
+    // CF2 and CF3 shouldn't have been flushed.
+    assertTrue(cf2MemstoreSize > 0);
+    assertTrue(cf3MemstoreSize > 0);
+    assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
+        + cf3MemstoreSize);
+
+    // Wait for the RS report to go across to the master, so that the master
+    // is aware of which sequence ids have been flushed, before we kill the RS.
+    // If in production, the RS dies before the report goes across, we will
+    // safely replay all the edits.
+    Thread.sleep(2000);
+
+    // Abort the region server where we have the region hosted.
+    HRegionServer rs = desiredRegionAndServer.getSecond();
+    rs.abort("testing");
+
+    // The aborted region server's regions will be eventually assigned to some
+    // other region server, and the get RPC call (inside verifyEdit()) will
+    // retry for some time till the regions come back up.
+
+    // Verify that all the edits are safe.
+    for (int i = 1; i <= 80; i++) {
+      verifyEdit(1, i, table);
+      if (i <= 10) {
+        verifyEdit(2, i, table);
+        verifyEdit(3, i, table);
+      }
+    }
+
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  // Test Log Replay with Distributed Replay on.
+  // In distributed log replay, the log splitters ask the master for the
+  // last flushed sequence id for a region. This test would ensure that we
+  // are doing the book-keeping correctly.
+  @Test
+  public void testLogReplayWithDistributedReplay() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+    testLogReplay();
+  }
+
+  /**
+   * When a log roll is about to happen, we do a flush of the regions who will be affected by the
+   * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
+   * test ensures that we do a full-flush in that scenario.
+   * @throws IOException
+   */
+  @Test
+  public void testFlushingWhenLogRolling() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+      FlushLargeStoresPolicy.class.getName());
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 100000);
+
+    // Also, let us try real hard to get a log roll to happen.
+    // Keeping the log roll period to 2s.
+    conf.setLong("hbase.regionserver.logroll.period", 2000);
+    // Keep the block size small so that we fill up the log files very fast.
+    conf.setLong("hbase.regionserver.hlog.blocksize", 6144);
+    int maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
+
+    final int numRegionServers = 4;
+    TEST_UTIL.startMiniCluster(numRegionServers);
+    TEST_UTIL.getHBaseAdmin().createNamespace(
+      NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
+    HTable table = TEST_UTIL.createTable(TABLENAME, families);
+    HTableDescriptor htd = table.getTableDescriptor();
+
+    for (byte[] family : families) {
+      if (!htd.hasFamily(family)) {
+        htd.addFamily(new HColumnDescriptor(family));
+      }
+    }
+
+    HRegion desiredRegion = getRegionWithName(TABLENAME).getFirst();
+    assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
+
+    // Add some edits. Most will be for CF1, some for CF2 and CF3.
+    for (int i = 1; i <= 10000; i++) {
+      table.put(createPut(1, i));
+      if (i <= 200) {
+        table.put(createPut(2, i));
+        table.put(createPut(3, i));
+      }
+      table.flushCommits();
+      // Keep adding until we exceed the number of log files, so that we are
+      // able to trigger the cleaning of old log files.
+      int currentNumLogFiles = ((FSHLog) (desiredRegion.getWAL())).getNumLogFiles();
+      if (currentNumLogFiles > maxLogs) {
+        LOG.info("The number of log files is now: " + currentNumLogFiles
+            + ". Expect a log roll and memstore flush.");
+        break;
+      }
+    }
+    table.close();
+    // Wait for some time till the flush caused by log rolling happens.
+    Thread.sleep(4000);
+
+    // We have artificially created the conditions for a log roll. When a
+    // log roll happens, we should flush all the column families. Testing that
+    // case here.
+
+    // Individual families should have been flushed.
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY1).getMemStoreSize());
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY2).getMemStoreSize());
+    assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize());
+
+    // And of course, the total memstore should also be clean.
+    assertEquals(0, desiredRegion.getMemstoreSize().get());
+
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void doPut(HTableInterface table) throws IOException {
+    // cf1 4B per row, cf2 40B per row and cf3 400B per row
+    byte[] qf = Bytes.toBytes("qf");
+    Random rand = new Random();
+    byte[] value1 = new byte[100];
+    byte[] value2 = new byte[200];
+    byte[] value3 = new byte[400];
+    for (int i = 0; i < 10000; i++) {
+      Put put = new Put(Bytes.toBytes("row-" + i));
+      rand.setSeed(i);
+      rand.nextBytes(value1);
+      rand.nextBytes(value2);
+      rand.nextBytes(value3);
+      put.add(FAMILY1, qf, value1);
+      put.add(FAMILY2, qf, value2);
+      put.add(FAMILY3, qf, value3);
+      table.put(put);
+    }
+  }
+
+  // Under the same write load, small stores should have less store files when
+  // percolumnfamilyflush enabled.
+  @Test
+  public void testCompareStoreFileCount() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
+    conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+      400 * 1024);
+    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
+    conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
+      ConstantSizeRegionSplitPolicy.class.getName());
+
+    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
+    htd.setCompactionEnabled(false);
+    htd.addFamily(new HColumnDescriptor(FAMILY1));
+    htd.addFamily(new HColumnDescriptor(FAMILY2));
+    htd.addFamily(new HColumnDescriptor(FAMILY3));
+
+    LOG.info("==============Test with selective flush disabled===============");
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.getHBaseAdmin().createNamespace(
+      NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
+    TEST_UTIL.getHBaseAdmin().createTable(htd);
+    getRegionWithName(TABLENAME).getFirst();
+    HConnection conn = HConnectionManager.createConnection(conf);
+    HTableInterface table = conn.getTable(TABLENAME);
+    doPut(table);
+    table.close();
+    conn.close();
+
+    HRegion region = getRegionWithName(TABLENAME).getFirst();
+    int cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
+    int cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
+    int cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
+    TEST_UTIL.shutdownMiniCluster();
+
+    LOG.info("==============Test with selective flush enabled===============");
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY,
+      FlushLargeStoresPolicy.class.getName());
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.getHBaseAdmin().createNamespace(
+      NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
+    TEST_UTIL.getHBaseAdmin().createTable(htd);
+    conn = HConnectionManager.createConnection(conf);
+    table = conn.getTable(TABLENAME);
+    doPut(table);
+    table.close();
+    conn.close();
+
+    region = getRegionWithName(TABLENAME).getFirst();
+    int cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
+    int cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
+    int cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
+    TEST_UTIL.shutdownMiniCluster();
+
+    LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount
+        + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", "
+        + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount);
+    LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1
+        + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", "
+        + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1);
+    // small CF will have less store files.
+    assertTrue(cf1StoreFileCount1 < cf1StoreFileCount);
+    assertTrue(cf2StoreFileCount1 < cf2StoreFileCount);
+  }
+
+  public static void main(String[] args) throws Exception {
+    int numRegions = Integer.parseInt(args[0]);
+    long numRows = Long.parseLong(args[1]);
+
+    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
+    htd.setMaxFileSize(10L * 1024 * 1024 * 1024);
+    htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
+    htd.addFamily(new HColumnDescriptor(FAMILY1));
+    htd.addFamily(new HColumnDescriptor(FAMILY2));
+    htd.addFamily(new HColumnDescriptor(FAMILY3));
+
+    Configuration conf = HBaseConfiguration.create();
+    HConnection conn = HConnectionManager.createConnection(conf);
+    HBaseAdmin admin = new HBaseAdmin(conn);
+    if (admin.tableExists(TABLENAME)) {
+      admin.disableTable(TABLENAME);
+      admin.deleteTable(TABLENAME);
+    }
+    if (numRegions >= 3) {
+      byte[] startKey = new byte[16];
+      byte[] endKey = new byte[16];
+      Arrays.fill(endKey, (byte) 0xFF);
+      admin.createTable(htd, startKey, endKey, numRegions);
+    } else {
+      admin.createTable(htd);
+    }
+    admin.close();
+
+    HTableInterface table = conn.getTable(TABLENAME);
+    byte[] qf = Bytes.toBytes("qf");
+    Random rand = new Random();
+    byte[] value1 = new byte[16];
+    byte[] value2 = new byte[256];
+    byte[] value3 = new byte[4096];
+    for (long i = 0; i < numRows; i++) {
+      Put put = new Put(Hashing.md5().hashLong(i).asBytes());
+      rand.setSeed(i);
+      rand.nextBytes(value1);
+      rand.nextBytes(value2);
+      rand.nextBytes(value3);
+      put.add(FAMILY1, qf, value1);
+      put.add(FAMILY2, qf, value2);
+      put.add(FAMILY3, qf, value3);
+      table.put(put);
+      if (i % 10000 == 0) {
+        LOG.info(i + " rows put");
+      }
+    }
+    table.close();
+    conn.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 669060c..8c152fd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -30,6 +30,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -42,8 +43,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -152,18 +153,15 @@ public class TestFSHLog {
     }
   }
 
-  protected void addEdits(WAL log, HRegionInfo hri, TableName tableName,
-                        int times, AtomicLong sequenceId) throws IOException {
-    HTableDescriptor htd = new HTableDescriptor();
-    htd.addFamily(new HColumnDescriptor("row"));
-
-    final byte [] row = Bytes.toBytes("row");
+  protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times,
+      AtomicLong sequenceId) throws IOException {
+    final byte[] row = Bytes.toBytes("row");
     for (int i = 0; i < times; i++) {
       long timestamp = System.currentTimeMillis();
       WALEdit cols = new WALEdit();
       cols.add(new KeyValue(row, row, row, timestamp, row));
-      log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols,
-          sequenceId, true, null);
+      log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp),
+        cols, sequenceId, true, null);
     }
     log.sync();
   }
@@ -173,8 +171,8 @@ public class TestFSHLog {
    * @param wal
    * @param regionEncodedName
    */
-  protected void flushRegion(WAL wal, byte[] regionEncodedName) {
-    wal.startCacheFlush(regionEncodedName);
+  protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
+    wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
     wal.completeCacheFlush(regionEncodedName);
   }
 
@@ -248,10 +246,14 @@ public class TestFSHLog {
     conf1.setInt("hbase.regionserver.maxlogs", 1);
     FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(),
         HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
-    TableName t1 = TableName.valueOf("t1");
-    TableName t2 = TableName.valueOf("t2");
-    HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
-    HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    HTableDescriptor t1 =
+        new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
+    HTableDescriptor t2 =
+        new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
+    HRegionInfo hri1 =
+        new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    HRegionInfo hri2 =
+        new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
     // variables to mock region sequenceIds
     final AtomicLong sequenceId1 = new AtomicLong(1);
     final AtomicLong sequenceId2 = new AtomicLong(1);
@@ -278,12 +280,12 @@ public class TestFSHLog {
       assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
       // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
       // remain.
-      flushRegion(wal, hri1.getEncodedNameAsBytes());
+      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
       wal.rollWriter();
       // only one wal should remain now (that is for the second region).
       assertEquals(1, wal.getNumRolledLogFiles());
       // flush the second region
-      flushRegion(wal, hri2.getEncodedNameAsBytes());
+      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
       wal.rollWriter(true);
       // no wal should remain now.
       assertEquals(0, wal.getNumRolledLogFiles());
@@ -300,14 +302,14 @@ public class TestFSHLog {
       regionsToFlush = wal.findRegionsToForceFlush();
       assertEquals(2, regionsToFlush.length);
       // flush both regions
-      flushRegion(wal, hri1.getEncodedNameAsBytes());
-      flushRegion(wal, hri2.getEncodedNameAsBytes());
+      flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
+      flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
       wal.rollWriter(true);
       assertEquals(0, wal.getNumRolledLogFiles());
       // Add an edit to region1, and roll the wal.
       addEdits(wal, hri1, t1, 2, sequenceId1);
       // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
-      wal.startCacheFlush(hri1.getEncodedNameAsBytes());
+      wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
       wal.rollWriter();
       wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
       assertEquals(1, wal.getNumRolledLogFiles());

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 3f551e4..b6e7c02 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -27,7 +27,10 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -786,13 +789,15 @@ public class TestWALReplay {
 
     // Add 1k to each family.
     final int countPerFamily = 1000;
+    Set<byte[]> familyNames = new HashSet<byte[]>();
     for (HColumnDescriptor hcd: htd.getFamilies()) {
       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
           ee, wal, htd, sequenceId);
+      familyNames.add(hcd.getName());
     }
 
     // Add a cache flush, shouldn't have any effect
-    wal.startCacheFlush(regionName);
+    wal.startCacheFlush(regionName, familyNames);
     wal.completeCacheFlush(regionName);
 
     // Add an edit to another family, should be skipped.
@@ -832,11 +837,11 @@ public class TestWALReplay {
           final HRegion region =
               new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
             @Override
-            protected FlushResult internalFlushcache(
-                final WAL wal, final long myseqid, MonitoredTask status)
+            protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
+                Collection<Store> storesToFlush, MonitoredTask status)
             throws IOException {
               LOG.info("InternalFlushCache Invoked");
-              FlushResult fs = super.internalFlushcache(wal, myseqid,
+              FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush,
                   Mockito.mock(MonitoredTask.class));
               flushcount.incrementAndGet();
               return fs;
@@ -958,16 +963,16 @@ public class TestWALReplay {
     private HRegion r;
 
     @Override
-    public void requestFlush(HRegion region) {
+    public void requestFlush(HRegion region, boolean forceFlushAllStores) {
       try {
-        r.flushcache();
+        r.flushcache(forceFlushAllStores);
       } catch (IOException e) {
         throw new RuntimeException("Exception flushing", e);
       }
     }
 
     @Override
-    public void requestDelayedFlush(HRegion region, long when) {
+    public void requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) {
       // TODO Auto-generated method stub
 
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
index 28cd849..2e9d312 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
@@ -146,18 +146,15 @@ public class TestDefaultWALProvider {
   }
 
 
-  protected void addEdits(WAL log, HRegionInfo hri, TableName tableName,
+  protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd,
                         int times, AtomicLong sequenceId) throws IOException {
-    HTableDescriptor htd = new HTableDescriptor();
-    htd.addFamily(new HColumnDescriptor("row"));
-
-    final byte [] row = Bytes.toBytes("row");
+    final byte[] row = Bytes.toBytes("row");
     for (int i = 0; i < times; i++) {
       long timestamp = System.currentTimeMillis();
       WALEdit cols = new WALEdit();
       cols.add(new KeyValue(row, row, row, timestamp, row));
-      log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols,
-          sequenceId, true, null);
+      log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp),
+        cols, sequenceId, true, null);
     }
     log.sync();
   }
@@ -174,8 +171,8 @@ public class TestDefaultWALProvider {
    * @param wal
    * @param regionEncodedName
    */
-  protected void flushRegion(WAL wal, byte[] regionEncodedName) {
-    wal.startCacheFlush(regionEncodedName);
+  protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
+    wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
     wal.completeCacheFlush(regionEncodedName);
   }
 
@@ -184,45 +181,47 @@ public class TestDefaultWALProvider {
   @Test
   public void testLogCleaning() throws Exception {
     LOG.info("testLogCleaning");
-    final TableName tableName =
-        TableName.valueOf("testLogCleaning");
-    final TableName tableName2 =
-        TableName.valueOf("testLogCleaning2");
+    final HTableDescriptor htd =
+        new HTableDescriptor(TableName.valueOf("testLogCleaning")).addFamily(new HColumnDescriptor(
+            "row"));
+    final HTableDescriptor htd2 =
+        new HTableDescriptor(TableName.valueOf("testLogCleaning2"))
+            .addFamily(new HColumnDescriptor("row"));
     final Configuration localConf = new Configuration(conf);
     localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
     final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
     final AtomicLong sequenceId = new AtomicLong(1);
     try {
-      HRegionInfo hri = new HRegionInfo(tableName,
+      HRegionInfo hri = new HRegionInfo(htd.getTableName(),
           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
-      HRegionInfo hri2 = new HRegionInfo(tableName2,
+      HRegionInfo hri2 = new HRegionInfo(htd2.getTableName(),
           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
       // we want to mix edits from regions, so pick our own identifier.
       final WAL log = wals.getWAL(UNSPECIFIED_REGION);
 
       // Add a single edit and make sure that rolling won't remove the file
       // Before HBASE-3198 it used to delete it
-      addEdits(log, hri, tableName, 1, sequenceId);
+      addEdits(log, hri, htd, 1, sequenceId);
       log.rollWriter();
       assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log));
 
       // See if there's anything wrong with more than 1 edit
-      addEdits(log, hri, tableName, 2, sequenceId);
+      addEdits(log, hri, htd, 2, sequenceId);
       log.rollWriter();
       assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
 
       // Now mix edits from 2 regions, still no flushing
-      addEdits(log, hri, tableName, 1, sequenceId);
-      addEdits(log, hri2, tableName2, 1, sequenceId);
-      addEdits(log, hri, tableName, 1, sequenceId);
-      addEdits(log, hri2, tableName2, 1, sequenceId);
+      addEdits(log, hri, htd, 1, sequenceId);
+      addEdits(log, hri2, htd2, 1, sequenceId);
+      addEdits(log, hri, htd, 1, sequenceId);
+      addEdits(log, hri2, htd2, 1, sequenceId);
       log.rollWriter();
       assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log));
 
       // Flush the first region, we expect to see the first two files getting
       // archived. We need to append something or writer won't be rolled.
-      addEdits(log, hri2, tableName2, 1, sequenceId);
-      log.startCacheFlush(hri.getEncodedNameAsBytes());
+      addEdits(log, hri2, htd2, 1, sequenceId);
+      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
       log.completeCacheFlush(hri.getEncodedNameAsBytes());
       log.rollWriter();
       assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
@@ -230,8 +229,8 @@ public class TestDefaultWALProvider {
       // Flush the second region, which removes all the remaining output files
       // since the oldest was completely flushed and the two others only contain
       // flush information
-      addEdits(log, hri2, tableName2, 1, sequenceId);
-      log.startCacheFlush(hri2.getEncodedNameAsBytes());
+      addEdits(log, hri2, htd2, 1, sequenceId);
+      log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys());
       log.completeCacheFlush(hri2.getEncodedNameAsBytes());
       log.rollWriter();
       assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(log));
@@ -254,21 +253,25 @@ public class TestDefaultWALProvider {
    * <p>
    * @throws IOException
    */
-  @Test 
+  @Test
   public void testWALArchiving() throws IOException {
     LOG.debug("testWALArchiving");
-    TableName table1 = TableName.valueOf("t1");
-    TableName table2 = TableName.valueOf("t2");
+    HTableDescriptor table1 =
+        new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
+    HTableDescriptor table2 =
+        new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
     final Configuration localConf = new Configuration(conf);
     localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
     final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
     try {
       final WAL wal = wals.getWAL(UNSPECIFIED_REGION);
       assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
-      HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW,
-          HConstants.EMPTY_END_ROW);
-      HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW,
-          HConstants.EMPTY_END_ROW);
+      HRegionInfo hri1 =
+          new HRegionInfo(table1.getTableName(), HConstants.EMPTY_START_ROW,
+              HConstants.EMPTY_END_ROW);
+      HRegionInfo hri2 =
+          new HRegionInfo(table2.getTableName(), HConstants.EMPTY_START_ROW,
+              HConstants.EMPTY_END_ROW);
       // ensure that we don't split the regions.
       hri1.setSplit(false);
       hri2.setSplit(false);
@@ -287,7 +290,7 @@ public class TestDefaultWALProvider {
       assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
       // add a waledit to table1, and flush the region.
       addEdits(wal, hri1, table1, 3, sequenceId1);
-      flushRegion(wal, hri1.getEncodedNameAsBytes());
+      flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys());
       // roll log; all old logs should be archived.
       wal.rollWriter();
       assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
@@ -301,7 +304,7 @@ public class TestDefaultWALProvider {
       assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
       // add edits for table2, and flush hri1.
       addEdits(wal, hri2, table2, 2, sequenceId2);
-      flushRegion(wal, hri1.getEncodedNameAsBytes());
+      flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getFamiliesKeys());
       // the log : region-sequenceId map is
       // log1: region2 (unflushed)
       // log2: region1 (flushed)
@@ -311,7 +314,7 @@ public class TestDefaultWALProvider {
       assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
       // flush region2, and all logs should be archived.
       addEdits(wal, hri2, table2, 2, sequenceId2);
-      flushRegion(wal, hri2.getEncodedNameAsBytes());
+      flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys());
       wal.rollWriter();
       assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index b163bd5..1530b6b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -479,8 +479,9 @@ public class TestWALFactory {
   @Test
   public void testEditAdd() throws IOException {
     final int COL_COUNT = 10;
-    final TableName tableName =
-        TableName.valueOf("tablename");
+    final HTableDescriptor htd =
+        new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
+            "column"));
     final byte [] row = Bytes.toBytes("row");
     WAL.Reader reader = null;
     try {
@@ -495,16 +496,15 @@ public class TestWALFactory {
             Bytes.toBytes(Integer.toString(i)),
           timestamp, new byte[] { (byte)(i + '0') }));
       }
-      HRegionInfo info = new HRegionInfo(tableName,
+      HRegionInfo info = new HRegionInfo(htd.getTableName(),
         row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
-      HTableDescriptor htd = new HTableDescriptor();
-      htd.addFamily(new HColumnDescriptor("column"));
       final WAL log = wals.getWAL(info.getEncodedNameAsBytes());
 
-      final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
-          System.currentTimeMillis()), cols, sequenceId, true, null);
+      final long txid = log.append(htd, info,
+        new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()),
+        cols, sequenceId, true, null);
       log.sync(txid);
-      log.startCacheFlush(info.getEncodedNameAsBytes());
+      log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys());
       log.completeCacheFlush(info.getEncodedNameAsBytes());
       log.shutdown();
       Path filename = DefaultWALProvider.getCurrentFileName(log);
@@ -518,7 +518,7 @@ public class TestWALFactory {
         WALKey key = entry.getKey();
         WALEdit val = entry.getEdit();
         assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
-        assertTrue(tableName.equals(key.getTablename()));
+        assertTrue(htd.getTableName().equals(key.getTablename()));
         Cell cell = val.getCells().get(0);
         assertTrue(Bytes.equals(row, cell.getRow()));
         assertEquals((byte)(i + '0'), cell.getValue()[0]);
@@ -537,8 +537,9 @@ public class TestWALFactory {
   @Test
   public void testAppend() throws IOException {
     final int COL_COUNT = 10;
-    final TableName tableName =
-        TableName.valueOf("tablename");
+    final HTableDescriptor htd =
+        new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
+            "column"));
     final byte [] row = Bytes.toBytes("row");
     WAL.Reader reader = null;
     final AtomicLong sequenceId = new AtomicLong(1);
@@ -552,15 +553,14 @@ public class TestWALFactory {
           Bytes.toBytes(Integer.toString(i)),
           timestamp, new byte[] { (byte)(i + '0') }));
       }
-      HRegionInfo hri = new HRegionInfo(tableName,
+      HRegionInfo hri = new HRegionInfo(htd.getTableName(),
           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
-      HTableDescriptor htd = new HTableDescriptor();
-      htd.addFamily(new HColumnDescriptor("column"));
       final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
-      final long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
-          System.currentTimeMillis()), cols, sequenceId, true, null);
+      final long txid = log.append(htd, hri,
+        new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()),
+        cols, sequenceId, true, null);
       log.sync(txid);
-      log.startCacheFlush(hri.getEncodedNameAsBytes());
+      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
       log.completeCacheFlush(hri.getEncodedNameAsBytes());
       log.shutdown();
       Path filename = DefaultWALProvider.getCurrentFileName(log);
@@ -572,7 +572,7 @@ public class TestWALFactory {
       for (Cell val : entry.getEdit().getCells()) {
         assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
           entry.getKey().getEncodedRegionName()));
-        assertTrue(tableName.equals(entry.getKey().getTablename()));
+        assertTrue(htd.getTableName().equals(entry.getKey().getTablename()));
         assertTrue(Bytes.equals(row, val.getRow()));
         assertEquals((byte)(idx + '0'), val.getValue()[0]);
         System.out.println(entry.getKey() + " " + val);


Mime
View raw message