incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [18/20] git commit: Changed the CsvBlurDriver class to be muck easier to use.
Date Mon, 05 Aug 2013 18:56:55 GMT
Changed the CsvBlurDriver class to be muck easier to use.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/dd067453
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/dd067453
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/dd067453

Branch: refs/heads/0.2.0-newtypesystem
Commit: dd067453c635c5775c55d4b7421e4c65ebbd9d24
Parents: 08dfc4b
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Aug 5 13:31:06 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Aug 5 13:31:06 2013 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/CsvBlurDriver.java       | 285 +++++++++++++++++--
 .../lib/CsvBlurDriverFamilyPerInput.java        |  73 -----
 .../blur/mapreduce/lib/CsvBlurMapper.java       | 155 ++++++----
 .../blur/mapreduce/lib/CsvBlurDriverTest.java   |  98 +++++++
 .../blur/mapreduce/lib/CsvBlurMapperTest.java   |  31 +-
 pom.xml                                         |   2 +-
 6 files changed, 477 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dd067453/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
index b144598..824ce23 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
@@ -17,54 +17,283 @@ package org.apache.blur.mapreduce.lib;
  * limitations under the License.
  */
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashSet;
+import java.util.Set;
 
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thrift.BlurClient;
 import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
 
+@SuppressWarnings("static-access")
 public class CsvBlurDriver {
 
-  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException,
-      BlurException, TException {
+  interface ControllerPool {
+    Iface getClient(String controllerConnectionStr);
+  }
+
+  public static void main(String... args) throws Exception {
+
     Configuration configuration = new Configuration();
     String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
-    if (otherArgs.length != 5) {
-      System.err
-          .println("Usage: csvindexer <thrift controller connection str> <tablename>
<column family definitions> <auto generate record id> <in>");
-      System.exit(2);
-    }
-    int c = 0;
-    final String controllerConnectionStr = otherArgs[c++];
-    final String tableName = otherArgs[c++];
-    final String columnDefs = otherArgs[c++];
-    final Boolean autoGenerateRecordIds = Boolean.parseBoolean(otherArgs[c++]);
-    final String input = otherArgs[c++];
-
-    final Iface client = BlurClient.getClient(controllerConnectionStr);
+
+    Job job = setupJob(configuration, new ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return BlurClient.getClient(controllerConnectionStr);
+      }
+    }, otherArgs);
+    if (job == null) {
+      System.exit(1);
+    }
+
+    boolean waitForCompletion = job.waitForCompletion(true);
+    System.exit(waitForCompletion ? 0 : 1);
+  }
+
+  public static Job setupJob(Configuration configuration, ControllerPool controllerPool,
String... otherArgs)
+      throws Exception {
+    CommandLine cmd = parse(otherArgs);
+    if (cmd == null) {
+      return null;
+    }
+
+    final String controllerConnectionStr = cmd.getOptionValue("c");
+    final String tableName = cmd.getOptionValue("t");
+
+    final Iface client = controllerPool.getClient(controllerConnectionStr);
     TableDescriptor tableDescriptor = client.describe(tableName);
-    
-    Job job = new Job(configuration, "Blur indexer [" + tableName + "] [" + input + "]");
-    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(job, autoGenerateRecordIds);
+
+    Job job = new Job(configuration, "Blur indexer [" + tableName + "]");
     job.setJarByClass(CsvBlurDriver.class);
     job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
+
+    if (cmd.hasOption("a")) {
+      CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(job, true);
+    }
+    if (cmd.hasOption("A")) {
+      CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(job, true);
+    }
+    if (cmd.hasOption("S")) {
+      job.setInputFormatClass(SequenceFileInputFormat.class);
+    } else {
+      job.setInputFormatClass(TextInputFormat.class);
+    }
     
-    FileInputFormat.addInputPath(job, new Path(input));
-    CsvBlurMapper.setColumns(job, columnDefs);
+    if (cmd.hasOption("C")) {
+      if (cmd.hasOption("S")) {
+        String[] optionValues = cmd.getOptionValues("C");
+        job.setInputFormatClass(CsvBlurCombineSequenceFileInputFormat.class);
+        CombineFileInputFormat.setMinInputSplitSize(job, Long.parseLong(optionValues[0]));
+        CombineFileInputFormat.setMaxInputSplitSize(job, Long.parseLong(optionValues[1]));
+      } else {
+        System.err.println("'C' can only be used with option 'S'");
+        return null;
+      }
+    }
+
+    if (cmd.hasOption("i")) {
+      for (String input : cmd.getOptionValues("i")) {
+        Path path = new Path(input);
+        Set<Path> pathSet = recurisvelyGetPathesContainingFiles(path, job.getConfiguration());
+        if (pathSet.isEmpty()) {
+          FileInputFormat.addInputPath(job, path);
+        } else {
+          for (Path p : pathSet) {
+            FileInputFormat.addInputPath(job, p);
+          }
+        }
+      }
+    }
+    // processing the 'I' option
+    if (cmd.hasOption("I")) {
+      Option[] options = cmd.getOptions();
+      for (Option option : options) {
+        if (option.getOpt().equals("I")) {
+          String[] values = option.getValues();
+          if (values.length < 2) {
+            System.err.println("'I' parameter missing minimum args of (family path*)");
+            return null;
+          }
+          for (String p : getSubArray(values, 1)) {
+            CsvBlurMapper.addFamilyPath(job, values[0], new Path(p));
+          }
+        }
+      }
+    }
+
+    if (cmd.hasOption("s")) {
+      CsvBlurMapper.setSeparator(job, StringEscapeUtils.unescapeJava(cmd.getOptionValue("s")));
+    }
+    if (cmd.hasOption("o")) {
+      BlurOutputFormat.setOptimizeInFlight(job, false);
+    }
+    if (cmd.hasOption("l")) {
+      BlurOutputFormat.setIndexLocally(job, false);
+    }
+    if (cmd.hasOption("b")) {
+      int maxDocumentBufferSize = Integer.parseInt(cmd.getOptionValue("b"));
+      BlurOutputFormat.setMaxDocumentBufferSize(job, maxDocumentBufferSize);
+    }
+    if (cmd.hasOption("r")) {
+      int reducerMultiplier = Integer.parseInt(cmd.getOptionValue("r"));
+      BlurOutputFormat.setReducerMultiplier(job, reducerMultiplier);
+    }
+    // processing the 'd' option
+    Option[] options = cmd.getOptions();
+    for (Option option : options) {
+      if (option.getOpt().equals("d")) {
+        String[] values = option.getValues();
+        if (values.length < 2) {
+          System.err.println("'d' parameter missing minimum args of (family columname*)");
+          return null;
+        }
+        CsvBlurMapper.addColumns(job, values[0], getSubArray(values, 1));
+      }
+    }
     BlurOutputFormat.setupJob(job, tableDescriptor);
-    
-//    CsvBlurCombineFileInputFormat
+    return job;
+  }
+
+  private static String[] getSubArray(String[] array, int starting) {
+    String[] result = new String[array.length - starting];
+    System.arraycopy(array, starting, result, 0, result.length);
+    return result;
+  }
+
+  private static Set<Path> recurisvelyGetPathesContainingFiles(Path path, Configuration
configuration)
+      throws IOException {
+    Set<Path> pathSet = new HashSet<Path>();
+    FileSystem fileSystem = path.getFileSystem(configuration);
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    for (FileStatus status : listStatus) {
+      if (status.isDir()) {
+        pathSet.addAll(recurisvelyGetPathesContainingFiles(status.getPath(), configuration));
+      } else {
+        pathSet.add(status.getPath().getParent());
+      }
+    }
+    return pathSet;
+  }
+
+  private static CommandLine parse(String... otherArgs) throws ParseException {
+    Options options = new Options();
+    options.addOption(OptionBuilder.withArgName("controller").hasArgs().isRequired(true)
+        .withDescription("* Thrift controller connection string. (host1:40010 host2:40010
...)").create("c"));
+    options.addOption(OptionBuilder.withArgName("tablename").hasArg().isRequired(true)
+        .withDescription("* Blur table name.").create("t"));
+    options.addOption(OptionBuilder.withArgName("family and column definitions").hasArgs().isRequired(true)
+        .withDescription("* Define the mapping of fields in the CSV file to column names.
(family col1 col2 col3 ...)")
+        .create("d"));
+    options.addOption(OptionBuilder
+        .withArgName("file delimiter")
+        .hasArg()
+        .withDescription(
+            "The file delimiter to be used. (default value ',')  NOTE: For special "
+                + "charactors like the default hadoop separator of ASCII value 1, you can
use standard "
+                + "java escaping (\\u0001)").create("s"));
+    options.addOption(OptionBuilder.withArgName("file input").hasArg()
+        .withDescription("The directory to index. (hdfs://namenode/input/in1)").create("i"));
+    options.addOption(OptionBuilder.withArgName("file input").hasArgs()
+        .withDescription("The directory to index with family name. (family hdfs://namenode/input/in1)").create("I"));
+    options.addOption(OptionBuilder
+        .withArgName("auto generate record ids")
+        .withDescription(
+            "Automatically generate record ids for each record based on a MD5 has of the
data within the record.")
+        .create("a"));
+    options.addOption(OptionBuilder
+        .withArgName("auto generate row ids")
+        .withDescription(
+            "Automatically generate row ids for each record based on a MD5 has of the data
within the record.")
+        .create("A"));
+    options.addOption(OptionBuilder.withArgName("disable optimize indexes during copy")
+        .withDescription("Disable optimize indexes during copy, this has very little overhead.
(enabled by default)")
+        .create("o"));
+    options.addOption(OptionBuilder
+        .withArgName("disable index locally")
+        .withDescription(
+            "Disable the use storage local on the server that is running the reducing "
+                + "task and copy to Blur table once complete. (enabled by default)").create("l"));
+    options.addOption(OptionBuilder.withArgName("sequence files inputs")
+        .withDescription("The input files are sequence files.").create("S"));
+    options.addOption(OptionBuilder
+        .withArgName("lucene document buffer size")
+        .hasArg()
+        .withDescription(
+            "The maximum number of Lucene documents to buffer in the reducer for a single
"
+                + "row before spilling over to disk. (default 1000)").create("b"));
+    options.addOption(OptionBuilder
+        .withArgName("reducer multiplier")
+        .hasArg()
+        .withDescription(
+            "The reducer multipler allows for an increase in the number of reducers per "
+                + "shard in the given table.  For example if the table has 128 shards and
the "
+                + "reducer multiplier is 4 the total number of redcuer will be 512, 4 reducers
"
+                + "per shard. (default 1)").create("r"));
+    options.addOption(OptionBuilder
+        .withArgName("minimum maximum")
+        .hasArgs(2)
+        .withDescription(
+            "Enables a combine file input to help deal with many small files as the input.
Provide " +
+            "the minimum and maximum size per mapper.  For a minimum of 1GB and a maximum
of " +
+            "2.5GB: (1000000000 2500000000)").create("C"));
+
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = null;
+    try {
+      cmd = parser.parse(options, otherArgs);
+    } catch (ParseException e) {
+      System.err.println(e.getMessage());
+      HelpFormatter formatter = new HelpFormatter();
+      PrintWriter pw = new PrintWriter(System.err, true);
+      formatter.printHelp(pw, HelpFormatter.DEFAULT_WIDTH, "csvindexer", null, options, HelpFormatter.DEFAULT_LEFT_PAD,
+          HelpFormatter.DEFAULT_DESC_PAD, null, false);
+      return null;
+    }
+
+    if (!(cmd.hasOption("I") || cmd.hasOption("i"))) {
+      System.err.println("Missing input directory, see options 'i' and 'I'.");
+      HelpFormatter formatter = new HelpFormatter();
+      PrintWriter pw = new PrintWriter(System.err, true);
+      formatter.printHelp(pw, HelpFormatter.DEFAULT_WIDTH, "csvindexer", null, options, HelpFormatter.DEFAULT_LEFT_PAD,
+          HelpFormatter.DEFAULT_DESC_PAD, null, false);
+      return null;
+    }
+    return cmd;
+  }
+
+  public static class CsvBlurCombineSequenceFileInputFormat extends CombineFileInputFormat<Writable,
Text> {
+
+    @Override
+    public RecordReader<Writable, Text> createRecordReader(InputSplit split, TaskAttemptContext
context)
+        throws IOException {
+      return new SequenceFileRecordReader<Writable, Text>();
+    }
 
-    boolean waitForCompletion = job.waitForCompletion(true);
-    System.exit(waitForCompletion ? 0 : 1);
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dd067453/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriverFamilyPerInput.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriverFamilyPerInput.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriverFamilyPerInput.java
deleted file mode 100644
index fbe4253..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriverFamilyPerInput.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-
-public class CsvBlurDriverFamilyPerInput {
-
-  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException,
-      BlurException, TException {
-    Configuration configuration = new Configuration();
-    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
-    if (otherArgs.length < 4) {
-      System.err
-          .println("Usage: csvindexer <thrift controller connection str> <tablename>
<column family definitions> <family=input path> ...");
-      System.exit(2);
-    }
-    int c = 0;
-    final String controllerConnectionStr = otherArgs[c++];
-    final String tableName = otherArgs[c++];
-    final String columnDefs = otherArgs[c++];
-
-    final Iface client = BlurClient.getClient(controllerConnectionStr);
-    TableDescriptor tableDescriptor = client.describe(tableName);
-
-    Job job = new Job(configuration, "Blur indexer [" + tableName + "] Mulitple Inputs");
-    job.setJarByClass(CsvBlurDriverFamilyPerInput.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    CsvBlurMapper.setColumns(job, columnDefs);
-    CsvBlurMapper.setFamilyNotInFile(job, true);
-
-    for (int i = c; i < otherArgs.length; i++) {
-      final String input = otherArgs[c++];
-      int indexOf = input.indexOf('=');
-      String family = input.substring(0, indexOf);
-      String pathStr = input.substring(indexOf + 1);
-      FileInputFormat.addInputPath(job, new Path(pathStr));
-      CsvBlurMapper.addFamilyPath(job, family, new Path(pathStr));
-    }
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-
-    boolean waitForCompletion = job.waitForCompletion(true);
-    System.exit(waitForCompletion ? 0 : 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dd067453/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
index 8de2e98..645af5d 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
@@ -49,7 +49,7 @@ public class CsvBlurMapper extends BaseBlurMapper<Writable, Text>
{
 
   private static final String UTF_8 = "UTF-8";
   public static final String BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA = "blur.csv.auto.generate.record.id.as.hash.of.data";
-  public static final String BLUR_CSV_FAMILYISNOTINFILE = "blur.csv.familyisnotinfile";
+  public static final String BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA = "blur.csv.auto.generate.row.id.as.hash.of.data";
   public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES = "blur.csv.family.path.mappings.families";
   public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX = "blur.csv.family.path.mappings.family.";
   public static final String BLUR_CSV_SEPARATOR_BASE64 = "blur.csv.separator.base64";
@@ -63,6 +63,7 @@ public class CsvBlurMapper extends BaseBlurMapper<Writable, Text>
{
   private String _familyFromPath;
   private boolean _autoGenerateRecordIdAsHashOfData;
   private MessageDigest _digest;
+  private boolean _autoGenerateRowIdAsHashOfData;
 
   /**
    * Add a mapping for a family to a path. This is to be used when an entire
@@ -85,20 +86,6 @@ public class CsvBlurMapper extends BaseBlurMapper<Writable, Text>
{
   }
 
   /**
-   * Sets the property familyIsNotInFile so that the parser know that the family
-   * is not to be parsed. Is to be used in conjunction with the addFamilyPath
-   * method.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param familyIsNotInFile
-   *          boolean.
-   */
-  public static void setFamilyNotInFile(Job job, boolean familyIsNotInFile) {
-    setFamilyNotInFile(job.getConfiguration(), familyIsNotInFile);
-  }
-
-  /**
    * Add a mapping for a family to a path. This is to be used when an entire
    * path is to be processed as a single family and the data itself does not
    * contain the family.<br/>
@@ -115,31 +102,56 @@ public class CsvBlurMapper extends BaseBlurMapper<Writable, Text>
{
    *          the path.
    */
   public static void addFamilyPath(Configuration configuration, String family, Path path)
{
-    Collection<String> mappings = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
-    if (mappings == null) {
-      mappings = new TreeSet<String>();
+    append(configuration, BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES, family);
+    append(configuration, BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family, path.toString());
+  }
+
+  private static void append(Configuration configuration, String name, String value) {
+    Collection<String> set = configuration.getStringCollection(name);
+    if (set == null) {
+      set = new TreeSet<String>();
     }
-    mappings.add(family);
-    configuration.setStrings(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES, mappings.toArray(new
String[mappings.size()]));
-    configuration.set(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family, path.toString());
+    set.add(value);
+    configuration.setStrings(name, set.toArray(new String[set.size()]));
+  }
+
+  /**
+   * If set to true the record id will be automatically generated as a hash of
+   * the data that the record contains.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param autoGenerateRecordIdAsHashOfData
+   *          boolean.
+   */
+  public static void setAutoGenerateRecordIdAsHashOfData(Job job, boolean autoGenerateRecordIdAsHashOfData)
{
+    setAutoGenerateRecordIdAsHashOfData(job.getConfiguration(), autoGenerateRecordIdAsHashOfData);
   }
 
   /**
-   * Sets the property familyIsNotInFile so that the parser know that the family
-   * is not to be parsed. Is to be used in conjunction with the addFamilyPath
-   * method.
+   * If set to true the record id will be automatically generated as a hash of
+   * the data that the record contains.
    * 
    * @param configuration
    *          the configuration to setup.
-   * @param familyIsNotInFile
+   * @param autoGenerateRecordIdAsHashOfData
    *          boolean.
    */
-  public static void setFamilyNotInFile(Configuration configuration, boolean familyIsNotInFile)
{
-    configuration.setBoolean(BLUR_CSV_FAMILYISNOTINFILE, familyIsNotInFile);
+  public static void setAutoGenerateRecordIdAsHashOfData(Configuration configuration,
+      boolean autoGenerateRecordIdAsHashOfData) {
+    configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, autoGenerateRecordIdAsHashOfData);
   }
 
-  public static boolean isFamilyNotInFile(Configuration configuration) {
-    return configuration.getBoolean(BLUR_CSV_FAMILYISNOTINFILE, false);
+  /**
+   * Gets whether or not to generate a recordid for the record based on the
+   * data.
+   * 
+   * @param configuration
+   *          the configuration.
+   * @return boolean.
+   */
+  public static boolean isAutoGenerateRecordIdAsHashOfData(Configuration configuration) {
+    return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, false);
   }
 
   /**
@@ -151,8 +163,8 @@ public class CsvBlurMapper extends BaseBlurMapper<Writable, Text>
{
    * @param autoGenerateRecordIdAsHashOfData
    *          boolean.
    */
-  public static void setAutoGenerateRecordIdAsHashOfData(Job job, boolean autoGenerateRecordIdAsHashOfData)
{
-    setAutoGenerateRecordIdAsHashOfData(job.getConfiguration(), autoGenerateRecordIdAsHashOfData);
+  public static void setAutoGenerateRowIdAsHashOfData(Job job, boolean autoGenerateRowIdAsHashOfData)
{
+    setAutoGenerateRecordIdAsHashOfData(job.getConfiguration(), autoGenerateRowIdAsHashOfData);
   }
 
   /**
@@ -164,9 +176,8 @@ public class CsvBlurMapper extends BaseBlurMapper<Writable, Text>
{
    * @param autoGenerateRecordIdAsHashOfData
    *          boolean.
    */
-  public static void setAutoGenerateRecordIdAsHashOfData(Configuration configuration,
-      boolean autoGenerateRecordIdAsHashOfData) {
-    configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, autoGenerateRecordIdAsHashOfData);
+  public static void setAutoGenerateRowIdAsHashOfData(Configuration configuration, boolean
autoGenerateRowIdAsHashOfData) {
+    configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA, autoGenerateRowIdAsHashOfData);
   }
 
   /**
@@ -177,8 +188,8 @@ public class CsvBlurMapper extends BaseBlurMapper<Writable, Text>
{
    *          the configuration.
    * @return boolean.
    */
-  public static boolean isAutoGenerateRecordIdAsHashOfData(Configuration configuration) {
-    return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, false);
+  public static boolean isAutoGenerateRowIdAsHashOfData(Configuration configuration) {
+    return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA, false);
   }
 
   /**
@@ -269,6 +280,19 @@ public class CsvBlurMapper extends BaseBlurMapper<Writable, Text>
{
     configuration.setStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family, columns);
   }
 
+  public static Collection<String> getFamilyNames(Configuration configuration) {
+    return configuration.getStringCollection(BLUR_CSV_FAMILIES);
+  }
+
+  public static Map<String, List<String>> getFamilyAndColumnNameMap(Configuration
configuration) {
+    Map<String, List<String>> columnNameMap = new HashMap<String, List<String>>();
+    for (String family : getFamilyNames(configuration)) {
+      String[] columnsNames = configuration.getStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family);
+      columnNameMap.put(family, Arrays.asList(columnsNames));
+    }
+    return columnNameMap;
+  }
+
   /**
    * Sets the separator of the file, by default it is ",".
    * 
@@ -302,32 +326,29 @@ public class CsvBlurMapper extends BaseBlurMapper<Writable, Text>
{
     super.setup(context);
     Configuration configuration = context.getConfiguration();
     _autoGenerateRecordIdAsHashOfData = isAutoGenerateRecordIdAsHashOfData(configuration);
-    if (_autoGenerateRecordIdAsHashOfData) {
+    _autoGenerateRowIdAsHashOfData = isAutoGenerateRowIdAsHashOfData(configuration);
+    if (_autoGenerateRecordIdAsHashOfData || _autoGenerateRowIdAsHashOfData) {
       try {
         _digest = MessageDigest.getInstance("MD5");
       } catch (NoSuchAlgorithmException e) {
         throw new IOException(e);
       }
     }
-    Collection<String> familyNames = configuration.getStringCollection(BLUR_CSV_FAMILIES);
-    _columnNameMap = new HashMap<String, List<String>>();
-    for (String family : familyNames) {
-      String[] columnsNames = configuration.getStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family);
-      _columnNameMap.put(family, Arrays.asList(columnsNames));
-    }
+    _columnNameMap = getFamilyAndColumnNameMap(configuration);
     _separator = new String(Base64.decodeBase64(configuration.get(BLUR_CSV_SEPARATOR_BASE64,
_separator)), UTF_8);
     _splitter = Splitter.on(_separator);
-    _familyNotInFile = isFamilyNotInFile(configuration);
-    if (_familyNotInFile) {
-      Path fileCurrentlyProcessing = getCurrentFile(context);
-      Collection<String> families = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
-      for (String family : families) {
-        String pathStr = configuration.get(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX +
family);
+    Path fileCurrentlyProcessing = getCurrentFile(context);
+    Collection<String> families = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
+    OUTER: for (String family : families) {
+      Collection<String> pathStrCollection = configuration
+          .getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family);
+      for (String pathStr : pathStrCollection) {
         Path path = new Path(pathStr);
         path = path.makeQualified(path.getFileSystem(configuration));
         if (isParent(path, fileCurrentlyProcessing)) {
           _familyFromPath = family;
-          break;
+          _familyNotInFile = true;
+          break OUTER;
         }
       }
     }
@@ -362,28 +383,37 @@ public class CsvBlurMapper extends BaseBlurMapper<Writable, Text>
{
     Iterable<String> split = _splitter.split(str);
     List<String> list = toList(split);
 
-    if (list.size() < 3) {
-      throw new IOException("Record [" + str + "] too short.");
-    }
-    int column = 0;
-    record.setRowId(list.get(column++));
-    int offset = 2;
-    if (!_autoGenerateRecordIdAsHashOfData) {
-      record.setRecordId(list.get(column++));
+    int offset = 0;
+    boolean gen = false;
+    if (!_autoGenerateRowIdAsHashOfData) {
+      record.setRowId(list.get(offset++));
     } else {
-      offset--;
       _digest.reset();
       byte[] bs = value.getBytes();
       int length = value.getLength();
       _digest.update(bs, 0, length);
-      record.setRecordId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
+      record.setRowId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
+      gen = true;
+    }
+
+    if (!_autoGenerateRecordIdAsHashOfData) {
+      record.setRecordId(list.get(offset++));
+    } else {
+      if (gen) {
+        record.setRecordId(record.getRowId());
+      } else {
+        _digest.reset();
+        byte[] bs = value.getBytes();
+        int length = value.getLength();
+        _digest.update(bs, 0, length);
+        record.setRecordId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
+      }
     }
     String family;
     if (_familyNotInFile) {
       family = _familyFromPath;
     } else {
-      family = list.get(column++);
-      offset++;
+      family = list.get(offset++);
     }
     record.setFamily(family);
 
@@ -409,7 +439,6 @@ public class CsvBlurMapper extends BaseBlurMapper<Writable, Text>
{
               + getColumnNames(columnNames) + "].");
         }
       }
-
     }
 
     for (int i = 0; i < columnNames.size(); i++) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dd067453/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
new file mode 100644
index 0000000..c294875
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
@@ -0,0 +1,98 @@
+package org.apache.blur.mapreduce.lib;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.blur.mapreduce.lib.CsvBlurDriver.ControllerPool;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+
+public class CsvBlurDriverTest {
+
+  protected String tableUri = "file:///tmp/tmppath";
+  protected int shardCount = 13;
+
+  @Test
+  public void testCsvBlurDriverTestFail1() throws Exception {
+    Configuration configuration = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return null;
+      }
+    };
+    assertNull(CsvBlurDriver.setupJob(configuration, controllerPool, new String[] {}));
+  }
+
+  @Test
+  public void testCsvBlurDriverTest() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010",
"-d", "family1", "col1",
+        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i", "file:///tmp/test2");
+    assertNotNull(job);
+    Configuration configuration = job.getConfiguration();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    assertEquals(tableDescriptor.getName(), "table1");
+    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
+    assertEquals(2, inputs.size());
+    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
+    assertEquals(2, familyAndColumnNameMap.size());
+  }
+  
+  @Test
+  public void testCsvBlurDriverTest2() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010",
"-d", "family1", "col1",
+        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i", "file:///tmp/test2", "-S", "-C", "1000000","2000000");
+    assertNotNull(job);
+    Configuration configuration = job.getConfiguration();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    assertEquals(tableDescriptor.getName(), "table1");
+    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
+    assertEquals(2, inputs.size());
+    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
+    assertEquals(2, familyAndColumnNameMap.size());
+  }
+
+  protected Iface getMockIface() {
+    InvocationHandler handler = new InvocationHandler() {
+
+      @Override
+      public Object invoke(Object o, Method method, Object[] args) throws Throwable {
+        if (method.getName().equals("describe")) {
+          TableDescriptor tableDescriptor = new TableDescriptor();
+          tableDescriptor.setName((String) args[0]);
+          tableDescriptor.setTableUri(tableUri);
+          tableDescriptor.setShardCount(shardCount);
+          return tableDescriptor;
+        }
+        throw new RuntimeException("not implemented.");
+      }
+    };
+    return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class
}, handler);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dd067453/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
index b653a07..47aa8e5 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
@@ -52,7 +52,6 @@ public class CsvBlurMapperTest {
   @Test
   public void testMapperFamilyPerPath() {
     Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setFamilyNotInFile(configuration, true);
     CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
     CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
     _mapper.setFamilyFromPath("cf1");
@@ -67,7 +66,6 @@ public class CsvBlurMapperTest {
   public void testMapperAutoGenerateRecordId() {
     Configuration configuration = _mapDriver.getConfiguration();
     CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setFamilyNotInFile(configuration, true);
     CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
     CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
     _mapper.setFamilyFromPath("cf1");
@@ -77,5 +75,34 @@ public class CsvBlurMapperTest {
         .addColumn("col1", "value1").addColumn("col2", "value2"));
     _mapDriver.runTest();
   }
+  
+  @Test
+  public void testMapperAutoGenerateRowId() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("record1,value1,value2"));
+    _mapDriver.withOutput(new Text("-50b4uzohynr7j7s9pve7ytz66"), new BlurMutate(MUTATE_TYPE.REPLACE,
"-50b4uzohynr7j7s9pve7ytz66", "record1", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
+  
+  @Test
+  public void testMapperAutoGenerateRowIdAndRecordId() {
+    Configuration configuration = _mapDriver.getConfiguration();
+    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
+    _mapper.setFamilyFromPath("cf1");
+
+    _mapDriver.withInput(new LongWritable(), new Text("value1,value2"));
+    _mapDriver.withOutput(new Text("5q0tme15ph3h5pns8sv3u5wy2"), new BlurMutate(MUTATE_TYPE.REPLACE,
"5q0tme15ph3h5pns8sv3u5wy2", "5q0tme15ph3h5pns8sv3u5wy2", "cf1")
+        .addColumn("col1", "value1").addColumn("col2", "value2"));
+    _mapDriver.runTest();
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/dd067453/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c967057..a745495 100644
--- a/pom.xml
+++ b/pom.xml
@@ -179,7 +179,7 @@ under the License.
 				<activeByDefault>true</activeByDefault>
 			</activation>
 			<properties>
-				<hadoop.version>1.1.2</hadoop.version>
+				<hadoop.version>1.2.1</hadoop.version>
 			</properties>
 		</profile>
 		


Mime
View raw message