hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-14380 Correct data gets skipped along with bad data in importTsv bulk load thru TsvImporterTextMapper (Bhupendra Kumar Jain)
Date Fri, 11 Sep 2015 16:05:26 GMT
Repository: hbase
Updated Branches:
  refs/heads/master c438052cc -> a8730c283


HBASE-14380 Correct data gets skipped along with bad data in importTsv bulk load thru TsvImporterTextMapper
(Bhupendra Kumar Jain)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a8730c28
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a8730c28
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a8730c28

Branch: refs/heads/master
Commit: a8730c28390bf15cd68b728cfefd28817e918295
Parents: c438052
Author: tedyu <yuzhihong@gmail.com>
Authored: Fri Sep 11 09:05:17 2015 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Fri Sep 11 09:05:17 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/mapreduce/TextSortReducer.java |  4 +-
 .../hadoop/hbase/mapreduce/TestImportTsv.java   | 67 ++++++++++++++++++--
 2 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a8730c28/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
index 0aaeeb0..5056f0b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -189,14 +189,14 @@ public class TextSortReducer extends
           if (skipBadLines) {
             System.err.println("Bad line." + badLine.getMessage());
             incrementBadLineCount(1);
-            return;
+            continue;
           }
           throw new IOException(badLine);
         } catch (IllegalArgumentException e) {
           if (skipBadLines) {
             System.err.println("Bad line." + e.getMessage());
             incrementBadLineCount(1);
-            return;
+            continue;
           } 
           throw new IOException(e);
         } 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a8730c28/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
index ca19af4..099ebe1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
@@ -51,6 +51,9 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -346,13 +349,34 @@ public class TestImportTsv implements Configurable {
     util.deleteTable(table);
   }
 
+  /**
+   * If there are invalid data rows as inputs, then only those rows should be ignored.
+   */
+  @Test
+  public void testTsvImporterTextMapperWithInvalidData() throws Exception {
+    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+    args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
+    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
+    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
+    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
+    // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have
TS
+    String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n";
+    doMROnTableTest(data, 1, 4);
+    util.deleteTable(table);
+  }
+
+  private Tool doMROnTableTest(String data, int valueMultiplier,int expectedKVCount)
+      throws Exception {
+    return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier,expectedKVCount);
+  }
+
   private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
-    return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier);
+    return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier,-1);
   }
 
   protected static Tool doMROnTableTest(HBaseTestingUtility util, String table,
       String family, String data, Map<String, String> args) throws Exception {
-    return doMROnTableTest(util, table, family, data, args, 1);
+    return doMROnTableTest(util, table, family, data, args, 1,-1);
   }
 
   /**
@@ -364,7 +388,7 @@ public class TestImportTsv implements Configurable {
    * @return The Tool instance used to run the test.
    */
   protected static Tool doMROnTableTest(HBaseTestingUtility util, String table,
-      String family, String data, Map<String, String> args, int valueMultiplier)
+      String family, String data, Map<String, String> args, int valueMultiplier,int
expectedKVCount)
   throws Exception {
     Configuration conf = new Configuration(util.getConfiguration());
 
@@ -412,7 +436,7 @@ public class TestImportTsv implements Configurable {
                  ImportTsv.BULK_OUTPUT_CONF_KEY),
             fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY)));
       } else {
-        validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family);
+        validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family,expectedKVCount);
       }
     } else {
       validateTable(conf, TableName.valueOf(table), family, valueMultiplier, isDryRun);
@@ -479,13 +503,14 @@ public class TestImportTsv implements Configurable {
   /**
    * Confirm ImportTsv via HFiles on fs.
    */
-  private static void validateHFiles(FileSystem fs, String outputPath, String family)
-      throws IOException {
+  private static void validateHFiles(FileSystem fs, String outputPath, String family,
+      int expectedKVCount) throws IOException {
     // validate number and content of output columns
     LOG.debug("Validating HFiles.");
     Set<String> configFamilies = new HashSet<String>();
     configFamilies.add(family);
     Set<String> foundFamilies = new HashSet<String>();
+    int actualKVCount = 0;
     for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter()))
{
       String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
       String cf = elements[elements.length - 1];
@@ -499,10 +524,40 @@ public class TestImportTsv implements Configurable {
         assertTrue(
           String.format("HFile %s appears to contain no data.", hfile.getPath()),
           hfile.getLen() > 0);
+        // count the number of KVs from all the hfiles
+        if (expectedKVCount > -1) {
+          actualKVCount += getKVCountFromHfile(fs, hfile.getPath());
+        }
       }
     }
     assertTrue(String.format("HFile output does not contain the input family '%s'.", family),
         foundFamilies.contains(family));
+    if (expectedKVCount > -1) {
+      assertTrue(String.format(
+        "KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>",
actualKVCount,
+        expectedKVCount), actualKVCount == expectedKVCount);
+    }
+  }
+
+  /**
+   * Method returns the total KVs in given hfile
+   * @param fs File System
+   * @param p HFile path
+   * @return KV count in the given hfile
+   * @throws IOException
+   */
+  private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
+    Configuration conf = util.getConfiguration();
+    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), conf);
+    reader.loadFileInfo();
+    HFileScanner scanner = reader.getScanner(false, false);
+    scanner.seekTo();
+    int count = 0;
+    do {
+      count++;
+    } while (scanner.next());
+    reader.close();
+    return count;
   }
 }
 


Mime
View raw message