hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject [1/2] hbase git commit: HBASE-11861 Native MOB Compaction mechanisms (Jingcheng Du)
Date Fri, 06 Feb 2015 14:17:27 GMT
Repository: hbase
Updated Branches:
  refs/heads/hbase-11339 fbbb3249d -> 2c4934eda


http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
new file mode 100644
index 0000000..9a8b7d9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestMobFileCompactor.java
@@ -0,0 +1,652 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.filecompactions;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestMobFileCompactor {
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private Configuration conf = null;
+  private String tableNameAsString;
+  private TableName tableName;
+  private static HTable hTable;
+  private static Admin admin;
+  private static HTableDescriptor desc;
+  private static HColumnDescriptor hcd1;
+  private static HColumnDescriptor hcd2;
+  private static FileSystem fs;
+  private final static String family1 = "family1";
+  private final static String family2 = "family2";
+  private final static String qf1 = "qualifier1";
+  private final static String qf2 = "qualifier2";
+  private static byte[] KEYS = Bytes.toBytes("012");
+  private static int regionNum = KEYS.length;
+  private static int delRowNum = 1;
+  private static int delCellNum = 6;
+  private static int cellNumPerRow = 3;
+  private static int rowNumPerFile = 2;
+  private static ExecutorService pool;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+    TEST_UTIL.startMiniCluster(1);
+    pool = createThreadPool(TEST_UTIL.getConfiguration());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    pool.shutdown();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    fs = TEST_UTIL.getTestFileSystem();
+    conf = TEST_UTIL.getConfiguration();
+    long tid = System.currentTimeMillis();
+    tableNameAsString = "testMob" + tid;
+    tableName = TableName.valueOf(tableNameAsString);
+    hcd1 = new HColumnDescriptor(family1);
+    hcd1.setMobEnabled(true);
+    hcd1.setMobThreshold(0L);
+    hcd1.setMaxVersions(4);
+    hcd2 = new HColumnDescriptor(family2);
+    hcd2.setMobEnabled(true);
+    hcd2.setMobThreshold(0L);
+    hcd2.setMaxVersions(4);
+    desc = new HTableDescriptor(tableName);
+    desc.addFamily(hcd1);
+    desc.addFamily(hcd2);
+    admin = TEST_UTIL.getHBaseAdmin();
+    admin.createTable(desc, getSplitKeys());
+    hTable = new HTable(conf, tableNameAsString);
+    hTable.setAutoFlush(false, false);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+    admin.close();
+    hTable.close();
+    fs.delete(TEST_UTIL.getDataTestDir(), true);
+  }
+
+  @Test
+  public void testCompactionWithoutDelFiles() throws Exception {
+    resetConf();
+    int count = 4;
+    // generate mob files
+    loadData(count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    assertEquals("Before compaction: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob file count", regionNum*count, countFiles(true, family1));
+    assertEquals("Before compaction: del file count", 0, countFiles(false, family1));
+
+    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1,
pool);
+    compactor.compact();
+
+    assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("After compaction: mob file count", regionNum, countFiles(true, family1));
+    assertEquals("After compaction: del file count", 0, countFiles(false, family1));
+  }
+
+  @Test
+  public void testCompactionWithDelFiles() throws Exception {
+    resetConf();
+    int count = 4;
+    // generate mob files
+    loadData(count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
+        countMobCells(hTable));
+    assertEquals("Before deleting: family1 mob file count", regionNum*count,
+        countFiles(true, family1));
+    assertEquals("Before deleting: family2 mob file count", regionNum*count,
+        countFiles(true, family2));
+
+    createDelFile();
+
+    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+    assertEquals("Before compaction: family1 mob file count", regionNum*count,
+        countFiles(true, family1));
+    assertEquals("Before compaction: family2 file count", regionNum*count,
+        countFiles(true, family2));
+    assertEquals("Before compaction: family1 del file count", regionNum,
+        countFiles(false, family1));
+    assertEquals("Before compaction: family2 del file count", regionNum,
+        countFiles(false, family2));
+
+    // do the mob file compaction
+    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1,
pool);
+    compactor.compact();
+
+    assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("After compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+    assertEquals("After compaction: family1 mob file count", regionNum,
+        countFiles(true, family1));
+    assertEquals("After compaction: family2 mob file count", regionNum*count,
+        countFiles(true, family2));
+    assertEquals("After compaction: family1 del file count", 0, countFiles(false, family1));
+    assertEquals("After compaction: family2 del file count", regionNum,
+        countFiles(false, family2));
+    assertRefFileNameEqual(family1);
+  }
+
+  private void assertRefFileNameEqual(String familyName) throws IOException {
+    Scan scan = new Scan();
+    scan.addFamily(Bytes.toBytes(familyName));
+    // Do not retrieve the mob data when scanning
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    ResultScanner results = hTable.getScanner(scan);
+    Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
+        tableName), familyName);
+    List<Path> actualFilePaths = new ArrayList<>();
+    List<Path> expectFilePaths = new ArrayList<>();
+    for (Result res : results) {
+      for (Cell cell : res.listCells()) {
+        byte[] referenceValue = CellUtil.cloneValue(cell);
+        String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT,
+            referenceValue.length - Bytes.SIZEOF_INT);
+        Path targetPath = new Path(mobFamilyPath, fileName);
+        if(!actualFilePaths.contains(targetPath)) {
+          actualFilePaths.add(targetPath);
+        }
+      }
+    }
+    results.close();
+    if (fs.exists(mobFamilyPath)) {
+      FileStatus[] files = fs.listStatus(mobFamilyPath);
+      for (FileStatus file : files) {
+        if (!StoreFileInfo.isDelFile(file.getPath())) {
+          expectFilePaths.add(file.getPath());
+        }
+      }
+    }
+    Collections.sort(actualFilePaths);
+    Collections.sort(expectFilePaths);
+    assertEquals(expectFilePaths, actualFilePaths);
+  }
+
+  @Test
+  public void testCompactionWithDelFilesAndNotMergeAllFiles() throws Exception {
+    resetConf();
+    int mergeSize = 5000;
+    // change the mob compaction merge size
+    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
+
+    int count = 4;
+    // generate mob files
+    loadData(count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    assertEquals("Before deleting: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("Before deleting: mob cells count", regionNum*cellNumPerRow*rowNumPerRegion,
+        countMobCells(hTable));
+    assertEquals("Before deleting: mob file count", regionNum*count, countFiles(true, family1));
+
+    int largeFilesCount = countLargeFiles(mergeSize, family1);
+    createDelFile();
+
+    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+    assertEquals("Before compaction: family1 mob file count", regionNum*count,
+        countFiles(true, family1));
+    assertEquals("Before compaction: family2 mob file count", regionNum*count,
+        countFiles(true, family2));
+    assertEquals("Before compaction: family1 del file count", regionNum,
+        countFiles(false, family1));
+    assertEquals("Before compaction: family2 del file count", regionNum,
+        countFiles(false, family2));
+
+    // do the mob file compaction
+    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1,
pool);
+    compactor.compact();
+
+    assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("After compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+    // After the compaction, the files smaller than the mob compaction merge size
+    // is merge to one file
+    assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum,
+        countFiles(true, family1));
+    assertEquals("After compaction: family2 mob file count", regionNum*count,
+        countFiles(true, family2));
+    assertEquals("After compaction: family1 del file count", regionNum,
+        countFiles(false, family1));
+    assertEquals("After compaction: family2 del file count", regionNum,
+        countFiles(false, family2));
+  }
+
+  @Test
+  public void testCompactionWithDelFilesAndWithSmallCompactionBatchSize() throws Exception
{
+    resetConf();
+    int batchSize = 2;
+    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, batchSize);
+    int count = 4;
+    // generate mob files
+    loadData(count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    assertEquals("Before deleting: mob row count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("Before deleting: family1 mob file count", regionNum*count,
+        countFiles(true, family1));
+    assertEquals("Before deleting: family2 mob file count", regionNum*count,
+        countFiles(true, family2));
+
+    createDelFile();
+
+    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+    assertEquals("Before compaction: family1 mob file count", regionNum*count,
+        countFiles(true, family1));
+    assertEquals("Before compaction: family2 mob file count", regionNum*count,
+        countFiles(true, family2));
+    assertEquals("Before compaction: family1 del file count", regionNum,
+        countFiles(false, family1));
+    assertEquals("Before compaction: family2 del file count", regionNum,
+        countFiles(false, family2));
+
+    // do the mob file compaction
+    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1,
pool);
+    compactor.compact();
+
+    assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("After compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+    assertEquals("After compaction: family1 mob file count", regionNum*(count/batchSize),
+        countFiles(true, family1));
+    assertEquals("After compaction: family2 mob file count", regionNum*count,
+        countFiles(true, family2));
+    assertEquals("After compaction: family1 del file count", 0, countFiles(false, family1));
+    assertEquals("After compaction: family2 del file count", regionNum,
+        countFiles(false, family2));
+  }
+
+  @Test
+  public void testCompactionWithHFileLink() throws IOException, InterruptedException {
+    resetConf();
+    int count = 4;
+    // generate mob files
+    loadData(count, rowNumPerFile);
+    int rowNumPerRegion = count*rowNumPerFile;
+
+    long tid = System.currentTimeMillis();
+    byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid);
+    // take a snapshot
+    admin.snapshot(snapshotName1, tableName);
+
+    createDelFile();
+
+    assertEquals("Before compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("Before compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+    assertEquals("Before compaction: family1 mob file count", regionNum*count,
+        countFiles(true, family1));
+    assertEquals("Before compaction: family2 mob file count", regionNum*count,
+        countFiles(true, family2));
+    assertEquals("Before compaction: family1 del file count", regionNum,
+        countFiles(false, family1));
+    assertEquals("Before compaction: family2 del file count", regionNum,
+        countFiles(false, family2));
+
+    // do the mob file compaction
+    MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd1,
pool);
+    compactor.compact();
+
+    assertEquals("After first compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
+        countMobRows(hTable));
+    assertEquals("After first compaction: mob cells count",
+        regionNum*(cellNumPerRow*rowNumPerRegion-delCellNum), countMobCells(hTable));
+    assertEquals("After first compaction: family1 mob file count", regionNum,
+        countFiles(true, family1));
+    assertEquals("After first compaction: family2 mob file count", regionNum*count,
+        countFiles(true, family2));
+    assertEquals("After first compaction: family1 del file count", 0, countFiles(false, family1));
+    assertEquals("After first compaction: family2 del file count", regionNum,
+        countFiles(false, family2));
+    assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1));
+    assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2));
+
+    admin.disableTable(tableName);
+    // Restore from snapshot, the hfilelink will exist in mob dir
+    admin.restoreSnapshot(snapshotName1);
+    admin.enableTable(tableName);
+
+    assertEquals("After restoring snapshot: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("After restoring snapshot: mob cells count",
+        regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
+    assertEquals("After restoring snapshot: family1 mob file count", regionNum*count,
+        countFiles(true, family1));
+    assertEquals("After restoring snapshot: family2 mob file count", regionNum*count,
+        countFiles(true, family2));
+    assertEquals("After restoring snapshot: family1 del file count", 0,
+        countFiles(false, family1));
+    assertEquals("After restoring snapshot: family2 del file count", 0,
+        countFiles(false, family2));
+    assertEquals("After restoring snapshot: family1 hfilelink count", regionNum*count,
+        countHFileLinks(family1));
+    assertEquals("After restoring snapshot: family2 hfilelink count", 0,
+        countHFileLinks(family2));
+
+    compactor.compact();
+
+    assertEquals("After second compaction: mob rows count", regionNum*rowNumPerRegion,
+        countMobRows(hTable));
+    assertEquals("After second compaction: mob cells count",
+        regionNum*cellNumPerRow*rowNumPerRegion, countMobCells(hTable));
+    assertEquals("After second compaction: family1 mob file count", regionNum,
+        countFiles(true, family1));
+    assertEquals("After second compaction: family2 mob file count", regionNum*count,
+        countFiles(true, family2));
+    assertEquals("After second compaction: family1 del file count", 0, countFiles(false,
family1));
+    assertEquals("After second compaction: family2 del file count", 0, countFiles(false,
family2));
+    assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1));
+    assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2));
+  }
+
+  /**
+   * Gets the number of rows in the given table.
+   * @param table to get the  scanner
+   * @return the number of rows
+   */
+  private int countMobRows(final HTable table) throws IOException {
+    Scan scan = new Scan();
+    // Do not retrieve the mob data when scanning
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      count++;
+    }
+    results.close();
+    return count;
+  }
+
+  /**
+   * Gets the number of cells in the given table.
+   * @param table to get the  scanner
+   * @return the number of cells
+   */
+  private int countMobCells(final HTable table) throws IOException {
+    Scan scan = new Scan();
+    // Do not retrieve the mob data when scanning
+    scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      for (Cell cell : res.listCells()) {
+        count++;
+      }
+    }
+    results.close();
+    return count;
+  }
+
+  /**
+   * Gets the number of files in the mob path.
+   * @param isMobFile gets number of the mob files or del files
+   * @param familyName the family name
+   * @return the number of the files
+   */
+  private int countFiles(boolean isMobFile, String familyName) throws IOException {
+    Path mobDirPath = MobUtils.getMobFamilyPath(
+        MobUtils.getMobRegionPath(conf, tableName), familyName);
+    int count = 0;
+    if (fs.exists(mobDirPath)) {
+      FileStatus[] files = fs.listStatus(mobDirPath);
+      for (FileStatus file : files) {
+        if (isMobFile == true) {
+          if (!StoreFileInfo.isDelFile(file.getPath())) {
+            count++;
+          }
+        } else {
+          if (StoreFileInfo.isDelFile(file.getPath())) {
+            count++;
+          }
+        }
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Gets the number of HFileLink in the mob path.
+   * @param familyName the family name
+   * @return the number of the HFileLink
+   */
+  private int countHFileLinks(String familyName) throws IOException {
+    Path mobDirPath = MobUtils.getMobFamilyPath(
+        MobUtils.getMobRegionPath(conf, tableName), familyName);
+    int count = 0;
+    if (fs.exists(mobDirPath)) {
+      FileStatus[] files = fs.listStatus(mobDirPath);
+      for (FileStatus file : files) {
+        if (HFileLink.isHFileLink(file.getPath())) {
+          count++;
+        }
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Gets the number of files.
+   * @param size the size of the file
+   * @param familyName the family name
+   * @return the number of files large than the size
+   */
+  private int countLargeFiles(int size, String familyName) throws IOException {
+    Path mobDirPath = MobUtils.getMobFamilyPath(
+        MobUtils.getMobRegionPath(conf, tableName), familyName);
+    int count = 0;
+    if (fs.exists(mobDirPath)) {
+      FileStatus[] files = fs.listStatus(mobDirPath);
+      for (FileStatus file : files) {
+        // ignore the del files in the mob path
+        if ((!StoreFileInfo.isDelFile(file.getPath()))
+            && (file.getLen() > size)) {
+          count++;
+        }
+      }
+    }
+    return count;
+  }
+
+  /**
+   * loads some data to the table.
+   * @param count the mob file number
+   */
+  private void loadData(int fileNum, int rowNumPerFile) throws IOException,
+      InterruptedException {
+    if (fileNum <= 0) {
+      throw new IllegalArgumentException();
+    }
+    for (byte k0 : KEYS) {
+      byte[] k = new byte[] { k0 };
+      for (int i = 0; i < fileNum * rowNumPerFile; i++) {
+        byte[] key = Bytes.add(k, Bytes.toBytes(i));
+        byte[] mobVal = makeDummyData(10 * (i + 1));
+        Put put = new Put(key);
+        put.setDurability(Durability.SKIP_WAL);
+        put.add(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal);
+        put.add(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal);
+        put.add(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal);
+        hTable.put(put);
+        if ((i + 1) % rowNumPerFile == 0) {
+          hTable.flushCommits();
+          admin.flush(tableName);
+        }
+      }
+    }
+  }
+
+  /**
+   * delete the row, family and cell to create the del file
+   */
+  private void createDelFile() throws IOException, InterruptedException {
+    for (byte k0 : KEYS) {
+      byte[] k = new byte[] { k0 };
+      // delete a family
+      byte[] key1 = Bytes.add(k, Bytes.toBytes(0));
+      Delete delete1 = new Delete(key1);
+      delete1.deleteFamily(Bytes.toBytes(family1));
+      hTable.delete(delete1);
+      // delete one row
+      byte[] key2 = Bytes.add(k, Bytes.toBytes(2));
+      Delete delete2 = new Delete(key2);
+      hTable.delete(delete2);
+      // delete one cell
+      byte[] key3 = Bytes.add(k, Bytes.toBytes(4));
+      Delete delete3 = new Delete(key3);
+      delete3.deleteColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1));
+      hTable.delete(delete3);
+      hTable.flushCommits();
+      admin.flush(tableName);
+      List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(
+          Bytes.toBytes(tableNameAsString));
+      for (HRegion region : regions) {
+        region.waitForFlushesAndCompactions();
+        region.compactStores(true);
+      }
+    }
+  }
+  /**
+   * Creates the dummy data with a specific size.
+   * @param the size of data
+   * @return the dummy data
+   */
+  private byte[] makeDummyData(int size) {
+    byte[] dummyData = new byte[size];
+    new Random().nextBytes(dummyData);
+    return dummyData;
+  }
+
+  /**
+   * Gets the split keys
+   */
+  public static byte[][] getSplitKeys() {
+    byte[][] splitKeys = new byte[KEYS.length - 1][];
+    for (int i = 0; i < splitKeys.length; ++i) {
+      splitKeys[i] = new byte[] { KEYS[i + 1] };
+    }
+    return splitKeys;
+  }
+
+  private static ExecutorService createThreadPool(Configuration conf) {
+    int maxThreads = 10;
+    long keepAliveTime = 60;
+    final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads,
+        keepAliveTime, TimeUnit.SECONDS, queue,
+        Threads.newDaemonThreadFactory("MobFileCompactionChore"),
+        new RejectedExecutionHandler() {
+          @Override
+          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+            try {
+              // waiting for a thread to pick up instead of throwing exceptions.
+              queue.put(r);
+            } catch (InterruptedException e) {
+              throw new RejectedExecutionException(e);
+            }
+          }
+        });
+    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+    return pool;
+  }
+
+  /**
+   * Resets the configuration.
+   */
+  private void resetConf() {
+    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
+      MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
+    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
+      MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java
new file mode 100644
index 0000000..ac66d95
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactionRequest.java
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.filecompactions;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
+import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartitionId;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestPartitionedMobFileCompactionRequest {
+
+  @Test
+  public void testCompactedPartitionId() {
+    String startKey1 = "startKey1";
+    String startKey2 = "startKey2";
+    String date1 = "date1";
+    String date2 = "date2";
+    CompactionPartitionId partitionId1 = new CompactionPartitionId(startKey1, date1);
+    CompactionPartitionId partitionId2 = new CompactionPartitionId(startKey2, date2);
+    CompactionPartitionId partitionId3 = new CompactionPartitionId(startKey1, date2);
+
+    Assert.assertTrue(partitionId1.equals(partitionId1));
+    Assert.assertFalse(partitionId1.equals(partitionId2));
+    Assert.assertFalse(partitionId1.equals(partitionId3));
+    Assert.assertFalse(partitionId2.equals(partitionId3));
+
+    Assert.assertEquals(startKey1, partitionId1.getStartKey());
+    Assert.assertEquals(date1, partitionId1.getDate());
+  }
+
+  @Test
+  public void testCompactedPartition() {
+    CompactionPartitionId partitionId = new CompactionPartitionId("startKey1", "date1");
+    CompactionPartition partition = new CompactionPartition(partitionId);
+    FileStatus file = new FileStatus(1, false, 1, 1024, 1, new Path("/test"));
+    partition.addFile(file);
+    Assert.assertEquals(file, partition.listFiles().get(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4934ed/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
new file mode 100644
index 0000000..1d64c0c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/filecompactions/TestPartitionedMobFileCompactor.java
@@ -0,0 +1,423 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob.filecompactions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.filecompactions.MobFileCompactionRequest.CompactionType;
+import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactionRequest.CompactionPartition;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestPartitionedMobFileCompactor {
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final static String family = "family";
+  private final static String qf = "qf";
+  private HColumnDescriptor hcd = new HColumnDescriptor(family);
+  private Configuration conf = TEST_UTIL.getConfiguration();
+  private CacheConfig cacheConf = new CacheConfig(conf);
+  private FileSystem fs;
+  private List<FileStatus> mobFiles = new ArrayList<>();
+  private List<FileStatus> delFiles = new ArrayList<>();
+  private List<FileStatus> allFiles = new ArrayList<>();
+  private Path basePath;
+  private String mobSuffix;
+  private String delSuffix;
+  private static ExecutorService pool;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
+    TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+    TEST_UTIL.startMiniCluster(1);
+    pool = createThreadPool(TEST_UTIL.getConfiguration());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    pool.shutdown();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private void init(String tableName) throws Exception {
+    fs = FileSystem.get(conf);
+    Path testDir = FSUtils.getRootDir(conf);
+    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
+    basePath = new Path(new Path(mobTestDir, tableName), family);
+    mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
+    delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
+  }
+
+  @Test
+  public void testCompactionSelectWithAllFiles() throws Exception {
+    resetConf();
+    String tableName = "testCompactionSelectWithAllFiles";
+    init(tableName);
+    int count = 10;
+    // create 10 mob files.
+    createStoreFiles(basePath, family, qf, count, Type.Put);
+    // create 10 del files
+    createStoreFiles(basePath, family, qf, count, Type.Delete);
+    listFiles();
+    long mergeSize = MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD;
+    List<String> expectedStartKeys = new ArrayList<>();
+    for(FileStatus file : mobFiles) {
+      if(file.getLen() < mergeSize) {
+        String fileName = file.getPath().getName();
+        String startKey = fileName.substring(0, 32);
+        expectedStartKeys.add(startKey);
+      }
+    }
+    testSelectFiles(tableName, CompactionType.ALL_FILES, expectedStartKeys);
+  }
+
+  @Test
+  public void testCompactionSelectWithPartFiles() throws Exception {
+    resetConf();
+    String tableName = "testCompactionSelectWithPartFiles";
+    init(tableName);
+    int count = 10;
+    // create 10 mob files.
+    createStoreFiles(basePath, family, qf, count, Type.Put);
+    // create 10 del files
+    createStoreFiles(basePath, family, qf, count, Type.Delete);
+    listFiles();
+    long mergeSize = 4000;
+    List<String> expectedStartKeys = new ArrayList<>();
+    for(FileStatus file : mobFiles) {
+      if(file.getLen() < 4000) {
+        String fileName = file.getPath().getName();
+        String startKey = fileName.substring(0, 32);
+        expectedStartKeys.add(startKey);
+      }
+    }
+    // set the mob file compaction mergeable threshold
+    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
+    testSelectFiles(tableName, CompactionType.PART_FILES, expectedStartKeys);
+  }
+
+  @Test
+  public void testCompactDelFilesWithDefaultBatchSize() throws Exception {
+    resetConf();
+    String tableName = "testCompactDelFilesWithDefaultBatchSize";
+    init(tableName);
+    // create 20 mob files.
+    createStoreFiles(basePath, family, qf, 20, Type.Put);
+    // create 13 del files
+    createStoreFiles(basePath, family, qf, 13, Type.Delete);
+    listFiles();
+    testCompactDelFiles(tableName, 1, 13);
+  }
+
+  @Test
+  public void testCompactDelFilesWithSmallBatchSize() throws Exception {
+    resetConf();
+    String tableName = "testCompactDelFilesWithSmallBatchSize";
+    init(tableName);
+    // create 20 mob files.
+    createStoreFiles(basePath, family, qf, 20, Type.Put);
+    // create 13 del files
+    createStoreFiles(basePath, family, qf, 13, Type.Delete);
+    listFiles();
+
+    // set the mob file compaction batch size
+    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 4);
+    testCompactDelFiles(tableName, 1, 13);
+  }
+
+  @Test
+  public void testCompactDelFilesChangeMaxDelFileCount() throws Exception {
+    resetConf();
+    String tableName = "testCompactDelFilesWithSmallBatchSize";
+    init(tableName);
+    // create 20 mob files.
+    createStoreFiles(basePath, family, qf, 20, Type.Put);
+    // create 13 del files
+    createStoreFiles(basePath, family, qf, 13, Type.Delete);
+    listFiles();
+
+    // set the max del file count
+    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, 5);
+    // set the mob file compaction batch size
+    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE, 2);
+    testCompactDelFiles(tableName, 4, 13);
+  }
+
+  /**
+   * Tests the selectFiles
+   * @param tableName the table name
+   * @param type the expected compaction type
+   * @param expected the expected start keys
+   */
+  private void testSelectFiles(String tableName, final CompactionType type,
+      final List<String> expected) throws IOException {
+    PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
+      TableName.valueOf(tableName), hcd, pool) {
+      @Override
+      public List<Path> compact(List<FileStatus> files) throws IOException {
+        if (files == null || files.isEmpty()) {
+          return null;
+        }
+        PartitionedMobFileCompactionRequest request = select(files);
+        // assert the compaction type is ALL_FILES
+        Assert.assertEquals(type, request.type);
+        // assert get the right partitions
+        compareCompactedPartitions(expected, request.compactionPartitions);
+        // assert get the right del files
+        compareDelFiles(request.delFiles);
+        return null;
+      }
+    };
+    compactor.compact(allFiles);
+  }
+
+  /**
+   * Tests the compacteDelFile
+   * @param tableName the table name
+   * @param expectedFileCount the expected file count
+   * @param expectedCellCount the expected cell count
+   */
+  private void testCompactDelFiles(String tableName, final int expectedFileCount,
+      final int expectedCellCount) throws IOException {
+    PartitionedMobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs,
+      TableName.valueOf(tableName), hcd, pool) {
+      @Override
+      protected List<Path> performCompaction(PartitionedMobFileCompactionRequest request)
+          throws IOException {
+        List<Path> delFilePaths = new ArrayList<Path>();
+        for (FileStatus delFile : request.delFiles) {
+          delFilePaths.add(delFile.getPath());
+        }
+        List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
+        // assert the del files are merged.
+        Assert.assertEquals(expectedFileCount, newDelPaths.size());
+        Assert.assertEquals(expectedCellCount, countDelCellsInDelFiles(newDelPaths));
+        return null;
+      }
+    };
+
+    compactor.compact(allFiles);
+  }
+
+  /**
+   * Lists the files in the path
+   */
+  private void listFiles() throws IOException {
+    for (FileStatus file : fs.listStatus(basePath)) {
+      allFiles.add(file);
+      if (file.getPath().getName().endsWith("_del")) {
+        delFiles.add(file);
+      } else {
+        mobFiles.add(file);
+      }
+    }
+  }
+
+  /**
+   * Compares the compacted partitions.
+   * @param partitions the collection of CompactedPartitions
+   */
+  private void compareCompactedPartitions(List<String> expected,
+      Collection<CompactionPartition> partitions) {
+    List<String> actualKeys = new ArrayList<>();
+    for (CompactionPartition partition : partitions) {
+      actualKeys.add(partition.getPartitionId().getStartKey());
+    }
+    Collections.sort(expected);
+    Collections.sort(actualKeys);
+    Assert.assertEquals(expected.size(), actualKeys.size());
+    for (int i = 0; i < expected.size(); i++) {
+      Assert.assertEquals(expected.get(i), actualKeys.get(i));
+    }
+  }
+
+  /**
+   * Compares the del files.
+   * @param allDelFiles all the del files
+   */
+  private void compareDelFiles(Collection<FileStatus> allDelFiles) {
+    int i = 0;
+    for (FileStatus file : allDelFiles) {
+      Assert.assertEquals(delFiles.get(i), file);
+      i++;
+    }
+  }
+
+  /**
+   * Creates store files.
+   * @param basePath the path to create file
+   * @family the family name
+   * @qualifier the column qualifier
+   * @count the store file number
+   * @type the key type
+   */
+  private void createStoreFiles(Path basePath, String family, String qualifier, int count,
+      Type type) throws IOException {
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+    String startKey = "row_";
+    MobFileName mobFileName = null;
+    for (int i = 0; i < count; i++) {
+      byte[] startRow = Bytes.toBytes(startKey + i) ;
+      if(type.equals(Type.Delete)) {
+        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
+            new Date()), delSuffix);
+      }
+      if(type.equals(Type.Put)){
+        mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
+            new Date()), mobSuffix);
+      }
+      StoreFile.Writer mobFileWriter = new StoreFile.WriterBuilder(conf, cacheConf, fs)
+      .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();
+      writeStoreFile(mobFileWriter, startRow, Bytes.toBytes(family), Bytes.toBytes(qualifier),
+          type, (i+1)*1000);
+    }
+  }
+
+  /**
+   * Writes data to store file.
+   * @param writer the store file writer
+   * @param row the row key
+   * @param family the family name
+   * @param qualifier the column qualifier
+   * @param type the key type
+   * @param size the size of value
+   */
+  private static void writeStoreFile(final StoreFile.Writer writer, byte[]row, byte[] family,
+      byte[] qualifier, Type type, int size) throws IOException {
+    long now = System.currentTimeMillis();
+    try {
+      byte[] dummyData = new byte[size];
+      new Random().nextBytes(dummyData);
+      writer.append(new KeyValue(row, family, qualifier, now, type, dummyData));
+    } finally {
+      writer.close();
+    }
+  }
+
+  /**
+   * Gets the number of del cell in the del files
+   * @param paths the del file paths
+   * @return the cell size
+   */
+  private int countDelCellsInDelFiles(List<Path> paths) throws IOException {
+    List<StoreFile> sfs = new ArrayList<StoreFile>();
+    int size = 0;
+    for(Path path : paths) {
+      StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE);
+      sfs.add(sf);
+    }
+    List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true,
+        false, null, HConstants.LATEST_TIMESTAMP);
+    Scan scan = new Scan();
+    scan.setMaxVersions(hcd.getMaxVersions());
+    long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes",
0), 0);
+    long ttl = HStore.determineTTLFromFamily(hcd);
+    ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
+    StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES,
null,
+        scanners, 0L, HConstants.LATEST_TIMESTAMP);
+    List<Cell> results = new ArrayList<>();
+    boolean hasMore = true;
+    while (hasMore) {
+      hasMore = scanner.next(results);
+      size += results.size();
+      results.clear();
+    }
+    scanner.close();
+    return size;
+  }
+
+  private static ExecutorService createThreadPool(Configuration conf) {
+    int maxThreads = 10;
+    long keepAliveTime = 60;
+    final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+    ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
+      TimeUnit.SECONDS, queue, Threads.newDaemonThreadFactory("MobFileCompactionChore"),
+      new RejectedExecutionHandler() {
+        @Override
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+          try {
+            // waiting for a thread to pick up instead of throwing exceptions.
+            queue.put(r);
+          } catch (InterruptedException e) {
+            throw new RejectedExecutionException(e);
+          }
+        }
+      });
+    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+    return pool;
+  }
+
+  /**
+   * Resets the configuration.
+   */
+  private void resetConf() {
+    conf.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
+      MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
+    conf.setInt(MobConstants.MOB_DELFILE_MAX_COUNT, MobConstants.DEFAULT_MOB_DELFILE_MAX_COUNT);
+    conf.setInt(MobConstants.MOB_FILE_COMPACTION_BATCH_SIZE,
+      MobConstants.DEFAULT_MOB_FILE_COMPACTION_BATCH_SIZE);
+  }
+}
\ No newline at end of file


Mime
View raw message