hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject [1/2] hbase git commit: HBASE-14970 Backport HBASE-13082 and its sub-jira to branch-1 (Ram)
Date Thu, 21 Jan 2016 15:54:12 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 cea43788f -> 58521869b


http://git-wip-us.apache.org/repos/asf/hbase/blob/58521869/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
index 0b4cbd2..4ee765f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.*;
 
+import java.io.IOException;
 import java.security.Key;
 import java.security.SecureRandom;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import javax.crypto.spec.SecretKeySpec;
@@ -122,13 +124,14 @@ public class TestEncryptionKeyRotation {
 
     // And major compact
     TEST_UTIL.getHBaseAdmin().majorCompact(htd.getTableName());
+    final List<Path> updatePaths = findCompactedStorefilePaths(htd.getTableName());
     TEST_UTIL.waitFor(30000, 1000, true, new Predicate<Exception>() {
       @Override
       public boolean evaluate() throws Exception {
         // When compaction has finished, all of the original files will be
         // gone
         boolean found = false;
-        for (Path path: initialPaths) {
+        for (Path path: updatePaths) {
           found = TEST_UTIL.getTestFileSystem().exists(path);
           if (found) {
             LOG.info("Found " + path);
@@ -140,14 +143,20 @@ public class TestEncryptionKeyRotation {
     });
 
     // Verify we have store file(s) with only the new key
+    Thread.sleep(1000);
+    waitForCompaction(htd.getTableName());
     List<Path> pathsAfterCompaction = findStorefilePaths(htd.getTableName());
     assertTrue(pathsAfterCompaction.size() > 0);
     for (Path path: pathsAfterCompaction) {
-      assertFalse("Store file " + path + " retains initial key",
-        Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path)));
       assertTrue("Store file " + path + " has incorrect key",
         Bytes.equals(secondCFKey.getEncoded(), extractHFileKey(path)));
     }
+    List<Path> compactedPaths = findCompactedStorefilePaths(htd.getTableName());
+    assertTrue(compactedPaths.size() > 0);
+    for (Path path: compactedPaths) {
+      assertTrue("Store file " + path + " retains initial key",
+        Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path)));
+    }
   }
 
   @Test
@@ -193,6 +202,33 @@ public class TestEncryptionKeyRotation {
     }
   }
 
+  private static void waitForCompaction(TableName tableName)
+      throws IOException, InterruptedException {
+    boolean compacted = false;
+    for (Region region : TEST_UTIL.getRSForFirstRegionInTable(tableName)
+        .getOnlineRegions(tableName)) {
+      for (Store store : region.getStores()) {
+        compacted = false;
+        while (!compacted) {
+          if (store.getStorefiles() != null) {
+            while (store.getStorefilesCount() != 1) {
+              Thread.sleep(100);
+            }
+            for (StoreFile storefile : store.getStorefiles()) {
+              if (!storefile.isCompactedAway()) {
+                compacted = true;
+                break;
+              }
+              Thread.sleep(100);
+            }
+          } else {
+            break;
+          }
+        }
+      }
+    }
+  }
+  
   private static List<Path> findStorefilePaths(TableName tableName) throws Exception
{
     List<Path> paths = new ArrayList<Path>();
     for (Region region:
@@ -206,6 +242,23 @@ public class TestEncryptionKeyRotation {
     return paths;
   }
 
+  private static List<Path> findCompactedStorefilePaths(TableName tableName) throws
Exception {
+    List<Path> paths = new ArrayList<Path>();
+    for (Region region:
+        TEST_UTIL.getRSForFirstRegionInTable(tableName).getOnlineRegions(tableName)) {
+      for (Store store : region.getStores()) {
+        Collection<StoreFile> compactedfiles =
+            ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+        if (compactedfiles != null) {
+          for (StoreFile storefile : compactedfiles) {
+            paths.add(storefile.getPath());
+          }
+        }
+      }
+    }
+    return paths;
+  }
+
   private void createTableAndFlush(HTableDescriptor htd) throws Exception {
     HColumnDescriptor hcd = htd.getFamilies().iterator().next();
     // Create the test table

http://git-wip-us.apache.org/repos/asf/hbase/blob/58521869/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 78b5b31..851f5ed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -166,9 +167,17 @@ public class TestHRegionReplayEvents {
     when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
     when(rss.getConfiguration()).thenReturn(CONF);
     when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting());
-
+    String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
+        .toString();
+    ExecutorService es = new ExecutorService(string);
+    es.startExecutorService(
+      string+"-"+string, 1);
+    when(rss.getExecutorService()).thenReturn(es);
     primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
     primaryRegion.close();
+    List<Region> regions = new ArrayList<Region>();
+    regions.add(primaryRegion);
+    when(rss.getOnlineRegions()).thenReturn(regions);
 
     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss,
null);
     secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
@@ -1369,6 +1378,11 @@ public class TestHRegionReplayEvents {
 
     // Test case 3: compact primary files
     primaryRegion.compactStores();
+    List<Region> regions = new ArrayList<Region>();
+    regions.add(primaryRegion);
+    when(rss.getOnlineRegions()).thenReturn(regions);
+    CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
+    cleaner.chore();
     secondaryRegion.refreshStoreFiles();
     assertPathListsEqual(primaryRegion.getStoreFileList(families),
       secondaryRegion.getStoreFileList(families));

http://git-wip-us.apache.org/repos/asf/hbase/blob/58521869/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index 0ae5851..3eb0a55 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.hadoop.util.StringUtils;
@@ -180,7 +182,7 @@ public class TestRegionMergeTransactionOnCluster {
       List<Pair<HRegionInfo, ServerName>> tableRegions = MetaTableAccessor
           .getTableRegionsAndLocations(master.getZooKeeper(), master.getConnection(), tableName);
       HRegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
-      HTableDescriptor tableDescritor = master.getTableDescriptors().get(
+      HTableDescriptor tableDescriptor = master.getTableDescriptors().get(
           tableName);
       Result mergedRegionResult = MetaTableAccessor.getRegionResult(
         master.getConnection(), mergedRegionInfo.getRegionName());
@@ -205,19 +207,46 @@ public class TestRegionMergeTransactionOnCluster {
       assertTrue(fs.exists(regionAdir));
       assertTrue(fs.exists(regionBdir));
 
+      HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
+      HRegionFileSystem hrfs = new HRegionFileSystem(
+        TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
+      int count = 0;
+      for(HColumnDescriptor colFamily : columnFamilies) {
+        count += hrfs.getStoreFiles(colFamily.getName()).size();
+      }
       admin.compactRegion(mergedRegionInfo.getRegionName());
-      // wait until merged region doesn't have reference file
+      // clean up the merged region store files
+      // wait until merged region have reference file
       long timeout = System.currentTimeMillis() + waitTime;
-      HRegionFileSystem hrfs = new HRegionFileSystem(
-          TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
+      int newcount = 0;
       while (System.currentTimeMillis() < timeout) {
-        if (!hrfs.hasReferences(tableDescritor)) {
+        for(HColumnDescriptor colFamily : columnFamilies) {
+          newcount += hrfs.getStoreFiles(colFamily.getName()).size();
+        }
+        if(newcount > count) {
+          break;
+        }
+        Thread.sleep(50);
+      }
+      assertTrue(newcount > count);
+      List<RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster()
+          .getRegionServerThreads();
+      for (RegionServerThread rs : regionServerThreads) {
+        CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null,
+            rs.getRegionServer(), false);
+        cleaner.chore();
+        Thread.sleep(1000);
+      }
+      int newcount1 = 0;
+      while (System.currentTimeMillis() < timeout) {
+        for(HColumnDescriptor colFamily : columnFamilies) {
+          newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
+        }
+        if(newcount1 <= 1) {
           break;
         }
         Thread.sleep(50);
       }
-      assertFalse(hrfs.hasReferences(tableDescritor));
-
       // run CatalogJanitor to clean merge references in hbase:meta and archive the
       // files of merging regions
       int cleaned = admin.runCatalogScan();

http://git-wip-us.apache.org/repos/asf/hbase/blob/58521869/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
index 654135e..6693ca5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*;
 import java.io.IOException;
+import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -254,6 +256,7 @@ public class TestRegionReplicas {
       LOG.info("Flushing primary region");
       Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
       region.flush(true);
+      HRegion primaryRegion = (HRegion) region;
 
       // ensure that chore is run
       LOG.info("Sleeping for " + (4 * refreshPeriod));
@@ -283,7 +286,7 @@ public class TestRegionReplicas {
       assertGetRpc(hriSecondary, 1042, true);
       assertGetRpc(hriSecondary, 2042, true);
 
-      // ensure that we are see the 3 store files
+      // ensure that we see the 3 store files
       Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
 
       // force compaction
@@ -298,7 +301,8 @@ public class TestRegionReplicas {
       }
 
       // ensure that we see the compacted file only
-      Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
+      // This will be 4 until the cleaner chore runs
+      Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount());
 
     } finally {
       HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
@@ -457,7 +461,19 @@ public class TestRegionReplicas {
       LOG.info("Force Major compaction on primary region " + hriPrimary);
       primaryRegion.compact(true);
       Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
-
+      List<RegionServerThread> regionServerThreads = HTU.getMiniHBaseCluster()
+          .getRegionServerThreads();
+      HRegionServer hrs = null;
+      for (RegionServerThread rs : regionServerThreads) {
+        if (rs.getRegionServer()
+            .getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) != null) {
+          hrs = rs.getRegionServer();
+          break;
+        }
+      }
+      CompactedHFilesDischarger cleaner =
+          new CompactedHFilesDischarger(100, null, hrs, false);
+      cleaner.chore();
       // scan all the hfiles on the secondary.
       // since there are no read on the secondary when we ask locations to
       // the NN a FileNotFound exception will be returned and the FileLink

http://git-wip-us.apache.org/repos/asf/hbase/blob/58521869/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 6c3f946..72899a3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -1027,6 +1027,18 @@ public class TestStore {
     store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
   }
 
+  private void closeCompactedFile(int index) throws IOException {
+    Collection<StoreFile> files =
+        this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
+    StoreFile sf = null;
+    Iterator<StoreFile> it = files.iterator();
+    for (int i = 0; i <= index; i++) {
+      sf = it.next();
+    }
+    sf.closeReader(true);
+    store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
+  }
+
   @Test
   public void testRefreshStoreFiles() throws Exception {
     init(name.getMethodName());
@@ -1054,6 +1066,7 @@ public class TestStore {
     store.refreshStoreFiles();
     assertEquals(5, this.store.getStorefilesCount());
 
+    closeCompactedFile(0);
     archiveStoreFile(0);
 
     assertEquals(5, this.store.getStorefilesCount());

http://git-wip-us.apache.org/repos/asf/hbase/blob/58521869/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
index f59524f..ba4ad3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
@@ -452,9 +452,9 @@ public class TestStoreScanner extends TestCase {
     // normally cause an NPE because scan.store is null.  So as long as we get through these
     // two calls we are good and the bug was quashed.
 
-    scan.updateReaders();
+    scan.updateReaders(new ArrayList<StoreFile>());
 
-    scan.updateReaders();
+    scan.updateReaders(new ArrayList<StoreFile>());
 
     scan.peek();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58521869/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
index 36a726d..a3d5631 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
@@ -137,6 +137,7 @@ public class TestStripeStoreFileManager {
     MockStoreFile stripe0a = createFile(0, 100, OPEN_KEY, KEY_B),
         stripe1 = createFile(KEY_B, OPEN_KEY);
     manager.addCompactionResults(al(l0File), al(stripe0a, stripe1));
+    manager.removeCompactedFiles(al(l0File));
     // If we want a key <= KEY_A, we should get everything except stripe1.
     ArrayList<StoreFile> sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_A));
     assertEquals(2, sfsDump.size());
@@ -162,6 +163,7 @@ public class TestStripeStoreFileManager {
     // a candidate from the first file, the old one should not be removed.
     StoreFile stripe0b = createFile(0, 101, OPEN_KEY, KEY_B);
     manager.addCompactionResults(al(l0File2), al(stripe0b));
+    manager.removeCompactedFiles(al(l0File2));
     sfs = manager.getCandidateFilesForRowKeyBefore(KV_A);
     assertEquals(stripe0b, sfs.next());
     sfs.remove();
@@ -350,10 +352,12 @@ public class TestStripeStoreFileManager {
     // Here, [B, C] is logically [B, inf), so we should be able to compact it to that only.
     verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
     manager.addCompactionResults(al(sf), al(createFile(KEY_B, OPEN_KEY)));
+    manager.removeCompactedFiles(al(sf));
     // Do the same for other variants.
     manager = createManager(al(sf, createFile(KEY_C, OPEN_KEY)));
     verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
     manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, KEY_C)));
+    manager.removeCompactedFiles(al(sf));
     manager = createManager(al(sf));
     verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
     manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, OPEN_KEY)));
@@ -379,6 +383,7 @@ public class TestStripeStoreFileManager {
     StoreFile sf_B2C_0 = createFile(KEY_B, KEY_C);
     StoreFile sf_C2i_0 = createFile(KEY_C, OPEN_KEY);
     manager.addCompactionResults(al(sf_L0_0a), al(sf_i2B_0, sf_B2C_0, sf_C2i_0));
+    manager.removeCompactedFiles(al(sf_L0_0a));
     verifyAllFiles(manager, al(sf_L0_0b, sf_i2B_0, sf_B2C_0, sf_C2i_0));
 
     // Add another l0 file, "compact" both L0 into two stripes
@@ -387,51 +392,61 @@ public class TestStripeStoreFileManager {
     StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C);
     manager.insertNewFiles(al(sf_L0_1));
     manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1));
+    manager.removeCompactedFiles(al(sf_L0_0b, sf_L0_1));
     verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1));
 
     // Try compacting with invalid file (no metadata) - should add files to L0.
     StoreFile sf_L0_2 = createFile(null, null);
     manager.addCompactionResults(al(), al(sf_L0_2));
+    manager.removeCompactedFiles(al());
     verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1, sf_L0_2));
     // Remove it...
     manager.addCompactionResults(al(sf_L0_2), al());
+    manager.removeCompactedFiles(al(sf_L0_2));
 
     // Do regular compaction in the first stripe.
     StoreFile sf_i2B_3 = createFile(OPEN_KEY, KEY_B);
     manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3));
+    manager.removeCompactedFiles(al(sf_i2B_0, sf_i2B_1));
     verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3));
 
     // Rebalance two stripes.
     StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D);
     StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY);
     manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4));
+    manager.removeCompactedFiles(al(sf_B2C_0, sf_C2i_0, sf_B2C_1));
     verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4));
 
     // Split the first stripe.
     StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A);
     StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B);
     manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5));
+    manager.removeCompactedFiles(al(sf_i2B_3));
     verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5));
 
     // Split the middle stripe.
     StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C);
     StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D);
     manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6));
+    manager.removeCompactedFiles(al(sf_B2D_4));
     verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6));
 
     // Merge two different middle stripes.
     StoreFile sf_A2C_7 = createFile(KEY_A, KEY_C);
     manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7));
+    manager.removeCompactedFiles(al(sf_A2B_5, sf_B2C_6));
     verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7));
 
     // Merge lower half.
     StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C);
     manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8));
+    manager.removeCompactedFiles(al(sf_i2A_5, sf_A2C_7));
     verifyAllFiles(manager, al(sf_D2i_4, sf_C2D_6, sf_i2C_8));
 
     // Merge all.
     StoreFile sf_i2i_9 = createFile(OPEN_KEY, OPEN_KEY);
     manager.addCompactionResults(al(sf_D2i_4, sf_C2D_6, sf_i2C_8), al(sf_i2i_9));
+    manager.removeCompactedFiles(al(sf_D2i_4, sf_C2D_6, sf_i2C_8));
     verifyAllFiles(manager, al(sf_i2i_9));
   }
 
@@ -451,12 +466,14 @@ public class TestStripeStoreFileManager {
     verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i, sf_c2i);
     // Remove these files.
     sfm.addCompactionResults(al(sf_i2d, sf_d2i), al());
+    sfm.removeCompactedFiles(al(sf_i2d, sf_d2i));
     assertEquals(0, sfm.getLevel0Files().size());
     // Add another file to stripe; then "rebalance" stripes w/o it - the file, which was
     // presumably flushed during compaction, should go to L0.
     StoreFile sf_i2c_2 = createFile(OPEN_KEY, KEY_C);
     sfm.insertNewFiles(al(sf_i2c_2));
     sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i));
+    sfm.removeCompactedFiles(al(sf_i2c, sf_c2i));
     assertEquals(1, sfm.getLevel0Files().size());
     verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2);
   }
@@ -472,9 +489,11 @@ public class TestStripeStoreFileManager {
     ArrayList<StoreFile> compacted = al(createFile(OPEN_KEY, KEY_B),
         createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY));
     manager.addCompactionResults(al(sf0a), compacted);
+    manager.removeCompactedFiles(al(sf0a));
     // Next L0 compaction only produces file for the first and last stripe.
     ArrayList<StoreFile> compacted2 = al(createFile(OPEN_KEY, KEY_B), createFile(KEY_C,
OPEN_KEY));
     manager.addCompactionResults(al(sf0b), compacted2);
+    manager.removeCompactedFiles(al(sf0b));
     compacted.addAll(compacted2);
     verifyAllFiles(manager, compacted);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58521869/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
index 5a45190..7e86632 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
@@ -125,7 +125,7 @@ public class TestWideScanner extends HBaseTestCase {
           ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
         while (scanners.hasNext()) {
           StoreScanner ss = (StoreScanner)scanners.next();
-          ss.updateReaders();
+          ss.updateReaders(new ArrayList<StoreFile>());
         }
       } while (more);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/58521869/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
new file mode 100644
index 0000000..c23e794
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, RegionServerTests.class })
+public class TestCompactedHFilesDischarger {
+  private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
+  private Region region;
+  private final static byte[] fam = Bytes.toBytes("cf_1");
+  private final static byte[] qual1 = Bytes.toBytes("qf_1");
+  private final static byte[] val = Bytes.toBytes("val");
+  private static CountDownLatch latch = new CountDownLatch(3);
+  private static AtomicInteger counter = new AtomicInteger(0);
+  private static AtomicInteger scanCompletedCounter = new AtomicInteger(0);
+  private RegionServerServices rss;
+
+  @Before
+  public void setUp() throws Exception {
+    TableName tableName = TableName.valueOf(getClass().getSimpleName());
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(fam));
+    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
+    Path path = testUtil.getDataTestDir(getClass().getSimpleName());
+    region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(),
htd);
+    rss = mock(RegionServerServices.class);
+    List<Region> regions = new ArrayList<Region>();
+    regions.add(region);
+    when(rss.getOnlineRegions()).thenReturn(regions);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    counter.set(0);
+    scanCompletedCounter.set(0);
+    latch = new CountDownLatch(3);
+    HBaseTestingUtility.closeRegionAndWAL(region);
+    testUtil.cleanupTestDir();
+  }
+
+  @Test
+  public void testCompactedHFilesCleaner() throws Exception {
+    // Create the cleaner object
+    CompactedHFilesDischarger cleaner =
+        new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
+    // Add some data to the region and do some flushes
+    for (int i = 1; i < 10; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+    for (int i = 11; i < 20; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+    for (int i = 21; i < 30; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+
+    Store store = region.getStore(fam);
+    assertEquals(3, store.getStorefilesCount());
+
+    Collection<StoreFile> storefiles = store.getStorefiles();
+    Collection<StoreFile> compactedfiles =
+        ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    // None of the files should be in compacted state.
+    for (StoreFile file : storefiles) {
+      assertFalse(file.isCompactedAway());
+    }
+    // Try to run the cleaner without compaction. there should not be any change
+    cleaner.chore();
+    storefiles = store.getStorefiles();
+    // None of the files should be in compacted state.
+    for (StoreFile file : storefiles) {
+      assertFalse(file.isCompactedAway());
+    }
+    // now do some compaction
+    region.compact(true);
+    // Still the flushed files should be present until the cleaner runs. But the state of
it should
+    // be in COMPACTED state
+    assertEquals(1, store.getStorefilesCount());
+    assertEquals(3,
+      ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
+
+    // Run the cleaner
+    cleaner.chore();
+    assertEquals(1, store.getStorefilesCount());
+    storefiles = store.getStorefiles();
+    for (StoreFile file : storefiles) {
+      // Should not be in compacted state
+      assertFalse(file.isCompactedAway());
+    }
+    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    assertTrue(compactedfiles.size() == 0);
+    
+  }
+
+  @Test
+  public void testCleanerWithParallelScannersAfterCompaction() throws Exception {
+    // Create the cleaner object
+    CompactedHFilesDischarger cleaner =
+        new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
+    // Add some data to the region and do some flushes
+    for (int i = 1; i < 10; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+    for (int i = 11; i < 20; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+    for (int i = 21; i < 30; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+
+    Store store = region.getStore(fam);
+    assertEquals(3, store.getStorefilesCount());
+
+    Collection<StoreFile> storefiles = store.getStorefiles();
+    Collection<StoreFile> compactedfiles =
+        ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    // None of the files should be in compacted state.
+    for (StoreFile file : storefiles) {
+      assertFalse(file.isCompactedAway());
+    }
+    // Do compaction
+    region.compact(true);
+    startScannerThreads();
+
+    storefiles = store.getStorefiles();
+    int usedReaderCount = 0;
+    int unusedReaderCount = 0;
+    for (StoreFile file : storefiles) {
+      if (file.getRefCount() == 3) {
+        usedReaderCount++;
+      }
+    }
+    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    for(StoreFile file : compactedfiles) {
+      assertEquals("Refcount should be 3", 0, file.getRefCount());
+      unusedReaderCount++;
+    }
+    // Though there are files we are not using them for reads
+    assertEquals("unused reader count should be 3", 3, unusedReaderCount);
+    assertEquals("used reader count should be 1", 1, usedReaderCount);
+    // now run the cleaner
+    cleaner.chore();
+    countDown();
+    assertEquals(1, store.getStorefilesCount());
+    storefiles = store.getStorefiles();
+    for (StoreFile file : storefiles) {
+      // Should not be in compacted state
+      assertFalse(file.isCompactedAway());
+    }
+    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    assertTrue(compactedfiles.size() == 0);
+  }
+
+  @Test
+  public void testCleanerWithParallelScanners() throws Exception {
+    // Create the cleaner object
+    CompactedHFilesDischarger cleaner =
+        new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
+    // Add some data to the region and do some flushes
+    for (int i = 1; i < 10; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+    for (int i = 11; i < 20; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+    for (int i = 21; i < 30; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+
+    Store store = region.getStore(fam);
+    assertEquals(3, store.getStorefilesCount());
+
+    Collection<StoreFile> storefiles = store.getStorefiles();
+    Collection<StoreFile> compactedfiles =
+        ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    // None of the files should be in compacted state.
+    for (StoreFile file : storefiles) {
+      assertFalse(file.isCompactedAway());
+    }
+    startScannerThreads();
+    // Do compaction
+    region.compact(true);
+
+    storefiles = store.getStorefiles();
+    int usedReaderCount = 0;
+    int unusedReaderCount = 0;
+    for (StoreFile file : storefiles) {
+      if (file.getRefCount() == 0) {
+        unusedReaderCount++;
+      }
+    }
+    compactedfiles =
+        ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    for(StoreFile file : compactedfiles) {
+      assertEquals("Refcount should be 3", 3, file.getRefCount());
+      usedReaderCount++;
+    }
+    // The newly compacted file will not be used by any scanner
+    assertEquals("unused reader count should be 1", 1, unusedReaderCount);
+    assertEquals("used reader count should be 3", 3, usedReaderCount);
+    // now run the cleaner
+    cleaner.chore();
+    countDown();
+    // No change in the number of store files as none of the compacted files could be cleaned
up
+    assertEquals(1, store.getStorefilesCount());
+    assertEquals(3,
+      ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
+    while (scanCompletedCounter.get() != 3) {
+      Thread.sleep(100);
+    }
+    // reset
+    latch = new CountDownLatch(3);
+    scanCompletedCounter.set(0);
+    counter.set(0);
+    // Try creating a new scanner and it should use only the new file created after compaction
+    startScannerThreads();
+    storefiles = store.getStorefiles();
+    usedReaderCount = 0;
+    unusedReaderCount = 0;
+    for (StoreFile file : storefiles) {
+      if (file.getRefCount() == 3) {
+        usedReaderCount++;
+      }
+    }
+    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    for(StoreFile file : compactedfiles) {
+      assertEquals("Refcount should be 0", 0, file.getRefCount());
+      unusedReaderCount++;
+    }
+    // Though there are files we are not using them for reads
+    assertEquals("unused reader count should be 3", 3, unusedReaderCount);
+    assertEquals("used reader count should be 1", 1, usedReaderCount);
+    countDown();
+    while (scanCompletedCounter.get() != 3) {
+      Thread.sleep(100);
+    }
+    // Run the cleaner again
+    cleaner.chore();
+    // Now the cleaner should be able to clear it up because there are no active readers
+    assertEquals(1, store.getStorefilesCount());
+    storefiles = store.getStorefiles();
+    for (StoreFile file : storefiles) {
+      // Should not be in compacted state
+      assertFalse(file.isCompactedAway());
+    }
+    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    assertTrue(compactedfiles.size() == 0);
+  }
+
+  protected void countDown() {
+    // count down 3 times
+    latch.countDown();
+    latch.countDown();
+    latch.countDown();
+  }
+
+  protected void startScannerThreads() throws InterruptedException {
+    // Start parallel scan threads
+    ScanThread[] scanThreads = new ScanThread[3];
+    for (int i = 0; i < 3; i++) {
+      scanThreads[i] = new ScanThread((HRegion) region);
+    }
+    for (ScanThread thread : scanThreads) {
+      thread.start();
+    }
+    while (counter.get() != 3) {
+      Thread.sleep(100);
+    }
+  }
+
+  private static class ScanThread extends Thread {
+    private final HRegion region;
+
+    public ScanThread(HRegion region) {
+      this.region = region;
+    }
+
+    @Override
+    public void run() {
+      try {
+        initiateScan(region);
+      } catch (IOException e) {
+        // do nothing
+      }
+    }
+
+    private void initiateScan(HRegion region) throws IOException {
+      Scan scan = new Scan();
+      scan.setCaching(1);
+      RegionScanner resScanner = null;
+      try {
+        resScanner = region.getScanner(scan);
+        List<Cell> results = new ArrayList<Cell>();
+        boolean next = resScanner.next(results);
+        try {
+          counter.incrementAndGet();
+          latch.await();
+        } catch (InterruptedException e) {
+        }
+        while (!next) {
+          resScanner.next(results);
+        }
+      } finally {
+        scanCompletedCounter.incrementAndGet();
+        resScanner.close();
+      }
+    }
+  }
+}


Mime
View raw message