hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jerry...@apache.org
Subject hbase git commit: HBASE-15808 Reduce potential bulk load intermediate space usage and waste
Date Thu, 12 May 2016 22:42:55 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 921f745b2 -> 0b59341d2


HBASE-15808 Reduce potential bulk load intermediate space usage and waste


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0b59341d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0b59341d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0b59341d

Branch: refs/heads/branch-1.3
Commit: 0b59341d2442f9d3ac591676f3277eb3a887005b
Parents: 921f745
Author: Jerry He <jerryjch@apache.org>
Authored: Thu May 12 15:22:56 2016 -0700
Committer: Jerry He <jerryjch@apache.org>
Committed: Thu May 12 15:29:56 2016 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 26 ++++++++++++--
 .../TestLoadIncrementalHFilesSplitRecovery.java | 36 ++++++++++++++++++++
 2 files changed, 59 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0b59341d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 07059bc..0893b2e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -121,6 +121,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
   private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
   public final static String CREATE_TABLE_CONF_KEY = "create.table";
 
+  // We use a '.' prefix which is ignored when walking directory trees
+  // above. It is invalid family name.
+  final static String TMP_DIR = ".tmp";
+
   private int maxFilesPerRegionPerFamily;
   private boolean assignSeqIds;
 
@@ -203,6 +207,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
       }
       Path familyDir = familyStat.getPath();
       byte[] familyName = familyDir.getName().getBytes();
+      // Skip invalid family
+      try {
+        HColumnDescriptor.isLegalFamilyName(familyName);
+      }
+      catch (IllegalArgumentException e) {
+        LOG.warn("Skipping invalid " + familyStat.getPath());
+        continue;
+      }
       TFamily family = visitor.bulkFamily(familyName);
 
       FileStatus[] hfileStatuses = fs.listStatus(familyDir);
@@ -660,9 +672,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
       byte[] splitKey) throws IOException {
     final Path hfilePath = item.hfilePath;
 
-    // We use a '_' prefix which is ignored when walking directory trees
-    // above.
-    final String TMP_DIR = "_tmp";
     Path tmpDir = item.hfilePath.getParent();
     if (!tmpDir.getName().equals(TMP_DIR)) {
       tmpDir = new Path(tmpDir, TMP_DIR);
@@ -689,6 +698,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
     lqis.add(new LoadQueueItem(item.family, botOut));
     lqis.add(new LoadQueueItem(item.family, topOut));
 
+    // If the current item is already the result of previous splits,
+    // we don't need it anymore. Clean up to save space.
+    // It is not part of the original input files.
+    try {
+      tmpDir = item.hfilePath.getParent();
+      if (tmpDir.getName().equals(TMP_DIR)) {
+        fs.delete(item.hfilePath, false);
+      }
+    } catch (IOException e) {
+      LOG.warn("Unable to delete temporary split file " + item.hfilePath);
+    }
     LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
     return lqis;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b59341d/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
index e3024d0..26583f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequ
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.AfterClass;
@@ -416,6 +418,40 @@ public class TestLoadIncrementalHFilesSplitRecovery {
   }
 
   /**
+   * This test creates a table with many small regions.  The bulk load files
+   * would be splitted multiple times before all of them can be loaded successfully.
+   */
+  @Test (timeout=120000)
+  public void testSplitTmpFileCleanUp() throws Exception {
+    final TableName table = TableName.valueOf("splitTmpFileCleanUp");
+    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
+        Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"),
+        Bytes.toBytes("row_00000040"), Bytes.toBytes("row_00000050")};
+    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration()))
{
+      setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
+
+      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
+
+      // create HFiles
+      Path bulk = buildBulkFiles(table, 2);
+      try (Table t = connection.getTable(table)) {
+        lih.doBulkLoad(bulk, (HTable) t);
+      }
+      // family path
+      Path tmpPath = new Path(bulk, family(0));
+      // TMP_DIR under family path
+      tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
+      FileSystem fs = bulk.getFileSystem(util.getConfiguration());
+      // HFiles have been splitted, there is TMP_DIR
+      assertTrue(fs.exists(tmpPath));
+      // TMP_DIR should have been cleaned-up
+      assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
+        FSUtils.listStatus(fs, tmpPath));
+      assertExpectedTable(connection, table, ROWCOUNT, 2);
+    }
+  }
+
+  /**
    * This simulates an remote exception which should cause LIHF to exit with an
    * exception.
    */


Mime
View raw message