hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-13702 ImportTsv: Add dry-run functionality and log bad rows (Apekshit Sharma)
Date Sat, 04 Jul 2015 14:25:08 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 67b61005f -> 9e54e195f


HBASE-13702 ImportTsv: Add dry-run functionality and log bad rows (Apekshit Sharma)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9e54e195
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9e54e195
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9e54e195

Branch: refs/heads/branch-1
Commit: 9e54e195f60689bfde26279630f80825214d0219
Parents: 67b6100
Author: tedyu <yuzhihong@gmail.com>
Authored: Sat Jul 4 07:25:00 2015 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Sat Jul 4 07:25:00 2015 -0700

----------------------------------------------------------------------
 .../mapreduce/IntegrationTestImportTsv.java     |  23 +-
 .../hadoop/hbase/mapreduce/ImportTsv.java       |  85 ++++-
 .../hbase/mapreduce/TsvImporterMapper.java      |  22 +-
 .../hbase/mapreduce/TsvImporterTextMapper.java  |  17 +-
 .../hadoop/hbase/mapreduce/TestImportTsv.java   | 337 ++++++++++++-------
 5 files changed, 311 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9e54e195/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
index da6f68a..7a1dee6 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -38,11 +40,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
@@ -187,19 +189,18 @@ public class IntegrationTestImportTsv implements Configurable, Tool
{
     Path hfiles = new Path(
         util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles");
 
-    String[] args = {
-        format("-D%s=%s", ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles),
-        format("-D%s=HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2",
-          ImportTsv.COLUMNS_CONF_KEY, cf, cf),
-        // configure the test harness to NOT delete the HFiles after they're
-        // generated. We need those for doLoadIncrementalHFiles
-        format("-D%s=false", TestImportTsv.DELETE_AFTER_LOAD_CONF),
-        table.getNameAsString()
-    };
+
+    Map<String, String> args = new HashMap<String, String>();
+    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+    args.put(ImportTsv.COLUMNS_CONF_KEY,
+        format("HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", cf, cf));
+    // configure the test harness to NOT delete the HFiles after they're
+    // generated. We need those for doLoadIncrementalHFiles
+    args.put(TestImportTsv.DELETE_AFTER_LOAD_CONF, "false");
 
     // run the job, complete the load.
     util.createTable(table, new String[]{cf});
-    Tool t = TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args);
+    Tool t = TestImportTsv.doMROnTableTest(util, table.getNameAsString(), cf, simple_tsv,
args);
     doLoadIncrementalHFiles(hfiles, table);
 
     // validate post-conditions

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e54e195/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
index 90f2f0e..5d22e27 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -53,6 +54,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
@@ -86,6 +88,9 @@ public class ImportTsv extends Configured implements Tool {
   public final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
   // TODO: the rest of these configs are used exclusively by TsvImporterMapper.
   // Move them out of the tool and let the mapper handle its own validation.
+  public final static String DRY_RUN_CONF_KEY = "importtsv.dry.run";
+  // If true, bad lines are logged to stderr. Default: false.
+  public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines";
   public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
   public final static String COLUMNS_CONF_KEY = "importtsv.columns";
   public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
@@ -99,6 +104,11 @@ public class ImportTsv extends Configured implements Tool {
   final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
   public final static String CREATE_TABLE_CONF_KEY = "create.table";
   public final static String NO_STRICT_COL_FAMILY = "no.strict";
+  /**
+   * If table didn't exist and was created in dry-run mode, this flag is
+   * flipped to delete it when MR ends.
+   */
+  private static boolean dryRunTableCreated;
 
   public static class TsvParser {
     /**
@@ -450,9 +460,10 @@ public class ImportTsv extends Configured implements Tool {
    * @return The newly created job.
    * @throws IOException When setting up the job fails.
    */
-  public static Job createSubmittableJob(Configuration conf, String[] args)
+  protected static Job createSubmittableJob(Configuration conf, String[] args)
       throws IOException, ClassNotFoundException {
     Job job = null;
+    boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
     try (Connection connection = ConnectionFactory.createConnection(conf)) {
       try (Admin admin = connection.getAdmin()) {
         // Support non-XML supported characters
@@ -476,6 +487,7 @@ public class ImportTsv extends Configured implements Tool {
           FileInputFormat.setInputPaths(job, inputDir);
           job.setInputFormatClass(TextInputFormat.class);
           job.setMapperClass(mapperClass);
+          job.setMapOutputKeyClass(ImmutableBytesWritable.class);
           String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
           String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
           if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
@@ -486,13 +498,19 @@ public class ImportTsv extends Configured implements Tool {
 
           if (hfileOutPath != null) {
             if (!admin.tableExists(tableName)) {
-              String errorMsg = format("Table '%s' does not exist.", tableName);
+              LOG.warn(format("Table '%s' does not exist.", tableName));
               if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
-                LOG.warn(errorMsg);
                 // TODO: this is backwards. Instead of depending on the existence of a table,
                 // create a sane splits file for HFileOutputFormat based on data sampling.
                 createTable(admin, tableName, columns);
+                if (isDryRun) {
+                  LOG.warn("Dry run: Table will be deleted at end of dry run.");
+                  dryRunTableCreated = true;
+                }
               } else {
+                String errorMsg =
+                    format("Table '%s' does not exist and '%s' is set to no.", tableName,
+                        CREATE_TABLE_CONF_KEY);
                 LOG.error(errorMsg);
                 throw new TableNotFoundException(errorMsg);
               }
@@ -523,21 +541,22 @@ public class ImportTsv extends Configured implements Tool {
                       + "=true.\n";
                   usage(msg);
                   System.exit(-1);
-                } 
+                }
               }
-              job.setReducerClass(PutSortReducer.class);
-              Path outputDir = new Path(hfileOutPath);
-              FileOutputFormat.setOutputPath(job, outputDir);
-              job.setMapOutputKeyClass(ImmutableBytesWritable.class);
               if (mapperClass.equals(TsvImporterTextMapper.class)) {
                 job.setMapOutputValueClass(Text.class);
                 job.setReducerClass(TextSortReducer.class);
               } else {
                 job.setMapOutputValueClass(Put.class);
                 job.setCombinerClass(PutCombiner.class);
+                job.setReducerClass(PutSortReducer.class);
+              }
+              if (!isDryRun) {
+                Path outputDir = new Path(hfileOutPath);
+                FileOutputFormat.setOutputPath(job, outputDir);
+                HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
+                    regionLocator);
               }
-              HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
-                  regionLocator);
             }
           } else {
             if (!admin.tableExists(tableName)) {
@@ -552,13 +571,20 @@ public class ImportTsv extends Configured implements Tool {
                   + " or custom mapper whose value type is Put.");
               System.exit(-1);
             }
-            // No reducers. Just write straight to table. Call initTableReducerJob
-            // to set up the TableOutputFormat.
-            TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null,
-                job);
+            if (!isDryRun) {
+              // No reducers. Just write straight to table. Call initTableReducerJob
+              // to set up the TableOutputFormat.
+              TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
+            }
             job.setNumReduceTasks(0);
           }
-
+          if (isDryRun) {
+            job.setOutputFormatClass(NullOutputFormat.class);
+            job.getConfiguration().setStrings("io.serializations",
+                job.getConfiguration().get("io.serializations"),
+                MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+                KeyValueSerialization.class.getName());
+          }
           TableMapReduceUtil.addDependencyJars(job);
           TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
               com.google.common.base.Function.class /* Guava used by TsvParser */);
@@ -579,7 +605,24 @@ public class ImportTsv extends Configured implements Tool {
       tableName, cfSet));
     admin.createTable(htd);
   }
-  
+
+  private static void deleteTable(Configuration conf, String[] args) {
+    TableName tableName = TableName.valueOf(args[0]);
+    try (Connection connection = ConnectionFactory.createConnection(conf);
+         Admin admin = connection.getAdmin()) {
+      try {
+        admin.disableTable(tableName);
+      } catch (TableNotEnabledException e) {
+        LOG.debug("Dry mode: Table: " + tableName + " already disabled, so just deleting
it.");
+      }
+      admin.deleteTable(tableName);
+    } catch (IOException e) {
+      LOG.error(format("***Dry run: Failed to delete table '%s'.***\n%s", tableName, e.toString()));
+      return;
+    }
+    LOG.info(format("Dry run: Deleted table '%s'.", tableName));
+  }
+
   private static Set<String> getColumnFamilies(String[] columns) {
     Set<String> cfSet = new HashSet<String>();
     for (String aColumn : columns) {
@@ -630,7 +673,10 @@ public class ImportTsv extends Configured implements Tool {
       "  Note: if you do not use this option, then the target table must already exist in
HBase\n" +
       "\n" +
       "Other options that may be specified with -D include:\n" +
+      "  -D" + DRY_RUN_CONF_KEY + "=true - Dry run mode. Data is not actually populated into"
+
+      " table. If table does not exist, it is created but deleted in the end.\n" +
       "  -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
+      "  -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" +
       "  '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
       "  -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for
the import\n" +
       "  -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of "
+
@@ -717,8 +763,13 @@ public class ImportTsv extends Configured implements Tool {
     // system time
     getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
 
+    dryRunTableCreated = false;
     Job job = createSubmittableJob(getConf(), otherArgs);
-    return job.waitForCompletion(true) ? 0 : 1;
+    boolean success = job.waitForCompletion(true);
+    if (dryRunTableCreated) {
+      deleteTable(getConf(), args);
+    }
+    return success ? 0 : 1;
   }
 
   public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e54e195/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
index 270de75..9f1b4c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
@@ -57,6 +57,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
   /** Should skip bad lines */
   private boolean skipBadLines;
   private Counter badLineCount;
+  private boolean logBadLines;
 
   protected ImportTsv.TsvParser parser;
 
@@ -129,6 +130,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
     skipBadLines = context.getConfiguration().getBoolean(
         ImportTsv.SKIP_LINES_CONF_KEY, true);
     badLineCount = context.getCounter("ImportTsv", "Bad Lines");
+    logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY,
false);
     hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
   }
 
@@ -163,26 +165,16 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
         populatePut(lineBytes, parsed, put, i);
       }
       context.write(rowKey, put);
-    } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
-      if (skipBadLines) {
-        System.err.println(
-            "Bad line at offset: " + offset.get() + ":\n" +
-            badLine.getMessage());
-        incrementBadLineCount(1);
-        return;
-      } else {
-        throw new IOException(badLine);
+    } catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) {
+      if (logBadLines) {
+        System.err.println(value);
       }
-    } catch (IllegalArgumentException e) {
+      System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
       if (skipBadLines) {
-        System.err.println(
-            "Bad line at offset: " + offset.get() + ":\n" +
-            e.getMessage());
         incrementBadLineCount(1);
         return;
-      } else {
-        throw new IOException(e);
       }
+      throw new IOException(badLine);
     } catch (InterruptedException e) {
       e.printStackTrace();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e54e195/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
index 9d97cab..7744ea7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
@@ -45,6 +45,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
   /** Should skip bad lines */
   private boolean skipBadLines;
   private Counter badLineCount;
+  private boolean logBadLines;
 
   private ImportTsv.TsvParser parser;
 
@@ -97,6 +98,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
     }
 
     skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
+    logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY,
false);
     badLineCount = context.getCounter("ImportTsv", "Bad Lines");
   }
 
@@ -110,21 +112,16 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
       ImmutableBytesWritable rowKey = new ImmutableBytesWritable(
           value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond());
       context.write(rowKey, value);
-    } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
+    } catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) {
+      if (logBadLines) {
+        System.err.println(value);
+      }
+      System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
       if (skipBadLines) {
-        System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
         incrementBadLineCount(1);
         return;
       } 
       throw new IOException(badLine);
-    } catch (IllegalArgumentException e) {
-      if (skipBadLines) {
-        System.err.println("Bad line at offset: " + offset.get() + ":\n" + e.getMessage());
-        incrementBadLineCount(1);
-        return;
-      } else {
-        throw new IOException(e);
-      }
     } catch (InterruptedException e) {
       e.printStackTrace();
       Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e54e195/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
index 2ad796a..9a87510 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
@@ -19,13 +19,15 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
@@ -39,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -53,13 +56,17 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 @Category(LargeTests.class)
 public class TestImportTsv implements Configurable {
@@ -68,10 +75,7 @@ public class TestImportTsv implements Configurable {
   protected static final String NAME = TestImportTsv.class.getSimpleName();
   protected static HBaseTestingUtility util = new HBaseTestingUtility();
 
-  /**
-   * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
-   * false.
-   */
+  // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true.
   protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
 
   /**
@@ -80,6 +84,11 @@ public class TestImportTsv implements Configurable {
   protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
 
   private final String FAMILY = "FAM";
+  private String table;
+  private Map<String, String> args;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
 
   public Configuration getConf() {
     return util.getConfiguration();
@@ -101,112 +110,80 @@ public class TestImportTsv implements Configurable {
     util.shutdownMiniCluster();
   }
 
-  @Test
-  public void testMROnTable() throws Exception {
-    String table = "test-" + UUID.randomUUID();
-
+  @Before
+  public void setup() throws Exception {
+    table = "test-" + UUID.randomUUID();
+    args = new HashMap<String, String>();
     // Prepare the arguments required for the test.
-    String[] args = new String[] {
-        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
-        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
-        table
-    };
+    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B");
+    args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b");
+  }
 
+  @Test
+  public void testMROnTable() throws Exception {
     util.createTable(TableName.valueOf(table), FAMILY);
-    doMROnTableTest(util, FAMILY, null, args, 1);
+    doMROnTableTest(null, 1);
     util.deleteTable(table);
   }
   
   @Test
   public void testMROnTableWithTimestamp() throws Exception {
-    String table = "test-" + UUID.randomUUID();
-
-    // Prepare the arguments required for the test.
-    String[] args = new String[] {
-        "-D" + ImportTsv.COLUMNS_CONF_KEY
-            + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
-        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
-        table
-    };
+    util.createTable(TableName.valueOf(table), FAMILY);
+    args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
+    args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
     String data = "KEY,1234,VALUE1,VALUE2\n";
 
-    util.createTable(TableName.valueOf(table), FAMILY);
-    doMROnTableTest(util, FAMILY, data, args, 1);
+    doMROnTableTest(data, 1);
     util.deleteTable(table);
   }
-  
 
   @Test
   public void testMROnTableWithCustomMapper()
   throws Exception {
-    String table = "test-" + UUID.randomUUID();
-
-    // Prepare the arguments required for the test.
-    String[] args = new String[] {
-        "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
-        table
-    };
-
     util.createTable(TableName.valueOf(table), FAMILY);
-    doMROnTableTest(util, FAMILY, null, args, 3);
+    args.put(ImportTsv.MAPPER_CONF_KEY,
+        "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper");
+
+    doMROnTableTest(null, 3);
     util.deleteTable(table);
   }
   
   @Test
   public void testBulkOutputWithoutAnExistingTable() throws Exception {
-    String table = "test-" + UUID.randomUUID();
-
     // Prepare the arguments required for the test.
     Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
-    String[] args = new String[] {
-        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
-        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
-        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
-        table
-    };
+    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
 
-    doMROnTableTest(util, FAMILY, null, args, 3);
+    doMROnTableTest(null, 3);
     util.deleteTable(table);
   }
 
   @Test
   public void testBulkOutputWithAnExistingTable() throws Exception {
-    String table = "test-" + UUID.randomUUID();
+    util.createTable(TableName.valueOf(table), FAMILY);
 
     // Prepare the arguments required for the test.
     Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
-    String[] args = new String[] {
-        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
-        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
-        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
-        table
-    };
+    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
 
-    util.createTable(TableName.valueOf(table), FAMILY);
-    doMROnTableTest(util, FAMILY, null, args, 3);
+    doMROnTableTest(null, 3);
     util.deleteTable(table);
   }
   
   @Test
   public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception {
-    String table = "test-" + UUID.randomUUID();
+    util.createTable(TableName.valueOf(table), FAMILY);
+
     // Prepare the arguments required for the test.
     Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
-    String[] args = new String[] {
-        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
-        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
-        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
-        "-D" + ImportTsv.NO_STRICT_COL_FAMILY + "=true",
-        table
-    };
-    util.createTable(TableName.valueOf(table), FAMILY);
-    doMROnTableTest(util, FAMILY, null, args, 3);
+    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+    args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true");
+    doMROnTableTest(null, 3);
     util.deleteTable(table);
   }
 
   @Test
   public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {
-    String table = "test-" + UUID.randomUUID();
     Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table),"hfiles");
     String INPUT_FILE = "InputFile1.csv";
     // Prepare the arguments required for the test.
@@ -220,59 +197,164 @@ public class TestImportTsv implements Configurable {
             "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table,
             INPUT_FILE
             };
-    GenericOptionsParser opts = new GenericOptionsParser(util.getConfiguration(), args);
+    Configuration conf = new Configuration(util.getConfiguration());
+    GenericOptionsParser opts = new GenericOptionsParser(conf, args);
     args = opts.getRemainingArgs();
-    Job job = ImportTsv.createSubmittableJob(util.getConfiguration(), args);
-    assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class));
-    assertTrue(job.getReducerClass().equals(TextSortReducer.class));
-    assertTrue(job.getMapOutputValueClass().equals(Text.class));
+    assertEquals("running test job configuration failed.", 0,
+        ToolRunner.run(conf, new ImportTsv() {
+          @Override
+          public int run(String[] args) throws Exception {
+            Job job = createSubmittableJob(getConf(), args);
+            assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class));
+            assertTrue(job.getReducerClass().equals(TextSortReducer.class));
+            assertTrue(job.getMapOutputValueClass().equals(Text.class));
+            return 0;
+          }
+        }, args));
+    // Delete table created by createSubmittableJob.
+    util.deleteTable(table);
   }
 
   @Test
   public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
-    String table = "test-" + UUID.randomUUID();
-    String FAMILY = "FAM";
     Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table),"hfiles");
-    // Prepare the arguments required for the test.
-    String[] args =
-        new String[] {
-            "-D" + ImportTsv.MAPPER_CONF_KEY
-                + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
-            "-D" + ImportTsv.COLUMNS_CONF_KEY
-                + "=HBASE_ROW_KEY,FAM:A,FAM:B",
-            "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
-            "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), table

-            };
+    args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper");
+    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
     String data = "KEY\u001bVALUE4\u001bVALUE8\n";
-    doMROnTableTest(util, FAMILY, data, args, 4);
+    doMROnTableTest(data, 4);
+    util.deleteTable(table);
   }
 
-  @Test(expected = TableNotFoundException.class)
+  @Test
   public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
-    String table = "test-" + UUID.randomUUID();
-    String[] args =
-        new String[] { table, "/inputFile" };
+    String[] args = new String[] { table, "/inputFile" };
 
     Configuration conf = new Configuration(util.getConfiguration());
     conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A");
     conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output");
     conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
-    ImportTsv.createSubmittableJob(conf, args);
+    exception.expect(TableNotFoundException.class);
+    assertEquals("running test job configuration failed.", 0,
+        ToolRunner.run(conf, new ImportTsv() {
+          @Override public int run(String[] args) throws Exception {
+            createSubmittableJob(getConf(), args);
+            return 0;
+          }
+        }, args));
   }
 
-  @Test(expected = TableNotFoundException.class)
+  @Test
   public void testMRWithoutAnExistingTable() throws Exception {
-    String table = "test-" + UUID.randomUUID();
     String[] args =
         new String[] { table, "/inputFile" };
 
-    Configuration conf = new Configuration(util.getConfiguration());
-    ImportTsv.createSubmittableJob(conf, args);
+    exception.expect(TableNotFoundException.class);
+    assertEquals("running test job configuration failed.", 0, ToolRunner.run(
+        new Configuration(util.getConfiguration()),
+        new ImportTsv() {
+          @Override
+          public int run(String[] args) throws Exception {
+            createSubmittableJob(getConf(), args);
+            return 0;
+          }
+        }, args));
   }
 
-  protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
-      String data, String[] args) throws Exception {
-    return doMROnTableTest(util, family, data, args, 1);
+  @Test
+  public void testJobConfigurationsWithDryMode() throws Exception {
+    Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table),"hfiles");
+    String INPUT_FILE = "InputFile1.csv";
+    // Prepare the arguments required for the test.
+    String[] argsArray = new String[] {
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,",
+        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(),
+        "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true",
+        table,
+        INPUT_FILE };
+    assertEquals("running test job configuration failed.", 0, ToolRunner.run(
+        new Configuration(util.getConfiguration()),
+        new ImportTsv() {
+          @Override
+          public int run(String[] args) throws Exception {
+            Job job = createSubmittableJob(getConf(), args);
+            assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class));
+            return 0;
+          }
+        }, argsArray));
+    // Delete table created by createSubmittableJob.
+    util.deleteTable(table);
+  }
+
+  @Test
+  public void testDryModeWithoutBulkOutputAndTableExists() throws Exception {
+    util.createTable(TableName.valueOf(table), FAMILY);
+    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+    doMROnTableTest(null, 1);
+    // Dry mode should not delete an existing table. If it's not present,
+    // this will throw TableNotFoundException.
+    util.deleteTable(table);
+  }
+
+  /**
+   * If table is not present in non-bulk mode, dry run should fail just like
+   * normal mode.
+   */
+  @Test
+  public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception {
+    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+    exception.expect(TableNotFoundException.class);
+    doMROnTableTest(null, 1);
+  }
+
+  @Test public void testDryModeWithBulkOutputAndTableExists() throws Exception {
+    util.createTable(TableName.valueOf(table), FAMILY);
+    // Prepare the arguments required for the test.
+    Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+    doMROnTableTest(null, 1);
+    // Dry mode should not delete an existing table. If it's not present,
+    // this will throw TableNotFoundException.
+    util.deleteTable(table);
+  }
+
+  /**
+   * If table is not present in bulk mode and create.table is not set to yes,
+   * import should fail with TableNotFoundException.
+   */
+  @Test
+  public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws
+      Exception {
+    // Prepare the arguments required for the test.
+    Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+    args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no");
+    exception.expect(TableNotFoundException.class);
+    doMROnTableTest(null, 1);
+  }
+
+  @Test
+  public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception
{
+    // Prepare the arguments required for the test.
+    Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
+    args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
+    args.put(ImportTsv.DRY_RUN_CONF_KEY, "true");
+    args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes");
+    doMROnTableTest(null, 1);
+    // Verify temporary table was deleted.
+    exception.expect(TableNotFoundException.class);
+    util.deleteTable(table);
+  }
+
+  private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
+    return doMROnTableTest(util, table, FAMILY, data, args, valueMultiplier);
+  }
+
+  protected static Tool doMROnTableTest(HBaseTestingUtility util, String table,
+      String family, String data, Map<String, String> args) throws Exception {
+    return doMROnTableTest(util, table, family, data, args, 1);
   }
 
   /**
@@ -283,10 +365,9 @@ public class TestImportTsv implements Configurable {
    * @param args Any arguments to pass BEFORE inputFile path is appended.
    * @return The Tool instance used to run the test.
    */
-  protected static Tool doMROnTableTest(HBaseTestingUtility util, String family,
-      String data, String[] args, int valueMultiplier)
+  protected static Tool doMROnTableTest(HBaseTestingUtility util, String table,
+      String family, String data, Map<String, String> args, int valueMultiplier)
   throws Exception {
-    String table = args[args.length - 1];
     Configuration conf = new Configuration(util.getConfiguration());
 
     // populate input file
@@ -305,32 +386,40 @@ public class TestImportTsv implements Configurable {
       conf.setInt("mapreduce.map.combine.minspills", 1);
     }
 
+    // Build args array.
+    String[] argsArray = new String[args.size() + 2];
+    Iterator it = args.entrySet().iterator();
+    int i = 0;
+    while (it.hasNext()) {
+      Map.Entry pair = (Map.Entry) it.next();
+      argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue();
+      i++;
+    }
+    argsArray[i] = table;
+    argsArray[i + 1] = inputPath.toString();
+
     // run the import
-    List<String> argv = new ArrayList<String>(Arrays.asList(args));
-    argv.add(inputPath.toString());
     Tool tool = new ImportTsv();
-    LOG.debug("Running ImportTsv with arguments: " + argv);
-    assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+    LOG.debug("Running ImportTsv with arguments: " + argsArray);
+    assertEquals(0, ToolRunner.run(conf, tool, argsArray));
 
     // Perform basic validation. If the input args did not include
     // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
     // Otherwise, validate presence of hfiles.
-    boolean createdHFiles = false;
-    String outputPath = null;
-    for (String arg : argv) {
-      if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
-        createdHFiles = true;
-        // split '-Dfoo=bar' on '=' and keep 'bar'
-        outputPath = arg.split("=")[1];
-        break;
+    boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) &&
+        "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY));
+    if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
+      if (isDryRun) {
+        assertFalse(String.format("Dry run mode, %s should not have been created.",
+                 ImportTsv.BULK_OUTPUT_CONF_KEY),
+            fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY)));
+      } else {
+        validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family);
       }
+    } else {
+      validateTable(conf, TableName.valueOf(table), family, valueMultiplier, isDryRun);
     }
 
-    if (createdHFiles)
-      validateHFiles(fs, outputPath, family);
-    else
-      validateTable(conf, TableName.valueOf(table), family, valueMultiplier);
-
     if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
       LOG.debug("Deleting test subdirectory");
       util.cleanupDataTestDirOnTestFS(table);
@@ -342,7 +431,7 @@ public class TestImportTsv implements Configurable {
    * Confirm ImportTsv via data in online table.
    */
   private static void validateTable(Configuration conf, TableName tableName,
-      String family, int valueMultiplier) throws IOException {
+      String family, int valueMultiplier, boolean isDryRun) throws IOException {
 
     LOG.debug("Validating table.");
     Table table = new HTable(conf, tableName);
@@ -355,8 +444,10 @@ public class TestImportTsv implements Configurable {
         // Scan entire family.
         scan.addFamily(Bytes.toBytes(family));
         ResultScanner resScanner = table.getScanner(scan);
+        int numRows = 0;
         for (Result res : resScanner) {
-          assertTrue(res.size() == 2);
+          numRows++;
+          assertEquals(2, res.size());
           List<Cell> kvs = res.listCells();
           assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY")));
           assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY")));
@@ -364,6 +455,11 @@ public class TestImportTsv implements Configurable {
           assertTrue(CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
           // Only one result set is expected, so let it loop.
         }
+        if (isDryRun) {
+          assertEquals(0, numRows);
+        } else {
+          assertEquals(1, numRows);
+        }
         verified = true;
         break;
       } catch (NullPointerException e) {
@@ -385,7 +481,6 @@ public class TestImportTsv implements Configurable {
    */
   private static void validateHFiles(FileSystem fs, String outputPath, String family)
       throws IOException {
-
     // validate number and content of output columns
     LOG.debug("Validating HFiles.");
     Set<String> configFamilies = new HashSet<String>();
@@ -397,7 +492,7 @@ public class TestImportTsv implements Configurable {
       foundFamilies.add(cf);
       assertTrue(
         String.format(
-          "HFile ouput contains a column family (%s) not present in input families (%s)",
+          "HFile output contains a column family (%s) not present in input families (%s)",
           cf, configFamilies),
           configFamilies.contains(cf));
       for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
@@ -406,6 +501,8 @@ public class TestImportTsv implements Configurable {
           hfile.getLen() > 0);
       }
     }
+    assertTrue(String.format("HFile output does not contain the input family '%s'.", family),
+        foundFamilies.contains(family));
   }
 }
 


Mime
View raw message