incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cr...@apache.org
Subject [25/50] [abbrv] Reconfiguring projects again, some of the units test for hadoop2 now run successfully.
Date Sun, 18 May 2014 21:42:00 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
new file mode 100644
index 0000000..5ee26eb
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This utility code was taken from HBase to locate classes and the jars files
+ * to add to the MapReduce Job.
+ */
+public class BlurMapReduceUtil {
+
+  private final static Log LOG = LogFactory.getLog(BlurMapReduceUtil.class);
+
+  /**
+   * Add the Blur dependency jars as well as jars for any of the configured job
+   * classes to the job configuration, so that JobClient will ship them to the
+   * cluster and add them to the DistributedCache.
+   */
+  public static void addDependencyJars(Job job) throws IOException {
+    try {
+      addDependencyJars(job.getConfiguration(), org.apache.zookeeper.ZooKeeper.class, job.getMapOutputKeyClass(),
+          job.getMapOutputValueClass(), job.getInputFormatClass(), job.getOutputKeyClass(), job.getOutputValueClass(),
+          job.getOutputFormatClass(), job.getPartitionerClass(), job.getCombinerClass());
+      addAllJarsInBlurLib(job.getConfiguration());
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Adds all the jars in the same path as the blur jar files.
+   * @param conf
+   * @throws IOException
+   */
+  public static void addAllJarsInBlurLib(Configuration conf) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Set<String> jars = new HashSet<String>();
+    jars.addAll(conf.getStringCollection("tmpjars"));
+
+    String property = System.getProperty("java.class.path");
+    String[] files = property.split("\\:");
+
+    String blurLibPath = getPath("blur-", files);
+    if (blurLibPath == null) {
+      return;
+    }
+    List<String> pathes = getPathes(blurLibPath, files);
+    for (String pathStr : pathes) {
+      Path path = new Path(pathStr);
+      if (!localFs.exists(path)) {
+        LOG.warn("Could not validate jar file " + path);
+        continue;
+      }
+      jars.add(path.makeQualified(localFs).toString());
+    }
+    if (jars.isEmpty()) {
+      return;
+    }
+    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
+  }
+
+  private static List<String> getPathes(String path, String[] files) {
+    List<String> pathes = new ArrayList<String>();
+    for (String file : files) {
+      if (file.startsWith(path)) {
+        pathes.add(file);
+      }
+    }
+    return pathes;
+  }
+
+  private static String getPath(String startsWith, String[] files) {
+    for (String file : files) {
+      int lastIndexOf = file.lastIndexOf('/');
+      String fileName = file.substring(lastIndexOf + 1);
+      if (fileName.startsWith(startsWith)) {
+        return file.substring(0, lastIndexOf);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Add the jars containing the given classes to the job's configuration such
+   * that JobClient will ship them to the cluster and add them to the
+   * DistributedCache.
+   */
+  public static void addDependencyJars(Configuration conf, Class<?>... classes) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+    Set<String> jars = new HashSet<String>();
+    // Add jars that are already in the tmpjars variable
+    jars.addAll(conf.getStringCollection("tmpjars"));
+
+    // Add jars containing the specified classes
+    for (Class<?> clazz : classes) {
+      if (clazz == null) {
+        continue;
+      }
+
+      String pathStr = findOrCreateJar(clazz);
+      if (pathStr == null) {
+        LOG.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster.");
+        continue;
+      }
+      Path path = new Path(pathStr);
+      if (!localFs.exists(path)) {
+        LOG.warn("Could not validate jar file " + path + " for class " + clazz);
+        continue;
+      }
+      jars.add(path.makeQualified(localFs).toString());
+    }
+    if (jars.isEmpty()) {
+      return;
+    }
+
+    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
+  }
+
+  /**
+   * If org.apache.hadoop.util.JarFinder is available (0.23+ hadoop), finds the
+   * Jar for a class or creates it if it doesn't exist. If the class is in a
+   * directory in the classpath, it creates a Jar on the fly with the contents
+   * of the directory and returns the path to that Jar. If a Jar is created, it
+   * is created in the system temporary directory.
+   * 
+   * Otherwise, returns an existing jar that contains a class of the same name.
+   * 
+   * @param my_class
+   *          the class to find.
+   * @return a jar file that contains the class, or null.
+   * @throws IOException
+   */
+  private static String findOrCreateJar(Class<?> my_class) throws IOException {
+    try {
+      Class<?> jarFinder = Class.forName("org.apache.hadoop.util.JarFinder");
+      // hadoop-0.23 has a JarFinder class that will create the jar
+      // if it doesn't exist. Note that this is needed to run the mapreduce
+      // unit tests post-0.23, because mapreduce v2 requires the relevant jars
+      // to be in the mr cluster to do output, split, etc. At unit test time,
+      // the hbase jars do not exist, so we need to create some. Note that we
+      // can safely fall back to findContainingJars for pre-0.23 mapreduce.
+      Method m = jarFinder.getMethod("getJar", Class.class);
+      return (String) m.invoke(null, my_class);
+    } catch (InvocationTargetException ite) {
+      // function was properly called, but threw it's own exception
+      throw new IOException(ite.getCause());
+    } catch (Exception e) {
+      // ignore all other exceptions. related to reflection failure
+    }
+
+    LOG.debug("New JarFinder: org.apache.hadoop.util.JarFinder.getJar " + "not available.  Using old findContainingJar");
+    return findContainingJar(my_class);
+  }
+
+  /**
+   * Find a jar that contains a class of the same name, if any. It will return a
+   * jar file, even if that is not the first thing on the class path that has a
+   * class with the same name.
+   * 
+   * This is shamelessly copied from JobConf
+   * 
+   * @param my_class
+   *          the class to find.
+   * @return a jar file that contains the class, or null.
+   * @throws IOException
+   */
+  private static String findContainingJar(Class<?> my_class) {
+    ClassLoader loader = my_class.getClassLoader();
+    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+    try {
+      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
+        URL url = itr.nextElement();
+        if ("jar".equals(url.getProtocol())) {
+          String toReturn = url.getPath();
+          if (toReturn.startsWith("file:")) {
+            toReturn = toReturn.substring("file:".length());
+          }
+          // URLDecoder is a misnamed class, since it actually decodes
+          // x-www-form-urlencoded MIME type rather than actual
+          // URL encoding (which the file path has). Therefore it would
+          // decode +s to ' 's which is incorrect (spaces are actually
+          // either unencoded or encoded as "%20"). Replace +s first, so
+          // that they are kept sacred during the decoding process.
+          toReturn = toReturn.replaceAll("\\+", "%2B");
+          toReturn = URLDecoder.decode(toReturn, "UTF-8");
+          return toReturn.replaceAll("!.*$", "");
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
new file mode 100644
index 0000000..36d7f4f
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
@@ -0,0 +1,178 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * {@link BlurMutate} carries the {@link Record}s bound for the {@link Row} for
+ * indexing. If this mutate represents a delete of the {@link Row} the recordId
+ * of the {@link BlurRecord} is ignored.
+ */
+public class BlurMutate implements Writable {
+
+  /**
+   * The {@link MUTATE_TYPE} controls the mutating of the {@link Row}. DELETE
+   * indicates that the {@link Row} is to be deleted. REPLACE indicates that the
+   * group of mutates are to replace the existing {@link Row}.
+   * 
+   * If both a DELETE and a REPLACE exist for a single {@link Row} in the
+   * {@link BlurOutputFormat} then the {@link Row} will be replaced not just
+   * deleted.
+   */
+  public enum MUTATE_TYPE {
+    /* ADD(0), UPDATE(1), */DELETE(2), REPLACE(3);
+    private int _value;
+
+    private MUTATE_TYPE(int value) {
+      _value = value;
+    }
+
+    public int getValue() {
+      return _value;
+    }
+
+    public MUTATE_TYPE find(int value) {
+      switch (value) {
+      // @TODO Updates through MR is going to be disabled
+      // case 0:
+      // return ADD;
+      // case 1:
+      // return UPDATE;
+      case 2:
+        return DELETE;
+      case 3:
+        return REPLACE;
+      default:
+        throw new RuntimeException("Value [" + value + "] not found.");
+      }
+    }
+  }
+
+  private MUTATE_TYPE _mutateType = MUTATE_TYPE.REPLACE;
+  private BlurRecord _record = new BlurRecord();
+
+  public BlurMutate() {
+
+  }
+
+  public BlurMutate(MUTATE_TYPE type, BlurRecord record) {
+    _mutateType = type;
+    _record = record;
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+    _record.setRecordId(recordId);
+  }
+
+  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId, String family) {
+    _mutateType = type;
+    _record.setRowId(rowId);
+    _record.setRecordId(recordId);
+    _record.setFamily(family);
+  }
+
+  public BlurMutate addColumn(BlurColumn column) {
+    _record.addColumn(column);
+    return this;
+  }
+
+  public BlurMutate addColumn(String name, String value) {
+    return addColumn(new BlurColumn(name, value));
+  }
+
+  public BlurRecord getRecord() {
+    return _record;
+  }
+
+  public void setRecord(BlurRecord record) {
+    _record = record;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeVInt(out, _mutateType.getValue());
+    _record.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _mutateType.find(IOUtil.readVInt(in));
+    _record.readFields(in);
+  }
+
+  public MUTATE_TYPE getMutateType() {
+    return _mutateType;
+  }
+
+  public BlurMutate setMutateType(MUTATE_TYPE mutateType) {
+    _mutateType = mutateType;
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return "BlurMutate [mutateType=" + _mutateType + ", record=" + _record + "]";
+  }
+
+  public BlurMutate setFamily(String family) {
+    _record.setFamily(family);
+    return this;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_mutateType == null) ? 0 : _mutateType.hashCode());
+    result = prime * result + ((_record == null) ? 0 : _record.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurMutate other = (BlurMutate) obj;
+    if (_mutateType != other._mutateType)
+      return false;
+    if (_record == null) {
+      if (other._record != null)
+        return false;
+    } else if (!_record.equals(other._record))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
new file mode 100644
index 0000000..6b485a9
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
@@ -0,0 +1,96 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.mapred.AbstractOutputCommitter;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+
+public class BlurOutputCommitter extends AbstractOutputCommitter {
+
+  private static final Log LOG = LogFactory.getLog(BlurOutputCommitter.class);
+
+  private Path _newIndex;
+  private Configuration _configuration;
+  private TaskAttemptID _taskAttemptID;
+  private Path _indexPath;
+  private final boolean _runTaskCommit;
+  private TableDescriptor _tableDescriptor;
+
+  public BlurOutputCommitter() {
+    _runTaskCommit = true;
+  }
+
+  public BlurOutputCommitter(boolean isMap, int numberOfReducers) {
+    _runTaskCommit = isMap && numberOfReducers != 0 ? false : true;
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+    return _runTaskCommit;
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    setup(context);
+    FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
+    if (fileSystem.exists(_newIndex) && !fileSystem.isFile(_newIndex)) {
+      Path dst = new Path(_indexPath, _taskAttemptID.toString() + ".task_complete");
+      LOG.info("Committing [{0}] to [{1}]", _newIndex, dst);
+      fileSystem.rename(_newIndex, dst);
+    } else {
+      throw new IOException("Path [" + _newIndex + "] does not exist, can not commit.");
+    }
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    setup(context);
+    FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
+    LOG.info("abortTask - Deleting [{0}]", _newIndex);
+    fileSystem.delete(_newIndex, true);
+  }
+
+  private void setup(TaskAttemptContext context) throws IOException {
+    _configuration = context.getConfiguration();
+    _tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
+    int shardCount = _tableDescriptor.getShardCount();
+    int attemptId = context.getTaskAttemptID().getTaskID().getId();
+    int shardId = attemptId % shardCount;
+    _taskAttemptID = context.getTaskAttemptID();
+    Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
+    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
+    _indexPath = new Path(tableOutput, shardName);
+    _newIndex = new Path(_indexPath, _taskAttemptID.toString() + ".tmp");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
new file mode 100644
index 0000000..7bbc567
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -0,0 +1,338 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TJSONProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * {@link BlurOutputFormat} is used to index data and delivery the indexes to
+ * the proper Blur table for searching. A typical usage of this class would be
+ * as follows.<br/>
+ * <br/>
+ * 
+ * <br/>
+ * {@link Iface} client = {@link BlurClient}.getClient("controller1:40010");<br/>
+ * <br/>
+ * TableDescriptor tableDescriptor = client.describe(tableName);<br/>
+ * <br/>
+ * Job job = new Job(jobConf, "blur index");<br/>
+ * job.setJarByClass(BlurOutputFormatTest.class);<br/>
+ * job.setMapperClass(CsvBlurMapper.class);<br/>
+ * job.setInputFormatClass(TextInputFormat.class);<br/>
+ * <br/>
+ * FileInputFormat.addInputPath(job, new Path(input));<br/>
+ * CsvBlurMapper.addColumns(job, "cf1", "col");<br/>
+ * <br/>
+ * BlurOutputFormat.setupJob(job, tableDescriptor);<br/>
+ * BlurOutputFormat.setIndexLocally(job, true);<br/>
+ * BlurOutputFormat.setOptimizeInFlight(job, false);<br/>
+ * <br/>
+ * job.waitForCompletion(true);<br/>
+ * 
+ */
+public class BlurOutputFormat extends OutputFormat<Text, BlurMutate> {
+
+  public static final String BLUR_OUTPUT_REDUCER_MULTIPLIER = "blur.output.reducer.multiplier";
+  public static final String BLUR_OUTPUT_OPTIMIZEINFLIGHT = "blur.output.optimizeinflight";
+  public static final String BLUR_OUTPUT_INDEXLOCALLY = "blur.output.indexlocally";
+  public static final String BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE = "blur.output.max.document.buffer.size";
+  public static final String BLUR_TABLE_DESCRIPTOR = "blur.table.descriptor";
+  public static final String BLUR_OUTPUT_PATH = "blur.output.path";
+
+  private static final String MAPRED_OUTPUT_COMMITTER_CLASS = "mapred.output.committer.class";
+  private static ThreadLocal<Progressable> _progressable = new ThreadLocal<Progressable>();
+  private static ThreadLocal<GetCounter> _getCounter = new ThreadLocal<GetCounter>();
+
+  public static void setProgressable(Progressable progressable) {
+    _progressable.set(progressable);
+  }
+
+  public static Progressable getProgressable() {
+    return _progressable.get();
+  }
+
+  public static void setGetCounter(GetCounter getCounter) {
+    _getCounter.set(getCounter);
+  }
+
+  public static GetCounter getGetCounter() {
+    return _getCounter.get();
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+    CheckOutputSpecs.checkOutputSpecs(context.getConfiguration(), context.getNumReduceTasks());
+  }
+
+  @Override
+  public RecordWriter<Text, BlurMutate> getRecordWriter(TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    int id = context.getTaskAttemptID().getTaskID().getId();
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    final GenericBlurRecordWriter writer = new GenericBlurRecordWriter(context.getConfiguration(), id,
+        taskAttemptID.toString() + ".tmp");
+    return new RecordWriter<Text, BlurMutate>() {
+
+      @Override
+      public void write(Text key, BlurMutate value) throws IOException, InterruptedException {
+        writer.write(key, value);
+      }
+
+      @Override
+      public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+        writer.close();
+      }
+    };
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+    return new BlurOutputCommitter(context.getTaskAttemptID().isMap(), context.getNumReduceTasks());
+  }
+
+  public static TableDescriptor getTableDescriptor(Configuration configuration) throws IOException {
+    String tableDesStr = configuration.get(BLUR_TABLE_DESCRIPTOR);
+    if (tableDesStr == null) {
+      return null;
+    }
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(tableDesStr.getBytes());
+    TIOStreamTransport transport = new TIOStreamTransport(inputStream);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    TableDescriptor descriptor = new TableDescriptor();
+    try {
+      descriptor.read(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    transport.close();
+    return descriptor;
+  }
+
+  /**
+   * This will multiple the number of reducers for this job. For example if the
+   * table has 256 shards the normal number of reducers is 256. However if the
+   * reducer multiplier is set to 4 then the number of reducers will be 1024 and
+   * each shard will get 4 new segments instead of the normal 1.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param multiple
+   *          the multiple to use.
+   * @throws IOException
+   */
+  public static void setReducerMultiplier(Job job, int multiple) throws IOException {
+    TableDescriptor tableDescriptor = getTableDescriptor(job.getConfiguration());
+    if (tableDescriptor == null) {
+      throw new IOException("setTableDescriptor needs to be called first.");
+    }
+    job.setNumReduceTasks(tableDescriptor.getShardCount() * multiple);
+    Configuration configuration = job.getConfiguration();
+    configuration.setInt(BLUR_OUTPUT_REDUCER_MULTIPLIER, multiple);
+  }
+
+  public static int getReducerMultiplier(Configuration configuration) {
+    return configuration.getInt(BLUR_OUTPUT_REDUCER_MULTIPLIER, 1);
+  }
+
+  /**
+   * Sets the {@link TableDescriptor} for this job.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param tableDescriptor
+   *          the {@link TableDescriptor}.
+   * @throws IOException
+   */
+  public static void setTableDescriptor(Job job, TableDescriptor tableDescriptor) throws IOException {
+    setTableDescriptor(job.getConfiguration(), tableDescriptor);
+  }
+
+  /**
+   * Sets the {@link TableDescriptor} for this job.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param tableDescriptor
+   *          the {@link TableDescriptor}.
+   * @throws IOException
+   */
+  public static void setTableDescriptor(Configuration configuration, TableDescriptor tableDescriptor)
+      throws IOException {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    TIOStreamTransport transport = new TIOStreamTransport(outputStream);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      tableDescriptor.write(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    transport.close();
+    configuration.set(BLUR_TABLE_DESCRIPTOR, new String(outputStream.toByteArray()));
+    setOutputPath(configuration, new Path(tableDescriptor.getTableUri()));
+  }
+
+  /**
+   * Sets the maximum number of documents that the buffer will hold in memory
+   * before overflowing to disk. By default this is 1000 which will probably be
+   * very low for most systems.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param maxDocumentBufferSize
+   *          the maxDocumentBufferSize.
+   */
+  public static void setMaxDocumentBufferSize(Job job, int maxDocumentBufferSize) {
+    setMaxDocumentBufferSize(job.getConfiguration(), maxDocumentBufferSize);
+  }
+
+  /**
+   * Sets the maximum number of documents that the buffer will hold in memory
+   * before overflowing to disk. By default this is 1000 which will probably be
+   * very low for most systems.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param maxDocumentBufferSize
+   *          the maxDocumentBufferSize.
+   */
+  public static void setMaxDocumentBufferSize(Configuration configuration, int maxDocumentBufferSize) {
+    configuration.setInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE, maxDocumentBufferSize);
+  }
+
+  public static int getMaxDocumentBufferSize(Configuration configuration) {
+    return configuration.getInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE, 1000);
+  }
+
+  public static void setOutputPath(Job job, Path path) {
+    setOutputPath(job.getConfiguration(), path);
+  }
+
+  public static void setOutputPath(Configuration configuration, Path path) {
+    configuration.set(BLUR_OUTPUT_PATH, path.toString());
+    configuration.set(MAPRED_OUTPUT_COMMITTER_CLASS, BlurOutputCommitter.class.getName());
+  }
+
+  public static Path getOutputPath(Configuration configuration) {
+    return new Path(configuration.get(BLUR_OUTPUT_PATH));
+  }
+
+  /**
+   * Enabled by default, this will enable local indexing on the machine where
+   * the task is running. Then when the {@link RecordWriter} closes the index is
+   * copied to the remote destination in HDFS.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setIndexLocally(Job job, boolean b) {
+    setIndexLocally(job.getConfiguration(), b);
+  }
+
+  /**
+   * Enabled by default, this will enable local indexing on the machine where
+   * the task is running. Then when the {@link RecordWriter} closes the index is
+   * copied to the remote destination in HDFS.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setIndexLocally(Configuration configuration, boolean b) {
+    configuration.setBoolean(BLUR_OUTPUT_INDEXLOCALLY, b);
+  }
+
+  public static boolean isIndexLocally(Configuration configuration) {
+    return configuration.getBoolean(BLUR_OUTPUT_INDEXLOCALLY, true);
+  }
+
+  /**
+   * Enabled by default, this will optimize the index while copying from the
+   * local index to the remote destination in HDFS. Used in conjunction with the
+   * setIndexLocally.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setOptimizeInFlight(Job job, boolean b) {
+    setOptimizeInFlight(job.getConfiguration(), b);
+  }
+
+  /**
+   * Enabled by default, this will optimize the index while copying from the
+   * local index to the remote destination in HDFS. Used in conjunction with the
+   * setIndexLocally.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param b
+   *          the boolean to true enable, false to disable.
+   */
+  public static void setOptimizeInFlight(Configuration configuration, boolean b) {
+    configuration.setBoolean(BLUR_OUTPUT_OPTIMIZEINFLIGHT, b);
+  }
+
+  public static boolean isOptimizeInFlight(Configuration configuration) {
+    return configuration.getBoolean(BLUR_OUTPUT_OPTIMIZEINFLIGHT, true);
+  }
+
+  /**
+   * Sets up the output portion of the map reduce job. This does effect the map
+   * side of the job, of a map and reduce job.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param tableDescriptor
+   *          the table descriptor to write the output of the indexing job.
+   * @throws IOException
+   */
+  public static void setupJob(Job job, TableDescriptor tableDescriptor) throws IOException {
+    job.setReducerClass(DefaultBlurReducer.class);
+    job.setNumReduceTasks(tableDescriptor.getShardCount());
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(BlurMutate.class);
+    job.setOutputFormatClass(BlurOutputFormat.class);
+    setTableDescriptor(job, tableDescriptor);
+    BlurMapReduceUtil.addDependencyJars(job);
+    BlurMapReduceUtil.addAllJarsInBlurLib(job.getConfiguration());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
new file mode 100644
index 0000000..7c12a76
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
@@ -0,0 +1,178 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.utils.ReaderBlurRecord;
+import org.apache.hadoop.io.Writable;
+
+public class BlurRecord implements Writable, ReaderBlurRecord {
+
+  private String _rowId;
+  private String _recordId;
+  private String _family;
+
+  private List<BlurColumn> _columns = new ArrayList<BlurColumn>();
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _rowId = IOUtil.readString(in);
+    _recordId = IOUtil.readString(in);
+    _family = IOUtil.readString(in);
+    int size = IOUtil.readVInt(in);
+    _columns.clear();
+    for (int i = 0; i < size; i++) {
+      BlurColumn column = new BlurColumn();
+      column.readFields(in);
+      _columns.add(column);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeString(out, _rowId);
+    IOUtil.writeString(out, _recordId);
+    IOUtil.writeString(out, _family);
+    IOUtil.writeVInt(out, _columns.size());
+    for (BlurColumn column : _columns) {
+      column.write(out);
+    }
+  }
+
+  public String getRowId() {
+    return _rowId;
+  }
+
+  public void setRowId(String rowId) {
+    this._rowId = rowId;
+  }
+
+  public String getRecordId() {
+    return _recordId;
+  }
+
+  public void setRecordId(String recordId) {
+    this._recordId = recordId;
+  }
+
+  public String getFamily() {
+    return _family;
+  }
+
+  public void setFamily(String family) {
+    this._family = family;
+  }
+
+  public List<BlurColumn> getColumns() {
+    return _columns;
+  }
+
+  public void setColumns(List<BlurColumn> columns) {
+    this._columns = columns;
+  }
+
+  public void clearColumns() {
+    _columns.clear();
+  }
+
+  public void addColumn(BlurColumn column) {
+    _columns.add(column);
+  }
+
+  public void addColumn(String name, String value) {
+    BlurColumn blurColumn = new BlurColumn();
+    blurColumn.setName(name);
+    blurColumn.setValue(value);
+    addColumn(blurColumn);
+  }
+
+  @Override
+  public void setRecordIdStr(String value) {
+    setRecordId(value);
+  }
+
+  @Override
+  public void setFamilyStr(String family) {
+    setFamily(family);
+  }
+
+  public void reset() {
+    clearColumns();
+    _rowId = null;
+    _recordId = null;
+    _family = null;
+  }
+
+  @Override
+  public void setRowIdStr(String rowId) {
+    setRowId(rowId);
+  }
+
+  @Override
+  public String toString() {
+    return "{rowId=" + _rowId + ", recordId=" + _recordId + ", family=" + _family + ", columns=" + _columns + "}";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_columns == null) ? 0 : _columns.hashCode());
+    result = prime * result + ((_family == null) ? 0 : _family.hashCode());
+    result = prime * result + ((_recordId == null) ? 0 : _recordId.hashCode());
+    result = prime * result + ((_rowId == null) ? 0 : _rowId.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurRecord other = (BlurRecord) obj;
+    if (_columns == null) {
+      if (other._columns != null)
+        return false;
+    } else if (!_columns.equals(other._columns))
+      return false;
+    if (_family == null) {
+      if (other._family != null)
+        return false;
+    } else if (!_family.equals(other._family))
+      return false;
+    if (_recordId == null) {
+      if (other._recordId != null)
+        return false;
+    } else if (!_recordId.equals(other._recordId))
+      return false;
+    if (_rowId == null) {
+      if (other._rowId != null)
+        return false;
+    } else if (!_rowId.equals(other._rowId))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
new file mode 100644
index 0000000..5f4fec6
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
@@ -0,0 +1,90 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+
+public abstract class BlurRecordReader extends RecordReader<Text, BlurRecord> {
+
+//  private IndexReader reader;
+//  private Directory directory;
+//  private int startingDocId;
+//  private int endingDocId;
+//  private int position;
+//  private Text rowid = new Text();
+//  private BlurRecord record = new BlurRecord();
+//
+//  @Override
+//  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+//    BlurInputSplit blurSplit = (BlurInputSplit) split;
+//    Path path = blurSplit.getIndexPath();
+//    String segmentName = blurSplit.getSegmentName();
+//    startingDocId = blurSplit.getStartingDocId();
+//    endingDocId = blurSplit.getEndingDocId();
+//    directory = new HdfsDirectory(context.getConfiguration(), path);
+//
+//    IndexCommit commit = Utils.findLatest(directory);
+//    reader = Utils.openSegmentReader(directory, commit, segmentName, Utils.getTermInfosIndexDivisor(context.getConfiguration()));
+//    int maxDoc = reader.maxDoc();
+//    if (endingDocId >= maxDoc) {
+//      endingDocId = maxDoc - 1;
+//    }
+//    position = startingDocId - 1;
+//  }
+//
+//  @Override
+//  public boolean nextKeyValue() throws IOException, InterruptedException {
+//    do {
+//      position++;
+//      if (position > endingDocId) {
+//        return false;
+//      }
+//    } while (reader.isDeleted(position));
+//    readDocument();
+//    return true;
+//  }
+//
+//  private void readDocument() throws CorruptIndexException, IOException {
+//    Document document = reader.document(position);
+//    record.reset();
+//    rowid.set(RowDocumentUtil.readRecord(document, record));
+//  }
+//
+//  @Override
+//  public Text getCurrentKey() throws IOException, InterruptedException {
+//    return rowid;
+//  }
+//
+//  @Override
+//  public BlurRecord getCurrentValue() throws IOException, InterruptedException {
+//    return record;
+//  }
+//
+//  @Override
+//  public float getProgress() throws IOException, InterruptedException {
+//    int total = endingDocId - startingDocId;
+//    return (float) position / (float) total;
+//  }
+//
+//  @Override
+//  public void close() throws IOException {
+//    reader.close();
+//    directory.close();
+//  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
new file mode 100644
index 0000000..6bbaad4
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class CheckOutputSpecs {
+  
+  public static void checkOutputSpecs(Configuration config, int reducers) throws IOException, InterruptedException {
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(config);
+    if (tableDescriptor == null) {
+      throw new IOException("setTableDescriptor needs to be called first.");
+    }
+    int shardCount = tableDescriptor.getShardCount();
+    FileSystem fileSystem = BlurOutputFormat.getOutputPath(config).getFileSystem(config);
+    Path tablePath = new Path(tableDescriptor.getTableUri());
+    if (fileSystem.exists(tablePath)) {
+      BlurUtil.validateShardCount(shardCount, fileSystem, tablePath);
+    } else {
+      throw new IOException("Table path [ " + tablePath + " ] doesn't exist for table [ " + tableDescriptor.getName()
+          + " ].");
+    }
+    BlurUtil.validateWritableDirectory(fileSystem, tablePath);
+    int reducerMultiplier = BlurOutputFormat.getReducerMultiplier(config);
+    int validNumberOfReducers = reducerMultiplier * shardCount;
+    if (reducers > 0 && reducers != validNumberOfReducers) {
+      throw new IllegalArgumentException("Invalid number of reducers [ " + reducers + " ]."
+          + " Number of Reducers should be [ " + validNumberOfReducers + " ].");
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
new file mode 100644
index 0000000..a79541c
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
@@ -0,0 +1,128 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+
+/**
+ * Decorator of Directory to capture the copy rate of a directory copy.
+ */
+public class CopyRateDirectory extends Directory {
+
+  private final Directory _directory;
+  private final RateCounter _copyRateCounter;
+
+  public CopyRateDirectory(Directory dir, RateCounter copyRateCounter) {
+    _directory = dir;
+    _copyRateCounter = copyRateCounter;
+  }
+
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    return wrap(_directory.createOutput(name, context));
+  }
+
+  private IndexOutput wrap(IndexOutput output) {
+    return new CopyRateIndexOutput(output, _copyRateCounter);
+  }
+
+  static class CopyRateIndexOutput extends IndexOutput {
+
+    private final IndexOutput _indexOutput;
+    private final RateCounter _copyRateCounter;
+
+    public CopyRateIndexOutput(IndexOutput output, RateCounter copyRateCounter) {
+      _indexOutput = output;
+      _copyRateCounter = copyRateCounter;
+    }
+
+    public void copyBytes(DataInput input, long numBytes) throws IOException {
+      _indexOutput.copyBytes(input, numBytes);
+      if (_copyRateCounter != null) {
+        _copyRateCounter.mark(numBytes);
+      }
+    }
+
+    public void writeByte(byte b) throws IOException {
+      _indexOutput.writeByte(b);
+      if (_copyRateCounter != null) {
+        _copyRateCounter.mark();
+      }
+    }
+
+    public void flush() throws IOException {
+      _indexOutput.flush();
+    }
+
+    public void close() throws IOException {
+      _indexOutput.close();
+    }
+
+    public long getFilePointer() {
+      return _indexOutput.getFilePointer();
+    }
+
+    @SuppressWarnings("deprecation")
+    public void seek(long pos) throws IOException {
+      _indexOutput.seek(pos);
+    }
+
+    public void writeBytes(byte[] b, int offset, int length) throws IOException {
+      _indexOutput.writeBytes(b, offset, length);
+      _copyRateCounter.mark(length);
+    }
+
+    public long length() throws IOException {
+      return _indexOutput.length();
+    }
+  }
+
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  public void deleteFile(String name) throws IOException {
+    _directory.deleteFile(name);
+  }
+
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    return _directory.openInput(name, context);
+  }
+
+  public void close() throws IOException {
+    _directory.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
new file mode 100644
index 0000000..6830e32
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
@@ -0,0 +1,409 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import com.google.common.base.Splitter;
+
+@SuppressWarnings("static-access")
+public class CsvBlurDriver {
+
+  public static final String CSVLOADER = "csvloader";
+  public static final String MAPRED_COMPRESS_MAP_OUTPUT = "mapred.compress.map.output";
+  public static final String MAPRED_MAP_OUTPUT_COMPRESSION_CODEC = "mapred.map.output.compression.codec";
+  public static final int DEFAULT_WIDTH = 100;
+  public static final String HEADER = "The \"" +CSVLOADER +
+  		"\" command is used to load delimited into a Blur table.\nThe required options are \"-c\", \"-t\", \"-d\". The " +
+  		"standard format for the contents of a file is:\"rowid,recordid,family,col1,col2,...\". However there are " +
+  		"several options, such as the rowid and recordid can be generated based on the data in the record via the " +
+  		"\"-A\" and \"-a\" options. The family can assigned based on the path via the \"-I\" option. The column " +
+  		"name order can be mapped via the \"-d\" option. Also you can set the input " +
+  		"format to either sequence files vie the \"-S\" option or leave the default text files.";
+
+  enum COMPRESSION {
+    SNAPPY(SnappyCodec.class), GZIP(GzipCodec.class), BZIP(BZip2Codec.class), DEFAULT(DefaultCodec.class);
+
+    private final String className;
+
+    private COMPRESSION(Class<? extends CompressionCodec> clazz) {
+      className = clazz.getName();
+    }
+
+    public String getClassName() {
+      return className;
+    }
+  }
+
+  interface ControllerPool {
+    Iface getClient(String controllerConnectionStr);
+  }
+
+  public static void main(String... args) throws Exception {
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    Job job = setupJob(configuration, new ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return BlurClient.getClient(controllerConnectionStr);
+      }
+    }, otherArgs);
+    if (job == null) {
+      System.exit(1);
+    }
+
+    boolean waitForCompletion = job.waitForCompletion(true);
+    System.exit(waitForCompletion ? 0 : 1);
+  }
+
+  public static Job setupJob(Configuration configuration, ControllerPool controllerPool, String... otherArgs)
+      throws Exception {
+    CommandLine cmd = parse(otherArgs);
+    if (cmd == null) {
+      return null;
+    }
+
+    final String controllerConnectionStr = cmd.getOptionValue("c");
+    final String tableName = cmd.getOptionValue("t");
+
+    final Iface client = controllerPool.getClient(controllerConnectionStr);
+    TableDescriptor tableDescriptor = client.describe(tableName);
+
+    Job job = new Job(configuration, "Blur indexer [" + tableName + "]");
+    job.setJarByClass(CsvBlurDriver.class);
+    job.setMapperClass(CsvBlurMapper.class);
+
+    if (cmd.hasOption("p")) {
+      job.getConfiguration().set(MAPRED_COMPRESS_MAP_OUTPUT, "true");
+      String codecStr = cmd.getOptionValue("p");
+      COMPRESSION compression;
+      try {
+        compression = COMPRESSION.valueOf(codecStr.trim().toUpperCase());
+      } catch (IllegalArgumentException e) {
+        compression = null;
+      }
+      if (compression == null) {
+        job.getConfiguration().set(MAPRED_MAP_OUTPUT_COMPRESSION_CODEC, codecStr.trim());
+      } else {
+        job.getConfiguration().set(MAPRED_MAP_OUTPUT_COMPRESSION_CODEC, compression.getClassName());
+      }
+    }
+    if (cmd.hasOption("a")) {
+      CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(job, true);
+    }
+    if (cmd.hasOption("A")) {
+      CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(job, true);
+    }
+    if (cmd.hasOption("S")) {
+      job.setInputFormatClass(SequenceFileInputFormat.class);
+    } else {
+      job.setInputFormatClass(TextInputFormat.class);
+    }
+
+    if (cmd.hasOption("C")) {
+      if (cmd.hasOption("S")) {
+        String[] optionValues = cmd.getOptionValues("C");
+        job.setInputFormatClass(CsvBlurCombineSequenceFileInputFormat.class);
+        CombineFileInputFormat.setMinInputSplitSize(job, Long.parseLong(optionValues[0]));
+        CombineFileInputFormat.setMaxInputSplitSize(job, Long.parseLong(optionValues[1]));
+      } else {
+        System.err.println("'C' can only be used with option 'S'");
+        return null;
+      }
+    }
+
+    if (cmd.hasOption("i")) {
+      for (String input : cmd.getOptionValues("i")) {
+        Path path = new Path(input);
+        Set<Path> pathSet = recurisvelyGetPathesContainingFiles(path, job.getConfiguration());
+        if (pathSet.isEmpty()) {
+          FileInputFormat.addInputPath(job, path);
+        } else {
+          for (Path p : pathSet) {
+            FileInputFormat.addInputPath(job, p);
+          }
+        }
+      }
+    }
+    // processing the 'I' option
+    if (cmd.hasOption("I")) {
+    	if(cmd.hasOption("C")){
+    		 System.err.println("'I' and 'C' both parameters can not be used together.");
+             return null;
+    	}
+      Option[] options = cmd.getOptions();
+      for (Option option : options) {
+        if (option.getOpt().equals("I")) {
+          String[] values = option.getValues();
+          if (values.length < 2) {
+            System.err.println("'I' parameter missing minimum args of (family path*)");
+            return null;
+          }
+          for (String p : getSubArray(values, 1)) {
+            Path path = new Path(p);
+            CsvBlurMapper.addFamilyPath(job, values[0], path);
+            FileInputFormat.addInputPath(job, path);
+          }
+        }
+      }
+    }
+
+    if (cmd.hasOption("s")) {
+      CsvBlurMapper.setSeparator(job, StringEscapeUtils.unescapeJava(cmd.getOptionValue("s")));
+    }
+    if (cmd.hasOption("o")) {
+      BlurOutputFormat.setOptimizeInFlight(job, false);
+    }
+    if (cmd.hasOption("l")) {
+      BlurOutputFormat.setIndexLocally(job, false);
+    }
+    if (cmd.hasOption("b")) {
+      int maxDocumentBufferSize = Integer.parseInt(cmd.getOptionValue("b"));
+      BlurOutputFormat.setMaxDocumentBufferSize(job, maxDocumentBufferSize);
+    }
+    if (cmd.hasOption("r")) {
+      int reducerMultiplier = Integer.parseInt(cmd.getOptionValue("r"));
+      BlurOutputFormat.setReducerMultiplier(job, reducerMultiplier);
+    }
+    // processing the 'd' option
+    Option[] options = cmd.getOptions();
+    for (Option option : options) {
+      if (option.getOpt().equals("d")) {
+        String[] values = option.getValues();
+        if (values.length < 2) {
+          System.err.println("'d' parameter missing minimum args of (family columname*)");
+          return null;
+        }
+        CsvBlurMapper.addColumns(job, values[0], getSubArray(values, 1));
+      }
+    }
+    BlurOutputFormat.setupJob(job, tableDescriptor);
+    BlurMapReduceUtil.addDependencyJars(job.getConfiguration(), Splitter.class);
+    return job;
+  }
+
+  private static String[] getSubArray(String[] array, int starting) {
+    String[] result = new String[array.length - starting];
+    System.arraycopy(array, starting, result, 0, result.length);
+    return result;
+  }
+
+  private static Set<Path> recurisvelyGetPathesContainingFiles(Path path, Configuration configuration)
+      throws IOException {
+    Set<Path> pathSet = new HashSet<Path>();
+    FileSystem fileSystem = path.getFileSystem(configuration);
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    for (FileStatus status : listStatus) {
+      if (status.isDir()) {
+        pathSet.addAll(recurisvelyGetPathesContainingFiles(status.getPath(), configuration));
+      } else {
+        pathSet.add(status.getPath().getParent());
+      }
+    }
+    return pathSet;
+  }
+
+  private static CommandLine parse(String... otherArgs) throws ParseException {
+    Options options = new Options();
+    options.addOption(OptionBuilder.withArgName("controller*").hasArgs().isRequired(true)
+        .withDescription("* Thrift controller connection string. (host1:40010 host2:40010 ...)").create("c"));
+    options.addOption(OptionBuilder.withArgName("tablename").hasArg().isRequired(true)
+        .withDescription("* Blur table name.").create("t"));
+    options.addOption(OptionBuilder.withArgName("family column*").hasArgs().isRequired(true)
+        .withDescription("* Define the mapping of fields in the CSV file to column names. (family col1 col2 col3 ...)")
+        .create("d"));
+    options.addOption(OptionBuilder
+        .withArgName("delimiter")
+        .hasArg()
+        .withDescription(
+            "The file delimiter to be used. (default value ',')  NOTE: For special "
+                + "charactors like the default hadoop separator of ASCII value 1, you can use standard "
+                + "java escaping (\\u0001)").create("s"));
+    options.addOption(OptionBuilder.withArgName("path*").hasArg()
+        .withDescription("The directory to index, the family name is assumed to BE present in the file contents. (hdfs://namenode/input/in1)").create("i"));
+    options.addOption(OptionBuilder.withArgName("family path*").hasArgs()
+        .withDescription("The directory to index with a family name, the family name is assumed to NOT be present in the file contents. (family hdfs://namenode/input/in1)").create("I"));
+    options
+        .addOption(OptionBuilder
+            .withArgName("auto generate record ids")
+            .withDescription(
+                "No Record Ids - Automatically generate record ids for each record based on a MD5 has of the data within the record.")
+            .create("a"));
+    options
+        .addOption(OptionBuilder
+            .withArgName("auto generate row ids")
+            .withDescription(
+                "No Row Ids - Automatically generate row ids for each record based on a MD5 has of the data within the record.")
+            .create("A"));
+    options.addOption(OptionBuilder.withArgName("disable optimize indexes during copy")
+        .withDescription("Disable optimize indexes during copy, this has very little overhead. (enabled by default)")
+        .create("o"));
+    options.addOption(OptionBuilder
+        .withArgName("disable index locally")
+        .withDescription(
+            "Disable the use storage local on the server that is running the reducing "
+                + "task and copy to Blur table once complete. (enabled by default)").create("l"));
+    options.addOption(OptionBuilder.withArgName("sequence files inputs")
+        .withDescription("The input files are sequence files.").create("S"));
+    options.addOption(OptionBuilder
+        .withArgName("size")
+        .hasArg()
+        .withDescription(
+            "The maximum number of Lucene documents to buffer in the reducer for a single "
+                + "row before spilling over to disk. (default 1000)").create("b"));
+    options.addOption(OptionBuilder
+        .withArgName("multiplier")
+        .hasArg()
+        .withDescription(
+            "The reducer multipler allows for an increase in the number of reducers per "
+                + "shard in the given table.  For example if the table has 128 shards and the "
+                + "reducer multiplier is 4 the total number of reducers will be 512, 4 reducers "
+                + "per shard. (default 1)").create("r"));
+    options.addOption(OptionBuilder
+        .withArgName("minimum maximum")
+        .hasArgs(2)
+        .withDescription(
+            "Enables a combine file input to help deal with many small files as the input. Provide "
+                + "the minimum and maximum size per mapper.  For a minimum of 1GB and a maximum of "
+                + "2.5GB: (1000000000 2500000000)").create("C"));
+    options.addOption(OptionBuilder
+        .withArgName("codec")
+        .hasArgs(1)
+        .withDescription(
+            "Sets the compression codec for the map compress output setting. (SNAPPY,GZIP,BZIP,DEFAULT, or classname)")
+        .create("p"));
+
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = null;
+    try {
+      cmd = parser.parse(options, otherArgs);
+    } catch (ParseException e) {
+      System.err.println(e.getMessage());
+      HelpFormatter formatter = new HelpFormatter();
+      PrintWriter pw = new PrintWriter(System.err, true);
+      formatter.printHelp(pw, DEFAULT_WIDTH, CSVLOADER, HEADER, options, HelpFormatter.DEFAULT_LEFT_PAD,
+          HelpFormatter.DEFAULT_DESC_PAD, null, false);
+      return null;
+    }
+
+    if (!(cmd.hasOption("I") || cmd.hasOption("i"))) {
+      System.err.println("Missing input directory, see options 'i' and 'I'.");
+      HelpFormatter formatter = new HelpFormatter();
+      PrintWriter pw = new PrintWriter(System.err, true);
+      formatter.printHelp(pw, DEFAULT_WIDTH, CSVLOADER, HEADER, options, HelpFormatter.DEFAULT_LEFT_PAD,
+          HelpFormatter.DEFAULT_DESC_PAD, null, false);
+      return null;
+    }
+    return cmd;
+  }
+
+  public static class CsvBlurCombineSequenceFileInputFormat extends CombineFileInputFormat<Writable, Text> {
+
+    
+    private static class SequenceFileRecordReaderWrapper extends RecordReader<Writable, Text>{
+    	
+    	private final RecordReader<Writable,Text> delegate;
+    	private final FileSplit fileSplit;
+
+		@SuppressWarnings("unused")
+		public SequenceFileRecordReaderWrapper(CombineFileSplit split,
+            TaskAttemptContext context, Integer index) throws IOException{
+            fileSplit = new FileSplit(split.getPath(index),
+                      split.getOffset(index), split.getLength(index),
+                      split.getLocations());
+            delegate = new SequenceFileInputFormat<Writable,Text>().createRecordReader(fileSplit, context);
+        }
+
+        @Override public float getProgress() throws IOException, InterruptedException {
+            return delegate.getProgress();
+        }
+
+		@Override
+		public Writable getCurrentKey() throws IOException,
+				InterruptedException {
+			return delegate.getCurrentKey();
+		}
+
+		@Override
+		public Text getCurrentValue() throws IOException, InterruptedException {
+			return delegate.getCurrentValue();
+		}
+
+		@Override
+		public void initialize(InputSplit arg0, TaskAttemptContext context)
+				throws IOException, InterruptedException {
+			delegate.initialize(fileSplit, context);
+		}
+
+		@Override
+		public boolean nextKeyValue() throws IOException, InterruptedException {
+			return delegate.nextKeyValue();
+		}
+		
+		@Override public void close() throws IOException {
+            delegate.close();
+		}
+
+    }
+    	
+    @Override
+	public RecordReader<Writable, Text> createRecordReader(
+			InputSplit split, TaskAttemptContext context) throws IOException {
+		return new CombineFileRecordReader<Writable, Text>((CombineFileSplit) split, context, SequenceFileRecordReaderWrapper.class);
+	}
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/7afca43b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
new file mode 100644
index 0000000..8f59e31
--- /dev/null
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
@@ -0,0 +1,487 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import com.google.common.base.Splitter;
+
+/**
+ * This will parse a standard csv file into a {@link BlurMutate} object. Use the
+ * static addColumns, and setSeparator methods to configure the class.
+ */
+public class CsvBlurMapper extends BaseBlurMapper<Writable, Text> {
+
+  public static final String UTF_8 = "UTF-8";
+  public static final String BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA = "blur.csv.auto.generate.record.id.as.hash.of.data";
+  public static final String BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA = "blur.csv.auto.generate.row.id.as.hash.of.data";
+  public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES = "blur.csv.family.path.mappings.families";
+  public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX = "blur.csv.family.path.mappings.family.";
+  public static final String BLUR_CSV_SEPARATOR_BASE64 = "blur.csv.separator.base64";
+  public static final String BLUR_CSV_FAMILY_COLUMN_PREFIX = "blur.csv.family.";
+  public static final String BLUR_CSV_FAMILIES = "blur.csv.families";
+  public static final String HIVE_NULL = "\\N";
+
+  protected Map<String, List<String>> _columnNameMap;
+  protected String _separator = Base64.encodeBase64String(",".getBytes());
+  protected Splitter _splitter;
+  protected boolean _familyNotInFile;
+  protected String _familyFromPath;
+  protected boolean _autoGenerateRecordIdAsHashOfData;
+  protected MessageDigest _digest;
+  protected boolean _autoGenerateRowIdAsHashOfData;
+
+  /**
+   * Add a mapping for a family to a path. This is to be used when an entire
+   * path is to be processed as a single family and the data itself does not
+   * contain the family.<br/>
+   * <br/>
+   * 
+   * NOTE: the familyNotInFile property must be set before this method can be
+   * called.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param family
+   *          the family.
+   * @param path
+   *          the path.
+   */
+  public static void addFamilyPath(Job job, String family, Path path) {
+    addFamilyPath(job.getConfiguration(), family, path);
+  }
+
+  /**
+   * Add a mapping for a family to a path. This is to be used when an entire
+   * path is to be processed as a single family and the data itself does not
+   * contain the family.<br/>
+   * <br/>
+   * 
+   * NOTE: the familyNotInFile property must be set before this method can be
+   * called.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param family
+   *          the family.
+   * @param path
+   *          the path.
+   */
+  public static void addFamilyPath(Configuration configuration, String family, Path path) {
+    append(configuration, BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES, family);
+    append(configuration, BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family, path.toString());
+  }
+
+  protected static void append(Configuration configuration, String name, String value) {
+    Collection<String> set = configuration.getStringCollection(name);
+    if (set == null) {
+      set = new TreeSet<String>();
+    }
+    set.add(value);
+    configuration.setStrings(name, set.toArray(new String[set.size()]));
+  }
+
+  /**
+   * If set to true the record id will be automatically generated as a hash of
+   * the data that the record contains.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param autoGenerateRecordIdAsHashOfData
+   *          boolean.
+   */
+  public static void setAutoGenerateRecordIdAsHashOfData(Job job, boolean autoGenerateRecordIdAsHashOfData) {
+    setAutoGenerateRecordIdAsHashOfData(job.getConfiguration(), autoGenerateRecordIdAsHashOfData);
+  }
+
+  /**
+   * If set to true the record id will be automatically generated as a hash of
+   * the data that the record contains.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param autoGenerateRecordIdAsHashOfData
+   *          boolean.
+   */
+  public static void setAutoGenerateRecordIdAsHashOfData(Configuration configuration,
+      boolean autoGenerateRecordIdAsHashOfData) {
+    configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, autoGenerateRecordIdAsHashOfData);
+  }
+
+  /**
+   * Gets whether or not to generate a recordid for the record based on the
+   * data.
+   * 
+   * @param configuration
+   *          the configuration.
+   * @return boolean.
+   */
+  public static boolean isAutoGenerateRecordIdAsHashOfData(Configuration configuration) {
+    return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, false);
+  }
+
+  /**
+   * If set to true the record id will be automatically generated as a hash of
+   * the data that the record contains.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param autoGenerateRecordIdAsHashOfData
+   *          boolean.
+   */
+  public static void setAutoGenerateRowIdAsHashOfData(Job job, boolean autoGenerateRowIdAsHashOfData) {
+    setAutoGenerateRowIdAsHashOfData(job.getConfiguration(), autoGenerateRowIdAsHashOfData);
+  }
+
+  /**
+   * If set to true the record id will be automatically generated as a hash of
+   * the data that the record contains.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param autoGenerateRecordIdAsHashOfData
+   *          boolean.
+   */
+  public static void setAutoGenerateRowIdAsHashOfData(Configuration configuration, boolean autoGenerateRowIdAsHashOfData) {
+    configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA, autoGenerateRowIdAsHashOfData);
+  }
+
+  /**
+   * Gets whether or not to generate a recordid for the record based on the
+   * data.
+   * 
+   * @param configuration
+   *          the configuration.
+   * @return boolean.
+   */
+  public static boolean isAutoGenerateRowIdAsHashOfData(Configuration configuration) {
+    return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA, false);
+  }
+
+  /**
+   * Sets all the family and column definitions.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param strDefinition
+   *          the string definition. <br/>
+   * <br/>
+   *          Example:<br/>
+   *          "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
+   *          Where "cf1" is a family name that contains columns "col1", "col2"
+   *          and "col3" and a second family of "cf2" with columns "col1",
+   *          "col2", and "col3".
+   */
+  public static void setColumns(Job job, String strDefinition) {
+    setColumns(job.getConfiguration(), strDefinition);
+  }
+
+  /**
+   * Sets all the family and column definitions.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param strDefinition
+   *          the string definition. <br/>
+   * <br/>
+   *          Example:<br/>
+   *          "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
+   *          Where "cf1" is a family name that contains columns "col1", "col2"
+   *          and "col3" and a second family of "cf2" with columns "col1",
+   *          "col2", and "col3".
+   */
+  public static void setColumns(Configuration configuration, String strDefinition) {
+    Iterable<String> familyDefs = Splitter.on('|').split(strDefinition);
+    for (String familyDef : familyDefs) {
+      int indexOf = familyDef.indexOf(':');
+      if (indexOf < 0) {
+        throwMalformedDefinition(strDefinition);
+      }
+      String family = familyDef.substring(0, indexOf);
+      Iterable<String> cols = Splitter.on(',').split(familyDef.substring(indexOf + 1));
+      List<String> colnames = new ArrayList<String>();
+      for (String columnName : cols) {
+        colnames.add(columnName);
+      }
+      if (family.trim().isEmpty() || colnames.isEmpty()) {
+        throwMalformedDefinition(strDefinition);
+      }
+      addColumns(configuration, family, colnames.toArray(new String[colnames.size()]));
+    }
+  }
+
+  protected static void throwMalformedDefinition(String strDefinition) {
+    throw new RuntimeException("Family and column definition string not valid [" + strDefinition
+        + "] should look like \"family1:colname1,colname2|family2:colname1,colname2,colname3\"");
+  }
+
+  /**
+   * Adds the column layout for the given family.
+   * 
+   * @param job
+   *          the job to apply the layout.
+   * @param family
+   *          the family name.
+   * @param columns
+   *          the column names.
+   */
+  public static void addColumns(Job job, String family, String... columns) {
+    addColumns(job.getConfiguration(), family, columns);
+  }
+
+  /**
+   * Adds the column layout for the given family.
+   * 
+   * @param configuration
+   *          the configuration to apply the layout.
+   * @param family
+   *          the family name.
+   * @param columns
+   *          the column names.
+   */
+  public static void addColumns(Configuration configuration, String family, String... columns) {
+    Collection<String> families = new TreeSet<String>(configuration.getStringCollection(BLUR_CSV_FAMILIES));
+    families.add(family);
+    configuration.setStrings(BLUR_CSV_FAMILIES, families.toArray(new String[] {}));
+    configuration.setStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family, columns);
+  }
+
+  public static Collection<String> getFamilyNames(Configuration configuration) {
+    return configuration.getStringCollection(BLUR_CSV_FAMILIES);
+  }
+
+  public static Map<String, List<String>> getFamilyAndColumnNameMap(Configuration configuration) {
+    Map<String, List<String>> columnNameMap = new HashMap<String, List<String>>();
+    for (String family : getFamilyNames(configuration)) {
+      String[] columnsNames = configuration.getStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family);
+      columnNameMap.put(family, Arrays.asList(columnsNames));
+    }
+    return columnNameMap;
+  }
+
+  /**
+   * Sets the separator of the file, by default it is ",".
+   * 
+   * @param job
+   *          the job to apply the separator change.
+   * @param separator
+   *          the separator.
+   */
+  public static void setSeparator(Job job, String separator) {
+    setSeparator(job.getConfiguration(), separator);
+  }
+
+  /**
+   * Sets the separator of the file, by default it is ",".
+   * 
+   * @param configuration
+   *          the configuration to apply the separator change.
+   * @param separator
+   *          the separator.
+   */
+  public static void setSeparator(Configuration configuration, String separator) {
+    try {
+      configuration.set(BLUR_CSV_SEPARATOR_BASE64, Base64.encodeBase64String(separator.getBytes(UTF_8)));
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration configuration = context.getConfiguration();
+    _autoGenerateRecordIdAsHashOfData = isAutoGenerateRecordIdAsHashOfData(configuration);
+    _autoGenerateRowIdAsHashOfData = isAutoGenerateRowIdAsHashOfData(configuration);
+    if (_autoGenerateRecordIdAsHashOfData || _autoGenerateRowIdAsHashOfData) {
+      try {
+        _digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new IOException(e);
+      }
+    }
+    _columnNameMap = getFamilyAndColumnNameMap(configuration);
+    _separator = new String(Base64.decodeBase64(configuration.get(BLUR_CSV_SEPARATOR_BASE64, _separator)), UTF_8);
+    _splitter = Splitter.on(_separator);
+    Path fileCurrentlyProcessing = getCurrentFile(context);
+    Collection<String> families = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
+    OUTER: for (String family : families) {
+      Collection<String> pathStrCollection = configuration
+          .getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family);
+      for (String pathStr : pathStrCollection) {
+        Path path = new Path(pathStr);
+        path = path.makeQualified(path.getFileSystem(configuration));
+        if (isParent(path, fileCurrentlyProcessing)) {
+          _familyFromPath = family;
+          _familyNotInFile = true;
+          break OUTER;
+        }
+      }
+    }
+  }
+
+  protected boolean isParent(Path possibleParent, Path child) {
+    if (child == null) {
+      return false;
+    }
+    if (possibleParent.equals(child.getParent())) {
+      return true;
+    }
+    return isParent(possibleParent, child.getParent());
+  }
+
+  protected Path getCurrentFile(Context context) throws IOException {
+    InputSplit split = context.getInputSplit();
+    if (split != null && split instanceof FileSplit) {
+      FileSplit inputSplit = (FileSplit) split;
+      Path path = inputSplit.getPath();
+      return path.makeQualified(path.getFileSystem(context.getConfiguration()));
+    }
+    return null;
+  }
+
+  @Override
+  protected void map(Writable k, Text value, Context context) throws IOException, InterruptedException {
+    BlurRecord record = _mutate.getRecord();
+    record.clearColumns();
+    String str = value.toString();
+
+    Iterable<String> split = _splitter.split(str);
+    List<String> list = toList(split);
+
+    int offset = 0;
+    boolean gen = false;
+    if (!_autoGenerateRowIdAsHashOfData) {
+      record.setRowId(list.get(offset++));
+    } else {
+      _digest.reset();
+      byte[] bs = value.getBytes();
+      int length = value.getLength();
+      _digest.update(bs, 0, length);
+      record.setRowId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
+      gen = true;
+    }
+
+    if (!_autoGenerateRecordIdAsHashOfData) {
+      record.setRecordId(list.get(offset++));
+    } else {
+      if (gen) {
+        record.setRecordId(record.getRowId());
+      } else {
+        _digest.reset();
+        byte[] bs = value.getBytes();
+        int length = value.getLength();
+        _digest.update(bs, 0, length);
+        record.setRecordId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
+      }
+    }
+    String family;
+    if (_familyNotInFile) {
+      family = _familyFromPath;
+    } else {
+      family = list.get(offset++);
+    }
+    record.setFamily(family);
+
+    List<String> columnNames = _columnNameMap.get(family);
+    if (columnNames == null) {
+      throw new IOException("Family [" + family + "] is missing in the definition.");
+    }
+    if (list.size() - offset != columnNames.size()) {
+
+      String options = "";
+
+      if (!_autoGenerateRowIdAsHashOfData) {
+        options += "rowid,";
+      }
+      if (!_autoGenerateRecordIdAsHashOfData) {
+        options += "recordid,";
+      }
+      if (!_familyNotInFile) {
+        options += "family,";
+      }
+      String msg = "Record [" + str + "] does not match defined record [" + options + getColumnNames(columnNames)
+          + "].";
+      throw new IOException(msg);
+    }
+
+    for (int i = 0; i < columnNames.size(); i++) {
+      String val = handleHiveNulls(list.get(i + offset));
+      if (val != null) {
+        record.addColumn(columnNames.get(i), val);
+        _columnCounter.increment(1);
+      }
+    }
+    _key.set(record.getRowId());
+    _mutate.setMutateType(MUTATE_TYPE.REPLACE);
+    context.write(_key, _mutate);
+    _recordCounter.increment(1);
+    context.progress();
+  }
+
+  protected String handleHiveNulls(String value) {
+    if (value.equals(HIVE_NULL)) {
+      return null;
+    }
+    return value;
+  }
+
+  public void setFamilyFromPath(String familyFromPath) {
+    this._familyFromPath = familyFromPath;
+  }
+
+  protected String getColumnNames(List<String> columnNames) {
+    StringBuilder builder = new StringBuilder();
+    for (String c : columnNames) {
+      if (builder.length() != 0) {
+        builder.append(',');
+      }
+      builder.append(c);
+    }
+    return builder.toString();
+  }
+
+  protected List<String> toList(Iterable<String> split) {
+    List<String> lst = new ArrayList<String>();
+    for (String s : split) {
+      lst.add(s);
+    }
+    return lst;
+  }
+
+}


Mime
View raw message