hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-14380 Correct data gets skipped along with bad data in importTsv bulk load thru TsvImporterTextMapper (Bhupendra Kumar Jain)
Date Mon, 14 Sep 2015 13:46:33 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.2 8cb32383e -> 4e408e2c5


HBASE-14380 Correct data gets skipped along with bad data in importTsv bulk load thru TsvImporterTextMapper
(Bhupendra Kumar Jain)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4e408e2c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4e408e2c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4e408e2c

Branch: refs/heads/branch-1.2
Commit: 4e408e2c5e95f1da17cf1e5e9563f5fbcfb01e81
Parents: 8cb3238
Author: tedyu <yuzhihong@gmail.com>
Authored: Mon Sep 14 06:46:22 2015 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Mon Sep 14 06:46:22 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/mapreduce/TextSortReducer.java |  4 +-
 .../hadoop/hbase/mapreduce/TestImportTsv.java   | 76 +++++++++++++++++---
 2 files changed, 70 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4e408e2c/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
index b3981a1..62b62f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -188,14 +188,14 @@ public class TextSortReducer extends
           if (skipBadLines) {
             System.err.println("Bad line." + badLine.getMessage());
             incrementBadLineCount(1);
-            return;
+            continue;
           }
           throw new IOException(badLine);
         } catch (IllegalArgumentException e) {
           if (skipBadLines) {
             System.err.println("Bad line." + e.getMessage());
             incrementBadLineCount(1);
-            return;
+            continue;
           } 
           throw new IOException(e);
         } 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4e408e2c/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
index 2ad796a..b613823 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.HTable;
@@ -49,6 +48,10 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
@@ -270,11 +273,38 @@ public class TestImportTsv implements Configurable {
     ImportTsv.createSubmittableJob(conf, args);
   }
 
+  /**
+   * If there are invalid data rows as inputs, then only those rows should be ignored.
+   */
+  @Test
+  public void testTsvImporterTextMapperWithInvalidData() throws Exception {
+    String table = "test-" + UUID.randomUUID();
+    String FAMILY = "FAM";
+    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+    // Prepare the arguments required for the test.
+    String[] args =
+        new String[] {
+            "-D" + ImportTsv.MAPPER_CONF_KEY
+                + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
+            "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
+            "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
+            "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table
};
+    // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have
TS
+    String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n";
+    doMROnTableTest(util, FAMILY, data, args, 1, 4);
+    util.deleteTable(table);
+  }
+
   protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
       String data, String[] args) throws Exception {
     return doMROnTableTest(util, family, data, args, 1);
   }
 
+  protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
+      String[] args, int valueMultiplier) throws Exception {
+    return doMROnTableTest(util, family, data, args, 1, -1);
+  }
+
   /**
    * Run an ImportTsv job and perform basic validation on the results.
    * Returns the ImportTsv <code>Tool</code> instance so that other tests can
@@ -283,8 +313,8 @@ public class TestImportTsv implements Configurable {
    * @param args Any arguments to pass BEFORE inputFile path is appended.
    * @return The Tool instance used to run the test.
    */
-  protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
-      String data, String[] args, int valueMultiplier)
+  protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
+      String[] args, int valueMultiplier, int expectedKVCount)
   throws Exception {
     String table = args[args.length - 1];
     Configuration conf = new Configuration(util.getConfiguration());
@@ -327,7 +357,7 @@ public class TestImportTsv implements Configurable {
     }
 
     if (createdHFiles)
-      validateHFiles(fs, outputPath, family);
+      validateHFiles(fs, outputPath, family, expectedKVCount);
     else
       validateTable(conf, TableName.valueOf(table), family, valueMultiplier);
 
@@ -383,29 +413,59 @@ public class TestImportTsv implements Configurable {
   /**
    * Confirm ImportTsv via HFiles on fs.
    */
-  private static void validateHFiles(FileSystem fs, String outputPath, String family)
-      throws IOException {
-
+  private static void validateHFiles(FileSystem fs, String outputPath, String family,
+      int expectedKVCount) throws IOException {
     // validate number and content of output columns
     LOG.debug("Validating HFiles.");
     Set<String> configFamilies = new HashSet<String>();
     configFamilies.add(family);
     Set<String> foundFamilies = new HashSet<String>();
+    int actualKVCount = 0;
     for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter()))
{
       String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
       String cf = elements[elements.length - 1];
       foundFamilies.add(cf);
       assertTrue(
         String.format(
-          "HFile ouput contains a column family (%s) not present in input families (%s)",
+          "HFile output contains a column family (%s) not present in input families (%s)",
           cf, configFamilies),
           configFamilies.contains(cf));
       for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
         assertTrue(
           String.format("HFile %s appears to contain no data.", hfile.getPath()),
           hfile.getLen() > 0);
+        // count the number of KVs from all the hfiles
+        if (expectedKVCount > -1) {
+          actualKVCount += getKVCountFromHfile(fs, hfile.getPath());
+        }
       }
     }
+    if (expectedKVCount > -1) {
+      assertTrue(String.format(
+        "KV count in output hfile=<%d> doesn't match with expected KV count=<%d>",
actualKVCount,
+        expectedKVCount), actualKVCount == expectedKVCount);
+    }
+  }
+
+  /**
+   * Method returns the total KVs in given hfile
+   * @param fs File System
+   * @param p HFile path
+   * @return KV count in the given hfile
+   * @throws IOException
+   */
+  private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
+    Configuration conf = util.getConfiguration();
+    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf);
+    reader.loadFileInfo();
+    HFileScanner scanner = reader.getScanner(false, false);
+    scanner.seekTo();
+    int count = 0;
+    do {
+      count++;
+    } while (scanner.next());
+    reader.close();
+    return count;
   }
 }
 


Mime
View raw message