hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [2/3] hbase git commit: HBASE-14920: Compacting memstore
Date Fri, 20 May 2016 10:42:04 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index a465ea9..cdf5757 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -17,15 +17,6 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.ExceptionHandler;
-import com.lmax.disruptor.LifecycleAware;
-import com.lmax.disruptor.TimeoutException;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -39,6 +30,15 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
index 6e10f3c..53c501f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -32,8 +33,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.common.collect.Maps;
-
 /**
  * Accounting of sequence ids per region and then by column family. So we can our accounting
  * current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance
@@ -163,6 +162,38 @@ class SequenceIdAccounting {
     }
   }
 
+  /**
+   * Update the store sequence id, e.g., upon executing in-memory compaction
+   */
+  void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceId,
+      boolean onlyIfGreater) {
+    if(sequenceId == null) return;
+    Long highest = this.highestSequenceIds.get(encodedRegionName);
+    if(highest == null || sequenceId > highest) {
+      this.highestSequenceIds.put(encodedRegionName,sequenceId);
+    }
+    synchronized (this.tieLock) {
+      ConcurrentMap<byte[], Long> m = getOrCreateLowestSequenceIds(encodedRegionName);
+      boolean replaced = false;
+      while (!replaced) {
+        Long oldSeqId = m.get(familyName);
+        if (oldSeqId == null) {
+          m.put(familyName, sequenceId);
+          replaced = true;
+        } else if (onlyIfGreater) {
+          if (sequenceId > oldSeqId) {
+            replaced = m.replace(familyName, oldSeqId, sequenceId);
+          } else {
+            return;
+          }
+        } else { // replace even if sequence id is not greater than oldSeqId
+          m.put(familyName, sequenceId);
+          return;
+        }
+      }
+    }
+  }
+
   ConcurrentMap<byte[], Long> getOrCreateLowestSequenceIds(byte[] encodedRegionName) {
     // Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append.
     ConcurrentMap<byte[], Long> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 10fe04c..b5ddd00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -25,19 +25,19 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.util.FSUtils;
 
 // imports for things that haven't moved from regionserver.wal yet.
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.FSUtils;
 
 /**
  * No-op implementation of {@link WALProvider} used when the WAL is disabled.
@@ -170,6 +170,10 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
+    public void updateStore(byte[] encodedRegionName, byte[] familyName,
+        Long sequenceid, boolean onlyIfGreater) { return; }
+
+    @Override
     public void sync() {
       if (!this.listeners.isEmpty()) {
         for (WALActionsListener listener : this.listeners) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 051ce54..af63b0b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hbase.wal;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Set;
@@ -35,8 +36,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
  * APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc).
@@ -118,6 +117,18 @@ public interface WAL {
   long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException;
 
   /**
+   * updates the seuence number of a specific store.
+   * depending on the flag: replaces current seq number if the given seq id is bigger,
+   * or even if it is lower than existing one
+   * @param encodedRegionName
+   * @param familyName
+   * @param sequenceid
+   * @param onlyIfGreater
+   */
+  void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
+      boolean onlyIfGreater);
+
+  /**
    * Sync what we have in the WAL.
    * @throws IOException
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 010e184..501075a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -219,7 +219,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
       });
 
   /** This is for unit tests parameterized with a single boolean. */
-  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination()  ;
+  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
   /** Compression algorithms to use in testing */
   public static final Compression.Algorithm[] COMPRESSION_ALGORITHMS ={
       Compression.Algorithm.NONE, Compression.Algorithm.GZ
@@ -1518,7 +1518,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
       desc.addFamily(hcd);
     }
     getHBaseAdmin().createTable(desc, splitKeys);
-    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
+    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
+    // assigned
     waitUntilAllRegionsAssigned(tableName);
     return (HTable) getConnection().getTable(tableName);
   }
@@ -1555,7 +1556,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
       desc.addFamily(hcd);
     }
     getHBaseAdmin().createTable(desc);
-    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
+    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
+    // assigned
     waitUntilAllRegionsAssigned(tableName);
     return (HTable) getConnection().getTable(tableName);
   }
@@ -1573,7 +1575,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
         desc.addCoprocessor(cpName);
       }
       getHBaseAdmin().createTable(desc);
-      // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
+      // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
+      // assigned
       waitUntilAllRegionsAssigned(tableName);
       return (HTable) getConnection().getTable(tableName);
     }
@@ -1598,7 +1601,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
       i++;
     }
     getHBaseAdmin().createTable(desc);
-    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
+    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
+    // assigned
     waitUntilAllRegionsAssigned(tableName);
     return (HTable) getConnection().getTable(tableName);
   }
@@ -1617,7 +1621,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
     HColumnDescriptor hcd = new HColumnDescriptor(family);
     desc.addFamily(hcd);
     getHBaseAdmin().createTable(desc, splitRows);
-    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
+    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
+    // assigned
     waitUntilAllRegionsAssigned(tableName);
     return (HTable) getConnection().getTable(tableName);
   }
@@ -1829,10 +1834,27 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
    */
   public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
       boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
+    return createLocalHRegionWithInMemoryFlags(tableName,startKey, stopKey, isReadOnly,
+        durability, wal, null, families);
+  }
+
+  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
+      byte[] stopKey,
+      boolean isReadOnly, Durability durability, WAL wal, boolean[] compactedMemStore,
+      byte[]... families)
+      throws IOException {
     HTableDescriptor htd = new HTableDescriptor(tableName);
     htd.setReadOnly(isReadOnly);
+    int i=0;
     for (byte[] family : families) {
       HColumnDescriptor hcd = new HColumnDescriptor(family);
+      if(compactedMemStore != null && i < compactedMemStore.length) {
+        hcd.setInMemoryCompaction(true);
+      } else {
+        hcd.setInMemoryCompaction(false);
+
+      }
+      i++;
       // Set default to be three versions.
       hcd.setMaxVersions(Integer.MAX_VALUE);
       htd.addFamily(hcd);
@@ -1872,7 +1894,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
    * @param preserveRegions keep the existing split points
    * @return HTable for the new table
    */
-  public HTable truncateTable(final TableName tableName, final boolean preserveRegions) throws IOException {
+  public HTable truncateTable(final TableName tableName, final boolean preserveRegions) throws
+      IOException {
     Admin admin = getHBaseAdmin();
     if (!admin.isTableDisabled(tableName)) {
       admin.disableTable(tableName);
@@ -1947,7 +1970,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
    * @return Count of rows loaded.
    * @throws IOException
    */
-  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) throws IOException {
+  public int loadTable(final Table t, final byte[][] f, byte[] value,
+      boolean writeToWAL) throws IOException {
     List<Put> puts = new ArrayList<>();
     for (byte[] row : HBaseTestingUtility.ROWS) {
       Put put = new Put(row);
@@ -2005,7 +2029,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
             }
             if (count != expectedCount) {
               String row = new String(new byte[] {b1,b2,b3});
-              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " instead of " + expectedCount);
+              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " +
+                  "instead of " + expectedCount);
             }
           }
         }
@@ -2515,7 +2540,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
    * Create a stubbed out RegionServerService, mainly for getting FS.
    * This version is used by TestTokenAuthentication
    */
-  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws IOException {
+  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws
+      IOException {
     final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
     rss.setFileSystem(getTestFileSystem());
     rss.setRpcServer(rpc);
@@ -3124,7 +3150,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
    * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
    * makes tests linger.  Here is the exception you'll see:
    * <pre>
-   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683 failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
+   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
+   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
+   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
+   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
    * </pre>
    * @param stream A DFSClient.DFSOutputStream.
    * @param max

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index d8363d4..3c10ddc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.Store;
@@ -208,6 +209,10 @@ public class TestIOFencing {
 
     @Override public void finalizeFlush() {
     }
+
+    @Override public MemStore getMemStore() {
+      return null;
+    }
   }
 
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 5c79d72..4a4b0e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -305,19 +305,15 @@ public class TestHeapSize  {
     // DefaultMemStore Deep Overhead
     actual = DefaultMemStore.DEEP_OVERHEAD;
     expected = ClassSize.estimateBase(cl, false);
-    expected += (2 * ClassSize.estimateBase(AtomicLong.class, false));
-    expected += (2 * ClassSize.estimateBase(CellSet.class, false));
-    expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false));
-    expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false));
+    expected += ClassSize.estimateBase(AtomicLong.class, false);
+    expected += ClassSize.estimateBase(CellSet.class, false);
+    expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
+    expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
     if(expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(AtomicLong.class, true);
-      ClassSize.estimateBase(AtomicLong.class, true);
-      ClassSize.estimateBase(CellSet.class, true);
       ClassSize.estimateBase(CellSet.class, true);
       ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
-      ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
-      ClassSize.estimateBase(TimeRangeTracker.class, true);
       ClassSize.estimateBase(TimeRangeTracker.class, true);
       assertEquals(expected, actual);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
new file mode 100644
index 0000000..5c0e42b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -0,0 +1,729 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * compacted memstore test case
+ */
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestCompactingMemStore extends TestDefaultMemStore {
+
+  private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class);
+  private static MemStoreChunkPool chunkPool;
+  private HRegion region;
+  private RegionServicesForStores regionServicesForStores;
+  private HStore store;
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Helpers
+  //////////////////////////////////////////////////////////////////////////////
+  private static byte[] makeQualifier(final int i1, final int i2) {
+    return Bytes.toBytes(Integer.toString(i1) + ";" +
+        Integer.toString(i2));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    chunkPool.clearChunks();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    super.internalSetUp();
+    Configuration conf = new Configuration();
+    conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true);
+    conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
+    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
+    HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf);
+    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+    this.region = hbaseUtility.createTestRegion("foobar", hcd);
+    this.regionServicesForStores = region.getRegionServicesForStores();
+    this.store = new HStore(region, hcd, conf);
+    this.memstore = new CompactingMemStore(HBaseConfiguration.create(), CellComparator.COMPARATOR,
+        store, regionServicesForStores);
+    chunkPool = MemStoreChunkPool.getPool(conf);
+    assertTrue(chunkPool != null);
+  }
+
+
+  /**
+   * A simple test which verifies the 3 possible states when scanning across snapshot.
+   *
+   * @throws IOException
+   * @throws CloneNotSupportedException
+   */
+  @Test
+  public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException {
+    // we are going to the scanning across snapshot with two kvs
+    // kv1 should always be returned before kv2
+    final byte[] one = Bytes.toBytes(1);
+    final byte[] two = Bytes.toBytes(2);
+    final byte[] f = Bytes.toBytes("f");
+    final byte[] q = Bytes.toBytes("q");
+    final byte[] v = Bytes.toBytes(3);
+
+    final KeyValue kv1 = new KeyValue(one, f, q, 10, v);
+    final KeyValue kv2 = new KeyValue(two, f, q, 10, v);
+
+    // use case 1: both kvs in kvset
+    this.memstore.add(kv1.clone());
+    this.memstore.add(kv2.clone());
+    verifyScanAcrossSnapshot2(kv1, kv2);
+
+    // use case 2: both kvs in snapshot
+    this.memstore.snapshot();
+    verifyScanAcrossSnapshot2(kv1, kv2);
+
+    // use case 3: first in snapshot second in kvset
+    this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
+        CellComparator.COMPARATOR, store, regionServicesForStores);
+    this.memstore.add(kv1.clone());
+    // As compaction is starting in the background the repetition
+    // of the k1 might be removed BUT the scanners created earlier
+    // should look on the OLD MutableCellSetSegment, so this should be OK...
+    this.memstore.snapshot();
+    this.memstore.add(kv2.clone());
+    verifyScanAcrossSnapshot2(kv1,kv2);
+  }
+
+  /**
+   * Test memstore snapshots
+   * @throws IOException
+   */
+  @Test
+  public void testSnapshotting() throws IOException {
+    final int snapshotCount = 5;
+    // Add some rows, run a snapshot. Do it a few times.
+    for (int i = 0; i < snapshotCount; i++) {
+      addRows(this.memstore);
+      runSnapshot(this.memstore, true);
+      assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount());
+    }
+  }
+
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Get tests
+  //////////////////////////////////////////////////////////////////////////////
+
+  /** Test getNextRow from memstore
+   * @throws InterruptedException
+   */
+  @Test
+  public void testGetNextRow() throws Exception {
+    addRows(this.memstore);
+    // Add more versions to make it a little more interesting.
+    Thread.sleep(1);
+    addRows(this.memstore);
+    Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY);
+    assertTrue(KeyValue.COMPARATOR.compareRows(closestToEmpty,
+        new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0);
+    for (int i = 0; i < ROW_COUNT; i++) {
+      Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i),
+          System.currentTimeMillis()));
+      if (i + 1 == ROW_COUNT) {
+        assertEquals(nr, null);
+      } else {
+        assertTrue(KeyValue.COMPARATOR.compareRows(nr,
+            new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0);
+      }
+    }
+    //starting from each row, validate results should contain the starting row
+    Configuration conf = HBaseConfiguration.create();
+    for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
+      ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
+        KeepDeletedCells.FALSE, 0, this.memstore.getComparator());
+      ScanType scanType = ScanType.USER_SCAN;
+      InternalScanner scanner = new StoreScanner(new Scan(
+          Bytes.toBytes(startRowId)), scanInfo, scanType, null,
+          memstore.getScanners(0));
+      List<Cell> results = new ArrayList<Cell>();
+      for (int i = 0; scanner.next(results); i++) {
+        int rowId = startRowId + i;
+        Cell left = results.get(0);
+        byte[] row1 = Bytes.toBytes(rowId);
+        assertTrue("Row name",
+            CellComparator.COMPARATOR.compareRows(left, row1, 0, row1.length) == 0);
+        assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
+        List<Cell> row = new ArrayList<Cell>();
+        for (Cell kv : results) {
+          row.add(kv);
+        }
+        isExpectedRowWithoutTimestamps(rowId, row);
+        // Clear out set.  Otherwise row results accumulate.
+        results.clear();
+      }
+    }
+  }
+
+  @Test
+  public void testGet_memstoreAndSnapShot() throws IOException {
+    byte[] row = Bytes.toBytes("testrow");
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf1 = Bytes.toBytes("testqualifier1");
+    byte[] qf2 = Bytes.toBytes("testqualifier2");
+    byte[] qf3 = Bytes.toBytes("testqualifier3");
+    byte[] qf4 = Bytes.toBytes("testqualifier4");
+    byte[] qf5 = Bytes.toBytes("testqualifier5");
+    byte[] val = Bytes.toBytes("testval");
+
+    //Setting up memstore
+    memstore.add(new KeyValue(row, fam, qf1, val));
+    memstore.add(new KeyValue(row, fam, qf2, val));
+    memstore.add(new KeyValue(row, fam, qf3, val));
+    //Pushing to pipeline
+    ((CompactingMemStore)memstore).flushInMemory();
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    //Creating a snapshot
+    memstore.snapshot();
+    assertEquals(3, memstore.getSnapshot().getCellsCount());
+    //Adding value to "new" memstore
+    assertEquals(0, memstore.getActive().getCellsCount());
+    memstore.add(new KeyValue(row, fam, qf4, val));
+    memstore.add(new KeyValue(row, fam, qf5, val));
+    assertEquals(2, memstore.getActive().getCellsCount());
+  }
+
+
+  ////////////////////////////////////
+  //Test for upsert with MSLAB
+  ////////////////////////////////////
+
+  /**
+   * Test a pathological pattern that shows why we can't currently
+   * use the MSLAB for upsert workloads. This test inserts data
+   * in the following pattern:
+   *
+   * - row0001 through row1000 (fills up one 2M Chunk)
+   * - row0002 through row1001 (fills up another 2M chunk, leaves one reference
+   *   to the first chunk
+   * - row0003 through row1002 (another chunk, another dangling reference)
+   *
+   * This causes OOME pretty quickly if we use MSLAB for upsert
+   * since each 2M chunk is held onto by a single reference.
+   */
+  @Test
+  public void testUpsertMSLAB() throws Exception {
+
+    int ROW_SIZE = 2048;
+    byte[] qualifier = new byte[ROW_SIZE - 4];
+
+    MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
+    for (int i = 0; i < 3; i++) { System.gc(); }
+    long usageBefore = bean.getHeapMemoryUsage().getUsed();
+
+    long size = 0;
+    long ts=0;
+
+    for (int newValue = 0; newValue < 1000; newValue++) {
+      for (int row = newValue; row < newValue + 1000; row++) {
+        byte[] rowBytes = Bytes.toBytes(row);
+        size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts);
+      }
+    }
+    System.out.println("Wrote " + ts + " vals");
+    for (int i = 0; i < 3; i++) { System.gc(); }
+    long usageAfter = bean.getHeapMemoryUsage().getUsed();
+    System.out.println("Memory used: " + (usageAfter - usageBefore)
+        + " (heapsize: " + memstore.heapSize() +
+        " size: " + size + ")");
+  }
+
+  ////////////////////////////////////
+  // Test for periodic memstore flushes
+  // based on time of oldest edit
+  ////////////////////////////////////
+
+  /**
+   * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased
+   * as older keyvalues are deleted from the memstore.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testUpsertMemstoreSize() throws Exception {
+    long oldSize = memstore.size();
+
+    List<Cell> l = new ArrayList<Cell>();
+    KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
+    KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
+    KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
+
+    kv1.setSequenceId(1);
+    kv2.setSequenceId(1);
+    kv3.setSequenceId(1);
+    l.add(kv1);
+    l.add(kv2);
+    l.add(kv3);
+
+    this.memstore.upsert(l, 2);// readpoint is 2
+    long newSize = this.memstore.size();
+    assert (newSize > oldSize);
+    //The kv1 should be removed.
+    assert (memstore.getActive().getCellsCount() == 2);
+
+    KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
+    kv4.setSequenceId(1);
+    l.clear();
+    l.add(kv4);
+    this.memstore.upsert(l, 3);
+    assertEquals(newSize, this.memstore.size());
+    //The kv2 should be removed.
+    assert (memstore.getActive().getCellsCount() == 2);
+    //this.memstore = null;
+  }
+
+  /**
+   * Tests that the timeOfOldestEdit is updated correctly for the
+   * various edit operations in memstore.
+   * @throws Exception
+   */
+  @Test
+  public void testUpdateToTimeOfOldestEdit() throws Exception {
+    try {
+      EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
+      EnvironmentEdgeManager.injectEdge(edge);
+      long t = memstore.timeOfOldestEdit();
+      assertEquals(t, Long.MAX_VALUE);
+
+      // test the case that the timeOfOldestEdit is updated after a KV add
+      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+      t = memstore.timeOfOldestEdit();
+      assertTrue(t == 1234);
+      // The method will also assert
+      // the value is reset to Long.MAX_VALUE
+      t = runSnapshot(memstore, true);
+
+      // test the case that the timeOfOldestEdit is updated after a KV delete
+      memstore.delete(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+      t = memstore.timeOfOldestEdit();
+      assertTrue(t == 1234);
+     t = runSnapshot(memstore, true);
+
+      // test the case that the timeOfOldestEdit is updated after a KV upsert
+      List<Cell> l = new ArrayList<Cell>();
+      KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
+      kv1.setSequenceId(100);
+      l.add(kv1);
+      memstore.upsert(l, 1000);
+      t = memstore.timeOfOldestEdit();
+      assertTrue(t == 1234);
+    } finally {
+      EnvironmentEdgeManager.reset();
+    }
+  }
+
+  private long runSnapshot(final AbstractMemStore hmc, boolean useForce)
+      throws IOException {
+    // Save off old state.
+    long oldHistorySize = hmc.getSnapshot().getSize();
+    long prevTimeStamp = hmc.timeOfOldestEdit();
+
+    hmc.snapshot();
+    MemStoreSnapshot snapshot = hmc.snapshot();
+    if (useForce) {
+      // Make some assertions about what just happened.
+      assertTrue("History size has not increased", oldHistorySize < snapshot.getSize());
+      long t = hmc.timeOfOldestEdit();
+      assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
+      hmc.clearSnapshot(snapshot.getId());
+    } else {
+      long t = hmc.timeOfOldestEdit();
+      assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp);
+    }
+    return prevTimeStamp;
+  }
+
+  private void isExpectedRowWithoutTimestamps(final int rowIndex,
+      List<Cell> kvs) {
+    int i = 0;
+    for (Cell kv : kvs) {
+      byte[] expectedColname = makeQualifier(rowIndex, i++);
+      assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname));
+      // Value is column name as bytes.  Usually result is
+      // 100 bytes in size at least. This is the default size
+      // for BytesWriteable.  For comparison, convert bytes to
+      // String and trim to remove trailing null bytes.
+      assertTrue("Content", CellUtil.matchingValue(kv, expectedColname));
+    }
+  }
+
+  @Test
+  public void testPuttingBackChunksAfterFlushing() throws IOException {
+    byte[] row = Bytes.toBytes("testrow");
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf1 = Bytes.toBytes("testqualifier1");
+    byte[] qf2 = Bytes.toBytes("testqualifier2");
+    byte[] qf3 = Bytes.toBytes("testqualifier3");
+    byte[] qf4 = Bytes.toBytes("testqualifier4");
+    byte[] qf5 = Bytes.toBytes("testqualifier5");
+    byte[] val = Bytes.toBytes("testval");
+
+    // Setting up memstore
+    memstore.add(new KeyValue(row, fam, qf1, val));
+    memstore.add(new KeyValue(row, fam, qf2, val));
+    memstore.add(new KeyValue(row, fam, qf3, val));
+
+    // Creating a snapshot
+    MemStoreSnapshot snapshot = memstore.snapshot();
+    assertEquals(3, memstore.getSnapshot().getCellsCount());
+
+    // Adding value to "new" memstore
+    assertEquals(0, memstore.getActive().getCellsCount());
+    memstore.add(new KeyValue(row, fam, qf4, val));
+    memstore.add(new KeyValue(row, fam, qf5, val));
+    assertEquals(2, memstore.getActive().getCellsCount());
+    memstore.clearSnapshot(snapshot.getId());
+
+    int chunkCount = chunkPool.getPoolSize();
+    assertTrue(chunkCount > 0);
+
+  }
+
+  @Test
+  public void testPuttingBackChunksWithOpeningScanner()
+      throws IOException {
+    byte[] row = Bytes.toBytes("testrow");
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf1 = Bytes.toBytes("testqualifier1");
+    byte[] qf2 = Bytes.toBytes("testqualifier2");
+    byte[] qf3 = Bytes.toBytes("testqualifier3");
+    byte[] qf4 = Bytes.toBytes("testqualifier4");
+    byte[] qf5 = Bytes.toBytes("testqualifier5");
+    byte[] qf6 = Bytes.toBytes("testqualifier6");
+    byte[] qf7 = Bytes.toBytes("testqualifier7");
+    byte[] val = Bytes.toBytes("testval");
+
+    // Setting up memstore
+    memstore.add(new KeyValue(row, fam, qf1, val));
+    memstore.add(new KeyValue(row, fam, qf2, val));
+    memstore.add(new KeyValue(row, fam, qf3, val));
+
+    // Creating a snapshot
+    MemStoreSnapshot snapshot = memstore.snapshot();
+    assertEquals(3, memstore.getSnapshot().getCellsCount());
+
+    // Adding value to "new" memstore
+    assertEquals(0, memstore.getActive().getCellsCount());
+    memstore.add(new KeyValue(row, fam, qf4, val));
+    memstore.add(new KeyValue(row, fam, qf5, val));
+    assertEquals(2, memstore.getActive().getCellsCount());
+
+    // opening scanner before clear the snapshot
+    List<KeyValueScanner> scanners = memstore.getScanners(0);
+    // Shouldn't putting back the chunks to pool,since some scanners are opening
+    // based on their data
+    memstore.clearSnapshot(snapshot.getId());
+
+    assertTrue(chunkPool.getPoolSize() == 0);
+
+    // Chunks will be put back to pool after close scanners;
+    for (KeyValueScanner scanner : scanners) {
+      scanner.close();
+    }
+    assertTrue(chunkPool.getPoolSize() > 0);
+
+    // clear chunks
+    chunkPool.clearChunks();
+
+    // Creating another snapshot
+
+    snapshot = memstore.snapshot();
+    // Adding more value
+    memstore.add(new KeyValue(row, fam, qf6, val));
+    memstore.add(new KeyValue(row, fam, qf7, val));
+    // opening scanners
+    scanners = memstore.getScanners(0);
+    // close scanners before clear the snapshot
+    for (KeyValueScanner scanner : scanners) {
+      scanner.close();
+    }
+    // Since no opening scanner, the chunks of snapshot should be put back to
+    // pool
+    memstore.clearSnapshot(snapshot.getId());
+    assertTrue(chunkPool.getPoolSize() > 0);
+  }
+
+  @Test
+  public void testPuttingBackChunksWithOpeningPipelineScanner()
+      throws IOException {
+    byte[] row = Bytes.toBytes("testrow");
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf1 = Bytes.toBytes("testqualifier1");
+    byte[] qf2 = Bytes.toBytes("testqualifier2");
+    byte[] qf3 = Bytes.toBytes("testqualifier3");
+    byte[] val = Bytes.toBytes("testval");
+
+    // Setting up memstore
+    memstore.add(new KeyValue(row, fam, qf1, 1, val));
+    memstore.add(new KeyValue(row, fam, qf2, 1, val));
+    memstore.add(new KeyValue(row, fam, qf3, 1, val));
+
+    // Creating a pipeline
+    ((CompactingMemStore)memstore).disableCompaction();
+    ((CompactingMemStore)memstore).flushInMemory();
+
+    // Adding value to "new" memstore
+    assertEquals(0, memstore.getActive().getCellsCount());
+    memstore.add(new KeyValue(row, fam, qf1, 2, val));
+    memstore.add(new KeyValue(row, fam, qf2, 2, val));
+    assertEquals(2, memstore.getActive().getCellsCount());
+
+    // pipeline bucket 2
+    ((CompactingMemStore)memstore).flushInMemory();
+    // opening scanner before force flushing
+    List<KeyValueScanner> scanners = memstore.getScanners(0);
+    // Shouldn't putting back the chunks to pool,since some scanners are opening
+    // based on their data
+    ((CompactingMemStore)memstore).enableCompaction();
+    // trigger compaction
+    ((CompactingMemStore)memstore).flushInMemory();
+
+    // Adding value to "new" memstore
+    assertEquals(0, memstore.getActive().getCellsCount());
+    memstore.add(new KeyValue(row, fam, qf3, 3, val));
+    memstore.add(new KeyValue(row, fam, qf2, 3, val));
+    memstore.add(new KeyValue(row, fam, qf1, 3, val));
+    assertEquals(3, memstore.getActive().getCellsCount());
+
+    while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+
+    assertTrue(chunkPool.getPoolSize() == 0);
+
+    // Chunks will be put back to pool after close scanners;
+    for (KeyValueScanner scanner : scanners) {
+      scanner.close();
+    }
+    assertTrue(chunkPool.getPoolSize() > 0);
+
+    // clear chunks
+    chunkPool.clearChunks();
+
+    // Creating another snapshot
+
+    MemStoreSnapshot snapshot = memstore.snapshot();
+    memstore.clearSnapshot(snapshot.getId());
+
+    snapshot = memstore.snapshot();
+    // Adding more value
+    memstore.add(new KeyValue(row, fam, qf2, 4, val));
+    memstore.add(new KeyValue(row, fam, qf3, 4, val));
+    // opening scanners
+    scanners = memstore.getScanners(0);
+    // close scanners before clear the snapshot
+    for (KeyValueScanner scanner : scanners) {
+      scanner.close();
+    }
+    // Since no opening scanner, the chunks of snapshot should be put back to
+    // pool
+    memstore.clearSnapshot(snapshot.getId());
+    assertTrue(chunkPool.getPoolSize() > 0);
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // Compaction tests
+  //////////////////////////////////////////////////////////////////////////////
+  @Test
+  public void testCompaction1Bucket() throws IOException {
+
+    String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
+
+    // test 1 bucket
+    addRowsByKeys(memstore, keys1);
+    assertEquals(704, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    long size = memstore.getFlushableSize();
+    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
+    while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    assertEquals(528, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    size = memstore.getFlushableSize();
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    ImmutableSegment s = memstore.getSnapshot();
+    assertEquals(3, s.getCellsCount());
+    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
+  @Test
+  public void testCompaction2Buckets() throws IOException {
+
+    String[] keys1 = { "A", "A", "B", "C" };
+    String[] keys2 = { "A", "B", "D" };
+
+    addRowsByKeys(memstore, keys1);
+    assertEquals(704, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    long size = memstore.getFlushableSize();
+    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
+    while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(1000);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    assertEquals(528, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    addRowsByKeys(memstore, keys2);
+    assertEquals(1056, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    size = memstore.getFlushableSize();
+    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
+    while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    assertEquals(704, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    size = memstore.getFlushableSize();
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    ImmutableSegment s = memstore.getSnapshot();
+    assertEquals(4, s.getCellsCount());
+    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
+  @Test
+  public void testCompaction3Buckets() throws IOException {
+
+    String[] keys1 = { "A", "A", "B", "C" };
+    String[] keys2 = { "A", "B", "D" };
+    String[] keys3 = { "D", "B", "B" };
+
+    addRowsByKeys(memstore, keys1);
+    assertEquals(704, region.getMemstoreSize());
+
+    long size = memstore.getFlushableSize();
+    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
+
+    String tstStr = "\n\nFlushable size after first flush in memory:" + size
+        + ". Is MemmStore in compaction?:" + ((CompactingMemStore)memstore).isMemStoreFlushingInMemory();
+    while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    assertEquals(528, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    addRowsByKeys(memstore, keys2);
+
+    tstStr += " After adding second part of the keys. Memstore size: " +
+        region.getMemstoreSize() + ", Memstore Total Size: " +
+        regionServicesForStores.getGlobalMemstoreTotalSize() + "\n\n";
+
+    assertEquals(1056, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    ((CompactingMemStore)memstore).disableCompaction();
+    size = memstore.getFlushableSize();
+    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    assertEquals(1056, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    addRowsByKeys(memstore, keys3);
+    assertEquals(1584, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    ((CompactingMemStore)memstore).enableCompaction();
+    size = memstore.getFlushableSize();
+    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
+    while (((CompactingMemStore)memstore).isMemStoreFlushingInMemory()) {
+      Threads.sleep(10);
+    }
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    assertEquals(704, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    size = memstore.getFlushableSize();
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    ImmutableSegment s = memstore.getSnapshot();
+    assertEquals(4, s.getCellsCount());
+    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+
+    memstore.clearSnapshot(snapshot.getId());
+
+    //assertTrue(tstStr, false);
+  }
+
+  private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
+    byte[] fam = Bytes.toBytes("testfamily");
+    byte[] qf = Bytes.toBytes("testqualifier");
+    for (int i = 0; i < keys.length; i++) {
+      long timestamp = System.currentTimeMillis();
+      Threads.sleep(1); // to make sure each kv gets a different ts
+      byte[] row = Bytes.toBytes(keys[i]);
+      byte[] val = Bytes.toBytes(keys[i] + i);
+      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
+      hmc.add(kv);
+      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
+      long size = AbstractMemStore.heapSizeChange(kv, true);
+      regionServicesForStores.addAndGetGlobalMemstoreSize(size);
+    }
+  }
+
+  private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
+    long t = 1234;
+
+    @Override
+    public long currentTime() {
+            return t;
+        }
+      public void setCurrentTimeMillis(long t) {
+        this.t = t;
+      }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 5e6007d..0c4029d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -21,11 +21,11 @@ package org.apache.hadoop.hbase.regionserver;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
@@ -48,7 +48,17 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
@@ -61,27 +71,38 @@ import java.util.concurrent.atomic.AtomicReference;
 
 /** memstore test case */
 @Category({RegionServerTests.class, MediumTests.class})
-public class TestDefaultMemStore extends TestCase {
+public class TestDefaultMemStore {
   private static final Log LOG = LogFactory.getLog(TestDefaultMemStore.class);
-  private DefaultMemStore memstore;
-  private static final int ROW_COUNT = 10;
-  private static final int QUALIFIER_COUNT = ROW_COUNT;
-  private static final byte [] FAMILY = Bytes.toBytes("column");
-  private MultiVersionConcurrencyControl mvcc;
-  private AtomicLong startSeqNum = new AtomicLong(0);
-
-  @Override
+  @Rule public TestName name = new TestName();
+  @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
+    withLookingForStuckThread(true).build();
+  protected AbstractMemStore memstore;
+  protected static final int ROW_COUNT = 10;
+  protected static final int QUALIFIER_COUNT = ROW_COUNT;
+  protected static final byte[] FAMILY = Bytes.toBytes("column");
+  protected MultiVersionConcurrencyControl mvcc;
+  protected AtomicLong startSeqNum = new AtomicLong(0);
+
+  private String getName() {
+    return this.name.getMethodName();
+  }
+
+  @Before
   public void setUp() throws Exception {
-    super.setUp();
-    this.mvcc = new MultiVersionConcurrencyControl();
+    internalSetUp();
     this.memstore = new DefaultMemStore();
   }
 
+  protected void internalSetUp() throws Exception {
+    this.mvcc = new MultiVersionConcurrencyControl();
+  }
+
+  @Test
   public void testPutSameKey() {
-    byte [] bytes = Bytes.toBytes(getName());
+    byte[] bytes = Bytes.toBytes(getName());
     KeyValue kv = new KeyValue(bytes, bytes, bytes, bytes);
     this.memstore.add(kv);
-    byte [] other = Bytes.toBytes("somethingelse");
+    byte[] other = Bytes.toBytes("somethingelse");
     KeyValue samekey = new KeyValue(bytes, bytes, bytes, other);
     this.memstore.add(samekey);
     Cell found = this.memstore.getActive().first();
@@ -93,6 +114,7 @@ public class TestDefaultMemStore extends TestCase {
    * Test memstore snapshot happening while scanning.
    * @throws IOException
    */
+  @Test
   public void testScanAcrossSnapshot() throws IOException {
     int rowCount = addRows(this.memstore);
     List<KeyValueScanner> memstorescanners = this.memstore.getScanners(0);
@@ -180,6 +202,7 @@ public class TestDefaultMemStore extends TestCase {
    * @throws IOException
    * @throws CloneNotSupportedException
    */
+  @Test
   public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException {
     // we are going to the scanning across snapshot with two kvs
     // kv1 should always be returned before kv2
@@ -209,7 +232,7 @@ public class TestDefaultMemStore extends TestCase {
     verifyScanAcrossSnapshot2(kv1, kv2);
   }
 
-  private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2)
+  protected void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2)
       throws IOException {
     List<KeyValueScanner> memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
     assertEquals(1, memstorescanners.size());
@@ -220,7 +243,7 @@ public class TestDefaultMemStore extends TestCase {
     assertNull(scanner.next());
   }
 
-  private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected)
+  protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected)
       throws IOException {
     scanner.seek(KeyValueUtil.createFirstOnRow(new byte[]{}));
     List<Cell> returned = Lists.newArrayList();
@@ -238,6 +261,7 @@ public class TestDefaultMemStore extends TestCase {
     assertNull(scanner.peek());
   }
 
+  @Test
   public void testMemstoreConcurrentControl() throws IOException {
     final byte[] row = Bytes.toBytes(1);
     final byte[] f = Bytes.toBytes("family");
@@ -280,6 +304,7 @@ public class TestDefaultMemStore extends TestCase {
    * the same timestamp, we still need to provide consistent reads
    * for the same scanner.
    */
+  @Test
   public void testMemstoreEditsVisibilityWithSameKey() throws IOException {
     final byte[] row = Bytes.toBytes(1);
     final byte[] f = Bytes.toBytes("family");
@@ -334,6 +359,7 @@ public class TestDefaultMemStore extends TestCase {
    * the same timestamp, we still need to provide consistent reads
    * for the same scanner.
    */
+  @Test
   public void testMemstoreDeletesVisibilityWithSameKey() throws IOException {
     final byte[] row = Bytes.toBytes(1);
     final byte[] f = Bytes.toBytes("family");
@@ -438,6 +464,7 @@ public class TestDefaultMemStore extends TestCase {
     }
   }
 
+  @Test
   public void testReadOwnWritesUnderConcurrency() throws Throwable {
 
     int NUM_THREADS = 8;
@@ -463,6 +490,7 @@ public class TestDefaultMemStore extends TestCase {
    * Test memstore snapshots
    * @throws IOException
    */
+  @Test
   public void testSnapshotting() throws IOException {
     final int snapshotCount = 5;
     // Add some rows, run a snapshot. Do it a few times.
@@ -473,6 +501,7 @@ public class TestDefaultMemStore extends TestCase {
     }
   }
 
+  @Test
   public void testMultipleVersionsSimple() throws Exception {
     DefaultMemStore m = new DefaultMemStore(new Configuration(), CellComparator.COMPARATOR);
     byte [] row = Bytes.toBytes("testRow");
@@ -500,53 +529,56 @@ public class TestDefaultMemStore extends TestCase {
   /** Test getNextRow from memstore
    * @throws InterruptedException
    */
+  @Test
   public void testGetNextRow() throws Exception {
     addRows(this.memstore);
     // Add more versions to make it a little more interesting.
     Thread.sleep(1);
     addRows(this.memstore);
-    Cell closestToEmpty = this.memstore.getNextRow(KeyValue.LOWESTKEY);
+    Cell closestToEmpty = ((DefaultMemStore) this.memstore).getNextRow(KeyValue.LOWESTKEY);
     assertTrue(CellComparator.COMPARATOR.compareRows(closestToEmpty,
-      new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0);
+        new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0);
     for (int i = 0; i < ROW_COUNT; i++) {
-      Cell nr = this.memstore.getNextRow(new KeyValue(Bytes.toBytes(i),
-        System.currentTimeMillis()));
+      Cell nr = ((DefaultMemStore) this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i),
+          System.currentTimeMillis()));
       if (i + 1 == ROW_COUNT) {
         assertEquals(nr, null);
       } else {
         assertTrue(CellComparator.COMPARATOR.compareRows(nr,
-          new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0);
+            new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0);
       }
     }
     //starting from each row, validate results should contain the starting row
     Configuration conf = HBaseConfiguration.create();
     for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
       ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
-        KeepDeletedCells.FALSE, 0, this.memstore.getComparator());
+          KeepDeletedCells.FALSE, 0, this.memstore.getComparator());
       ScanType scanType = ScanType.USER_SCAN;
-      InternalScanner scanner = new StoreScanner(new Scan(
+      try (InternalScanner scanner = new StoreScanner(new Scan(
           Bytes.toBytes(startRowId)), scanInfo, scanType, null,
-          memstore.getScanners(0));
-      List<Cell> results = new ArrayList<Cell>();
-      for (int i = 0; scanner.next(results); i++) {
-        int rowId = startRowId + i;
-        Cell left = results.get(0);
-        byte[] row1 = Bytes.toBytes(rowId);
-        assertTrue(
-            "Row name",
-            CellComparator.COMPARATOR.compareRows(left, row1, 0, row1.length) == 0);
-        assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
-        List<Cell> row = new ArrayList<Cell>();
-        for (Cell kv : results) {
-          row.add(kv);
+          memstore.getScanners(0))) {
+        List<Cell> results = new ArrayList<Cell>();
+        for (int i = 0; scanner.next(results); i++) {
+          int rowId = startRowId + i;
+          Cell left = results.get(0);
+          byte[] row1 = Bytes.toBytes(rowId);
+          assertTrue(
+              "Row name",
+              CellComparator.COMPARATOR.compareRows(left, row1, 0, row1.length) == 0);
+          assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
+          List<Cell> row = new ArrayList<Cell>();
+          for (Cell kv : results) {
+            row.add(kv);
+          }
+          isExpectedRowWithoutTimestamps(rowId, row);
+          // Clear out set.  Otherwise row results accumulate.
+          results.clear();
         }
-        isExpectedRowWithoutTimestamps(rowId, row);
-        // Clear out set.  Otherwise row results accumulate.
-        results.clear();
       }
     }
   }
 
+  @Test
   public void testGet_memstoreAndSnapShot() throws IOException {
     byte [] row = Bytes.toBytes("testrow");
     byte [] fam = Bytes.toBytes("testfamily");
@@ -558,9 +590,9 @@ public class TestDefaultMemStore extends TestCase {
     byte [] val = Bytes.toBytes("testval");
 
     //Setting up memstore
-    memstore.add(new KeyValue(row, fam ,qf1, val));
-    memstore.add(new KeyValue(row, fam ,qf2, val));
-    memstore.add(new KeyValue(row, fam ,qf3, val));
+    memstore.add(new KeyValue(row, fam, qf1, val));
+    memstore.add(new KeyValue(row, fam, qf2, val));
+    memstore.add(new KeyValue(row, fam, qf3, val));
     //Creating a snapshot
     memstore.snapshot();
     assertEquals(3, memstore.getSnapshot().getCellsCount());
@@ -574,6 +606,7 @@ public class TestDefaultMemStore extends TestCase {
   //////////////////////////////////////////////////////////////////////////////
   // Delete tests
   //////////////////////////////////////////////////////////////////////////////
+  @Test
   public void testGetWithDelete() throws IOException {
     byte [] row = Bytes.toBytes("testrow");
     byte [] fam = Bytes.toBytes("testfamily");
@@ -584,7 +617,7 @@ public class TestDefaultMemStore extends TestCase {
     KeyValue put1 = new KeyValue(row, fam, qf1, ts1, val);
     long ts2 = ts1 + 1;
     KeyValue put2 = new KeyValue(row, fam, qf1, ts2, val);
-    long ts3 = ts2 +1;
+    long ts3 = ts2 + 1;
     KeyValue put3 = new KeyValue(row, fam, qf1, ts3, val);
     memstore.add(put1);
     memstore.add(put2);
@@ -608,6 +641,7 @@ public class TestDefaultMemStore extends TestCase {
     }
   }
 
+  @Test
   public void testGetWithDeleteColumn() throws IOException {
     byte [] row = Bytes.toBytes("testrow");
     byte [] fam = Bytes.toBytes("testfamily");
@@ -618,7 +652,7 @@ public class TestDefaultMemStore extends TestCase {
     KeyValue put1 = new KeyValue(row, fam, qf1, ts1, val);
     long ts2 = ts1 + 1;
     KeyValue put2 = new KeyValue(row, fam, qf1, ts2, val);
-    long ts3 = ts2 +1;
+    long ts3 = ts2 + 1;
     KeyValue put3 = new KeyValue(row, fam, qf1, ts3, val);
     memstore.add(put1);
     memstore.add(put2);
@@ -636,15 +670,14 @@ public class TestDefaultMemStore extends TestCase {
     expected.add(put2);
     expected.add(put1);
 
-
     assertEquals(4, memstore.getActive().getCellsCount());
     int i = 0;
-    for (Cell cell: memstore.getActive().getCellSet()) {
+    for (Cell cell : memstore.getActive().getCellSet()) {
       assertEquals(expected.get(i++), cell);
     }
   }
 
-
+  @Test
   public void testGetWithDeleteFamily() throws IOException {
     byte [] row = Bytes.toBytes("testrow");
     byte [] fam = Bytes.toBytes("testfamily");
@@ -675,15 +708,14 @@ public class TestDefaultMemStore extends TestCase {
     expected.add(put4);
     expected.add(put3);
 
-
-
     assertEquals(5, memstore.getActive().getCellsCount());
     int i = 0;
-    for (Cell cell: memstore.getActive().getCellSet()) {
+    for (Cell cell : memstore.getActive().getCellSet()) {
       assertEquals(expected.get(i++), cell);
     }
   }
 
+  @Test
   public void testKeepDeleteInmemstore() {
     byte [] row = Bytes.toBytes("testrow");
     byte [] fam = Bytes.toBytes("testfamily");
@@ -697,6 +729,7 @@ public class TestDefaultMemStore extends TestCase {
     assertEquals(delete, memstore.getActive().first());
   }
 
+  @Test
   public void testRetainsDeleteVersion() throws IOException {
     // add a put to memstore
     memstore.add(KeyValueTestUtil.create("row1", "fam", "a", 100, "dont-care"));
@@ -709,6 +742,8 @@ public class TestDefaultMemStore extends TestCase {
     assertEquals(2, memstore.getActive().getCellsCount());
     assertEquals(delete, memstore.getActive().first());
   }
+
+  @Test
   public void testRetainsDeleteColumn() throws IOException {
     // add a put to memstore
     memstore.add(KeyValueTestUtil.create("row1", "fam", "a", 100, "dont-care"));
@@ -721,6 +756,8 @@ public class TestDefaultMemStore extends TestCase {
     assertEquals(2, memstore.getActive().getCellsCount());
     assertEquals(delete, memstore.getActive().first());
   }
+
+  @Test
   public void testRetainsDeleteFamily() throws IOException {
     // add a put to memstore
     memstore.add(KeyValueTestUtil.create("row1", "fam", "a", 100, "dont-care"));
@@ -751,6 +788,7 @@ public class TestDefaultMemStore extends TestCase {
    * This causes OOME pretty quickly if we use MSLAB for upsert
    * since each 2M chunk is held onto by a single reference.
    */
+  @Test
   public void testUpsertMSLAB() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true);
@@ -793,6 +831,7 @@ public class TestDefaultMemStore extends TestCase {
    * as older keyvalues are deleted from the memstore.
    * @throws Exception
    */
+  @Test
   public void testUpsertMemstoreSize() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR);
@@ -832,6 +871,7 @@ public class TestDefaultMemStore extends TestCase {
    * various edit operations in memstore.
    * @throws Exception
    */
+  @Test
   public void testUpdateToTimeOfOldestEdit() throws Exception {
     try {
       EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
@@ -874,6 +914,7 @@ public class TestDefaultMemStore extends TestCase {
    * false.
    * @throws Exception
    */
+  @Test
   public void testShouldFlush() throws Exception {
     Configuration conf = new Configuration();
     conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
@@ -883,7 +924,7 @@ public class TestDefaultMemStore extends TestCase {
     checkShouldFlush(conf, false);
   }
 
-  private void checkShouldFlush(Configuration conf, boolean expected) throws Exception {
+  protected void checkShouldFlush(Configuration conf, boolean expected) throws Exception {
     try {
       EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
       EnvironmentEdgeManager.injectEdge(edge);
@@ -898,7 +939,7 @@ public class TestDefaultMemStore extends TestCase {
       s.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
       edge.setCurrentTimeMillis(1234 + 100);
       StringBuffer sb = new StringBuffer();
-      assertTrue(region.shouldFlush(sb) == false);
+      assertTrue(!region.shouldFlush(sb));
       edge.setCurrentTimeMillis(1234 + 10000);
       assertTrue(region.shouldFlush(sb) == expected);
     } finally {
@@ -906,6 +947,7 @@ public class TestDefaultMemStore extends TestCase {
     }
   }
 
+  @Test
   public void testShouldFlushMeta() throws Exception {
     // write an edit in the META and ensure the shouldFlush (that the periodic memstore
     // flusher invokes) returns true after SYSTEM_CACHE_FLUSH_INTERVAL (even though
@@ -954,7 +996,7 @@ public class TestDefaultMemStore extends TestCase {
    * @return How many rows we added.
    * @throws IOException
    */
-  private int addRows(final MemStore hmc) {
+  protected int addRows(final AbstractMemStore hmc) {
     return addRows(hmc, HConstants.LATEST_TIMESTAMP);
   }
 
@@ -964,10 +1006,10 @@ public class TestDefaultMemStore extends TestCase {
    * @return How many rows we added.
    * @throws IOException
    */
-  private int addRows(final MemStore hmc, final long ts) {
+  protected int addRows(final MemStore hmc, final long ts) {
     for (int i = 0; i < ROW_COUNT; i++) {
-      long timestamp = ts == HConstants.LATEST_TIMESTAMP?
-        System.currentTimeMillis(): ts;
+      long timestamp = ts == HConstants.LATEST_TIMESTAMP ?
+        System.currentTimeMillis() : ts;
       for (int ii = 0; ii < QUALIFIER_COUNT; ii++) {
         byte [] row = Bytes.toBytes(i);
         byte [] qf = makeQualifier(i, ii);
@@ -977,7 +1019,7 @@ public class TestDefaultMemStore extends TestCase {
     return ROW_COUNT;
   }
 
-  private long runSnapshot(final DefaultMemStore hmc) throws UnexpectedStateException {
+  private long runSnapshot(final AbstractMemStore hmc) throws UnexpectedStateException {
     // Save off old state.
     int oldHistorySize = hmc.getSnapshot().getCellsCount();
     MemStoreSnapshot snapshot = hmc.snapshot();
@@ -993,7 +1035,7 @@ public class TestDefaultMemStore extends TestCase {
   private void isExpectedRowWithoutTimestamps(final int rowIndex,
       List<Cell> kvs) {
     int i = 0;
-    for (Cell kv: kvs) {
+    for (Cell kv : kvs) {
       byte[] expectedColname = makeQualifier(rowIndex, i++);
       assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname));
       // Value is column name as bytes.  Usually result is
@@ -1023,7 +1065,6 @@ public class TestDefaultMemStore extends TestCase {
     }
   }
 
-
   static void doScan(MemStore ms, int iteration) throws IOException {
     long nanos = System.nanoTime();
     KeyValueScanner s = ms.getScanners(0).get(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 8cc04f7..31aebb7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -18,35 +18,10 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
-import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.security.PrivilegedExceptionAction;
@@ -180,6 +155,30 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 /**
  * Basic stand-alone testing of HRegion.  No clusters!
  *
@@ -199,7 +198,7 @@ public class TestHRegion {
 
   HRegion region = null;
   // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
-  private static HBaseTestingUtility TEST_UTIL;
+  protected static HBaseTestingUtility TEST_UTIL;
   public static Configuration CONF ;
   private String dir;
   private static FileSystem FILESYSTEM;
@@ -2449,7 +2448,7 @@ public class TestHRegion {
       // extract the key values out the memstore:
       // This is kinda hacky, but better than nothing...
       long now = System.currentTimeMillis();
-      DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore;
+      AbstractMemStore memstore = (AbstractMemStore)((HStore) region.getStore(fam1)).memstore;
       Cell firstCell = memstore.getActive().first();
       assertTrue(firstCell.getTimestamp() <= now);
       now = firstCell.getTimestamp();
@@ -5145,7 +5144,7 @@ public class TestHRegion {
    * @return A region on which you must call
    *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
    */
-  private static HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
+  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
       byte[]... families) throws IOException {
     return initHRegion(tableName, null, null, callingMethod, conf, false, families);
   }
@@ -5154,12 +5153,12 @@ public class TestHRegion {
    * @return A region on which you must call
    *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
    */
-  private static HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
+  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
       boolean isReadOnly, byte[]... families) throws IOException {
     return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
   }
 
-  public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
+  protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
       String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
       throws IOException {
     Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
@@ -5173,7 +5172,7 @@ public class TestHRegion {
    * @return A region on which you must call
    *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
    */
-  public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
+  public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
       String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
       WAL wal, byte[]... families) throws IOException {
     return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey,
@@ -6604,7 +6603,7 @@ public class TestHRegion {
       qual2, 0, qual2.length));
   }
 
-  static HRegion initHRegion(TableName tableName, String callingMethod,
+  HRegion initHRegion(TableName tableName, String callingMethod,
       byte[]... families) throws IOException {
     return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
         families);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
new file mode 100644
index 0000000..e9c6b6f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.experimental.categories.Category;
+
+/**
+ * A test similar to TestHRegion, but with in-memory flush families.
+ * Also checks wal truncation after in-memory compaction.
+ */
+@Category({VerySlowRegionServerTests.class, LargeTests.class})
+@SuppressWarnings("deprecation")
+public class TestHRegionWithInMemoryFlush extends TestHRegion{
+  // Do not spin up clusters in here. If you need to spin up a cluster, do it
+  // over in TestHRegionOnCluster.
+  private static final Log LOG = LogFactory.getLog(TestHRegionWithInMemoryFlush.class);
+
+  /**
+   * @return A region on which you must call
+   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
+   */
+  @Override
+  public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
+      String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
+      WAL wal, byte[]... families) throws IOException {
+    boolean[] inMemory = new boolean[families.length];
+    for(int i = 0; i < inMemory.length; i++) {
+      inMemory[i] = true;
+    }
+    return TEST_UTIL.createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey,
+        isReadOnly, durability, wal, inMemory, families);
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index 848b678..1615b99 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -17,16 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
+import com.google.common.hash.Hashing;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -59,7 +50,15 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * This test verifies the correctness of the Per Column Family flushing strategy
@@ -127,7 +126,7 @@ public class TestPerColumnFamilyFlush {
     // Set up the configuration
     Configuration conf = HBaseConfiguration.create();
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
-    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
     conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
       100 * 1024);
     // Intialize the region
@@ -336,7 +335,7 @@ public class TestPerColumnFamilyFlush {
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000);
     // Carefully chosen limits so that the memstore just flushes when we're done
-    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
     conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 10000);
     final int numRegionServers = 4;
     try {
@@ -450,7 +449,7 @@ public class TestPerColumnFamilyFlush {
     TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
-    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
     long cfFlushSizeLowerBound = 2048;
     conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
       cfFlushSizeLowerBound);
@@ -607,7 +606,7 @@ public class TestPerColumnFamilyFlush {
     }
 
     LOG.info("==============Test with selective flush enabled===============");
-    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
+    conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
     // default value of per-cf flush lower bound is too big, set to a small enough value
     conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0);
     try {


Mime
View raw message