hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [24/44] hbase git commit: HBASE-13253 LoadIncrementalHFiles unify hfiles discovery
Date Mon, 23 Mar 2015 22:23:09 GMT
HBASE-13253 LoadIncrementalHFiles unify hfiles discovery


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f9a17edc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f9a17edc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f9a17edc

Branch: refs/heads/hbase-12439
Commit: f9a17edc252a88c5a1a2c7764e3f9f65623e0ced
Parents: 99ec366
Author: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Authored: Tue Mar 17 19:38:39 2015 +0000
Committer: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Committed: Tue Mar 17 19:38:39 2015 +0000

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 196 ++++++++++---------
 .../mapreduce/TestLoadIncrementalHFiles.java    |  39 ++--
 2 files changed, 127 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f9a17edc/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 0f9ade3..d96c9e4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -160,6 +159,75 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
         + "\n");
   }
 
+  private static interface BulkHFileVisitor<TFamily> {
+    TFamily bulkFamily(final byte[] familyName)
+      throws IOException;
+    void bulkHFile(final TFamily family, final FileStatus hfileStatus)
+      throws IOException;
+  }
+
+  /**
+   * Iterate over the bulkDir hfiles.
+   * Skip reference, HFileLink, files starting with "_" and non-valid hfiles.
+   */
+  private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
+    final BulkHFileVisitor<TFamily> visitor) throws IOException {
+    if (!fs.exists(bulkDir)) {
+      throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found");
+    }
+
+    FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
+    if (familyDirStatuses == null) {
+      throw new FileNotFoundException("No families found in " + bulkDir);
+    }
+
+    for (FileStatus familyStat : familyDirStatuses) {
+      if (!familyStat.isDirectory()) {
+        LOG.warn("Skipping non-directory " + familyStat.getPath());
+        continue;
+      }
+      Path familyDir = familyStat.getPath();
+      byte[] familyName = familyDir.getName().getBytes();
+      TFamily family = visitor.bulkFamily(familyName);
+
+      FileStatus[] hfileStatuses = fs.listStatus(familyDir);
+      for (FileStatus hfileStatus : hfileStatuses) {
+        if (!fs.isFile(hfileStatus.getPath())) {
+          LOG.warn("Skipping non-file " + hfileStatus);
+          continue;
+        }
+
+        Path hfile = hfileStatus.getPath();
+        // Skip "_", reference, HFileLink
+        String fileName = hfile.getName();
+        if (fileName.startsWith("_")) {
+          continue;
+        }
+        if (StoreFileInfo.isReference(fileName)) {
+          LOG.warn("Skipping reference " + fileName);
+          continue;
+        }
+        if (HFileLink.isHFileLink(fileName)) {
+          LOG.warn("Skipping HFileLink " + fileName);
+          continue;
+        }
+
+        // Validate HFile Format
+        try {
+          if (!HFile.isHFileFormat(fs, hfile)) {
+            LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
+            continue;
+          }
+        } catch (FileNotFoundException e) {
+          LOG.warn("the file " + hfile + " was removed");
+          continue;
+        }
+
+        visitor.bulkHFile(family, hfileStatus);
+      }
+    }
+  }
+
   /**
    * Represents an HFile waiting to be loaded. An queue is used
    * in this class in order to support the case where a region has
@@ -186,54 +254,25 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
    * Walk the given directory for all HFiles, and return a Queue
    * containing all such files.
    */
-  private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir)
+  private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir)
   throws IOException {
     fs = hfofDir.getFileSystem(getConf());
-
-    if (!fs.exists(hfofDir)) {
-      throw new FileNotFoundException("HFileOutputFormat dir " +
-          hfofDir + " not found");
-    }
-
-    FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
-    if (familyDirStatuses == null) {
-      throw new FileNotFoundException("No families found in " + hfofDir);
-    }
-
-    for (FileStatus stat : familyDirStatuses) {
-      if (!stat.isDirectory()) {
-        LOG.warn("Skipping non-directory " + stat.getPath());
-        continue;
+    visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
+      @Override
+      public byte[] bulkFamily(final byte[] familyName) {
+        return familyName;
       }
-      Path familyDir = stat.getPath();
-      byte[] family = familyDir.getName().getBytes();
-      FileStatus[] hfileStatuses = fs.listStatus(familyDir);
-      for (FileStatus hfileStatus : hfileStatuses) {
-        if (!hfileStatus.isFile()) {
-          LOG.warn("Skipping non-file " + hfileStatus);
-          continue;
-        }
-        long length = hfileStatus.getLen();
-        Path hfile = hfileStatus.getPath();
-        // Skip "_", reference, HFileLink
-        String fileName = hfile.getName();
-        if (fileName.startsWith("_")) continue;
-        if (StoreFileInfo.isReference(fileName)) {
-          LOG.warn("Skipping reference " + fileName);
-          continue;
-        }
-        if (HFileLink.isHFileLink(fileName)) {
-          LOG.warn("Skipping HFileLink " + fileName);
-          continue;
-        }
-        if(length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
+      @Override
+      public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException
{
+        long length = hfile.getLen();
+        if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
             HConstants.DEFAULT_MAX_FILE_SIZE)) {
-          LOG.warn("Trying to bulk load hfile " + hfofDir.toString() + " with size: " +
+          LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " +
               length + " bytes can be problematic as it may lead to oversplitting.");
         }
-        ret.add(new LoadQueueItem(family, hfile));
+        ret.add(new LoadQueueItem(family, hfile.getPath()));
       }
-    }
+    });
   }
 
   /**
@@ -297,20 +336,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
         LoadQueueItem lqi = queueIter.next();
         String familyNameInHFile = Bytes.toString(lqi.family);
         if (!familyNames.contains(familyNameInHFile)) {
-          boolean isValid = false;
-          try {
-            isValid = HFile.isHFileFormat(lqi.hfilePath.getFileSystem(getConf()), lqi.hfilePath);
-            if (!isValid) {
-              LOG.warn("the file " + lqi + " doesn't seems to be an hfile. skipping");
-            }
-          } catch (FileNotFoundException e) {
-            LOG.warn("the file " + lqi + " was removed");
-          }
-          if (isValid) {
-            unmatchedFamilies.add(familyNameInHFile);
-          } else {
-            queueIter.remove();
-          }
+          unmatchedFamilies.add(familyNameInHFile);
         }
       }
       if (unmatchedFamilies.size() > 0) {
@@ -864,46 +890,26 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
    * More modifications necessary if we want to avoid doing it.
    */
   private void createTable(TableName tableName, String dirPath) throws Exception {
-    Path hfofDir = new Path(dirPath);
-    FileSystem fs = hfofDir.getFileSystem(getConf());
-
-    if (!fs.exists(hfofDir)) {
-      throw new FileNotFoundException("HFileOutputFormat dir " +
-          hfofDir + " not found");
-    }
-
-    FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
-    if (familyDirStatuses == null) {
-      throw new FileNotFoundException("No families found in " + hfofDir);
-    }
-
-    HTableDescriptor htd = new HTableDescriptor(tableName);
-    HColumnDescriptor hcd;
+    final Path hfofDir = new Path(dirPath);
+    final FileSystem fs = hfofDir.getFileSystem(getConf());
 
     // Add column families
     // Build a set of keys
-    byte[][] keys;
-    TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-
-    for (FileStatus stat : familyDirStatuses) {
-      if (!stat.isDirectory()) {
-        LOG.warn("Skipping non-directory " + stat.getPath());
-        continue;
+    final HTableDescriptor htd = new HTableDescriptor(tableName);
+    final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() {
+      @Override
+      public HColumnDescriptor bulkFamily(final byte[] familyName) {
+        HColumnDescriptor hcd = new HColumnDescriptor(familyName);
+        htd.addFamily(hcd);
+        return hcd;
       }
-      Path familyDir = stat.getPath();
-      byte[] family = familyDir.getName().getBytes();
-
-      hcd = new HColumnDescriptor(family);
-      htd.addFamily(hcd);
-
-      Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
-      for (Path hfile : hfiles) {
-        String fileName = hfile.getName();
-        if (fileName.startsWith("_") || StoreFileInfo.isReference(fileName)
-            || HFileLink.isHFileLink(fileName)) continue;
+      @Override
+      public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
+          throws IOException {
+        Path hfile = hfileStatus.getPath();
         HFile.Reader reader = HFile.createReader(fs, hfile,
             new CacheConfig(getConf()), getConf());
-        final byte[] first, last;
         try {
           if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
             hcd.setCompressionType(reader.getFileContext().getCompression());
@@ -911,8 +917,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
                      " for family " + hcd.toString());
           }
           reader.loadFileInfo();
-          first = reader.getFirstRowKey();
-          last =  reader.getLastRowKey();
+          byte[] first = reader.getFirstRowKey();
+          byte[] last  = reader.getLastRowKey();
 
           LOG.info("Trying to figure out region boundaries hfile=" + hfile +
             " first=" + Bytes.toStringBinary(first) +
@@ -924,13 +930,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool
{
 
           value = map.containsKey(last)? map.get(last):0;
           map.put(last, value-1);
-        }  finally {
+        } finally {
           reader.close();
         }
       }
-    }
+    });
 
-    keys = LoadIncrementalHFiles.inferBoundaries(map);
+    byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map);
     this.hbAdmin.createTable(htd,keys);
 
     LOG.info("Table "+ tableName +" is available!!");

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9a17edc/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index e356049..570f812 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -131,7 +131,7 @@ public class TestLoadIncrementalHFiles {
           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
     });
   }
-  
+
   /**
    * Test loading into a column family that has a ROWCOL bloom filter.
    */
@@ -271,7 +271,7 @@ public class TestLoadIncrementalHFiles {
           file.getPath().getName() != "DONOTERASE");
       }
     }
-    
+
     util.deleteTable(tableName);
   }
 
@@ -307,20 +307,30 @@ public class TestLoadIncrementalHFiles {
     }
   }
 
+  @Test(timeout = 60000)
+  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
+    testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
+  }
+
+  @Test(timeout = 60000)
+  public void testNonHfileFolder() throws Exception {
+    testNonHfileFolder("testNonHfileFolder", false);
+  }
+
   /**
    * Write a random data file and a non-file in a dir with a valid family name
    * but not part of the table families. we should we able to bulkload without
    * getting the unmatched family exception. HBASE-13037/HBASE-13227
    */
-  @Test(timeout = 60000)
-  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
-    Path dir = util.getDataTestDirOnTestFS("testNonHfileFolderWithUnmatchedFamilyName");
+  private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception
{
+    Path dir = util.getDataTestDirOnTestFS(tableName);
     FileSystem fs = util.getTestFileSystem();
     dir = dir.makeQualified(fs);
 
     Path familyDir = new Path(dir, Bytes.toString(FAMILY));
     HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"),
         FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
+    createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
 
     final String NON_FAMILY_FOLDER = "_logs";
     Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
@@ -330,10 +340,13 @@ public class TestLoadIncrementalHFiles {
 
     Table table = null;
     try {
-      final String TABLE_NAME = "mytable_testNonHfileFolderWithUnmatchedFamilyName";
-      table = util.createTable(TableName.valueOf(TABLE_NAME), FAMILY);
+      if (preCreateTable) {
+        table = util.createTable(TableName.valueOf(tableName), FAMILY);
+      } else {
+        table = util.getConnection().getTable(TableName.valueOf(tableName));
+      }
 
-      final String[] args = {dir.toString(), TABLE_NAME};
+      final String[] args = {dir.toString(), tableName};
       new LoadIncrementalHFiles(util.getConfiguration()).run(args);
       assertEquals(500, util.countRows(table));
     } finally {
@@ -421,8 +434,8 @@ public class TestLoadIncrementalHFiles {
      *
      * Should be inferred as:
      * a-----------------k   m-------------q   r--------------t  u---------x
-     * 
-     * The output should be (m,r,u) 
+     *
+     * The output should be (m,r,u)
      */
 
     String first;
@@ -430,7 +443,7 @@ public class TestLoadIncrementalHFiles {
 
     first = "a"; last = "e";
     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-    
+
     first = "r"; last = "s";
     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
 
@@ -451,14 +464,14 @@ public class TestLoadIncrementalHFiles {
 
     first = "s"; last = "t";
     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-    
+
     first = "u"; last = "w";
     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
 
     byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
     byte[][] compare = new byte[3][];
     compare[0] = "m".getBytes();
-    compare[1] = "r".getBytes(); 
+    compare[1] = "r".getBytes();
     compare[2] = "u".getBytes();
 
     assertEquals(keysArray.length, 3);


Mime
View raw message