hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1524512 [1/2] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ main/resources/ test/java/org/apache/hadoop/hbase/ test/...
Date Wed, 18 Sep 2013 18:18:24 GMT
Author: liyin
Date: Wed Sep 18 18:18:23 2013
New Revision: 1524512

URL: http://svn.apache.org/r1524512
Log:
[HBASE-3149] Per CF Flushing of the Memstores (Reloaded)

Author: gauravm

Summary:
Creating a new diff for the per-CF flushing of the memstore.

The idea is, if the memstore has hit the the threshold, we can flush only the families which occupy a large chunk of the memstore. This will avoid creating lots of smaller files, and deamortize our IO when flushing.

For eg. if the memstore flush limit is 128 MB, we can set a limit for flushing of individual stores to 32 MB. When the total memstore size hits 128 MB, we will flush only those column families which are above 32 MB. If we don't find any such column family, we flush all the column families.

Test Plan:
- Unit Tests

- Tested this on a couple of machines on my mini-shadow.

- Plotted graphs of the size of memstores of all the column families in a
particular region over time:
x-axis is time, y-axis is the size (in MB). The black line is the total size
of memstores. The other lines are for specific column families When this line
hits 128 MB, a flush happens.

Graph 1 - Without the change (https://www.facebook.com/pxlcld/kwtv )
As you can see, when the black line hits 128 MB, it falls to 0, and so do all the other lines. Then they all rise up again at the same time, and fall back.

Graph 2 - With the change (https://www.facebook.com/pxlcld/kwts )
When the black line hits 128 MB, only the column families which are above 32 MB fall down to zero, while other column families retain their size. After a couple of flushes, we have all the families of the same size, but none above 32 MB. In that case, all of them are flushed.

Notice once we have flushed the first time, we do four flushes (128/32 = 4) each of which takes 1/4th the time of the first flush, since we are flushing only 32 MB (1/4th of 128 MB) in each of the smaller flush.

Reviewers: aaiyer, liyintang, manukranthk

Reviewed By: aaiyer

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D947028

Task ID: 2220663

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ReadOnlyStore.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.89-fb/src/main/resources/hbase-default.xml
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed Sep 18 18:18:23 2013
@@ -350,6 +350,21 @@ public final class HConstants {
   public static final String HREGION_MEMSTORE_FLUSH_SIZE =
       "hbase.hregion.memstore.flush.size";
 
+  /** Conf key for enabling Per Column Family flushing of memstores */
+  public static final String HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH =
+      "hbase.hregion.memstore.percolumnfamilyflush.enabled";
+
+  /** Default value for the Per Column Family flush knob */
+  public static final Boolean DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH =
+    false;
+
+  /**
+   * If Per Column Family flushing is enabled, this is the minimum size
+   * at which a column family's memstore is flushed.
+   */
+  public static final String HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE =
+      "hbase.hregion.memstore.percolumnfamilyflush.flush.size";
+
   public static final String HREGION_MEMSTORE_BLOCK_MULTIPLIER =
       "hbase.hregion.memstore.block.multiplier";
   public static final String HREGION_MEMSTORE_WAIT_ON_BLOCK =

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java Wed Sep 18 18:18:23 2013
@@ -99,6 +99,9 @@ public class HTableDescriptor implements
 
   public static final long DEFAULT_MEMSTORE_FLUSH_SIZE = 1024*1024*64L;
 
+  public static final long DEFAULT_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE =
+    1024*1024*16L;
+
   public static final long DEFAULT_MAX_FILESIZE = 1024*1024*256L;
 
   public static final boolean DEFAULT_DEFERRED_LOG_FLUSH = true;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java Wed Sep 18 18:18:23 2013
@@ -31,6 +31,11 @@ public interface FlushRequester {
    * Tell the listener the cache needs to be flushed.
    *
    * @param region the HRegion requesting the cache flush
+   * @param selectiveFlushRequest is this a selective flush request? This means
+   *                              that if some column families are dominating
+   *                              the memstore size, only those column families
+   *                              would be flushed.
+   *
    */
-  void request(HRegion region);
-}
\ No newline at end of file
+  void request(HRegion region, boolean selectiveFlushRequest);
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Sep 18 18:18:23 2013
@@ -30,7 +30,9 @@ import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -269,13 +271,20 @@ public class HRegion implements HeapSize
 
   final long timestampTooNew;
   final long memstoreFlushSize;
-  private volatile long lastFlushTime; // start time for the last successful flush.
+  // The maximum size a column family's memstore can grow up to,
+  // before being flushed.
+  final long columnfamilyMemstoreFlushSize;
+  // Last flush time for each Store. Useful when we are flushing for each column
+  private Map<Store, Long> lastStoreFlushTimeMap
+    = new ConcurrentHashMap<Store, Long>();
   private List<Pair<Long,Long>> recentFlushes
     = new ArrayList<Pair<Long,Long>>();
   final FlushRequester flushListener;
   private final long blockingMemStoreSize;
   private final boolean waitOnMemstoreBlock;
   final long threadWakeFrequency;
+   // Selective flushing of Column Families which dominate the memstore?
+  final boolean perColumnFamilyFlushEnabled;
   // Used to guard splits and closes
   private final ReentrantReadWriteLock splitsAndClosesLock =
     new ReentrantReadWriteLock();
@@ -432,10 +441,12 @@ public class HRegion implements HeapSize
     this.fs = null;
     this.timestampTooNew = HConstants.LATEST_TIMESTAMP;
     this.memstoreFlushSize = 0L;
+    this.columnfamilyMemstoreFlushSize = 0L;
     this.log = null;
     this.regiondir = null;
     this.regionInfo = null;
     this.threadWakeFrequency = 0L;
+    this.perColumnFamilyFlushEnabled = false;
     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
     this.openDate = 0;
   }
@@ -494,6 +505,10 @@ public class HRegion implements HeapSize
     this.flushListener = flushListener;
     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY,
         10 * 1000);
+    this.perColumnFamilyFlushEnabled = conf.getBoolean(
+            HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH,
+            HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH);
+    LOG.debug("Per Column Family Flushing: " + perColumnFamilyFlushEnabled);
     String encodedNameStr = this.regionInfo.getEncodedName();
     this.regiondir = new Path(tableDir, encodedNameStr);
     if (LOG.isDebugEnabled()) {
@@ -518,6 +533,9 @@ public class HRegion implements HeapSize
     }
     this.disableWAL = regionInfo.getTableDesc().isWALDisabled();
     this.memstoreFlushSize = flushSize;
+    this.columnfamilyMemstoreFlushSize = conf.getLong(
+            HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE,
+            HTableDescriptor.DEFAULT_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE);
     this.blockingMemStoreSize = (long)(this.memstoreFlushSize *
       conf.getFloat(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 2));
     this.waitOnMemstoreBlock =
@@ -640,7 +658,11 @@ public class HRegion implements HeapSize
       }
 
       this.writestate.compacting = 0;
-      this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
+      long startTime = EnvironmentEdgeManager.currentTimeMillis();
+      for (Store store : stores.values()) {
+        this.lastStoreFlushTimeMap.put(store, startTime);
+      }
+
       // Use maximum of log sequenceid or that which was found in stores
       // (particularly if no recovered edits, seqid will be -1).
       long nextSeqid = maxSeqId + 1;
@@ -954,9 +976,22 @@ public class HRegion implements HeapSize
     return this.fs;
   }
 
-  /** @return the last time the region was flushed */
-  public long getLastFlushTime() {
-    return this.lastFlushTime;
+  /**
+   * @return Returns the earliest time a store in the region
+   *         was flushed. All other stores in the region would
+   *         have been flushed either at, or after this time.
+   */
+  public long getMinFlushTimeForAllStores() {
+    return Collections.min(this.lastStoreFlushTimeMap.values());
+  }
+
+  /**
+   * Returns the last time a particular store was flushed
+   * @param store The store in question
+   * @return The last time this store was flushed
+   */
+  public long getLastStoreFlushTime(Store store) {
+    return this.lastStoreFlushTimeMap.get(store);
   }
 
   /** @return how info about the last flushes <time, size> */
@@ -1245,6 +1280,16 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * Flush the cache, while disabling selective flushing.
+   *
+   * @return
+   * @throws IOException
+   */
+  public boolean flushcache() throws IOException {
+    return flushcache(false);
+  }
+
+  /**
    * Flush the cache.
    *
    * When this method is called the cache will be flushed unless:
@@ -1258,11 +1303,22 @@ public class HRegion implements HeapSize
    * <p>This method may block for some time, so it should not be called from a
    * time-sensitive thread.
    *
+   * @param selectiveFlushRequest If true, selectively flush column families
+   *                              which dominate the memstore size, provided it
+   *                              is enabled in the configuration.
+   *
    * @return true if cache was flushed
    *
    * @throws IOException general io exceptions
    */
-  public boolean flushcache() throws IOException {
+  public boolean flushcache(boolean selectiveFlushRequest) throws IOException {
+    // If a selective flush was requested, but the per-column family switch is
+    // off, we cannot do a selective flush.
+    if (selectiveFlushRequest && !perColumnFamilyFlushEnabled) {
+      LOG.debug("Disabling selective flushing of Column Families' memstores.");
+      selectiveFlushRequest = false;
+    }
+
     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
     try {
       if (this.closed.get()) {
@@ -1284,12 +1340,41 @@ public class HRegion implements HeapSize
           return false;
         }
       }
+      Collection<Store> specificStoresToFlush = null;
       try {
         // Prevent splits and closes
         status.setStatus("Acquiring readlock on region");
         splitsAndClosesLock.readLock().lock();
         try {
-          boolean result = internalFlushcache(status);
+          // We now have to flush the memstore since it has
+          // reached the threshold, however, we might not need
+          // to flush the entire memstore. If there are certain
+          // column families that are dominating the memstore size,
+          // we will flush just those. The second behavior only
+          // happens when selectiveFlushRequest is true.
+          boolean result;
+
+          // If it is okay to flush the memstore by selecting the
+          // column families which dominate the size, we are going
+          // to populate the specificStoresToFlush set.
+          if (selectiveFlushRequest) {
+            specificStoresToFlush = new HashSet<Store>();
+            for (Store store : stores.values()) {
+              if (shouldFlushStore(store)) {
+                specificStoresToFlush.add(store);
+                LOG.debug("Column Family: " + store.getColumnFamilyName() +
+                          " of region " + this + " will be flushed");
+              }
+            }
+            // Didn't find any CFs which were above the threshold for selection.
+            if (specificStoresToFlush.size() == 0) {
+              LOG.debug("Since none of the CFs were above the size, flushing all.");
+              specificStoresToFlush = stores.values();
+            }
+          } else {
+            specificStoresToFlush = stores.values();
+          }
+          result = internalFlushcache(specificStoresToFlush, status);
           status.markComplete("Flush successful");
           return result;
         } finally {
@@ -1342,33 +1427,65 @@ public class HRegion implements HeapSize
    * @throws IOException general io exceptions
    */
   protected boolean internalFlushcache(MonitoredTask status)
-    throws IOException {
-    return internalFlushcache(this.log, -1, status);
+          throws IOException {
+    return internalFlushcache(this.log, -1L, stores.values(), status);
+  }
+
+  /**
+   * See {@link #internalFlushcache(org.apache.hadoop.hbase.monitoring.MonitoredTask)}
+   * @param storesToFlush The specific stores to flush.
+   * @param status
+   * @return
+   * @throws IOException
+   */
+  protected boolean internalFlushcache(Collection<Store> storesToFlush,
+                                       MonitoredTask status)
+          throws IOException {
+    return internalFlushcache(this.log, -1L, storesToFlush, status);
+  }
+
+  protected boolean internalFlushcache(final HLog wal, final long myseqid,
+                                       MonitoredTask status)
+          throws IOException {
+    return internalFlushcache(wal, myseqid, stores.values(), status);
   }
 
   /**
    * @param wal Null if we're NOT to go via hlog/wal.
    * @param myseqid The seqid to use if <code>wal</code> is null writing out
    * flush file.
+   * @param storesToFlush The list of stores to flush.
    * @param status
    * @return true if the region needs compacting
    * @throws IOException
    * @see {@link #internalFlushcache()}
    */
   protected boolean internalFlushcache(final HLog wal, final long myseqid,
-      MonitoredTask status) throws IOException {
+      Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
     // If nothing to flush, return and avoid logging start/stop flush.
     if (this.memstoreSize.get() <= 0) {
-      // Record latest flush time
-      this.lastFlushTime = startTime;
+      // Since there is nothing to flush, we will reset the flush times for all
+      // the stores.
+      for (Store store : stores.values()) {
+        lastStoreFlushTimeMap.put(store, startTime);
+      }
       return false;
     }
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Started memstore flush for region " + this +
-        ". Current region memstore size " +
+      LOG.debug("Started a " +
+        "memstore flush for region " + this + ", with current memstore size: " +
         StringUtils.humanReadableInt(this.memstoreSize.get()) +
+        ", and " + storesToFlush.size() + "/" + stores.size() +
+        " column families' memstores are being flushed." +
         ((wal != null)? "": "; wal is null, using passed myseqid=" + myseqid));
+      for (Store store : storesToFlush) {
+        LOG.debug("Flushing Column Family: " + store.getColumnFamilyName() +
+                  " which was occupying " +
+                  StringUtils.humanReadableInt(store.getMemStoreSize()) +
+                  " of memstore.");
+      }
     }
 
     // Stop updates while we snapshot the memstore of all stores. We only have
@@ -1389,17 +1506,39 @@ public class HRegion implements HeapSize
     this.updatesLock.writeLock().lock();
     t0 = EnvironmentEdgeManager.currentTimeMillis();
     status.setStatus("Preparing to flush by snapshotting stores");
-    final long currentMemStoreSize = this.memstoreSize.get();
+    long totalMemstoreSizeOfFlushableStores = 0;
+
+    Set<Store> storesNotToFlush = new HashSet<Store>(stores.values());
+    storesNotToFlush.removeAll(storesToFlush);
+
+    // Calculate the smallest LSN numbers for edits in the stores that will
+    // be flushed and the ones which won't be. This will be used to populate
+    // the firstSeqWrittenInCurrentMemstore and
+    // firstSeqWrittenInSnapshotMemstore maps correctly.
+    long firstSeqIdInStoresToFlush = Long.MAX_VALUE;
+    for (Store store : storesToFlush) {
+      firstSeqIdInStoresToFlush = Math.min(firstSeqIdInStoresToFlush,
+        store.getSmallestSeqNumberInMemstore());
+    }
+
+    long firstSeqIdInStoresNotToFlush = Long.MAX_VALUE;
+    for (Store store : storesNotToFlush) {
+      firstSeqIdInStoresNotToFlush = Math.min(firstSeqIdInStoresNotToFlush,
+        store.getSmallestSeqNumberInMemstore());
+    }
+
     //copy the array of per column family memstore values
     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(
-        stores.size());
+        storesToFlush.size());
     try {
       if (wal != null) {
-        sequenceId = wal.startCacheFlush(this.regionInfo.getRegionName());
+        sequenceId = wal.startCacheFlush(this.regionInfo.getRegionName(),
+          firstSeqIdInStoresToFlush, firstSeqIdInStoresNotToFlush);
       } else {
         sequenceId = myseqid;
       }
-      for (Store s : stores.values()) {
+      for (Store s : storesToFlush) {
+        totalMemstoreSizeOfFlushableStores += s.getMemStoreSize();
         storeFlushers.add(s.getStoreFlusher(sequenceId));
       }
 
@@ -1476,7 +1615,7 @@ public class HRegion implements HeapSize
       storeFlushers.clear();
 
       // Set down the memstore size by amount of flush.
-      this.incMemoryUsage(-currentMemStoreSize);
+      this.incMemoryUsage(-totalMemstoreSizeOfFlushableStores);
     } catch (Throwable t) {
       // An exception here means that the snapshot was not persisted.
       // The hlog needs to be replayed so its content is restored to memstore.
@@ -1495,7 +1634,9 @@ public class HRegion implements HeapSize
     // If we get to here, the HStores have been written. If we get an
     // error in completeCacheFlush it will release the lock it is holding
     // update lastFlushTime after the HStores have been written.
-    this.lastFlushTime = startTime;
+    for (Store store : storesToFlush) {
+      this.lastStoreFlushTimeMap.put(store, startTime);
+    }
 
     // B.  Write a FLUSHCACHE-COMPLETE message to the log.
     //     This tells future readers that the HStores were emitted correctly,
@@ -1522,13 +1663,14 @@ public class HRegion implements HeapSize
     long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
     if (LOG.isDebugEnabled()) {
       LOG.info("Finished memstore flush of ~" +
-        StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
-        this + " in " + time + "ms, sequence id=" + sequenceId +
-        ", compaction requested=" + compactionRequested +
+        StringUtils.humanReadableInt(totalMemstoreSizeOfFlushableStores) +
+        " for region " + this + " in " + time + "ms, sequence id=" +
+        sequenceId + ", compaction requested=" + compactionRequested +
         ((wal == null)? "; wal=null": ""));
       status.setStatus("Finished memstore flush");
     }
-    this.recentFlushes.add(new Pair<Long,Long>(time/1000,currentMemStoreSize));
+    this.recentFlushes.add(
+            new Pair<Long,Long>(time/1000, totalMemstoreSizeOfFlushableStores));
 
     return compactionRequested;
   }
@@ -1785,6 +1927,7 @@ public class HRegion implements HeapSize
     try {
       prepareDeleteTimestamps(familyMap, byteNow);
 
+      long seqNum = -1;
       if (!this.disableWAL && writeToWAL) {
         // write/sync to WAL should happen before we touch memstore.
         //
@@ -1796,12 +1939,12 @@ public class HRegion implements HeapSize
         // single WALEdit.
         WALEdit walEdit = new WALEdit();
         addFamilyMapToWALEdit(familyMap, walEdit);
-        this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
-            walEdit, now);
+        seqNum = this.log.append(regionInfo,
+                regionInfo.getTableDesc().getName(), walEdit, now);
       }
 
       // Now make changes to the memstore.
-      long addedSize = applyFamilyMapToMemstore(familyMap);
+      long addedSize = applyFamilyMapToMemstore(familyMap, seqNum);
       flush = isFlushSize(this.incMemoryUsage(addedSize));
     } finally {
       this.updatesLock.readLock().unlock();
@@ -2056,6 +2199,7 @@ public class HRegion implements HeapSize
         }
       }
 
+      long seqNum = -1;
       // ------------------------------------
       // STEP 3. Write to WAL
       // ----------------------------------
@@ -2071,8 +2215,9 @@ public class HRegion implements HeapSize
         }
 
         // Append the edit to WAL
-        this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
-            walEdit, now);
+        seqNum = this.log.append(regionInfo,
+                regionInfo.getTableDesc().getName(),
+                walEdit, now);
       }
 
       // ------------------------------------
@@ -2084,7 +2229,7 @@ public class HRegion implements HeapSize
         if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
 
         Mutation op = batchOp.operations[i].getFirst();
-        addedSize += applyFamilyMapToMemstore(op.getFamilyMap());
+        addedSize += applyFamilyMapToMemstore(op.getFamilyMap(), seqNum);
         batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
       }
       success = true;
@@ -2292,14 +2437,15 @@ public class HRegion implements HeapSize
       // If order is reversed, i.e. we write to memstore first, and
       // for some reason fail to write/sync to commit log, the memstore
       // will contain uncommitted transactions.
+      long seqNum = -1;
       if (!this.disableWAL && writeToWAL) {
         WALEdit walEdit = new WALEdit();
         addFamilyMapToWALEdit(familyMap, walEdit);
-        this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
-           walEdit, now);
+        seqNum = this.log.append(regionInfo,
+                regionInfo.getTableDesc().getName(), walEdit, now);
       }
 
-      long addedSize = applyFamilyMapToMemstore(familyMap);
+      long addedSize = applyFamilyMapToMemstore(familyMap, seqNum);
       flush = isFlushSize(this.incMemoryUsage(addedSize));
     } finally {
       this.updatesLock.readLock().unlock();
@@ -2323,15 +2469,20 @@ public class HRegion implements HeapSize
    * should already have locked updatesLock.readLock(). This also does
    * <b>not</b> check the families for validity.
    *
+   * @param familyMap
+   * @param seqNum The log sequence number associated with the edits.
+   *
    * @return the additional memory usage of the memstore caused by the
    * new entries.
    */
-  private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap) {
-    return applyFamilyMapToMemstore(familyMap, null);
+  private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap,
+                                        long seqNum) {
+    return applyFamilyMapToMemstore(familyMap, null, seqNum);
   }
 
   private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap,
-                 MultiVersionConsistencyControl.WriteEntry writeEntryToUse) {
+                 MultiVersionConsistencyControl.WriteEntry writeEntryToUse,
+                 long seqNum) {
     // Increment the rowUpdatedCnt
     this.rowUpdateCnt.incrementAndGet();
     
@@ -2349,7 +2500,7 @@ public class HRegion implements HeapSize
         Store store = getStore(family);
         for (KeyValue kv: edits) {
           kv.setMemstoreTS(w.getWriteNumber());
-          size += store.add(kv);
+          size += store.add(kv, seqNum);
         }
       }
     } finally {
@@ -2424,7 +2575,9 @@ public class HRegion implements HeapSize
       writestate.flushRequested = true;
     }
     // Make request outside of synchronize block; HBASE-818.
-    this.flushListener.request(this);
+    // Request for a selective flush of the memstore, if possible.
+    // This function is called by put(), delete(), etc.
+    this.flushListener.request(this, this.perColumnFamilyFlushEnabled);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Flush requested on " + this);
     }
@@ -2439,6 +2592,15 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * @param store
+   * @return true if the size of the store is above the flush threshold for column families
+   */
+  private boolean shouldFlushStore(Store store) {
+    return (store.getMemStoreSize() > this.columnfamilyMemstoreFlushSize) ?
+            true : false;
+  }
+
+  /**
    * Read the edits log put under this region by wal log splitting process.  Put
    * the recovered edits back up into this region.
    *
@@ -2507,6 +2669,8 @@ public class HRegion implements HeapSize
 
     if (seqid > -1){
       // In case we added some edits to memory, we should flush
+      // We do not want to write to the log again, hence passing the log
+      // parameter as null.
       internalFlushcache(null, seqid, status);
     }
 
@@ -2585,9 +2749,11 @@ public class HRegion implements HeapSize
             // Once we are over the limit, restoreEdit will keep returning true to
             // flush -- but don't flush until we've played all the kvs that make up
             // the WALEdit.
-            flush = restoreEdit(store, kv);
+            flush = restoreEdit(store, kv, key.getLogSeqNum());
             editsCount++;
           }
+          // We do not want to write to the WAL again, and hence setting the WAL
+          // parameter to null.
           if (flush) internalFlushcache(null, currentEditSeqId, status);
 
           // Every 'interval' edits, tell the reporter we're making progress.
@@ -2644,10 +2810,11 @@ public class HRegion implements HeapSize
    * Used by tests
    * @param s Store to add edit too.
    * @param kv KeyValue to add.
+   * @param seqNum The sequence number for the edit.
    * @return True if we should flush.
    */
-  protected boolean restoreEdit(final Store s, final KeyValue kv) {
-    return isFlushSize(this.incMemoryUsage(s.add(kv)));
+  protected boolean restoreEdit(final Store s, final KeyValue kv, long seqNum) {
+    return isFlushSize(this.incMemoryUsage(s.add(kv, seqNum)));
   }
 
   /*
@@ -3536,12 +3703,13 @@ public class HRegion implements HeapSize
 
         // 6. append/sync all edits at once
         // TODO: Do batching as in doMiniBatchPut
-        this.log.append(regionInfo, this.getTableDesc().getName(), walEdit, now);
+        long seqNum = this.log.append(regionInfo, this.getTableDesc().getName(),
+                walEdit, now);
 
         // 7. apply to memstore
         long addedSize = 0;
         for (Mutation m : rm.getMutations()) {
-          addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
+          addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w, seqNum);
         }
         flush = isFlushSize(this.incMemoryUsage(addedSize));
       } finally {
@@ -3616,19 +3784,21 @@ public class HRegion implements HeapSize
           qualifier, EnvironmentEdgeManager.currentTimeMillis(),
           Bytes.toBytes(result));
 
+      long seqNum = -1;
       // now log it:
       if (writeToWAL) {
         long now = EnvironmentEdgeManager.currentTimeMillis();
         WALEdit walEdit = new WALEdit();
         walEdit.add(newKv);
-        this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
+        seqNum = this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
           walEdit, now);
       }
 
       // Now request the ICV to the store, this will set the timestamp
       // appropriately depending on if there is a value in memcache or not.
       // returns the
-      long size = store.updateColumnValue(row, family, qualifier, result);
+      long size = store.updateColumnValue(row, family, qualifier, result,
+                                          seqNum);
 
       size = this.incMemoryUsage(size);
       flush = isFlushSize(size);
@@ -3669,7 +3839,7 @@ public class HRegion implements HeapSize
   public static final long FIXED_OVERHEAD = ClassSize.align(
       (2 * Bytes.SIZEOF_BOOLEAN) +
       (6 * Bytes.SIZEOF_LONG) + 2 * ClassSize.ARRAY +
-      (28 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
+      (29 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Sep 18 18:18:23 2013
@@ -3168,12 +3168,11 @@ public class HRegionServer implements HR
        throw new IllegalArgumentException("No region : " + new String(regionName)
        + " available");
      }
-     if (region.getLastFlushTime() < ifOlderThanTS) region.flushcache();
+     if (region.getMinFlushTimeForAllStores() < ifOlderThanTS) region.flushcache();
    }
 
   /**
-   * Gets last flush time for the given region
-   * @return the last flush time for a region
+   * @return the earliest time a store in the given region was flushed.
    */
   public long getLastFlushTime(byte[] regionName) {
     HRegion region = getOnlineRegion(regionName);
@@ -3181,7 +3180,7 @@ public class HRegionServer implements HR
       throw new IllegalArgumentException("No region : " + new String(regionName)
       + " available");
     }
-    return region.getLastFlushTime();
+    return region.getMinFlushTimeForAllStores();
   }
 
   @Override
@@ -3189,7 +3188,7 @@ public class HRegionServer implements HR
      MapWritable map = new MapWritable();
      for (HRegion region: this.getOnlineRegions()) {
        map.put(new BytesWritable(region.getRegionName()),
-           new LongWritable(region.getLastFlushTime()));
+           new LongWritable(region.getMinFlushTimeForAllStores()));
      }
      return map;
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Wed Sep 18 18:18:23 2013
@@ -237,7 +237,8 @@ public class HRegionThriftServer extends
     public Map<ByteBuffer, Long> getLastFlushTimes() throws TException {
       Map<ByteBuffer, Long> regionToFlushTime = new HashMap<ByteBuffer, Long>();
       for (HRegion region: rs.getOnlineRegions()) {
-        regionToFlushTime.put(ByteBuffer.wrap(region.getRegionName()), region.getLastFlushTime());
+        regionToFlushTime.put(ByteBuffer.wrap(region.getRegionName()),
+                              region.getMinFlushTimeForAllStores());
       }
       return regionToFlushTime;
     }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java Wed Sep 18 18:18:23 2013
@@ -138,7 +138,10 @@ class LogRoller extends HasThread implem
     if (r != null) {
       requester = this.server.getFlushRequester();
       if (requester != null) {
-        requester.request(r);
+        // If we do a selective flush, some column families might remain in
+        // the memstore for a long time, and might cause old logs to
+        // accumulate. Hence, we would not request for a selective flush.
+        requester.request(r, false);
         scheduled = true;
       }
     }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Wed Sep 18 18:18:23 2013
@@ -69,6 +69,9 @@ public class MemStore implements HeapSiz
   // Snapshot of memstore.  Made for flusher.
   volatile KeyValueSkipListSet snapshot;
 
+  // Smallest LSN amongst all the edits in the Memstore
+  volatile AtomicLong smallestSeqNumber = new AtomicLong();
+
   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   final KeyValue.KVComparator comparator;
@@ -132,6 +135,7 @@ public class MemStore implements HeapSiz
       this.chunkPool = null;
     }
 
+    this.smallestSeqNumber.set(Long.MAX_VALUE);
   }
 
   void dump() {
@@ -144,6 +148,31 @@ public class MemStore implements HeapSiz
   }
 
   /**
+   * Get the smallest LSN
+   * @return
+   */
+  long getSmallestSeqNumber() {
+    return smallestSeqNumber.get();
+  }
+
+  /**
+   * Update the smallest LSN
+   * @param seqNum
+   */
+  void updateSmallestSeqNumber(long seqNum) {
+    if (seqNum < 0) {
+      return;
+    }
+
+    // Do a Compare-and-Set instead of synchronized here.
+    long smallestSeqNumberVal;
+    do {
+      smallestSeqNumberVal = smallestSeqNumber.get();
+    } while (!smallestSeqNumber.compareAndSet(smallestSeqNumberVal,
+             Math.min(smallestSeqNumberVal, seqNum)));
+  }
+
+  /**
    * Creates a snapshot of the current memstore.
    * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet<KeyValue>)}
    * To get the snapshot made by this method, use {@link #getSnapshot()}
@@ -160,6 +189,8 @@ public class MemStore implements HeapSiz
         if (!this.kvset.isEmpty()) {
           this.snapshot = this.kvset;
           this.kvset = new KeyValueSkipListSet(this.comparator);
+          // Reset the smallest sequence number
+          this.smallestSeqNumber.set(Long.MAX_VALUE);
           this.snapshotTimeRangeTracker = this.timeRangeTracker;
           this.timeRangeTracker = new TimeRangeTracker();
 
@@ -226,11 +257,22 @@ public class MemStore implements HeapSiz
   }
 
   /**
+   * Write an update.
+   * This method should only be used by tests, since it does not specify the
+   * LSN for the edit.
+   * @param kv
+   * @return
+   */
+  long add(final KeyValue kv) {
+    return add(kv, -1L);
+  }
+
+  /**
    * Write an update
    * @param kv
    * @return approximate size of the passed key and value.
    */
-  long add(final KeyValue kv) {
+  long add(final KeyValue kv, long seqNum) {
     long s = -1;
     this.lock.readLock().lock();
     try {
@@ -241,6 +283,7 @@ public class MemStore implements HeapSiz
       if (toAdd.isDelete()) {
         this.numDeletesInKvSet.incrementAndGet();
       }
+      updateSmallestSeqNumber(seqNum);
     } finally {
       this.lock.readLock().unlock();
     }
@@ -272,8 +315,17 @@ public class MemStore implements HeapSiz
    * @param delete
    * @return approximate size of the passed key and value.
    */
+  long delete(final KeyValue delete, long seqNum) {
+    return add(delete, seqNum);
+  }
+
+  /**
+   * Should only be used in tests, since it does not provide a seqNum.
+   * @param delete
+   * @return
+   */
   long delete(final KeyValue delete) {
-    return add(delete);
+    return delete(delete, -1);
   }
 
   /**
@@ -422,13 +474,15 @@ public class MemStore implements HeapSiz
    * @param qualifier
    * @param newValue
    * @param now
+   * @param seqNum The LSN for the edit.
    * @return
    */
   public long updateColumnValue(byte[] row,
                                 byte[] family,
                                 byte[] qualifier,
                                 long newValue,
-                                long now) {
+                                long now,
+                                long seqNum) {
    this.lock.readLock().lock();
     try {
       // create a new KeyValue with 'now' and a 0 memstoreTS == immediately visible
@@ -436,7 +490,7 @@ public class MemStore implements HeapSiz
           now,
           Bytes.toBytes(newValue));
 
-      long addedSize = add(newKv);
+      long addedSize = add(newKv, seqNum);
 
       // now find and RM the old one(s) to prevent version explosion:
     KeyValue firstKv = KeyValue.createFirstOnRow(
@@ -783,7 +837,7 @@ public class MemStore implements HeapSiz
   }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (16 * ClassSize.REFERENCE));
+      ClassSize.OBJECT + (17 * ClassSize.REFERENCE));
 
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Wed Sep 18 18:18:23 2013
@@ -61,6 +61,7 @@ class MemStoreFlusher implements FlushRe
   private final Map<HRegion, FlushQueueEntry> regionsInQueue =
     new HashMap<HRegion, FlushQueueEntry>();
 
+  private final boolean perColumnFamilyFlushEnabled;
   private final long threadWakeFrequency;
   private final HRegionServer server;
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -109,6 +110,9 @@ class MemStoreFlusher implements FlushRe
     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
       90000);
 
+    this.perColumnFamilyFlushEnabled = conf.getBoolean(
+            HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH,
+            HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH);
     // number of "memstore flusher" threads per region server
     this.handlerCount = conf.getInt("hbase.regionserver.flusher.count", 2);
 
@@ -178,11 +182,15 @@ class MemStoreFlusher implements FlushRe
   }
 
   public void request(HRegion r) {
+    request(r, false);
+  }
+
+  public void request(HRegion r, boolean selectiveFlushRequest) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has no delay so it will be added at the top of the flush
         // queue.  It'll come out near immediately.
-        FlushQueueEntry fqe = new FlushQueueEntry(r);
+        FlushQueueEntry fqe = new FlushQueueEntry(r, selectiveFlushRequest);
         this.regionsInQueue.put(r, fqe);
         this.flushQueue.add(fqe);
       }
@@ -282,23 +290,25 @@ class MemStoreFlusher implements FlushRe
         return true;
       }
     }
-    return flushRegion(region, why, false);
+    return flushRegion(region, why, false, fqe.isSelectiveFlushRequest());
   }
 
-  /*
+  /**
    * Flush a region.
    * @param region Region to flush.
    * @param emergencyFlush Set if we are being force flushed. If true the region
    * needs to be removed from the flush queue. If false, when we were called
    * from the main flusher run loop and we got the entry to flush by calling
    * poll on the flush queue (which removed it).
+   * @param selectiveFlushRequest Do we want to selectively flush only the
+   * column families that dominate the memstore size?
    *
    * @return true if the region was successfully flushed, false otherwise. If
    * false, there will be accompanying log messages explaining why the log was
    * not flushed.
    */
   private boolean flushRegion(final HRegion region, String why,
-    final boolean emergencyFlush) {
+    final boolean emergencyFlush, boolean selectiveFlushRequest) {
 
     synchronized (this.regionsInQueue) {
       FlushQueueEntry fqe = this.regionsInQueue.remove(region);
@@ -310,7 +320,7 @@ class MemStoreFlusher implements FlushRe
      lock.readLock().lock();
     }
     try {
-      if (region.flushcache()) {
+      if (region.flushcache(selectiveFlushRequest)) {
         server.compactSplitThread.requestCompaction(region, why);
       }
       server.getMetrics().addFlush(region.getRecentFlushInfo());
@@ -379,7 +389,7 @@ class MemStoreFlusher implements FlushRe
         " exceeded; currently " +
         StringUtils.humanReadableInt(globalMemStoreSize) + " and flushing till " +
         StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
-      if (!flushRegion(biggestMemStoreRegion, "emergencyFlush", true)) {
+      if (!flushRegion(biggestMemStoreRegion, "emergencyFlush", true, false)) {
         LOG.warn("Flush failed");
         break;
       }
@@ -403,11 +413,28 @@ class MemStoreFlusher implements FlushRe
     private final long createTime;
     private long whenToExpire;
     private int requeueCount = 0;
+    private boolean selectiveFlushRequest;
 
-    FlushQueueEntry(final HRegion r) {
+    /**
+     * @param r The region to flush
+     * @param selectiveFlushRequest Do we want to flush only the column
+     *                              families that dominate the memstore size,
+     *                              i.e., do a selective flush? If we are
+     *                              doing log rolling, then we should not do a
+     *                              selective flush.
+     */
+    FlushQueueEntry(final HRegion r, boolean selectiveFlushRequest) {
       this.region = r;
       this.createTime = System.currentTimeMillis();
       this.whenToExpire = this.createTime;
+      this.selectiveFlushRequest = selectiveFlushRequest;
+    }
+
+    /**
+     * @return Is this a request for a selective flush?
+     */
+    public boolean isSelectiveFlushRequest() {
+      return selectiveFlushRequest;
     }
 
     /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ReadOnlyStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ReadOnlyStore.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ReadOnlyStore.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ReadOnlyStore.java Wed Sep 18 18:18:23 2013
@@ -137,12 +137,12 @@ public class ReadOnlyStore extends Store
   }
 
   @Override
-  protected long add(final KeyValue kv) {
+  protected long add(final KeyValue kv, long seqNum) {
     throw new UnsupportedOperationException("Cannot add KeyValues to ReadOnlyStore");
   }
 
   @Override
-  protected long delete(final KeyValue kv) {
+  protected long delete(final KeyValue kv, long seqNum) {
     throw new UnsupportedOperationException("Cannot delete KeyValues from ReadOnlyStore");
   }
 
@@ -168,7 +168,7 @@ public class ReadOnlyStore extends Store
 
   @Override
   public long updateColumnValue(byte [] row, byte [] f,
-      byte [] qualifier, long newValue) {
+      byte [] qualifier, long newValue, long seqNum) {
     throw new UnsupportedOperationException("Not supported on ReadOnlyStore");
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Sep 18 18:18:23 2013
@@ -507,13 +507,14 @@ public class Store extends SchemaConfigu
   /**
    * Adds a value to the memstore
    *
-   * @param kv
-   * @return memstore size delta
+   * @param kv The KV to be added.
+   * @param seqNum The LSN associated with the key.
+   * @return
    */
-  protected long add(final KeyValue kv) {
+  protected long add(final KeyValue kv, long seqNum) {
     lock.readLock().lock();
     try {
-      return this.memstore.add(kv);
+      return this.memstore.add(kv, seqNum);
     } finally {
       lock.readLock().unlock();
     }
@@ -525,10 +526,10 @@ public class Store extends SchemaConfigu
    * @param kv
    * @return memstore size delta
    */
-  protected long delete(final KeyValue kv) {
+  protected long delete(final KeyValue kv, long seqNum) {
     lock.readLock().lock();
     try {
-      return this.memstore.delete(kv);
+      return this.memstore.delete(kv, seqNum);
     } finally {
       lock.readLock().unlock();
     }
@@ -1859,8 +1860,18 @@ public class Store extends SchemaConfigu
   /**
    * @return The size of this store's memstore, in bytes
    */
-  long getMemStoreSize() {
-    return this.memstore.heapSize();
+  public long getMemStoreSize() {
+    // Use memstore.keySize() instead of heapSize() since heapSize() gives the
+    // size of the keys + size of the map.
+    return this.memstore.keySize();
+  }
+
+  /**
+   * A helper function to get the smallest LSN in the mestore.
+   * @return
+   */
+  public long getSmallestSeqNumberInMemstore() {
+    return this.memstore.getSmallestSeqNumber();
   }
 
   /**
@@ -1892,12 +1903,13 @@ public class Store extends SchemaConfigu
    * @param row
    * @param f
    * @param qualifier
-   * @param newValue the new value to set into memstore
-   * @return memstore size delta
+   * @param newValue
+   * @param seqNum The LSN associated with the edit.
+   * @return
    * @throws IOException
    */
   public long updateColumnValue(byte [] row, byte [] f,
-                                byte [] qualifier, long newValue)
+                                byte [] qualifier, long newValue, long seqNum)
       throws IOException {
 
     this.lock.readLock().lock();
@@ -1908,7 +1920,8 @@ public class Store extends SchemaConfigu
           f,
           qualifier,
           newValue,
-          now);
+          now,
+          seqNum);
 
     } finally {
       this.lock.readLock().unlock();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed Sep 18 18:18:23 2013
@@ -157,6 +157,7 @@ public class HLog implements Syncable {
   private final FileSystem fs;
   private final Path dir;
   private final Configuration conf;
+  private final boolean perColumnFamilyFlushEnabled;
   private final LogRollListener listener;
   private final long optionalFlushInterval;
   private final long blocksize;
@@ -435,6 +436,9 @@ public class HLog implements Syncable {
         RuntimeHaltAbortStrategy.INSTANCE : RuntimeExceptionAbortStrategy.INSTANCE;
     this.fs = fs;
     this.conf = conf;
+    this.perColumnFamilyFlushEnabled = conf.getBoolean(
+            HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH,
+            HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH);
     this.listener = listener;
     this.flushlogentries =
       conf.getInt("hbase.regionserver.flushlogentries", 1);
@@ -1028,12 +1032,15 @@ public class HLog implements Syncable {
    * @param tableName
    * @param edits
    * @param now
+   * @return The log sequence number for the edit. -1 if the WAL is disabled.
+   *          This sequence number is used for book-keeping for per-CF flush
+   *          of the memstore.
    * @throws IOException
    */
-  public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
+  public long append(HRegionInfo info, byte [] tableName, WALEdit edits,
     final long now) throws IOException {
     if (!this.enabled || edits.isEmpty()) {
-      return;
+      return -1L;
     }
     if (logSyncerThread.syncerShuttingDown) {
       // can't acquire lock for the duration of append()
@@ -1043,6 +1050,7 @@ public class HLog implements Syncable {
 
     long len = edits.getTotalKeyValueLength();
     long txid = 0;
+    long seqNum;
 
     long start = System.currentTimeMillis();
     byte[] regionName = info.getRegionName();
@@ -1053,7 +1061,7 @@ public class HLog implements Syncable {
       // memstore). . When the cache is flushed, the entry for the
       // region being flushed is removed if the sequence number of the flush
       // is greater than or equal to the value in lastSeqWritten.
-      long seqNum = obtainSeqNum();
+      seqNum = obtainSeqNum();
       this.firstSeqWrittenInCurrentMemstore.putIfAbsent(regionName, seqNum);
       HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
 
@@ -1095,6 +1103,7 @@ public class HLog implements Syncable {
       // update sync time
       pData.addLong(ProfilingData.HLOG_SYNC_TIME_MS, syncTime);
     }
+    return seqNum;
   }
 
   /**
@@ -1306,6 +1315,38 @@ public class HLog implements Syncable {
     return 0;
   }
 
+  /**
+   * This is a utility method for tests to find the sequence number of the first
+   * KV in a given region's memstore
+   *
+   * @param region
+   * @return
+   */
+  public long getFirstSeqWrittenInCurrentMemstoreForRegion(HRegion region) {
+    Long value = firstSeqWrittenInCurrentMemstore.get(region.getRegionName());
+    if (value != null) {
+      return value.longValue();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  /**
+   * This is a utility method for tests to find the sequence number of the first
+   * KV in a given region's snapshot memstore
+   *
+   * @param region
+   * @return
+   */
+  public long getFirstSeqWrittenInSnapshotMemstoreForRegion(HRegion region) {
+    Long value = firstSeqWrittenInSnapshotMemstore.get(region.getRegionName());
+    if (value != null) {
+      return value.longValue();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
   boolean canGetCurReplicas() {
     return this.getNumCurrentReplicas != null;
   }
@@ -1334,10 +1375,18 @@ public class HLog implements Syncable {
   }
 
   /** @return the number of log files in use */
-  int getNumLogFiles() {
+  public int getNumLogFiles() {
     return outputfiles.size();
   }
 
+  /**
+   * See {@link #startCacheFlush(byte[], long, long)} (byte[], long, long)}
+   * @param regionName
+   * @return
+   */
+  public long startCacheFlush(final byte[] regionName) {
+    return startCacheFlush(regionName, -1L, -1L);
+  }
 
   /**
    * Acquire a lock so that we do not close between the start and
@@ -1352,26 +1401,65 @@ public class HLog implements Syncable {
    * lsn of the earliest in-memory lsn - which is now in the memstore snapshot -
    * is saved temporarily in the firstSeqWritten map while the flush is active.
    *
+   * In case the per-CF flush is enabled, we cannot simply clear the
+   * firstSeqWritten entry for the region to be flushed. There might be certain
+   * CFs whose memstores won't be flushed. Therefore, we need the first LSNs for
+   * the stores that will be flushed, and first LSNs for the stores that won't
+   * be flushed.
+   *
+   * @param regionName
+   * @param firstSeqIdInStoresToFlush
+   * @param firstSeqIdInStoresNotToFlush
    * @return sequence ID to pass {@link #completeCacheFlush(byte[], byte[], long, boolean)}
    * (byte[], byte[], long)}
    * @see #completeCacheFlush(byte[], byte[], long, boolean)
    * @see #abortCacheFlush()
    */
-  public long startCacheFlush(final byte [] regionName) {
+  public long startCacheFlush(final byte [] regionName,
+                              long firstSeqIdInStoresToFlush,
+                              long firstSeqIdInStoresNotToFlush) {
+    long num = -1;
     this.closeLock.readLock().lock();
     synchronized (oldestSeqNumsLock) {
       if (this.firstSeqWrittenInSnapshotMemstore.containsKey(regionName)) {
         LOG.warn("Requested a startCacheFlush while firstSeqWrittenInSnapshotMemstore still"
-            + " contains " + Bytes.toString(regionName) + " . Did the previous flush fail?"
-            + " Will try to complete it");
+          + " contains " + Bytes.toString(regionName) + " . Did the previous flush fail?"
+          + " Will try to complete it");
       } else {
-        Long seq = this.firstSeqWrittenInCurrentMemstore.remove(regionName);
-        if (seq != null) {
-          this.firstSeqWrittenInSnapshotMemstore.put(regionName, seq);
+
+        // If we are flushing the entire memstore, remove the entry from the
+        // current memstores.
+        if (firstSeqIdInStoresNotToFlush == Long.MAX_VALUE) {
+          Long seq = this.firstSeqWrittenInCurrentMemstore.remove(regionName);
+          if (seq != null) {
+            this.firstSeqWrittenInSnapshotMemstore.put(regionName, seq);
+          }
+          num  = obtainSeqNum();
+        } else {
+          // Amongst the Stores not being flushed, what is the smallest sequence
+          // number? Put that in this map.
+          this.firstSeqWrittenInCurrentMemstore.replace(regionName,
+            firstSeqIdInStoresNotToFlush);
+
+          // Amongst the Stores being flushed, what is the smallest sequence
+          // number? Put that in this map.
+          this.firstSeqWrittenInSnapshotMemstore.put(regionName,
+            firstSeqIdInStoresToFlush);
+
+          // During Log Replay, we can safely discard any edits that have
+          // the sequence number less than the smallest sequence id amongst the
+          // stores that we are not flushing. This might re-apply some edits
+          // which belonged to stores which are going to be flushed, but we
+          // expect these operations to be idempotent anyways, and this is
+          // simpler.
+          num = firstSeqIdInStoresNotToFlush - 1;
         }
       }
     }
-    return obtainSeqNum();
+    if (num == -1) {
+      num = obtainSeqNum();
+    }
+    return num;
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/main/resources/hbase-default.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/resources/hbase-default.xml?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/resources/hbase-default.xml (original)
+++ hbase/branches/0.89-fb/src/main/resources/hbase-default.xml Wed Sep 18 18:18:23 2013
@@ -313,6 +313,19 @@
     </description>
   </property>
   <property>
+    <name>hbase.columnfamily.memstore.flush.size</name>
+    <value>16777216</value>
+    <description>
+    If per column family flushing is turned on, then every time that we hit the
+    total memstore limit, we find out all the column families whose memstores
+    exceed this value, and only flush them, while retaining the others whose
+    memstores are lower than this limit. If none of the families have their
+    memstore size more than this, all the memstores will be flushed
+    (just as usual). This value should less than half of the total memstore
+    threshold (hbase.hregion.memstore.flush.size).
+    </description>
+  </property>
+  <property>
     <name>hbase.hregion.preclose.flush.size</name>
     <value>5242880</value>
     <description>

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java?rev=1524512&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java Wed Sep 18 18:18:23 2013
@@ -0,0 +1,544 @@
+/*
+ * Copyright 2013 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This test verifies the correctness of the Per Column Family flushing strategy
+ */
+public class TestPerColumnFamilyFlush extends TestCase {
+  private static final Log LOG =
+          LogFactory.getLog(TestPerColumnFamilyFlush.class);
+  HRegion region = null;
+  private final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final String DIR = TEST_UTIL.getTestDir() +
+          "/TestHRegion/";
+
+  public static final String TABLENAME_STR = "t1";
+  public static final byte[] TABLENAME = Bytes.toBytes("t1");
+  public static final byte[][] families = { Bytes.toBytes("f1"),
+          Bytes.toBytes("f2"), Bytes.toBytes("f3"), Bytes.toBytes("f4"),
+          Bytes.toBytes("f5") };
+  public static final byte[] FAMILY1 = families[0];
+  public static final byte[] FAMILY2 = families[1];
+  public static final byte[] FAMILY3 = families[2];
+
+  private void initHRegion (String callingMethod,
+                            HBaseConfiguration conf)
+          throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(TABLENAME);
+    for(byte [] family : families) {
+      htd.addFamily(new HColumnDescriptor(family));
+    }
+    HRegionInfo info = new HRegionInfo(htd, null, null, false);
+    Path path = new Path(DIR + callingMethod);
+    region = HRegion.createHRegion(info, path, conf);
+  }
+
+  // A helper function to create puts.
+  Put createPut(int familyNum, int putNum) {
+    byte[] qf = Bytes.toBytes("q" + familyNum);
+    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
+    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
+    Put p = new Put(row);
+    p.add(families[familyNum - 1], qf, val);
+    return p;
+  }
+
+  // A helper function to create puts.
+  Get createGet(int familyNum, int putNum) {
+    byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
+    return new Get(row);
+  }
+
+  // A helper function to verify edits.
+  void verifyEdit(int familyNum, int putNum, HTable table) throws IOException {
+    Result r = table.get(createGet(familyNum, putNum));
+    byte[] family = families[familyNum - 1];
+    byte[] qf = Bytes.toBytes("q" + familyNum);
+    byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
+    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
+            r.getFamilyMap(family));
+    assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
+            r.getFamilyMap(family).get(qf));
+    assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
+            Arrays.equals(r.getFamilyMap(family).get(qf), val));
+  }
+
+  @Test
+  public void testSelectiveFlushWhenEnabled() throws IOException {
+    // Set up the configuration
+    HBaseConfiguration conf = new HBaseConfiguration();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200*1024);
+    conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true);
+    conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 100*1024);
+
+    // Intialize the HRegion
+    initHRegion(getName(), conf);
+    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
+    for (int i = 1; i <= 1200; i++) {
+      region.put(createPut(1, i));
+
+      if (i <= 100) {
+        region.put(createPut(2, i));
+        if (i <= 50) {
+          region.put(createPut(3, i));
+        }
+      }
+    }
+
+    long totalMemstoreSize = region.getMemstoreSize().get();
+
+    // Find the smallest LSNs for edits wrt to each CF.
+    long smallestSeqCF1 =
+            region.getStore(FAMILY1).getSmallestSeqNumberInMemstore();
+    long smallestSeqCF2 =
+            region.getStore(FAMILY2).getSmallestSeqNumberInMemstore();
+    long smallestSeqCF3 =
+            region.getStore(FAMILY3).getSmallestSeqNumberInMemstore();
+
+    // Find the sizes of the memstores of each CF.
+    long cf1MemstoreSize =
+            region.getStore(FAMILY1).getMemStoreSize();
+    long cf2MemstoreSize =
+            region.getStore(FAMILY2).getMemStoreSize();
+    long cf3MemstoreSize =
+            region.getStore(FAMILY3).getMemStoreSize();
+
+    // Get the overall smallest LSN in the region's memstores.
+    long smallestSeqInRegionCurrentMemstore =
+            region.getLog().
+                    getFirstSeqWrittenInCurrentMemstoreForRegion(region);
+
+    // The overall smallest LSN in the region's memstores should be the same as
+    // the LSN of the smallest edit in CF1
+    assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
+
+    // Some other sanity checks.
+    assertTrue(smallestSeqCF1 < smallestSeqCF2);
+    assertTrue(smallestSeqCF2 < smallestSeqCF3);
+    assertTrue(cf1MemstoreSize > 0);
+    assertTrue(cf2MemstoreSize > 0);
+    assertTrue(cf3MemstoreSize > 0);
+
+    // The total memstore size should be the same as the sum of the sizes of
+    // memstores of CF1, CF2 and CF3.
+    assertTrue(totalMemstoreSize ==
+                (cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize));
+
+    // Flush!
+    region.flushcache(true);
+
+    // Will use these to check if anything changed.
+    long oldCF2MemstoreSize = cf2MemstoreSize;
+    long oldCF3MemstoreSize = cf3MemstoreSize;
+
+    // Recalculate everything
+    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    totalMemstoreSize = region.getMemstoreSize().get();
+    smallestSeqInRegionCurrentMemstore = region.getLog()
+            .getFirstSeqWrittenInCurrentMemstoreForRegion(region);
+
+    // We should have cleared out only CF1, since we chose the flush thresholds
+    // and number of puts accordingly.
+    assertEquals(0, cf1MemstoreSize);
+    // Nothing should have happened to CF2, ...
+    assertTrue(cf2MemstoreSize == oldCF2MemstoreSize);
+    // ... or CF3
+    assertTrue(cf3MemstoreSize == oldCF3MemstoreSize);
+    // Now the smallest LSN in the region should be the same as the smallest
+    // LSN in the memstore of CF2.
+    assertTrue(smallestSeqInRegionCurrentMemstore == smallestSeqCF2);
+    // Of course, this should hold too.
+    assertTrue(totalMemstoreSize == (cf2MemstoreSize + cf3MemstoreSize));
+
+    // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
+    for (int i = 1200; i < 2400; i++) {
+      region.put(createPut(2, i));
+
+      // Add only 100 puts for CF3
+      if (i - 1200 < 100) {
+        region.put(createPut(3, i));
+      }
+    }
+
+    // How much does the CF3 memstore occupy? Will be used later.
+    oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+
+    // Flush again
+    region.flushcache(true);
+
+    // Recalculate everything
+    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    totalMemstoreSize = region.getMemstoreSize().get();
+    smallestSeqInRegionCurrentMemstore = region.getLog()
+            .getFirstSeqWrittenInCurrentMemstoreForRegion(region);
+
+    // CF1 and CF2, both should be absent.
+    assertEquals(0, cf1MemstoreSize);
+    assertEquals(0, cf2MemstoreSize);
+    // CF3 shouldn't have been touched.
+    assertTrue(cf3MemstoreSize == oldCF3MemstoreSize);
+    assertTrue(totalMemstoreSize == cf3MemstoreSize);
+    assertTrue(smallestSeqInRegionCurrentMemstore == smallestSeqCF3);
+
+    // What happens when we hit the memstore limit, but we are not able to find
+    // any Column Family above the threshold?
+    // In that case, we should flush all the CFs.
+
+    // Clearing the existing memstores.
+    region.flushcache(false);
+
+    // The memstore limit is 200*1024 and the column family flush threshold is
+    // around 50*1024. We try to just hit the memstore limit with each CF's
+    // memstore being below the CF flush threshold.
+    for (int i = 1; i <= 300; i++) {
+      region.put(createPut(1, i));
+      region.put(createPut(2, i));
+      region.put(createPut(3, i));
+      region.put(createPut(4, i));
+      region.put(createPut(5, i));
+    }
+
+    region.flushcache(true);
+    // Since we won't find any CF above the threshold, and hence no specific
+    // store to flush, we should flush all the memstores.
+    Assert.assertEquals(0, region.getMemstoreSize().get());
+  }
+
+  @Test
+  public void testSelectiveFlushWhenNotEnabled() throws IOException {
+    // Set up the configuration
+    HBaseConfiguration conf = new HBaseConfiguration();
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
+    conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, false);
+    conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 100 * 1024);
+
+    // Intialize the HRegion
+    initHRegion(getName(), conf);
+    // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
+    for (int i = 1; i <= 1200; i++) {
+      region.put(createPut(1, i));
+
+      if (i <= 100) {
+        region.put(createPut(2, i));
+        if (i <= 50) {
+          region.put(createPut(3, i));
+        }
+      }
+    }
+
+    long totalMemstoreSize = region.getMemstoreSize().get();
+
+    // Find the smallest LSNs for edits wrt to each CF.
+    long smallestSeqCF1 =
+            region.getStore(FAMILY1).getSmallestSeqNumberInMemstore();
+    long smallestSeqCF2 =
+            region.getStore(FAMILY2).getSmallestSeqNumberInMemstore();
+    long smallestSeqCF3 =
+            region.getStore(FAMILY3).getSmallestSeqNumberInMemstore();
+
+    // Find the sizes of the memstores of each CF.
+    long cf1MemstoreSize =
+            region.getStore(FAMILY1).getMemStoreSize();
+    long cf2MemstoreSize =
+            region.getStore(FAMILY2).getMemStoreSize();
+    long cf3MemstoreSize =
+            region.getStore(FAMILY3).getMemStoreSize();
+
+    // Get the overall smallest LSN in the region's memstores.
+    long smallestSeqInRegionCurrentMemstore =
+            region.getLog().
+                    getFirstSeqWrittenInCurrentMemstoreForRegion(region);
+
+    // The overall smallest LSN in the region's memstores should be the same as
+    // the LSN of the smallest edit in CF1
+    assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
+
+    // Some other sanity checks.
+    assertTrue(smallestSeqCF1 < smallestSeqCF2);
+    assertTrue(smallestSeqCF2 < smallestSeqCF3);
+    assertTrue(cf1MemstoreSize > 0);
+    assertTrue(cf2MemstoreSize > 0);
+    assertTrue(cf3MemstoreSize > 0);
+
+    // The total memstore size should be the same as the sum of the sizes of
+    // memstores of CF1, CF2 and CF3.
+    assertTrue(totalMemstoreSize ==
+            (cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize));
+
+    // Flush!
+    region.flushcache(true);
+
+    cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
+    cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
+    cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
+    totalMemstoreSize = region.getMemstoreSize().get();
+    smallestSeqInRegionCurrentMemstore = region.getLog()
+            .getFirstSeqWrittenInCurrentMemstoreForRegion(region);
+
+    // Everything should have been cleared
+    assertEquals(0, cf1MemstoreSize);
+    assertEquals(0, cf2MemstoreSize);
+    assertEquals(0, cf3MemstoreSize);
+    assertEquals(0, totalMemstoreSize);
+    assertEquals(Long.MAX_VALUE, smallestSeqInRegionCurrentMemstore);
+  }
+
+  // Find the (first) region which has a name starting with a particular prefix.
+  private HRegion getRegionWithNameStartingWith(String regionPrefix) {
+    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
+    List<JVMClusterUtil.RegionServerThread> rsts =
+      cluster.getRegionServerThreads();
+    int rsIndexContainingOurRegion = -1;
+    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
+      HRegionServer hrs = rsts.get(i).getRegionServer();
+      for (HRegion region : hrs.getOnlineRegions()) {
+        if (region.getRegionNameAsString().startsWith(regionPrefix)) {
+          if (rsIndexContainingOurRegion == -1) {
+            return region;
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  @Test
+  public void testLogReplay() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true);
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000);
+    // Carefully chosen limits so that the memstore just flushes when we're done
+    conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 10000);
+    final int numRegionServers = 4;
+    try {
+      TEST_UTIL.startMiniCluster(numRegionServers);
+    } catch (Exception e) {
+      LOG.error("Could not start the mini cluster. Terminating.");
+      e.printStackTrace();
+      throw e;
+    }
+
+    TEST_UTIL.createTable(TABLENAME, families);
+    HTable table = new HTable(conf, TABLENAME);
+    HTableDescriptor htd = table.getTableDescriptor();
+
+    for (byte [] family : families) {
+      if (!htd.hasFamily(family)) {
+        htd.addFamily(new HColumnDescriptor(family));
+      }
+    }
+
+    // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
+    // These will all be interleaved in the log.
+    for (int i = 1; i <= 80; i++) {
+      table.put(createPut(1, i));
+      if (i <= 10) {
+        table.put(createPut(2, i));
+        table.put(createPut(3, i));
+      }
+    }
+    table.flushCommits();
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      throw e;
+    }
+
+    HRegion desiredRegion = getRegionWithNameStartingWith(TABLENAME_STR);
+    assertTrue("Could not find a region which hosts the new region.",
+      desiredRegion != null);
+
+    // Flush the region selectively.
+    desiredRegion.flushcache(true);
+
+    long totalMemstoreSize;
+    long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
+    totalMemstoreSize = desiredRegion.getMemstoreSize().get();
+
+    // Find the sizes of the memstores of each CF.
+    cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize();
+    cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize();
+    cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize();
+
+    // CF1 Should have been flushed
+    assertEquals(0, cf1MemstoreSize);
+    // CF2 and CF3 shouldn't have been flushed.
+    assertTrue(cf2MemstoreSize > 0);
+    assertTrue(cf3MemstoreSize > 0);
+    assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize);
+
+    // Wait for the RS report to go across to the master, so that the master
+    // is aware of which sequence ids have been flushed, before we kill the RS.
+    // If in production, the RS dies before the report goes across, we will
+    // safely replay all the edits.
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      throw e;
+    }
+
+    // Abort the region server where we have the region hosted.
+    HRegionServer rs = desiredRegion.getRegionServer();
+    rs.abort("testing");
+
+    // The aborted region server's regions will be eventually assigned to some
+    // other region server, and the get RPC call (inside verifyEdit()) will
+    // retry for some time till the regions come back up.
+
+    // Verify that all the edits are safe.
+    for (int i = 1; i <= 80; i++) {
+      verifyEdit(1, i, table);
+      if (i <= 10) {
+        verifyEdit(2, i, table);
+        verifyEdit(3, i, table);
+      }
+    }
+
+    try {
+      TEST_UTIL.shutdownMiniCluster();
+    } catch (Exception e) {
+      LOG.error("Could not shutdown the mini cluster. Terminating.");
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  // Test Log Replay with Distributed Splitting on.
+  // In distributed log splitting, the log splitters ask the master for the
+  // last flushed sequence id for a region. This test would ensure that we
+  // are doing the book-keeping correctly.
+  @Test
+  public void testLogReplayWithDistributedSplitting() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean(
+            HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
+    testLogReplay();
+  }
+
+  /**
+   * When a log roll is about to happen, we do a flush of the regions who will
+   * be affected by the log roll. These flushes cannot be a selective flushes,
+   * otherwise we cannot roll the logs. This test ensures that we do a
+   * full-flush in that scenario.
+   * @throws IOException
+   */
+  @Test
+  public void testFlushingWhenLogRolling() throws Exception {
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true);
+    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000);
+    conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 100000);
+
+    // Also, let us try real hard to get a log roll to happen.
+    // Keeping the log roll period to 2s.
+    conf.setLong("hbase.regionserver.logroll.period", 2000);
+    // Keep the block size small so that we fill up the log files very fast.
+    conf.setLong("hbase.regionserver.hlog.blocksize", 6144);
+    int maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
+
+    final int numRegionServers = 4;
+    try {
+      TEST_UTIL.startMiniCluster(numRegionServers);
+    } catch (Exception e) {
+      LOG.error("Could not start the mini cluster. Terminating.");
+      e.printStackTrace();
+      throw e;
+    }
+
+    TEST_UTIL.createTable(TABLENAME, families);
+    HTable table = new HTable(conf, TABLENAME);
+    HTableDescriptor htd = table.getTableDescriptor();
+
+    for (byte [] family : families) {
+      if (!htd.hasFamily(family)) {
+        htd.addFamily(new HColumnDescriptor(family));
+      }
+    }
+
+    HRegion desiredRegion = getRegionWithNameStartingWith(TABLENAME_STR);
+    assertTrue("Could not find a region which hosts the new region.",
+      desiredRegion != null);
+
+    // Add some edits. Most will be for CF1, some for CF2 and CF3.
+    for (int i = 1; i <= 10000; i++) {
+      table.put(createPut(1, i));
+      if (i <= 200) {
+        table.put(createPut(2, i));
+        table.put(createPut(3, i));
+      }
+      table.flushCommits();
+      // Keep adding until we exceed the number of log files, so that we are
+      // able to trigger the cleaning of old log files.
+      int currentNumLogFiles = desiredRegion.getLog().getNumLogFiles();
+      if (currentNumLogFiles > maxLogs) {
+        LOG.info("The number of log files is now: " + currentNumLogFiles +
+                 ". Expect a log roll and memstore flush.");
+        break;
+      }
+    }
+
+    // Wait for some time till the flush caused by log rolling happens.
+    try {
+      Thread.sleep(4000);
+    } catch (InterruptedException e) {
+      throw e;
+    }
+
+    // We have artificially created the conditions for a log roll. When a
+    // log roll happens, we should flush all the column families. Testing that
+    // case here.
+
+    // Individual families should have been flushed.
+    assertEquals(0, desiredRegion.getStore(FAMILY1).getMemStoreSize());
+    assertEquals(0, desiredRegion.getStore(FAMILY2).getMemStoreSize());
+    assertEquals(0, desiredRegion.getStore(FAMILY3).getMemStoreSize());
+
+    // And of course, the total memstore should also be clean.
+    assertEquals(0, desiredRegion.getMemstoreSize().get());
+  }
+}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=1524512&r1=1524511&r2=1524512&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Wed Sep 18 18:18:23 2013
@@ -833,7 +833,7 @@ public class TestMemStore extends TestCa
     for (int newValue = 0; newValue < 1000; newValue++) {
       for (int row = newValue; row < newValue + 1000; row++) {
         byte[] rowBytes = Bytes.toBytes(row);
-        size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts);
+        size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts, -1L);
       }
     }
     System.out.println("Wrote " + ts + " vals");



Mime
View raw message