incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cr...@apache.org
Subject [21/50] [abbrv] Blur MR projects restructured.
Date Sun, 18 May 2014 21:41:56 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
----------------------------------------------------------------------
diff --git a/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
new file mode 100644
index 0000000..6830e32
--- /dev/null
+++ b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
@@ -0,0 +1,409 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import com.google.common.base.Splitter;
+
+@SuppressWarnings("static-access")
+public class CsvBlurDriver {
+
+  public static final String CSVLOADER = "csvloader";
+  public static final String MAPRED_COMPRESS_MAP_OUTPUT = "mapred.compress.map.output";
+  public static final String MAPRED_MAP_OUTPUT_COMPRESSION_CODEC = "mapred.map.output.compression.codec";
+  public static final int DEFAULT_WIDTH = 100;
+  public static final String HEADER = "The \"" +CSVLOADER +
+  		"\" command is used to load delimited into a Blur table.\nThe required options are \"-c\", \"-t\", \"-d\". The " +
+  		"standard format for the contents of a file is:\"rowid,recordid,family,col1,col2,...\". However there are " +
+  		"several options, such as the rowid and recordid can be generated based on the data in the record via the " +
+  		"\"-A\" and \"-a\" options. The family can assigned based on the path via the \"-I\" option. The column " +
+  		"name order can be mapped via the \"-d\" option. Also you can set the input " +
+  		"format to either sequence files vie the \"-S\" option or leave the default text files.";
+
+  enum COMPRESSION {
+    SNAPPY(SnappyCodec.class), GZIP(GzipCodec.class), BZIP(BZip2Codec.class), DEFAULT(DefaultCodec.class);
+
+    private final String className;
+
+    private COMPRESSION(Class<? extends CompressionCodec> clazz) {
+      className = clazz.getName();
+    }
+
+    public String getClassName() {
+      return className;
+    }
+  }
+
+  interface ControllerPool {
+    Iface getClient(String controllerConnectionStr);
+  }
+
+  public static void main(String... args) throws Exception {
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    Job job = setupJob(configuration, new ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return BlurClient.getClient(controllerConnectionStr);
+      }
+    }, otherArgs);
+    if (job == null) {
+      System.exit(1);
+    }
+
+    boolean waitForCompletion = job.waitForCompletion(true);
+    System.exit(waitForCompletion ? 0 : 1);
+  }
+
+  public static Job setupJob(Configuration configuration, ControllerPool controllerPool, String... otherArgs)
+      throws Exception {
+    CommandLine cmd = parse(otherArgs);
+    if (cmd == null) {
+      return null;
+    }
+
+    final String controllerConnectionStr = cmd.getOptionValue("c");
+    final String tableName = cmd.getOptionValue("t");
+
+    final Iface client = controllerPool.getClient(controllerConnectionStr);
+    TableDescriptor tableDescriptor = client.describe(tableName);
+
+    Job job = new Job(configuration, "Blur indexer [" + tableName + "]");
+    job.setJarByClass(CsvBlurDriver.class);
+    job.setMapperClass(CsvBlurMapper.class);
+
+    if (cmd.hasOption("p")) {
+      job.getConfiguration().set(MAPRED_COMPRESS_MAP_OUTPUT, "true");
+      String codecStr = cmd.getOptionValue("p");
+      COMPRESSION compression;
+      try {
+        compression = COMPRESSION.valueOf(codecStr.trim().toUpperCase());
+      } catch (IllegalArgumentException e) {
+        compression = null;
+      }
+      if (compression == null) {
+        job.getConfiguration().set(MAPRED_MAP_OUTPUT_COMPRESSION_CODEC, codecStr.trim());
+      } else {
+        job.getConfiguration().set(MAPRED_MAP_OUTPUT_COMPRESSION_CODEC, compression.getClassName());
+      }
+    }
+    if (cmd.hasOption("a")) {
+      CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(job, true);
+    }
+    if (cmd.hasOption("A")) {
+      CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(job, true);
+    }
+    if (cmd.hasOption("S")) {
+      job.setInputFormatClass(SequenceFileInputFormat.class);
+    } else {
+      job.setInputFormatClass(TextInputFormat.class);
+    }
+
+    if (cmd.hasOption("C")) {
+      if (cmd.hasOption("S")) {
+        String[] optionValues = cmd.getOptionValues("C");
+        job.setInputFormatClass(CsvBlurCombineSequenceFileInputFormat.class);
+        CombineFileInputFormat.setMinInputSplitSize(job, Long.parseLong(optionValues[0]));
+        CombineFileInputFormat.setMaxInputSplitSize(job, Long.parseLong(optionValues[1]));
+      } else {
+        System.err.println("'C' can only be used with option 'S'");
+        return null;
+      }
+    }
+
+    if (cmd.hasOption("i")) {
+      for (String input : cmd.getOptionValues("i")) {
+        Path path = new Path(input);
+        Set<Path> pathSet = recurisvelyGetPathesContainingFiles(path, job.getConfiguration());
+        if (pathSet.isEmpty()) {
+          FileInputFormat.addInputPath(job, path);
+        } else {
+          for (Path p : pathSet) {
+            FileInputFormat.addInputPath(job, p);
+          }
+        }
+      }
+    }
+    // processing the 'I' option
+    if (cmd.hasOption("I")) {
+    	if(cmd.hasOption("C")){
+    		 System.err.println("'I' and 'C' both parameters can not be used together.");
+             return null;
+    	}
+      Option[] options = cmd.getOptions();
+      for (Option option : options) {
+        if (option.getOpt().equals("I")) {
+          String[] values = option.getValues();
+          if (values.length < 2) {
+            System.err.println("'I' parameter missing minimum args of (family path*)");
+            return null;
+          }
+          for (String p : getSubArray(values, 1)) {
+            Path path = new Path(p);
+            CsvBlurMapper.addFamilyPath(job, values[0], path);
+            FileInputFormat.addInputPath(job, path);
+          }
+        }
+      }
+    }
+
+    if (cmd.hasOption("s")) {
+      CsvBlurMapper.setSeparator(job, StringEscapeUtils.unescapeJava(cmd.getOptionValue("s")));
+    }
+    if (cmd.hasOption("o")) {
+      BlurOutputFormat.setOptimizeInFlight(job, false);
+    }
+    if (cmd.hasOption("l")) {
+      BlurOutputFormat.setIndexLocally(job, false);
+    }
+    if (cmd.hasOption("b")) {
+      int maxDocumentBufferSize = Integer.parseInt(cmd.getOptionValue("b"));
+      BlurOutputFormat.setMaxDocumentBufferSize(job, maxDocumentBufferSize);
+    }
+    if (cmd.hasOption("r")) {
+      int reducerMultiplier = Integer.parseInt(cmd.getOptionValue("r"));
+      BlurOutputFormat.setReducerMultiplier(job, reducerMultiplier);
+    }
+    // processing the 'd' option
+    Option[] options = cmd.getOptions();
+    for (Option option : options) {
+      if (option.getOpt().equals("d")) {
+        String[] values = option.getValues();
+        if (values.length < 2) {
+          System.err.println("'d' parameter missing minimum args of (family columname*)");
+          return null;
+        }
+        CsvBlurMapper.addColumns(job, values[0], getSubArray(values, 1));
+      }
+    }
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurMapReduceUtil.addDependencyJars(job.getConfiguration(), Splitter.class);
+    return job;
+  }
+
+  private static String[] getSubArray(String[] array, int starting) {
+    String[] result = new String[array.length - starting];
+    System.arraycopy(array, starting, result, 0, result.length);
+    return result;
+  }
+
+  private static Set<Path> recurisvelyGetPathesContainingFiles(Path path, Configuration configuration)
+      throws IOException {
+    Set<Path> pathSet = new HashSet<Path>();
+    FileSystem fileSystem = path.getFileSystem(configuration);
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    for (FileStatus status : listStatus) {
+      if (status.isDir()) {
+        pathSet.addAll(recurisvelyGetPathesContainingFiles(status.getPath(), configuration));
+      } else {
+        pathSet.add(status.getPath().getParent());
+      }
+    }
+    return pathSet;
+  }
+
+  private static CommandLine parse(String... otherArgs) throws ParseException {
+    Options options = new Options();
+    options.addOption(OptionBuilder.withArgName("controller*").hasArgs().isRequired(true)
+        .withDescription("* Thrift controller connection string. (host1:40010 host2:40010 ...)").create("c"));
+    options.addOption(OptionBuilder.withArgName("tablename").hasArg().isRequired(true)
+        .withDescription("* Blur table name.").create("t"));
+    options.addOption(OptionBuilder.withArgName("family column*").hasArgs().isRequired(true)
+        .withDescription("* Define the mapping of fields in the CSV file to column names. (family col1 col2 col3 ...)")
+        .create("d"));
+    options.addOption(OptionBuilder
+        .withArgName("delimiter")
+        .hasArg()
+        .withDescription(
+            "The file delimiter to be used. (default value ',')  NOTE: For special "
+                + "charactors like the default hadoop separator of ASCII value 1, you can use standard "
+                + "java escaping (\\u0001)").create("s"));
+    options.addOption(OptionBuilder.withArgName("path*").hasArg()
+        .withDescription("The directory to index, the family name is assumed to BE present in the file contents. (hdfs://namenode/input/in1)").create("i"));
+    options.addOption(OptionBuilder.withArgName("family path*").hasArgs()
+        .withDescription("The directory to index with a family name, the family name is assumed to NOT be present in the file contents. (family hdfs://namenode/input/in1)").create("I"));
+    options
+        .addOption(OptionBuilder
+            .withArgName("auto generate record ids")
+            .withDescription(
+                "No Record Ids - Automatically generate record ids for each record based on a MD5 has of the data within the record.")
+            .create("a"));
+    options
+        .addOption(OptionBuilder
+            .withArgName("auto generate row ids")
+            .withDescription(
+                "No Row Ids - Automatically generate row ids for each record based on a MD5 has of the data within the record.")
+            .create("A"));
+    options.addOption(OptionBuilder.withArgName("disable optimize indexes during copy")
+        .withDescription("Disable optimize indexes during copy, this has very little overhead. (enabled by default)")
+        .create("o"));
+    options.addOption(OptionBuilder
+        .withArgName("disable index locally")
+        .withDescription(
+            "Disable the use storage local on the server that is running the reducing "
+                + "task and copy to Blur table once complete. (enabled by default)").create("l"));
+    options.addOption(OptionBuilder.withArgName("sequence files inputs")
+        .withDescription("The input files are sequence files.").create("S"));
+    options.addOption(OptionBuilder
+        .withArgName("size")
+        .hasArg()
+        .withDescription(
+            "The maximum number of Lucene documents to buffer in the reducer for a single "
+                + "row before spilling over to disk. (default 1000)").create("b"));
+    options.addOption(OptionBuilder
+        .withArgName("multiplier")
+        .hasArg()
+        .withDescription(
+            "The reducer multipler allows for an increase in the number of reducers per "
+                + "shard in the given table.  For example if the table has 128 shards and the "
+                + "reducer multiplier is 4 the total number of reducers will be 512, 4 reducers "
+                + "per shard. (default 1)").create("r"));
+    options.addOption(OptionBuilder
+        .withArgName("minimum maximum")
+        .hasArgs(2)
+        .withDescription(
+            "Enables a combine file input to help deal with many small files as the input. Provide "
+                + "the minimum and maximum size per mapper.  For a minimum of 1GB and a maximum of "
+                + "2.5GB: (1000000000 2500000000)").create("C"));
+    options.addOption(OptionBuilder
+        .withArgName("codec")
+        .hasArgs(1)
+        .withDescription(
+            "Sets the compression codec for the map compress output setting. (SNAPPY,GZIP,BZIP,DEFAULT, or classname)")
+        .create("p"));
+
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = null;
+    try {
+      cmd = parser.parse(options, otherArgs);
+    } catch (ParseException e) {
+      System.err.println(e.getMessage());
+      HelpFormatter formatter = new HelpFormatter();
+      PrintWriter pw = new PrintWriter(System.err, true);
+      formatter.printHelp(pw, DEFAULT_WIDTH, CSVLOADER, HEADER, options, HelpFormatter.DEFAULT_LEFT_PAD,
+          HelpFormatter.DEFAULT_DESC_PAD, null, false);
+      return null;
+    }
+
+    if (!(cmd.hasOption("I") || cmd.hasOption("i"))) {
+      System.err.println("Missing input directory, see options 'i' and 'I'.");
+      HelpFormatter formatter = new HelpFormatter();
+      PrintWriter pw = new PrintWriter(System.err, true);
+      formatter.printHelp(pw, DEFAULT_WIDTH, CSVLOADER, HEADER, options, HelpFormatter.DEFAULT_LEFT_PAD,
+          HelpFormatter.DEFAULT_DESC_PAD, null, false);
+      return null;
+    }
+    return cmd;
+  }
+
+  public static class CsvBlurCombineSequenceFileInputFormat extends CombineFileInputFormat<Writable, Text> {
+
+    
+    private static class SequenceFileRecordReaderWrapper extends RecordReader<Writable, Text>{
+    	
+    	private final RecordReader<Writable,Text> delegate;
+    	private final FileSplit fileSplit;
+
+		@SuppressWarnings("unused")
+		public SequenceFileRecordReaderWrapper(CombineFileSplit split,
+            TaskAttemptContext context, Integer index) throws IOException{
+            fileSplit = new FileSplit(split.getPath(index),
+                      split.getOffset(index), split.getLength(index),
+                      split.getLocations());
+            delegate = new SequenceFileInputFormat<Writable,Text>().createRecordReader(fileSplit, context);
+        }
+
+        @Override public float getProgress() throws IOException, InterruptedException {
+            return delegate.getProgress();
+        }
+
+		@Override
+		public Writable getCurrentKey() throws IOException,
+				InterruptedException {
+			return delegate.getCurrentKey();
+		}
+
+		@Override
+		public Text getCurrentValue() throws IOException, InterruptedException {
+			return delegate.getCurrentValue();
+		}
+
+		@Override
+		public void initialize(InputSplit arg0, TaskAttemptContext context)
+				throws IOException, InterruptedException {
+			delegate.initialize(fileSplit, context);
+		}
+
+		@Override
+		public boolean nextKeyValue() throws IOException, InterruptedException {
+			return delegate.nextKeyValue();
+		}
+		
+		@Override public void close() throws IOException {
+            delegate.close();
+		}
+
+    }
+    	
+    @Override
+	public RecordReader<Writable, Text> createRecordReader(
+			InputSplit split, TaskAttemptContext context) throws IOException {
+		return new CombineFileRecordReader<Writable, Text>((CombineFileSplit) split, context, SequenceFileRecordReaderWrapper.class);
+	}
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
new file mode 100644
index 0000000..8f59e31
--- /dev/null
+++ b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
@@ -0,0 +1,487 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import com.google.common.base.Splitter;
+
+/**
+ * This will parse a standard csv file into a {@link BlurMutate} object. Use the
+ * static addColumns, and setSeparator methods to configure the class.
+ */
+public class CsvBlurMapper extends BaseBlurMapper<Writable, Text> {
+
+  public static final String UTF_8 = "UTF-8";
+  public static final String BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA = "blur.csv.auto.generate.record.id.as.hash.of.data";
+  public static final String BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA = "blur.csv.auto.generate.row.id.as.hash.of.data";
+  public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES = "blur.csv.family.path.mappings.families";
+  public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX = "blur.csv.family.path.mappings.family.";
+  public static final String BLUR_CSV_SEPARATOR_BASE64 = "blur.csv.separator.base64";
+  public static final String BLUR_CSV_FAMILY_COLUMN_PREFIX = "blur.csv.family.";
+  public static final String BLUR_CSV_FAMILIES = "blur.csv.families";
+  public static final String HIVE_NULL = "\\N";
+
+  protected Map<String, List<String>> _columnNameMap;
+  protected String _separator = Base64.encodeBase64String(",".getBytes());
+  protected Splitter _splitter;
+  protected boolean _familyNotInFile;
+  protected String _familyFromPath;
+  protected boolean _autoGenerateRecordIdAsHashOfData;
+  protected MessageDigest _digest;
+  protected boolean _autoGenerateRowIdAsHashOfData;
+
+  /**
+   * Add a mapping for a family to a path. This is to be used when an entire
+   * path is to be processed as a single family and the data itself does not
+   * contain the family.<br/>
+   * <br/>
+   * 
+   * NOTE: the familyNotInFile property must be set before this method can be
+   * called.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param family
+   *          the family.
+   * @param path
+   *          the path.
+   */
+  public static void addFamilyPath(Job job, String family, Path path) {
+    addFamilyPath(job.getConfiguration(), family, path);
+  }
+
+  /**
+   * Add a mapping for a family to a path. This is to be used when an entire
+   * path is to be processed as a single family and the data itself does not
+   * contain the family.<br/>
+   * <br/>
+   * 
+   * NOTE: the familyNotInFile property must be set before this method can be
+   * called.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param family
+   *          the family.
+   * @param path
+   *          the path.
+   */
+  public static void addFamilyPath(Configuration configuration, String family, Path path) {
+    append(configuration, BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES, family);
+    append(configuration, BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family, path.toString());
+  }
+
+  protected static void append(Configuration configuration, String name, String value) {
+    Collection<String> set = configuration.getStringCollection(name);
+    if (set == null) {
+      set = new TreeSet<String>();
+    }
+    set.add(value);
+    configuration.setStrings(name, set.toArray(new String[set.size()]));
+  }
+
+  /**
+   * If set to true the record id will be automatically generated as a hash of
+   * the data that the record contains.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param autoGenerateRecordIdAsHashOfData
+   *          boolean.
+   */
+  public static void setAutoGenerateRecordIdAsHashOfData(Job job, boolean autoGenerateRecordIdAsHashOfData) {
+    setAutoGenerateRecordIdAsHashOfData(job.getConfiguration(), autoGenerateRecordIdAsHashOfData);
+  }
+
+  /**
+   * If set to true the record id will be automatically generated as a hash of
+   * the data that the record contains.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param autoGenerateRecordIdAsHashOfData
+   *          boolean.
+   */
+  public static void setAutoGenerateRecordIdAsHashOfData(Configuration configuration,
+      boolean autoGenerateRecordIdAsHashOfData) {
+    configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, autoGenerateRecordIdAsHashOfData);
+  }
+
+  /**
+   * Gets whether or not to generate a recordid for the record based on the
+   * data.
+   * 
+   * @param configuration
+   *          the configuration.
+   * @return boolean.
+   */
+  public static boolean isAutoGenerateRecordIdAsHashOfData(Configuration configuration) {
+    return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, false);
+  }
+
+  /**
+   * If set to true the record id will be automatically generated as a hash of
+   * the data that the record contains.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param autoGenerateRecordIdAsHashOfData
+   *          boolean.
+   */
+  public static void setAutoGenerateRowIdAsHashOfData(Job job, boolean autoGenerateRowIdAsHashOfData) {
+    setAutoGenerateRowIdAsHashOfData(job.getConfiguration(), autoGenerateRowIdAsHashOfData);
+  }
+
+  /**
+   * If set to true the record id will be automatically generated as a hash of
+   * the data that the record contains.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param autoGenerateRecordIdAsHashOfData
+   *          boolean.
+   */
+  public static void setAutoGenerateRowIdAsHashOfData(Configuration configuration, boolean autoGenerateRowIdAsHashOfData) {
+    configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA, autoGenerateRowIdAsHashOfData);
+  }
+
+  /**
+   * Gets whether or not to generate a recordid for the record based on the
+   * data.
+   * 
+   * @param configuration
+   *          the configuration.
+   * @return boolean.
+   */
+  public static boolean isAutoGenerateRowIdAsHashOfData(Configuration configuration) {
+    return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA, false);
+  }
+
+  /**
+   * Sets all the family and column definitions.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param strDefinition
+   *          the string definition. <br/>
+   * <br/>
+   *          Example:<br/>
+   *          "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
+   *          Where "cf1" is a family name that contains columns "col1", "col2"
+   *          and "col3" and a second family of "cf2" with columns "col1",
+   *          "col2", and "col3".
+   */
+  public static void setColumns(Job job, String strDefinition) {
+    setColumns(job.getConfiguration(), strDefinition);
+  }
+
+  /**
+   * Sets all the family and column definitions.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param strDefinition
+   *          the string definition. <br/>
+   * <br/>
+   *          Example:<br/>
+   *          "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
+   *          Where "cf1" is a family name that contains columns "col1", "col2"
+   *          and "col3" and a second family of "cf2" with columns "col1",
+   *          "col2", and "col3".
+   */
+  public static void setColumns(Configuration configuration, String strDefinition) {
+    Iterable<String> familyDefs = Splitter.on('|').split(strDefinition);
+    for (String familyDef : familyDefs) {
+      int indexOf = familyDef.indexOf(':');
+      if (indexOf < 0) {
+        throwMalformedDefinition(strDefinition);
+      }
+      String family = familyDef.substring(0, indexOf);
+      Iterable<String> cols = Splitter.on(',').split(familyDef.substring(indexOf + 1));
+      List<String> colnames = new ArrayList<String>();
+      for (String columnName : cols) {
+        colnames.add(columnName);
+      }
+      if (family.trim().isEmpty() || colnames.isEmpty()) {
+        throwMalformedDefinition(strDefinition);
+      }
+      addColumns(configuration, family, colnames.toArray(new String[colnames.size()]));
+    }
+  }
+
+  protected static void throwMalformedDefinition(String strDefinition) {
+    throw new RuntimeException("Family and column definition string not valid [" + strDefinition
+        + "] should look like \"family1:colname1,colname2|family2:colname1,colname2,colname3\"");
+  }
+
+  /**
+   * Adds the column layout for the given family.
+   * 
+   * @param job
+   *          the job to apply the layout.
+   * @param family
+   *          the family name.
+   * @param columns
+   *          the column names.
+   */
+  public static void addColumns(Job job, String family, String... columns) {
+    addColumns(job.getConfiguration(), family, columns);
+  }
+
+  /**
+   * Adds the column layout for the given family.
+   * 
+   * @param configuration
+   *          the configuration to apply the layout.
+   * @param family
+   *          the family name.
+   * @param columns
+   *          the column names.
+   */
+  public static void addColumns(Configuration configuration, String family, String... columns) {
+    Collection<String> families = new TreeSet<String>(configuration.getStringCollection(BLUR_CSV_FAMILIES));
+    families.add(family);
+    configuration.setStrings(BLUR_CSV_FAMILIES, families.toArray(new String[] {}));
+    configuration.setStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family, columns);
+  }
+
+  public static Collection<String> getFamilyNames(Configuration configuration) {
+    return configuration.getStringCollection(BLUR_CSV_FAMILIES);
+  }
+
+  public static Map<String, List<String>> getFamilyAndColumnNameMap(Configuration configuration) {
+    Map<String, List<String>> columnNameMap = new HashMap<String, List<String>>();
+    for (String family : getFamilyNames(configuration)) {
+      String[] columnsNames = configuration.getStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family);
+      columnNameMap.put(family, Arrays.asList(columnsNames));
+    }
+    return columnNameMap;
+  }
+
+  /**
+   * Sets the separator of the file, by default it is ",".
+   * 
+   * @param job
+   *          the job to apply the separator change.
+   * @param separator
+   *          the separator.
+   */
+  public static void setSeparator(Job job, String separator) {
+    setSeparator(job.getConfiguration(), separator);
+  }
+
+  /**
+   * Sets the separator of the file, by default it is ",".
+   * 
+   * @param configuration
+   *          the configuration to apply the separator change.
+   * @param separator
+   *          the separator.
+   */
+  public static void setSeparator(Configuration configuration, String separator) {
+    try {
+      configuration.set(BLUR_CSV_SEPARATOR_BASE64, Base64.encodeBase64String(separator.getBytes(UTF_8)));
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration configuration = context.getConfiguration();
+    _autoGenerateRecordIdAsHashOfData = isAutoGenerateRecordIdAsHashOfData(configuration);
+    _autoGenerateRowIdAsHashOfData = isAutoGenerateRowIdAsHashOfData(configuration);
+    if (_autoGenerateRecordIdAsHashOfData || _autoGenerateRowIdAsHashOfData) {
+      try {
+        _digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new IOException(e);
+      }
+    }
+    _columnNameMap = getFamilyAndColumnNameMap(configuration);
+    _separator = new String(Base64.decodeBase64(configuration.get(BLUR_CSV_SEPARATOR_BASE64, _separator)), UTF_8);
+    _splitter = Splitter.on(_separator);
+    Path fileCurrentlyProcessing = getCurrentFile(context);
+    Collection<String> families = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
+    OUTER: for (String family : families) {
+      Collection<String> pathStrCollection = configuration
+          .getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family);
+      for (String pathStr : pathStrCollection) {
+        Path path = new Path(pathStr);
+        path = path.makeQualified(path.getFileSystem(configuration));
+        if (isParent(path, fileCurrentlyProcessing)) {
+          _familyFromPath = family;
+          _familyNotInFile = true;
+          break OUTER;
+        }
+      }
+    }
+  }
+
+  protected boolean isParent(Path possibleParent, Path child) {
+    if (child == null) {
+      return false;
+    }
+    if (possibleParent.equals(child.getParent())) {
+      return true;
+    }
+    return isParent(possibleParent, child.getParent());
+  }
+
+  protected Path getCurrentFile(Context context) throws IOException {
+    InputSplit split = context.getInputSplit();
+    if (split != null && split instanceof FileSplit) {
+      FileSplit inputSplit = (FileSplit) split;
+      Path path = inputSplit.getPath();
+      return path.makeQualified(path.getFileSystem(context.getConfiguration()));
+    }
+    return null;
+  }
+
+  @Override
+  protected void map(Writable k, Text value, Context context) throws IOException, InterruptedException {
+    BlurRecord record = _mutate.getRecord();
+    record.clearColumns();
+    String str = value.toString();
+
+    Iterable<String> split = _splitter.split(str);
+    List<String> list = toList(split);
+
+    int offset = 0;
+    boolean gen = false;
+    if (!_autoGenerateRowIdAsHashOfData) {
+      record.setRowId(list.get(offset++));
+    } else {
+      _digest.reset();
+      byte[] bs = value.getBytes();
+      int length = value.getLength();
+      _digest.update(bs, 0, length);
+      record.setRowId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
+      gen = true;
+    }
+
+    if (!_autoGenerateRecordIdAsHashOfData) {
+      record.setRecordId(list.get(offset++));
+    } else {
+      if (gen) {
+        record.setRecordId(record.getRowId());
+      } else {
+        _digest.reset();
+        byte[] bs = value.getBytes();
+        int length = value.getLength();
+        _digest.update(bs, 0, length);
+        record.setRecordId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
+      }
+    }
+    String family;
+    if (_familyNotInFile) {
+      family = _familyFromPath;
+    } else {
+      family = list.get(offset++);
+    }
+    record.setFamily(family);
+
+    List<String> columnNames = _columnNameMap.get(family);
+    if (columnNames == null) {
+      throw new IOException("Family [" + family + "] is missing in the definition.");
+    }
+    if (list.size() - offset != columnNames.size()) {
+
+      String options = "";
+
+      if (!_autoGenerateRowIdAsHashOfData) {
+        options += "rowid,";
+      }
+      if (!_autoGenerateRecordIdAsHashOfData) {
+        options += "recordid,";
+      }
+      if (!_familyNotInFile) {
+        options += "family,";
+      }
+      String msg = "Record [" + str + "] does not match defined record [" + options + getColumnNames(columnNames)
+          + "].";
+      throw new IOException(msg);
+    }
+
+    for (int i = 0; i < columnNames.size(); i++) {
+      String val = handleHiveNulls(list.get(i + offset));
+      if (val != null) {
+        record.addColumn(columnNames.get(i), val);
+        _columnCounter.increment(1);
+      }
+    }
+    _key.set(record.getRowId());
+    _mutate.setMutateType(MUTATE_TYPE.REPLACE);
+    context.write(_key, _mutate);
+    _recordCounter.increment(1);
+    context.progress();
+  }
+
+  protected String handleHiveNulls(String value) {
+    if (value.equals(HIVE_NULL)) {
+      return null;
+    }
+    return value;
+  }
+
+  public void setFamilyFromPath(String familyFromPath) {
+    this._familyFromPath = familyFromPath;
+  }
+
+  protected String getColumnNames(List<String> columnNames) {
+    StringBuilder builder = new StringBuilder();
+    for (String c : columnNames) {
+      if (builder.length() != 0) {
+        builder.append(',');
+      }
+      builder.append(c);
+    }
+    return builder.toString();
+  }
+
+  protected List<String> toList(Iterable<String> split) {
+    List<String> lst = new ArrayList<String>();
+    for (String s : split) {
+      lst.add(s);
+    }
+    return lst;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
----------------------------------------------------------------------
diff --git a/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
new file mode 100644
index 0000000..99f2ac5
--- /dev/null
+++ b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
@@ -0,0 +1,89 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * This class is to be used in conjunction with {@link BlurOutputFormat}
+ * .</br></br>
+ * 
+ * Here is a basic example of how to use both the {@link BlurOutputFormat} and
+ * the {@link DefaultBlurReducer} together to build indexes.</br></br>
+ * 
+ * Once this job has successfully completed the indexes will be imported by the
+ * running shard servers and be placed online. This is a polling mechicism in
+ * the shard servers and by default they poll every 10 seconds.
+ * 
+ * 
+ * </br></br>
+ * 
+ * Job job = new Job(conf, "blur index");</br>
+ * job.setJarByClass(BlurOutputFormatTest.class);</br>
+ * job.setMapperClass(CsvBlurMapper.class);</br>
+ * job.setReducerClass(DefaultBlurReducer.class);</br>
+ * job.setNumReduceTasks(1);</br>
+ * job.setInputFormatClass(TrackingTextInputFormat.class);</br>
+ * job.setOutputKeyClass(Text.class);
+ * </br>job.setOutputValueClass(BlurMutate.class);</br>
+ * job.setOutputFormatClass(BlurOutputFormat.class);</br> </br>
+ * FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));</br>
+ * CsvBlurMapper.addColumns(job, "cf1", "col");</br> </br> TableDescriptor
+ * tableDescriptor = new TableDescriptor();</br>
+ * tableDescriptor.setShardCount(1)
+ * ;</br>tableDescriptor.setAnalyzerDefinition(new
+ * AnalyzerDefinition());</br>tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR
+ * + "/out").toString());</br>BlurOutputFormat.setTableDescriptor(job,
+ * tableDescriptor);</br>
+ * 
+ * 
+ */
+public class DefaultBlurReducer extends Reducer<Writable, BlurMutate, Writable, BlurMutate> {
+
+  @Override
+  protected void setup(final Context context) throws IOException, InterruptedException {
+    BlurOutputFormat.setProgressable(context);
+    BlurOutputFormat.setGetCounter(new GetCounter() {
+      @Override
+      public Counter getCounter(Enum<?> counterName) {
+        return context.getCounter(counterName);
+      }
+    });
+  }
+
+  @Override
+  protected void reduce(Writable key, Iterable<BlurMutate> values, Context context) throws IOException,
+      InterruptedException {
+    Text textKey = getTextKey(key);
+    for (BlurMutate value : values) {
+      context.write(textKey, value);
+    }
+  }
+
+  protected Text getTextKey(Writable key) {
+    if (key instanceof Text) {
+      return (Text) key;
+    }
+    throw new IllegalArgumentException("Key is not of type Text, you will need to "
+        + "override DefaultBlurReducer and implement \"getTextKey\" method.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
new file mode 100644
index 0000000..e138cd5
--- /dev/null
+++ b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.blur.analysis.FieldManager;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.LuceneVersionConstant;
+import org.apache.blur.lucene.codec.Blur022Codec;
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.RowDocumentUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.NoMergePolicy;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.NoLockFactory;
+
+public class GenericBlurRecordWriter {
+
+  private static final Log LOG = LogFactory.getLog(GenericBlurRecordWriter.class);
+  private static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
+
+  private final Text _prevKey = new Text();
+  private final Map<String, List<Field>> _documents = new TreeMap<String, List<Field>>();
+  private final IndexWriter _writer;
+  private final FieldManager _fieldManager;
+  private final Directory _finalDir;
+  private final Directory _localDir;
+  private final File _localPath;
+  private final int _maxDocumentBufferSize;
+  private final IndexWriterConfig _conf;
+  private final IndexWriterConfig _overFlowConf;
+  private final Path _newIndex;
+  private final boolean _indexLocally;
+  private final boolean _optimizeInFlight;
+  private Counter _columnCount;
+  private Counter _fieldCount;
+  private Counter _recordCount;
+  private Counter _rowCount;
+  private Counter _recordDuplicateCount;
+  private Counter _rowOverFlowCount;
+  private Counter _rowDeleteCount;
+  private RateCounter _recordRateCounter;
+  private RateCounter _rowRateCounter;
+  private RateCounter _copyRateCounter;
+  private boolean _countersSetup = false;
+  private IndexWriter _localTmpWriter;
+  private boolean _usingLocalTmpindex;
+  private File _localTmpPath;
+  private ProgressableDirectory _localTmpDir;
+  private String _deletedRowId;
+
+  public GenericBlurRecordWriter(Configuration configuration, int attemptId, String tmpDirName) throws IOException {
+
+    _indexLocally = BlurOutputFormat.isIndexLocally(configuration);
+    _optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(configuration);
+
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    int shardCount = tableDescriptor.getShardCount();
+    int shardId = attemptId % shardCount;
+
+    _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(configuration);
+    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
+    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+    Path indexPath = new Path(tableOutput, shardName);
+    _newIndex = new Path(indexPath, tmpDirName);
+    _finalDir = new ProgressableDirectory(new HdfsDirectory(configuration, _newIndex), getProgressable());
+    _finalDir.setLockFactory(NoLockFactory.getNoLockFactory());
+
+    TableContext tableContext = TableContext.create(tableDescriptor);
+    _fieldManager = tableContext.getFieldManager();
+    Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
+
+    _conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION, analyzer);
+    _conf.setCodec(new Blur022Codec());
+    _conf.setSimilarity(tableContext.getSimilarity());
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+
+    _overFlowConf = _conf.clone();
+    _overFlowConf.setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES);
+
+    if (_indexLocally) {
+      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+      _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+      _localDir = new ProgressableDirectory(FSDirectory.open(_localPath), getProgressable());
+      _writer = new IndexWriter(_localDir, _conf.clone());
+    } else {
+      _localPath = null;
+      _localDir = null;
+      _writer = new IndexWriter(_finalDir, _conf.clone());
+    }
+  }
+
+  private Progressable getProgressable() {
+    return new Progressable() {
+      @Override
+      public void progress() {
+        Progressable progressable = BlurOutputFormat.getProgressable();
+        if (progressable != null) {
+          progressable.progress();
+        }
+      }
+    };
+  }
+
+  public void write(Text key, BlurMutate value) throws IOException {
+    if (!_countersSetup) {
+      setupCounter();
+      _countersSetup = true;
+    }
+    if (!_prevKey.equals(key)) {
+      flush();
+      _prevKey.set(key);
+    }
+    add(value);
+  }
+
+  private void setupCounter() {
+    GetCounter getCounter = BlurOutputFormat.getGetCounter();
+    _fieldCount = getCounter.getCounter(BlurCounters.LUCENE_FIELD_COUNT);
+    _columnCount = getCounter.getCounter(BlurCounters.COLUMN_COUNT);
+    _recordCount = getCounter.getCounter(BlurCounters.RECORD_COUNT);
+    _recordDuplicateCount = getCounter.getCounter(BlurCounters.RECORD_DUPLICATE_COUNT);
+    _rowCount = getCounter.getCounter(BlurCounters.ROW_COUNT);
+    _rowDeleteCount = getCounter.getCounter(BlurCounters.ROW_DELETE_COUNT);
+    _rowOverFlowCount = getCounter.getCounter(BlurCounters.ROW_OVERFLOW_COUNT);
+    _recordRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.RECORD_RATE));
+    _rowRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.ROW_RATE));
+    _copyRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.COPY_RATE));
+  }
+
+  private void add(BlurMutate value) throws IOException {
+    BlurRecord blurRecord = value.getRecord();
+    Record record = getRecord(blurRecord);
+    String recordId = record.getRecordId();
+    if (value.getMutateType() == MUTATE_TYPE.DELETE) {
+      _deletedRowId = blurRecord.getRowId();
+      return;
+    }
+    if (_countersSetup) {
+      _columnCount.increment(record.getColumns().size());
+    }
+    List<Field> document = RowDocumentUtil.getDoc(_fieldManager, blurRecord.getRowId(), record);
+    List<Field> dup = _documents.put(recordId, document);
+    if (_countersSetup) {
+      if (dup != null) {
+        _recordDuplicateCount.increment(1);
+      } else {
+        _fieldCount.increment(document.size());
+        _recordCount.increment(1);
+      }
+    }
+    flushToTmpIndexIfNeeded();
+  }
+
+  private void flushToTmpIndexIfNeeded() throws IOException {
+    if (_documents.size() > _maxDocumentBufferSize) {
+      flushToTmpIndex();
+    }
+  }
+
+  private void flushToTmpIndex() throws IOException {
+    if (_documents.isEmpty()) {
+      return;
+    }
+    _usingLocalTmpindex = true;
+    if (_localTmpWriter == null) {
+      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
+      _localTmpPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
+      _localTmpDir = new ProgressableDirectory(FSDirectory.open(_localTmpPath), BlurOutputFormat.getProgressable());
+      _localTmpWriter = new IndexWriter(_localTmpDir, _overFlowConf.clone());
+      // The local tmp writer has merging disabled so the first document in is
+      // going to be doc 0.
+      // Therefore the first document added is the prime doc
+      List<List<Field>> docs = new ArrayList<List<Field>>(_documents.values());
+      docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+      _localTmpWriter.addDocuments(docs);
+    } else {
+      _localTmpWriter.addDocuments(_documents.values());
+    }
+    _documents.clear();
+  }
+
+  private void resetLocalTmp() {
+    _usingLocalTmpindex = false;
+    _localTmpWriter = null;
+    _localTmpDir = null;
+    rm(_localTmpPath);
+    _localTmpPath = null;
+  }
+
+  private Record getRecord(BlurRecord value) {
+    Record record = new Record();
+    record.setRecordId(value.getRecordId());
+    record.setFamily(value.getFamily());
+    for (BlurColumn col : value.getColumns()) {
+      record.addToColumns(new Column(col.getName(), col.getValue()));
+    }
+    return record;
+  }
+
+  private void flush() throws CorruptIndexException, IOException {
+    if (_usingLocalTmpindex) {
+      // since we have flushed to disk then we do not need to index the
+      // delete.
+      flushToTmpIndex();
+      _localTmpWriter.close(false);
+      DirectoryReader reader = DirectoryReader.open(_localTmpDir);
+      if (_countersSetup) {
+        _recordRateCounter.mark(reader.numDocs());
+      }
+      _writer.addIndexes(reader);
+      reader.close();
+      resetLocalTmp();
+      if (_countersSetup) {
+        _rowOverFlowCount.increment(1);
+      }
+    } else {
+      if (_documents.isEmpty()) {
+        if (_deletedRowId != null) {
+          _writer.addDocument(getDeleteDoc());
+          if (_countersSetup) {
+            _rowDeleteCount.increment(1);
+          }
+        }
+      } else {
+        List<List<Field>> docs = new ArrayList<List<Field>>(_documents.values());
+        docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+        _writer.addDocuments(docs);
+        if (_countersSetup) {
+          _recordRateCounter.mark(_documents.size());
+        }
+        _documents.clear();
+      }
+    }
+    _deletedRowId = null;
+    if (_countersSetup) {
+      _rowRateCounter.mark();
+      _rowCount.increment(1);
+    }
+  }
+
+  private Document getDeleteDoc() {
+    Document document = new Document();
+    document.add(new StringField(BlurConstants.ROW_ID, _deletedRowId, Store.NO));
+    document.add(new StringField(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE, Store.NO));
+    return document;
+  }
+
+  public void close() throws IOException {
+    flush();
+    _writer.close();
+    if (_countersSetup) {
+      _recordRateCounter.close();
+      _rowRateCounter.close();
+    }
+    if (_indexLocally) {
+      if (_optimizeInFlight) {
+        copyAndOptimizeInFlightDir();
+      } else {
+        copyDir();
+      }
+    }
+    if (_countersSetup) {
+      _copyRateCounter.close();
+    }
+  }
+
+  private void copyAndOptimizeInFlightDir() throws IOException {
+    CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
+    copyRateDirectory.setLockFactory(NoLockFactory.getNoLockFactory());
+    DirectoryReader reader = DirectoryReader.open(_localDir);
+    IndexWriter writer = new IndexWriter(copyRateDirectory, _conf.clone());
+    writer.addIndexes(reader);
+    writer.close();
+    rm(_localPath);
+  }
+
+  private void copyDir() throws IOException {
+    CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
+    String[] fileNames = _localDir.listAll();
+    for (String fileName : fileNames) {
+      LOG.info("Copying [{0}] to [{1}]", fileName, _newIndex);
+      _localDir.copy(copyRateDirectory, fileName, fileName, IOContext.DEFAULT);
+    }
+    rm(_localPath);
+  }
+
+  private void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
new file mode 100644
index 0000000..6814ac2
--- /dev/null
+++ b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
@@ -0,0 +1,29 @@
+package org.apache.blur.mapreduce.lib;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * 
+ */
+public interface GetCounter {
+  
+  Counter getCounter(Enum<?> counterName);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
----------------------------------------------------------------------
diff --git a/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
new file mode 100644
index 0000000..46c030f
--- /dev/null
+++ b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
@@ -0,0 +1,58 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class IOUtil {
+
+  public static final String UTF_8 = "UTF-8";
+
+  public static String readString(DataInput input) throws IOException {
+    int length = readVInt(input);
+    byte[] buffer = new byte[length];
+    input.readFully(buffer);
+    return new String(buffer, UTF_8);
+  }
+
+  public static void writeString(DataOutput output, String s) throws IOException {
+    byte[] bs = s.getBytes(UTF_8);
+    writeVInt(output, bs.length);
+    output.write(bs);
+  }
+
+  public static int readVInt(DataInput input) throws IOException {
+    byte b = input.readByte();
+    int i = b & 0x7F;
+    for (int shift = 7; (b & 0x80) != 0; shift += 7) {
+      b = input.readByte();
+      i |= (b & 0x7F) << shift;
+    }
+    return i;
+  }
+
+  public static void writeVInt(DataOutput output, int i) throws IOException {
+    while ((i & ~0x7F) != 0) {
+      output.writeByte((byte) ((i & 0x7f) | 0x80));
+      i >>>= 7;
+    }
+    output.writeByte((byte) i);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
----------------------------------------------------------------------
diff --git a/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
new file mode 100644
index 0000000..004e1fa
--- /dev/null
+++ b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
@@ -0,0 +1,289 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+/**
+ * The {@link ProgressableDirectory} allows for progress to be recorded while
+ * Lucene is blocked and merging. This prevents the Task from being killed after
+ * not reporting progress because of the blocked merge.
+ */
+public class ProgressableDirectory extends Directory {
+
+  private static final Log LOG = LogFactory.getLog(ProgressableDirectory.class);
+
+  private final Directory _directory;
+  private final Progressable _progressable;
+
+  public ProgressableDirectory(Directory directory, Progressable progressable) {
+    _directory = directory;
+    if (progressable == null) {
+      LOG.warn("Progressable is null.");
+      _progressable = new Progressable() {
+        @Override
+        public void progress() {
+
+        }
+      };
+    } else {
+      _progressable = progressable;
+    }
+  }
+
+  @Override
+  public void clearLock(String name) throws IOException {
+    _directory.clearLock(name);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _directory.close();
+  }
+
+  private IndexInput wrapInput(String name, IndexInput openInput) {
+    return new ProgressableIndexInput(name, openInput, 16384, _progressable);
+  }
+
+  private IndexOutput wrapOutput(IndexOutput createOutput) {
+    return new ProgressableIndexOutput(createOutput, _progressable);
+  }
+
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    return wrapOutput(_directory.createOutput(name, context));
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    _directory.deleteFile(name);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return _directory.equals(obj);
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  @Override
+  public LockFactory getLockFactory() {
+    return _directory.getLockFactory();
+  }
+
+  @Override
+  public String getLockID() {
+    return _directory.getLockID();
+  }
+
+  @Override
+  public int hashCode() {
+    return _directory.hashCode();
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  @Override
+  public Lock makeLock(String name) {
+    return _directory.makeLock(name);
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    return wrapInput(name, _directory.openInput(name, context));
+  }
+
+  @Override
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _directory.setLockFactory(lockFactory);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  @Override
+  public String toString() {
+    return _directory.toString();
+  }
+
+  @SuppressWarnings("deprecation")
+  static class ProgressableIndexOutput extends IndexOutput {
+
+    private Progressable _progressable;
+    private IndexOutput _indexOutput;
+
+    public ProgressableIndexOutput(IndexOutput indexOutput, Progressable progressable) {
+      _indexOutput = indexOutput;
+      _progressable = progressable;
+    }
+
+    @Override
+    public void close() throws IOException {
+      _indexOutput.close();
+      _progressable.progress();
+    }
+
+    @Override
+    public void copyBytes(DataInput input, long numBytes) throws IOException {
+      _indexOutput.copyBytes(input, numBytes);
+      _progressable.progress();
+    }
+
+    @Override
+    public void flush() throws IOException {
+      _indexOutput.flush();
+      _progressable.progress();
+    }
+
+    @Override
+    public long getFilePointer() {
+      return _indexOutput.getFilePointer();
+    }
+
+    @Override
+    public long length() throws IOException {
+      return _indexOutput.length();
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      _indexOutput.seek(pos);
+      _progressable.progress();
+    }
+
+    @Override
+    public void setLength(long length) throws IOException {
+      _indexOutput.setLength(length);
+      _progressable.progress();
+    }
+
+    @Override
+    public String toString() {
+      return _indexOutput.toString();
+    }
+
+    @Override
+    public void writeByte(byte b) throws IOException {
+      _indexOutput.writeByte(b);
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int offset, int length) throws IOException {
+      _indexOutput.writeBytes(b, offset, length);
+      _progressable.progress();
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int length) throws IOException {
+      _indexOutput.writeBytes(b, length);
+      _progressable.progress();
+    }
+
+    @Override
+    public void writeInt(int i) throws IOException {
+      _indexOutput.writeInt(i);
+    }
+
+    @Override
+    public void writeLong(long i) throws IOException {
+      _indexOutput.writeLong(i);
+    }
+
+    @Override
+    public void writeString(String s) throws IOException {
+      _indexOutput.writeString(s);
+    }
+
+    @Override
+    public void writeStringStringMap(Map<String, String> map) throws IOException {
+      _indexOutput.writeStringStringMap(map);
+    }
+
+  }
+
+  static class ProgressableIndexInput extends BufferedIndexInput {
+
+    private IndexInput _indexInput;
+    private final long _length;
+    private Progressable _progressable;
+
+    ProgressableIndexInput(String name, IndexInput indexInput, int buffer, Progressable progressable) {
+      super("ProgressableIndexInput(" + indexInput.toString() + ")", buffer);
+      _indexInput = indexInput;
+      _length = indexInput.length();
+      _progressable = progressable;
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      long filePointer = getFilePointer();
+      if (filePointer != _indexInput.getFilePointer()) {
+        _indexInput.seek(filePointer);
+      }
+      _indexInput.readBytes(b, offset, length);
+      _progressable.progress();
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+      _indexInput.close();
+    }
+
+    @Override
+    public long length() {
+      return _length;
+    }
+
+    @Override
+    public ProgressableIndexInput clone() {
+      ProgressableIndexInput clone = (ProgressableIndexInput) super.clone();
+      clone._indexInput = (IndexInput) _indexInput.clone();
+      return clone;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
new file mode 100644
index 0000000..694759e
--- /dev/null
+++ b/blur-mapred-common/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
@@ -0,0 +1,64 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.mapreduce.Counter;
+
+/**
+ * This turns a standard hadoop counter into a rate counter.
+ */
+public class RateCounter {
+
+  private final Counter _counter;
+  private final long _reportTime;
+  private final long _rateTime;
+  private long _lastReport;
+  private long _count = 0;
+
+  public RateCounter(Counter counter) {
+    this(counter, TimeUnit.SECONDS, 1);
+  }
+
+  public RateCounter(Counter counter, TimeUnit unit, long reportTime) {
+    _counter = counter;
+    _lastReport = System.nanoTime();
+    _reportTime = unit.toNanos(reportTime);
+    _rateTime = unit.toSeconds(reportTime);
+  }
+
+  public void mark() {
+    mark(1l);
+  }
+
+  public void mark(long n) {
+    long now = System.nanoTime();
+    if (_lastReport + _reportTime < now) {
+      long rate = _count / _rateTime;
+      _counter.setValue(rate);
+      _lastReport = System.nanoTime();
+      _count = 0;
+    }
+    _count += n;
+  }
+
+  public void close() {
+    _counter.setValue(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-hadoop1/pom.xml
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/pom.xml b/blur-mapred-hadoop1/pom.xml
new file mode 100644
index 0000000..3bc54f7
--- /dev/null
+++ b/blur-mapred-hadoop1/pom.xml
@@ -0,0 +1,155 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.blur</groupId>
+		<artifactId>blur</artifactId>
+		<version>0.2.2-incubating-SNAPSHOT</version>
+		<relativePath>../pom.xml</relativePath>
+	</parent>
+	<groupId>org.apache.blur</groupId>
+	<artifactId>blur-mapred-hadoop1</artifactId>
+	<version>${projectVersion}</version>
+	<packaging>jar</packaging>
+	<name>Blur Map Reduce</name>
+	<description>The Blur Map Reduce Hadoop1 module contains the testsuite for Hadoop1.</description>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-mapred-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-util</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>${log4j.version}</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>javax.mail</groupId>
+					<artifactId>mail</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>javax.jms</groupId>
+					<artifactId>jms</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+	</dependencies>
+
+	<repositories>
+		<repository>
+			<id>libdir</id>
+			<url>file://${basedir}/../lib</url>
+		</repository>
+	</repositories>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<argLine>-XX:+UseConcMarkSweepGC -Xmx1g -Xms1g</argLine>
+					<forkCount>2</forkCount>
+					<forkMode>always</forkMode>
+					<reuseForks>false</reuseForks>
+					<systemPropertyVariables>
+						<blur.tmp.dir>${project.build.directory}/target/tmp</blur.tmp.dir>
+					</systemPropertyVariables>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+	
+	
+	<profiles>
+		<profile>
+			<id>hadoop-1x</id>
+			<activation>
+				<property>
+					<name>hadoop1</name>
+				</property>
+			</activation>
+			<properties>
+				<projectVersion>${project.parent.version}-hadoop1</projectVersion>
+			</properties>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-test</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>test</scope>
+				</dependency>
+				<dependency>
+				      <groupId>org.apache.mrunit</groupId>
+				      <artifactId>mrunit</artifactId>
+				      <version>${mrunit.version}</version>
+				      <classifier>hadoop1</classifier>
+					  <scope>test</scope>
+                </dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
new file mode 100644
index 0000000..c14e86e
--- /dev/null
+++ b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
@@ -0,0 +1,229 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.thrift.generated.TableStats;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.GCWatcher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurOutputFormatMiniClusterTest {
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem fileSystem;
+  private static MiniMRCluster mr;
+  private static Path TEST_ROOT_DIR;
+  private static JobConf jobConf;
+  private static MiniCluster miniCluster;
+  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
+  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir",
+      "./target/tmp_BlurOutputFormatMiniClusterTest"));
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    GCWatcher.init(0.60);
+    LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
+    File testDirectory = new File(TMPDIR, "blur-cluster-test").getAbsoluteFile();
+    testDirectory.mkdirs();
+
+    Path directory = new Path(testDirectory.getPath());
+    FsPermission dirPermissions = localFS.getFileStatus(directory).getPermission();
+    FsAction userAction = dirPermissions.getUserAction();
+    FsAction groupAction = dirPermissions.getGroupAction();
+    FsAction otherAction = dirPermissions.getOtherAction();
+
+    StringBuilder builder = new StringBuilder();
+    builder.append(userAction.ordinal());
+    builder.append(groupAction.ordinal());
+    builder.append(otherAction.ordinal());
+    String dirPermissionNum = builder.toString();
+    System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
+    testDirectory.delete();
+    miniCluster = new MiniCluster();
+    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true);
+
+    // System.setProperty("test.build.data",
+    // "./target/BlurOutputFormatTest/data");
+    // TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+    // "target/tmp/BlurOutputFormatTest_tmp"));
+    TEST_ROOT_DIR = new Path(miniCluster.getFileSystemUri().toString() + "/blur_test");
+    System.setProperty("hadoop.log.dir", "./target/BlurOutputFormatTest/hadoop_log");
+    try {
+      fileSystem = TEST_ROOT_DIR.getFileSystem(conf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+    mr = new MiniMRCluster(1, miniCluster.getFileSystemUri().toString(), 1);
+    jobConf = mr.createJobConf();
+    BufferStore.initNewBuffer(128, 128 * 128);
+  }
+
+  @AfterClass
+  public static void teardown() {
+    if (mr != null) {
+      mr.shutdown();
+    }
+    miniCluster.shutdownBlurCluster();
+    rm(new File("build"));
+  }
+
+  private static void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Before
+  public void setup() {
+    TableContext.clear();
+  }
+
+  @Test
+  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException, BlurException,
+      TException {
+    fileSystem.delete(inDir, true);
+    String tableName = "testBlurOutputFormat";
+    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
+    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
+
+    Job job = new Job(jobConf, "blur index");
+    job.setJarByClass(BlurOutputFormatMiniClusterTest.class);
+    job.setMapperClass(CsvBlurMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    String tableUri = new Path(TEST_ROOT_DIR + "/blur/" + tableName).toString();
+    CsvBlurMapper.addColumns(job, "cf1", "col");
+
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(1);
+    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setName(tableName);
+
+    Iface client = getClient();
+    client.createTable(tableDescriptor);
+
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path tablePath = new Path(tableUri);
+    Path shardPath = new Path(tablePath, BlurUtil.getShardName(0));
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath);
+    assertEquals(3, listStatus.length);
+    System.out.println("======" + listStatus.length);
+    for (FileStatus fileStatus : listStatus) {
+      System.out.println(fileStatus.getPath());
+    }
+
+    assertTrue(job.waitForCompletion(true));
+    Counters ctrs = job.getCounters();
+    System.out.println("Counters: " + ctrs);
+
+    while (true) {
+      TableStats tableStats = client.tableStats(tableName);
+      System.out.println(tableStats);
+      if (tableStats.getRowCount() > 0) {
+        break;
+      }
+      Thread.sleep(5000);
+    }
+
+    assertTrue(fileSystem.exists(tablePath));
+    assertFalse(fileSystem.isFile(tablePath));
+
+    FileStatus[] listStatusAfter = fileSystem.listStatus(shardPath);
+
+    assertEquals(11, listStatusAfter.length);
+
+  }
+
+  private Iface getClient() {
+    return BlurClient.getClient(miniCluster.getControllerConnectionStr());
+  }
+
+  public static String readFile(String name) throws IOException {
+    DataInputStream f = fileSystem.open(new Path(TEST_ROOT_DIR + "/" + name));
+    BufferedReader b = new BufferedReader(new InputStreamReader(f));
+    StringBuilder result = new StringBuilder();
+    String line = b.readLine();
+    while (line != null) {
+      result.append(line);
+      result.append('\n');
+      line = b.readLine();
+    }
+    b.close();
+    return result.toString();
+  }
+
+  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
+      int numberOfRecords, String family) throws IOException {
+    // "1,1,cf1,val1"
+    Path file = new Path(TEST_ROOT_DIR + "/" + name);
+    fileSystem.delete(file, false);
+    DataOutputStream f = fileSystem.create(file);
+    PrintWriter writer = new PrintWriter(f);
+    for (int row = 0; row < numberOfRows; row++) {
+      for (int record = 0; record < numberOfRecords; record++) {
+        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
+      }
+    }
+    writer.close();
+    return file;
+  }
+
+  private String getRecord(int rowId, int recordId, String family) {
+    return rowId + "," + recordId + "," + family + ",valuetoindex";
+  }
+}


Mime
View raw message