hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [34/50] [abbrv] hbase git commit: HBASE-13082 Coarsen StoreScanner locks to RegionScanner (Ram)
Date Mon, 07 Dec 2015 16:56:11 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
index 870e963..67258aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -248,6 +249,7 @@ public class TestRegionReplicas {
       LOG.info("Flushing primary region");
       Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
       region.flush(true);
+      HRegion primaryRegion = (HRegion) region;
 
       // ensure that chore is run
       LOG.info("Sleeping for " + (4 * refreshPeriod));
@@ -277,7 +279,7 @@ public class TestRegionReplicas {
       assertGetRpc(hriSecondary, 1042, true);
       assertGetRpc(hriSecondary, 2042, true);
 
-      // ensure that we are see the 3 store files
+      // ensure that we see the 3 store files
       Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
 
       // force compaction
@@ -292,7 +294,8 @@ public class TestRegionReplicas {
       }
 
       // ensure that we see the compacted file only
-      Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
+      // This will be 4 until the cleaner chore runs
+      Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount());
 
     } finally {
       HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
@@ -451,7 +454,9 @@ public class TestRegionReplicas {
       LOG.info("Force Major compaction on primary region " + hriPrimary);
       primaryRegion.compact(true);
       Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
-
+      CompactedHFilesDischarger cleaner =
+          new CompactedHFilesDischarger(100, null, (HRegion) primaryRegion);
+      cleaner.chore();
       // scan all the hfiles on the secondary.
       // since there are no read on the secondary when we ask locations to
       // the NN a FileNotFound exception will be returned and the FileLink

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 85c75b1..9e846c6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -1030,6 +1030,18 @@ public class TestStore {
     store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
   }
 
+  private void closeCompactedFile(int index) throws IOException {
+    Collection<StoreFile> files =
+        this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
+    StoreFile sf = null;
+    Iterator<StoreFile> it = files.iterator();
+    for (int i = 0; i <= index; i++) {
+      sf = it.next();
+    }
+    sf.closeReader(true);
+    store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
+  }
+
   @Test
   public void testRefreshStoreFiles() throws Exception {
     init(name.getMethodName());
@@ -1057,6 +1069,7 @@ public class TestStore {
     store.refreshStoreFiles();
     assertEquals(5, this.store.getStorefilesCount());
 
+    closeCompactedFile(0);
     archiveStoreFile(0);
 
     assertEquals(5, this.store.getStorefilesCount());

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
index 3bb0384..e9d34ed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
@@ -137,6 +137,7 @@ public class TestStripeStoreFileManager {
     MockStoreFile stripe0a = createFile(0, 100, OPEN_KEY, KEY_B),
         stripe1 = createFile(KEY_B, OPEN_KEY);
     manager.addCompactionResults(al(l0File), al(stripe0a, stripe1));
+    manager.removeCompactedFiles(al(l0File));
     // If we want a key <= KEY_A, we should get everything except stripe1.
     ArrayList<StoreFile> sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_A));
     assertEquals(2, sfsDump.size());
@@ -162,6 +163,7 @@ public class TestStripeStoreFileManager {
     // a candidate from the first file, the old one should not be removed.
     StoreFile stripe0b = createFile(0, 101, OPEN_KEY, KEY_B);
     manager.addCompactionResults(al(l0File2), al(stripe0b));
+    manager.removeCompactedFiles(al(l0File2));
     sfs = manager.getCandidateFilesForRowKeyBefore(KV_A);
     assertEquals(stripe0b, sfs.next());
     sfs.remove();
@@ -350,10 +352,12 @@ public class TestStripeStoreFileManager {
     // Here, [B, C] is logically [B, inf), so we should be able to compact it to that only.
     verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
     manager.addCompactionResults(al(sf), al(createFile(KEY_B, OPEN_KEY)));
+    manager.removeCompactedFiles(al(sf));
     // Do the same for other variants.
     manager = createManager(al(sf, createFile(KEY_C, OPEN_KEY)));
     verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
     manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, KEY_C)));
+    manager.removeCompactedFiles(al(sf));
     manager = createManager(al(sf));
     verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
     manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, OPEN_KEY)));
@@ -379,6 +383,7 @@ public class TestStripeStoreFileManager {
     StoreFile sf_B2C_0 = createFile(KEY_B, KEY_C);
     StoreFile sf_C2i_0 = createFile(KEY_C, OPEN_KEY);
     manager.addCompactionResults(al(sf_L0_0a), al(sf_i2B_0, sf_B2C_0, sf_C2i_0));
+    manager.removeCompactedFiles(al(sf_L0_0a));
     verifyAllFiles(manager, al(sf_L0_0b, sf_i2B_0, sf_B2C_0, sf_C2i_0));
 
     // Add another l0 file, "compact" both L0 into two stripes
@@ -387,51 +392,61 @@ public class TestStripeStoreFileManager {
     StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C);
     manager.insertNewFiles(al(sf_L0_1));
     manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1));
+    manager.removeCompactedFiles(al(sf_L0_0b, sf_L0_1));
     verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1));
 
     // Try compacting with invalid file (no metadata) - should add files to L0.
     StoreFile sf_L0_2 = createFile(null, null);
     manager.addCompactionResults(al(), al(sf_L0_2));
+    manager.removeCompactedFiles(al());
     verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1, sf_L0_2));
     // Remove it...
     manager.addCompactionResults(al(sf_L0_2), al());
+    manager.removeCompactedFiles(al(sf_L0_2));
 
     // Do regular compaction in the first stripe.
     StoreFile sf_i2B_3 = createFile(OPEN_KEY, KEY_B);
     manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3));
+    manager.removeCompactedFiles(al(sf_i2B_0, sf_i2B_1));
     verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3));
 
     // Rebalance two stripes.
     StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D);
     StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY);
     manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4));
+    manager.removeCompactedFiles(al(sf_B2C_0, sf_C2i_0, sf_B2C_1));
     verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4));
 
     // Split the first stripe.
     StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A);
     StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B);
     manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5));
+    manager.removeCompactedFiles(al(sf_i2B_3));
     verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5));
 
     // Split the middle stripe.
     StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C);
     StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D);
     manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6));
+    manager.removeCompactedFiles(al(sf_B2D_4));
     verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6));
 
     // Merge two different middle stripes.
     StoreFile sf_A2C_7 = createFile(KEY_A, KEY_C);
     manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7));
+    manager.removeCompactedFiles(al(sf_A2B_5, sf_B2C_6));
     verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7));
 
     // Merge lower half.
     StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C);
     manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8));
+    manager.removeCompactedFiles(al(sf_i2A_5, sf_A2C_7));
     verifyAllFiles(manager, al(sf_D2i_4, sf_C2D_6, sf_i2C_8));
 
     // Merge all.
     StoreFile sf_i2i_9 = createFile(OPEN_KEY, OPEN_KEY);
     manager.addCompactionResults(al(sf_D2i_4, sf_C2D_6, sf_i2C_8), al(sf_i2i_9));
+    manager.removeCompactedFiles(al(sf_D2i_4, sf_C2D_6, sf_i2C_8));
     verifyAllFiles(manager, al(sf_i2i_9));
   }
 
@@ -451,12 +466,14 @@ public class TestStripeStoreFileManager {
     verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i, sf_c2i);
     // Remove these files.
     sfm.addCompactionResults(al(sf_i2d, sf_d2i), al());
+    sfm.removeCompactedFiles(al(sf_i2d, sf_d2i));
     assertEquals(0, sfm.getLevel0Files().size());
     // Add another file to stripe; then "rebalance" stripes w/o it - the file, which was
     // presumably flushed during compaction, should go to L0.
     StoreFile sf_i2c_2 = createFile(OPEN_KEY, KEY_C);
     sfm.insertNewFiles(al(sf_i2c_2));
     sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i));
+    sfm.removeCompactedFiles(al(sf_i2c, sf_c2i));
     assertEquals(1, sfm.getLevel0Files().size());
     verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2);
   }
@@ -472,9 +489,11 @@ public class TestStripeStoreFileManager {
     ArrayList<StoreFile> compacted = al(createFile(OPEN_KEY, KEY_B),
         createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY));
     manager.addCompactionResults(al(sf0a), compacted);
+    manager.removeCompactedFiles(al(sf0a));
     // Next L0 compaction only produces file for the first and last stripe.
     ArrayList<StoreFile> compacted2 = al(createFile(OPEN_KEY, KEY_B), createFile(KEY_C,
OPEN_KEY));
     manager.addCompactionResults(al(sf0b), compacted2);
+    manager.removeCompactedFiles(al(sf0b));
     compacted.addAll(compacted2);
     verifyAllFiles(manager, compacted);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b3d1f14/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
new file mode 100644
index 0000000..40539c4
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, RegionServerTests.class })
+public class TestCompactedHFilesDischarger {
+  private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
+  private Region region;
+  private final static byte[] fam = Bytes.toBytes("cf_1");
+  private final static byte[] qual1 = Bytes.toBytes("qf_1");
+  private final static byte[] val = Bytes.toBytes("val");
+  private static CountDownLatch latch = new CountDownLatch(3);
+  private static AtomicInteger counter = new AtomicInteger(0);
+  private static AtomicInteger scanCompletedCounter = new AtomicInteger(0);
+
+  @Before
+  public void setUp() throws Exception {
+    TableName tableName = TableName.valueOf(getClass().getSimpleName());
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor(fam));
+    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
+    Path path = testUtil.getDataTestDir(getClass().getSimpleName());
+    region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(),
htd);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    counter.set(0);
+    scanCompletedCounter.set(0);
+    latch = new CountDownLatch(3);
+    HBaseTestingUtility.closeRegionAndWAL(region);
+    testUtil.cleanupTestDir();
+  }
+
+  @Test
+  public void testCompactedHFilesCleaner() throws Exception {
+    // Create the cleaner object
+    CompactedHFilesDischarger cleaner =
+        new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
+    // Add some data to the region and do some flushes
+    for (int i = 1; i < 10; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+    for (int i = 11; i < 20; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+    for (int i = 21; i < 30; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+
+    Store store = region.getStore(fam);
+    assertEquals(3, store.getStorefilesCount());
+
+    Collection<StoreFile> storefiles = store.getStorefiles();
+    Collection<StoreFile> compactedfiles =
+        ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    // None of the files should be in compacted state.
+    for (StoreFile file : storefiles) {
+      assertFalse(file.isCompactedAway());
+    }
+    // Try to run the cleaner without compaction. there should not be any change
+    cleaner.chore();
+    storefiles = store.getStorefiles();
+    // None of the files should be in compacted state.
+    for (StoreFile file : storefiles) {
+      assertFalse(file.isCompactedAway());
+    }
+    // now do some compaction
+    region.compact(true);
+    // Still the flushed files should be present until the cleaner runs. But the state of
it should
+    // be in COMPACTED state
+    assertEquals(1, store.getStorefilesCount());
+    assertEquals(3,
+      ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
+
+    // Run the cleaner
+    cleaner.chore();
+    assertEquals(1, store.getStorefilesCount());
+    storefiles = store.getStorefiles();
+    for (StoreFile file : storefiles) {
+      // Should not be in compacted state
+      assertFalse(file.isCompactedAway());
+    }
+    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    assertTrue(compactedfiles.size() == 0);
+    
+  }
+
+  @Test
+  public void testCleanerWithParallelScannersAfterCompaction() throws Exception {
+    // Create the cleaner object
+    CompactedHFilesDischarger cleaner =
+        new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
+    // Add some data to the region and do some flushes
+    for (int i = 1; i < 10; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+    for (int i = 11; i < 20; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+    for (int i = 21; i < 30; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+
+    Store store = region.getStore(fam);
+    assertEquals(3, store.getStorefilesCount());
+
+    Collection<StoreFile> storefiles = store.getStorefiles();
+    Collection<StoreFile> compactedfiles =
+        ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    // None of the files should be in compacted state.
+    for (StoreFile file : storefiles) {
+      assertFalse(file.isCompactedAway());
+    }
+    // Do compaction
+    region.compact(true);
+    startScannerThreads();
+
+    storefiles = store.getStorefiles();
+    int usedReaderCount = 0;
+    int unusedReaderCount = 0;
+    for (StoreFile file : storefiles) {
+      if (file.getRefCount() == 3) {
+        usedReaderCount++;
+      }
+    }
+    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    for(StoreFile file : compactedfiles) {
+      assertEquals("Refcount should be 3", 0, file.getRefCount());
+      unusedReaderCount++;
+    }
+    // Though there are files we are not using them for reads
+    assertEquals("unused reader count should be 3", 3, unusedReaderCount);
+    assertEquals("used reader count should be 1", 1, usedReaderCount);
+    // now run the cleaner
+    cleaner.chore();
+    countDown();
+    assertEquals(1, store.getStorefilesCount());
+    storefiles = store.getStorefiles();
+    for (StoreFile file : storefiles) {
+      // Should not be in compacted state
+      assertFalse(file.isCompactedAway());
+    }
+    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    assertTrue(compactedfiles.size() == 0);
+  }
+
+  @Test
+  public void testCleanerWithParallelScanners() throws Exception {
+    // Create the cleaner object
+    CompactedHFilesDischarger cleaner =
+        new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
+    // Add some data to the region and do some flushes
+    for (int i = 1; i < 10; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+    for (int i = 11; i < 20; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+    for (int i = 21; i < 30; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(fam, qual1, val);
+      region.put(p);
+    }
+    // flush them
+    region.flush(true);
+
+    Store store = region.getStore(fam);
+    assertEquals(3, store.getStorefilesCount());
+
+    Collection<StoreFile> storefiles = store.getStorefiles();
+    Collection<StoreFile> compactedfiles =
+        ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    // None of the files should be in compacted state.
+    for (StoreFile file : storefiles) {
+      assertFalse(file.isCompactedAway());
+    }
+    startScannerThreads();
+    // Do compaction
+    region.compact(true);
+
+    storefiles = store.getStorefiles();
+    int usedReaderCount = 0;
+    int unusedReaderCount = 0;
+    for (StoreFile file : storefiles) {
+      if (file.getRefCount() == 0) {
+        unusedReaderCount++;
+      }
+    }
+    compactedfiles =
+        ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    for(StoreFile file : compactedfiles) {
+      assertEquals("Refcount should be 3", 3, file.getRefCount());
+      usedReaderCount++;
+    }
+    // The newly compacted file will not be used by any scanner
+    assertEquals("unused reader count should be 1", 1, unusedReaderCount);
+    assertEquals("used reader count should be 3", 3, usedReaderCount);
+    // now run the cleaner
+    cleaner.chore();
+    countDown();
+    // No change in the number of store files as none of the compacted files could be cleaned
up
+    assertEquals(1, store.getStorefilesCount());
+    assertEquals(3,
+      ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
+    while (scanCompletedCounter.get() != 3) {
+      Thread.sleep(100);
+    }
+    // reset
+    latch = new CountDownLatch(3);
+    scanCompletedCounter.set(0);
+    counter.set(0);
+    // Try creating a new scanner and it should use only the new file created after compaction
+    startScannerThreads();
+    storefiles = store.getStorefiles();
+    usedReaderCount = 0;
+    unusedReaderCount = 0;
+    for (StoreFile file : storefiles) {
+      if (file.getRefCount() == 3) {
+        usedReaderCount++;
+      }
+    }
+    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    for(StoreFile file : compactedfiles) {
+      assertEquals("Refcount should be 0", 0, file.getRefCount());
+      unusedReaderCount++;
+    }
+    // Though there are files we are not using them for reads
+    assertEquals("unused reader count should be 3", 3, unusedReaderCount);
+    assertEquals("used reader count should be 1", 1, usedReaderCount);
+    countDown();
+    while (scanCompletedCounter.get() != 3) {
+      Thread.sleep(100);
+    }
+    // Run the cleaner again
+    cleaner.chore();
+    // Now the cleaner should be able to clear it up because there are no active readers
+    assertEquals(1, store.getStorefilesCount());
+    storefiles = store.getStorefiles();
+    for (StoreFile file : storefiles) {
+      // Should not be in compacted state
+      assertFalse(file.isCompactedAway());
+    }
+    compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+    assertTrue(compactedfiles.size() == 0);
+  }
+
+  protected void countDown() {
+    // count down 3 times
+    latch.countDown();
+    latch.countDown();
+    latch.countDown();
+  }
+
+  protected void startScannerThreads() throws InterruptedException {
+    // Start parallel scan threads
+    ScanThread[] scanThreads = new ScanThread[3];
+    for (int i = 0; i < 3; i++) {
+      scanThreads[i] = new ScanThread((HRegion) region);
+    }
+    for (ScanThread thread : scanThreads) {
+      thread.start();
+    }
+    while (counter.get() != 3) {
+      Thread.sleep(100);
+    }
+  }
+
+  private static class ScanThread extends Thread {
+    private final HRegion region;
+
+    public ScanThread(HRegion region) {
+      this.region = region;
+    }
+
+    @Override
+    public void run() {
+      try {
+        initiateScan(region);
+      } catch (IOException e) {
+        // do nothing
+      }
+    }
+
+    private void initiateScan(HRegion region) throws IOException {
+      Scan scan = new Scan();
+      scan.setCaching(1);
+      RegionScanner resScanner = null;
+      try {
+        resScanner = region.getScanner(scan);
+        List<Cell> results = new ArrayList<Cell>();
+        boolean next = resScanner.next(results);
+        try {
+          counter.incrementAndGet();
+          latch.await();
+        } catch (InterruptedException e) {
+        }
+        while (!next) {
+          resScanner.next(results);
+        }
+      } finally {
+        scanCompletedCounter.incrementAndGet();
+        resScanner.close();
+      }
+    }
+  }
+}


Mime
View raw message