incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [03/11] Blur MR projects restructured.
Date Thu, 01 May 2014 20:49:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
deleted file mode 100644
index 037edec..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BaseBlurMapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * Base mapper class for Blur map reduce classes.
- * 
- * @param <KEY>
- * @param <VALUE>
- */
-public abstract class BaseBlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE, Text, BlurMutate> {
-  protected BlurMutate _mutate;
-  protected Text _key;
-  protected Counter _recordCounter;
-  protected Counter _columnCounter;
-
-  @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    _mutate = new BlurMutate();
-    _mutate.setRecord(new BlurRecord());
-    _key = new Text();
-    _recordCounter = context.getCounter(BlurCounters.RECORD_COUNT);
-    _columnCounter = context.getCounter(BlurCounters.COLUMN_COUNT);
-  }
-
-  @Override
-  protected abstract void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
deleted file mode 100644
index d32a3bd..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurColumn.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-public class BlurColumn implements Writable {
-
-  private String name;
-  private String value;
-
-  public BlurColumn() {
-  }
-
-  public BlurColumn(String name, String value) {
-    this.name = name;
-    this.value = value;
-  }
-
-  public boolean hasNull() {
-    if (name == null || value == null) {
-      return true;
-    }
-    return false;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public void setName(String name) {
-    this.name = name;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    name = IOUtil.readString(in);
-    value = IOUtil.readString(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    IOUtil.writeString(out, name);
-    IOUtil.writeString(out, value);
-  }
-
-  public String getValue() {
-    return value;
-  }
-
-  public void setValue(String value) {
-    this.value = value;
-  }
-
-  @Override
-  public String toString() {
-    return "{name=" + name + ", value=" + value + "}";
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((name == null) ? 0 : name.hashCode());
-    result = prime * result + ((value == null) ? 0 : value.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    BlurColumn other = (BlurColumn) obj;
-    if (name == null) {
-      if (other.name != null)
-        return false;
-    } else if (!name.equals(other.name))
-      return false;
-    if (value == null) {
-      if (other.value != null)
-        return false;
-    } else if (!value.equals(other.value))
-      return false;
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
deleted file mode 100644
index 0691dce..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurCounters.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * The enum class used for all the internal counters during map reduce jobs.
- */
-public enum BlurCounters {
-  RECORD_COUNT, LUCENE_FIELD_COUNT, ROW_COUNT, RECORD_RATE, COPY_RATE, ROW_RATE, RECORD_DUPLICATE_COUNT, ROW_OVERFLOW_COUNT, ROW_DELETE_COUNT, COLUMN_COUNT
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
deleted file mode 100644
index 5ee26eb..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMapReduceUtil.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.blur.mapreduce.lib;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * This utility code was taken from HBase to locate classes and the jars files
- * to add to the MapReduce Job.
- */
-public class BlurMapReduceUtil {
-
-  private final static Log LOG = LogFactory.getLog(BlurMapReduceUtil.class);
-
-  /**
-   * Add the Blur dependency jars as well as jars for any of the configured job
-   * classes to the job configuration, so that JobClient will ship them to the
-   * cluster and add them to the DistributedCache.
-   */
-  public static void addDependencyJars(Job job) throws IOException {
-    try {
-      addDependencyJars(job.getConfiguration(), org.apache.zookeeper.ZooKeeper.class, job.getMapOutputKeyClass(),
-          job.getMapOutputValueClass(), job.getInputFormatClass(), job.getOutputKeyClass(), job.getOutputValueClass(),
-          job.getOutputFormatClass(), job.getPartitionerClass(), job.getCombinerClass());
-      addAllJarsInBlurLib(job.getConfiguration());
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * Adds all the jars in the same path as the blur jar files.
-   * @param conf
-   * @throws IOException
-   */
-  public static void addAllJarsInBlurLib(Configuration conf) throws IOException {
-    FileSystem localFs = FileSystem.getLocal(conf);
-    Set<String> jars = new HashSet<String>();
-    jars.addAll(conf.getStringCollection("tmpjars"));
-
-    String property = System.getProperty("java.class.path");
-    String[] files = property.split("\\:");
-
-    String blurLibPath = getPath("blur-", files);
-    if (blurLibPath == null) {
-      return;
-    }
-    List<String> pathes = getPathes(blurLibPath, files);
-    for (String pathStr : pathes) {
-      Path path = new Path(pathStr);
-      if (!localFs.exists(path)) {
-        LOG.warn("Could not validate jar file " + path);
-        continue;
-      }
-      jars.add(path.makeQualified(localFs).toString());
-    }
-    if (jars.isEmpty()) {
-      return;
-    }
-    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
-  }
-
-  private static List<String> getPathes(String path, String[] files) {
-    List<String> pathes = new ArrayList<String>();
-    for (String file : files) {
-      if (file.startsWith(path)) {
-        pathes.add(file);
-      }
-    }
-    return pathes;
-  }
-
-  private static String getPath(String startsWith, String[] files) {
-    for (String file : files) {
-      int lastIndexOf = file.lastIndexOf('/');
-      String fileName = file.substring(lastIndexOf + 1);
-      if (fileName.startsWith(startsWith)) {
-        return file.substring(0, lastIndexOf);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Add the jars containing the given classes to the job's configuration such
-   * that JobClient will ship them to the cluster and add them to the
-   * DistributedCache.
-   */
-  public static void addDependencyJars(Configuration conf, Class<?>... classes) throws IOException {
-    FileSystem localFs = FileSystem.getLocal(conf);
-    Set<String> jars = new HashSet<String>();
-    // Add jars that are already in the tmpjars variable
-    jars.addAll(conf.getStringCollection("tmpjars"));
-
-    // Add jars containing the specified classes
-    for (Class<?> clazz : classes) {
-      if (clazz == null) {
-        continue;
-      }
-
-      String pathStr = findOrCreateJar(clazz);
-      if (pathStr == null) {
-        LOG.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster.");
-        continue;
-      }
-      Path path = new Path(pathStr);
-      if (!localFs.exists(path)) {
-        LOG.warn("Could not validate jar file " + path + " for class " + clazz);
-        continue;
-      }
-      jars.add(path.makeQualified(localFs).toString());
-    }
-    if (jars.isEmpty()) {
-      return;
-    }
-
-    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
-  }
-
-  /**
-   * If org.apache.hadoop.util.JarFinder is available (0.23+ hadoop), finds the
-   * Jar for a class or creates it if it doesn't exist. If the class is in a
-   * directory in the classpath, it creates a Jar on the fly with the contents
-   * of the directory and returns the path to that Jar. If a Jar is created, it
-   * is created in the system temporary directory.
-   * 
-   * Otherwise, returns an existing jar that contains a class of the same name.
-   * 
-   * @param my_class
-   *          the class to find.
-   * @return a jar file that contains the class, or null.
-   * @throws IOException
-   */
-  private static String findOrCreateJar(Class<?> my_class) throws IOException {
-    try {
-      Class<?> jarFinder = Class.forName("org.apache.hadoop.util.JarFinder");
-      // hadoop-0.23 has a JarFinder class that will create the jar
-      // if it doesn't exist. Note that this is needed to run the mapreduce
-      // unit tests post-0.23, because mapreduce v2 requires the relevant jars
-      // to be in the mr cluster to do output, split, etc. At unit test time,
-      // the hbase jars do not exist, so we need to create some. Note that we
-      // can safely fall back to findContainingJars for pre-0.23 mapreduce.
-      Method m = jarFinder.getMethod("getJar", Class.class);
-      return (String) m.invoke(null, my_class);
-    } catch (InvocationTargetException ite) {
-      // function was properly called, but threw it's own exception
-      throw new IOException(ite.getCause());
-    } catch (Exception e) {
-      // ignore all other exceptions. related to reflection failure
-    }
-
-    LOG.debug("New JarFinder: org.apache.hadoop.util.JarFinder.getJar " + "not available.  Using old findContainingJar");
-    return findContainingJar(my_class);
-  }
-
-  /**
-   * Find a jar that contains a class of the same name, if any. It will return a
-   * jar file, even if that is not the first thing on the class path that has a
-   * class with the same name.
-   * 
-   * This is shamelessly copied from JobConf
-   * 
-   * @param my_class
-   *          the class to find.
-   * @return a jar file that contains the class, or null.
-   * @throws IOException
-   */
-  private static String findContainingJar(Class<?> my_class) {
-    ClassLoader loader = my_class.getClassLoader();
-    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
-    try {
-      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
-        URL url = itr.nextElement();
-        if ("jar".equals(url.getProtocol())) {
-          String toReturn = url.getPath();
-          if (toReturn.startsWith("file:")) {
-            toReturn = toReturn.substring("file:".length());
-          }
-          // URLDecoder is a misnamed class, since it actually decodes
-          // x-www-form-urlencoded MIME type rather than actual
-          // URL encoding (which the file path has). Therefore it would
-          // decode +s to ' 's which is incorrect (spaces are actually
-          // either unencoded or encoded as "%20"). Replace +s first, so
-          // that they are kept sacred during the decoding process.
-          toReturn = toReturn.replaceAll("\\+", "%2B");
-          toReturn = URLDecoder.decode(toReturn, "UTF-8");
-          return toReturn.replaceAll("!.*$", "");
-        }
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
deleted file mode 100644
index 36d7f4f..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurMutate.java
+++ /dev/null
@@ -1,178 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.Row;
-import org.apache.hadoop.io.Writable;
-
-/**
- * {@link BlurMutate} carries the {@link Record}s bound for the {@link Row} for
- * indexing. If this mutate represents a delete of the {@link Row} the recordId
- * of the {@link BlurRecord} is ignored.
- */
-public class BlurMutate implements Writable {
-
-  /**
-   * The {@link MUTATE_TYPE} controls the mutating of the {@link Row}. DELETE
-   * indicates that the {@link Row} is to be deleted. REPLACE indicates that the
-   * group of mutates are to replace the existing {@link Row}.
-   * 
-   * If both a DELETE and a REPLACE exist for a single {@link Row} in the
-   * {@link BlurOutputFormat} then the {@link Row} will be replaced not just
-   * deleted.
-   */
-  public enum MUTATE_TYPE {
-    /* ADD(0), UPDATE(1), */DELETE(2), REPLACE(3);
-    private int _value;
-
-    private MUTATE_TYPE(int value) {
-      _value = value;
-    }
-
-    public int getValue() {
-      return _value;
-    }
-
-    public MUTATE_TYPE find(int value) {
-      switch (value) {
-      // @TODO Updates through MR is going to be disabled
-      // case 0:
-      // return ADD;
-      // case 1:
-      // return UPDATE;
-      case 2:
-        return DELETE;
-      case 3:
-        return REPLACE;
-      default:
-        throw new RuntimeException("Value [" + value + "] not found.");
-      }
-    }
-  }
-
-  private MUTATE_TYPE _mutateType = MUTATE_TYPE.REPLACE;
-  private BlurRecord _record = new BlurRecord();
-
-  public BlurMutate() {
-
-  }
-
-  public BlurMutate(MUTATE_TYPE type, BlurRecord record) {
-    _mutateType = type;
-    _record = record;
-  }
-
-  public BlurMutate(MUTATE_TYPE type, String rowId) {
-    _mutateType = type;
-    _record.setRowId(rowId);
-  }
-
-  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId) {
-    _mutateType = type;
-    _record.setRowId(rowId);
-    _record.setRecordId(recordId);
-  }
-
-  public BlurMutate(MUTATE_TYPE type, String rowId, String recordId, String family) {
-    _mutateType = type;
-    _record.setRowId(rowId);
-    _record.setRecordId(recordId);
-    _record.setFamily(family);
-  }
-
-  public BlurMutate addColumn(BlurColumn column) {
-    _record.addColumn(column);
-    return this;
-  }
-
-  public BlurMutate addColumn(String name, String value) {
-    return addColumn(new BlurColumn(name, value));
-  }
-
-  public BlurRecord getRecord() {
-    return _record;
-  }
-
-  public void setRecord(BlurRecord record) {
-    _record = record;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    IOUtil.writeVInt(out, _mutateType.getValue());
-    _record.write(out);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    _mutateType.find(IOUtil.readVInt(in));
-    _record.readFields(in);
-  }
-
-  public MUTATE_TYPE getMutateType() {
-    return _mutateType;
-  }
-
-  public BlurMutate setMutateType(MUTATE_TYPE mutateType) {
-    _mutateType = mutateType;
-    return this;
-  }
-
-  @Override
-  public String toString() {
-    return "BlurMutate [mutateType=" + _mutateType + ", record=" + _record + "]";
-  }
-
-  public BlurMutate setFamily(String family) {
-    _record.setFamily(family);
-    return this;
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((_mutateType == null) ? 0 : _mutateType.hashCode());
-    result = prime * result + ((_record == null) ? 0 : _record.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    BlurMutate other = (BlurMutate) obj;
-    if (_mutateType != other._mutateType)
-      return false;
-    if (_record == null) {
-      if (other._record != null)
-        return false;
-    } else if (!_record.equals(other._record))
-      return false;
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
deleted file mode 100644
index 6b485a9..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.mapred.AbstractOutputCommitter;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-
-public class BlurOutputCommitter extends AbstractOutputCommitter {
-
-  private static final Log LOG = LogFactory.getLog(BlurOutputCommitter.class);
-
-  private Path _newIndex;
-  private Configuration _configuration;
-  private TaskAttemptID _taskAttemptID;
-  private Path _indexPath;
-  private final boolean _runTaskCommit;
-  private TableDescriptor _tableDescriptor;
-
-  public BlurOutputCommitter() {
-    _runTaskCommit = true;
-  }
-
-  public BlurOutputCommitter(boolean isMap, int numberOfReducers) {
-    _runTaskCommit = isMap && numberOfReducers != 0 ? false : true;
-  }
-
-  @Override
-  public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
-    return _runTaskCommit;
-  }
-
-  @Override
-  public void setupTask(TaskAttemptContext context) throws IOException {
-
-  }
-
-  @Override
-  public void commitTask(TaskAttemptContext context) throws IOException {
-    setup(context);
-    FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
-    if (fileSystem.exists(_newIndex) && !fileSystem.isFile(_newIndex)) {
-      Path dst = new Path(_indexPath, _taskAttemptID.toString() + ".task_complete");
-      LOG.info("Committing [{0}] to [{1}]", _newIndex, dst);
-      fileSystem.rename(_newIndex, dst);
-    } else {
-      throw new IOException("Path [" + _newIndex + "] does not exist, can not commit.");
-    }
-  }
-
-  @Override
-  public void abortTask(TaskAttemptContext context) throws IOException {
-    setup(context);
-    FileSystem fileSystem = _newIndex.getFileSystem(_configuration);
-    LOG.info("abortTask - Deleting [{0}]", _newIndex);
-    fileSystem.delete(_newIndex, true);
-  }
-
-  private void setup(TaskAttemptContext context) throws IOException {
-    _configuration = context.getConfiguration();
-    _tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
-    int shardCount = _tableDescriptor.getShardCount();
-    int attemptId = context.getTaskAttemptID().getTaskID().getId();
-    int shardId = attemptId % shardCount;
-    _taskAttemptID = context.getTaskAttemptID();
-    Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
-    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
-    _indexPath = new Path(tableOutput, shardName);
-    _newIndex = new Path(_indexPath, _taskAttemptID.toString() + ".tmp");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
deleted file mode 100644
index 7bbc567..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ /dev/null
@@ -1,338 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TJSONProtocol;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TIOStreamTransport;
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.Progressable;
-
-/**
- * {@link BlurOutputFormat} is used to index data and delivery the indexes to
- * the proper Blur table for searching. A typical usage of this class would be
- * as follows.<br/>
- * <br/>
- * 
- * <br/>
- * {@link Iface} client = {@link BlurClient}.getClient("controller1:40010");<br/>
- * <br/>
- * TableDescriptor tableDescriptor = client.describe(tableName);<br/>
- * <br/>
- * Job job = new Job(jobConf, "blur index");<br/>
- * job.setJarByClass(BlurOutputFormatTest.class);<br/>
- * job.setMapperClass(CsvBlurMapper.class);<br/>
- * job.setInputFormatClass(TextInputFormat.class);<br/>
- * <br/>
- * FileInputFormat.addInputPath(job, new Path(input));<br/>
- * CsvBlurMapper.addColumns(job, "cf1", "col");<br/>
- * <br/>
- * BlurOutputFormat.setupJob(job, tableDescriptor);<br/>
- * BlurOutputFormat.setIndexLocally(job, true);<br/>
- * BlurOutputFormat.setOptimizeInFlight(job, false);<br/>
- * <br/>
- * job.waitForCompletion(true);<br/>
- * 
- */
-public class BlurOutputFormat extends OutputFormat<Text, BlurMutate> {
-
-  public static final String BLUR_OUTPUT_REDUCER_MULTIPLIER = "blur.output.reducer.multiplier";
-  public static final String BLUR_OUTPUT_OPTIMIZEINFLIGHT = "blur.output.optimizeinflight";
-  public static final String BLUR_OUTPUT_INDEXLOCALLY = "blur.output.indexlocally";
-  public static final String BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE = "blur.output.max.document.buffer.size";
-  public static final String BLUR_TABLE_DESCRIPTOR = "blur.table.descriptor";
-  public static final String BLUR_OUTPUT_PATH = "blur.output.path";
-
-  private static final String MAPRED_OUTPUT_COMMITTER_CLASS = "mapred.output.committer.class";
-  private static ThreadLocal<Progressable> _progressable = new ThreadLocal<Progressable>();
-  private static ThreadLocal<GetCounter> _getCounter = new ThreadLocal<GetCounter>();
-
-  public static void setProgressable(Progressable progressable) {
-    _progressable.set(progressable);
-  }
-
-  public static Progressable getProgressable() {
-    return _progressable.get();
-  }
-
-  public static void setGetCounter(GetCounter getCounter) {
-    _getCounter.set(getCounter);
-  }
-
-  public static GetCounter getGetCounter() {
-    return _getCounter.get();
-  }
-
-  @Override
-  public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
-    CheckOutputSpecs.checkOutputSpecs(context.getConfiguration(), context.getNumReduceTasks());
-  }
-
-  @Override
-  public RecordWriter<Text, BlurMutate> getRecordWriter(TaskAttemptContext context) throws IOException,
-      InterruptedException {
-    int id = context.getTaskAttemptID().getTaskID().getId();
-    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
-    final GenericBlurRecordWriter writer = new GenericBlurRecordWriter(context.getConfiguration(), id,
-        taskAttemptID.toString() + ".tmp");
-    return new RecordWriter<Text, BlurMutate>() {
-
-      @Override
-      public void write(Text key, BlurMutate value) throws IOException, InterruptedException {
-        writer.write(key, value);
-      }
-
-      @Override
-      public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-        writer.close();
-      }
-    };
-  }
-
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-    return new BlurOutputCommitter(context.getTaskAttemptID().isMap(), context.getNumReduceTasks());
-  }
-
-  public static TableDescriptor getTableDescriptor(Configuration configuration) throws IOException {
-    String tableDesStr = configuration.get(BLUR_TABLE_DESCRIPTOR);
-    if (tableDesStr == null) {
-      return null;
-    }
-    ByteArrayInputStream inputStream = new ByteArrayInputStream(tableDesStr.getBytes());
-    TIOStreamTransport transport = new TIOStreamTransport(inputStream);
-    TJSONProtocol protocol = new TJSONProtocol(transport);
-    TableDescriptor descriptor = new TableDescriptor();
-    try {
-      descriptor.read(protocol);
-    } catch (TException e) {
-      throw new IOException(e);
-    }
-    transport.close();
-    return descriptor;
-  }
-
-  /**
-   * This will multiple the number of reducers for this job. For example if the
-   * table has 256 shards the normal number of reducers is 256. However if the
-   * reducer multiplier is set to 4 then the number of reducers will be 1024 and
-   * each shard will get 4 new segments instead of the normal 1.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param multiple
-   *          the multiple to use.
-   * @throws IOException
-   */
-  public static void setReducerMultiplier(Job job, int multiple) throws IOException {
-    TableDescriptor tableDescriptor = getTableDescriptor(job.getConfiguration());
-    if (tableDescriptor == null) {
-      throw new IOException("setTableDescriptor needs to be called first.");
-    }
-    job.setNumReduceTasks(tableDescriptor.getShardCount() * multiple);
-    Configuration configuration = job.getConfiguration();
-    configuration.setInt(BLUR_OUTPUT_REDUCER_MULTIPLIER, multiple);
-  }
-
-  public static int getReducerMultiplier(Configuration configuration) {
-    return configuration.getInt(BLUR_OUTPUT_REDUCER_MULTIPLIER, 1);
-  }
-
-  /**
-   * Sets the {@link TableDescriptor} for this job.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param tableDescriptor
-   *          the {@link TableDescriptor}.
-   * @throws IOException
-   */
-  public static void setTableDescriptor(Job job, TableDescriptor tableDescriptor) throws IOException {
-    setTableDescriptor(job.getConfiguration(), tableDescriptor);
-  }
-
-  /**
-   * Sets the {@link TableDescriptor} for this job.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param tableDescriptor
-   *          the {@link TableDescriptor}.
-   * @throws IOException
-   */
-  public static void setTableDescriptor(Configuration configuration, TableDescriptor tableDescriptor)
-      throws IOException {
-    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    TIOStreamTransport transport = new TIOStreamTransport(outputStream);
-    TJSONProtocol protocol = new TJSONProtocol(transport);
-    try {
-      tableDescriptor.write(protocol);
-    } catch (TException e) {
-      throw new IOException(e);
-    }
-    transport.close();
-    configuration.set(BLUR_TABLE_DESCRIPTOR, new String(outputStream.toByteArray()));
-    setOutputPath(configuration, new Path(tableDescriptor.getTableUri()));
-  }
-
-  /**
-   * Sets the maximum number of documents that the buffer will hold in memory
-   * before overflowing to disk. By default this is 1000 which will probably be
-   * very low for most systems.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param maxDocumentBufferSize
-   *          the maxDocumentBufferSize.
-   */
-  public static void setMaxDocumentBufferSize(Job job, int maxDocumentBufferSize) {
-    setMaxDocumentBufferSize(job.getConfiguration(), maxDocumentBufferSize);
-  }
-
-  /**
-   * Sets the maximum number of documents that the buffer will hold in memory
-   * before overflowing to disk. By default this is 1000 which will probably be
-   * very low for most systems.
-   * 
-   * @param configuration
-   *          the configuration to setup.
-   * @param maxDocumentBufferSize
-   *          the maxDocumentBufferSize.
-   */
-  public static void setMaxDocumentBufferSize(Configuration configuration, int maxDocumentBufferSize) {
-    configuration.setInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE, maxDocumentBufferSize);
-  }
-
-  public static int getMaxDocumentBufferSize(Configuration configuration) {
-    return configuration.getInt(BLUR_OUTPUT_MAX_DOCUMENT_BUFFER_SIZE, 1000);
-  }
-
-  public static void setOutputPath(Job job, Path path) {
-    setOutputPath(job.getConfiguration(), path);
-  }
-
-  public static void setOutputPath(Configuration configuration, Path path) {
-    configuration.set(BLUR_OUTPUT_PATH, path.toString());
-    configuration.set(MAPRED_OUTPUT_COMMITTER_CLASS, BlurOutputCommitter.class.getName());
-  }
-
-  public static Path getOutputPath(Configuration configuration) {
-    return new Path(configuration.get(BLUR_OUTPUT_PATH));
-  }
-
-  /**
-   * Enabled by default, this will enable local indexing on the machine where
-   * the task is running. Then when the {@link RecordWriter} closes the index is
-   * copied to the remote destination in HDFS.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param b
-   *          the boolean to true enable, false to disable.
-   */
-  public static void setIndexLocally(Job job, boolean b) {
-    setIndexLocally(job.getConfiguration(), b);
-  }
-
-  /**
-   * Enabled by default, this will enable local indexing on the machine where
-   * the task is running. Then when the {@link RecordWriter} closes the index is
-   * copied to the remote destination in HDFS.
-   * 
-   * @param configuration
-   *          the configuration to setup.
-   * @param b
-   *          the boolean to true enable, false to disable.
-   */
-  public static void setIndexLocally(Configuration configuration, boolean b) {
-    configuration.setBoolean(BLUR_OUTPUT_INDEXLOCALLY, b);
-  }
-
-  public static boolean isIndexLocally(Configuration configuration) {
-    return configuration.getBoolean(BLUR_OUTPUT_INDEXLOCALLY, true);
-  }
-
-  /**
-   * Enabled by default, this will optimize the index while copying from the
-   * local index to the remote destination in HDFS. Used in conjunction with the
-   * setIndexLocally.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param b
-   *          the boolean to true enable, false to disable.
-   */
-  public static void setOptimizeInFlight(Job job, boolean b) {
-    setOptimizeInFlight(job.getConfiguration(), b);
-  }
-
-  /**
-   * Enabled by default, this will optimize the index while copying from the
-   * local index to the remote destination in HDFS. Used in conjunction with the
-   * setIndexLocally.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param b
-   *          the boolean to true enable, false to disable.
-   */
-  public static void setOptimizeInFlight(Configuration configuration, boolean b) {
-    configuration.setBoolean(BLUR_OUTPUT_OPTIMIZEINFLIGHT, b);
-  }
-
-  public static boolean isOptimizeInFlight(Configuration configuration) {
-    return configuration.getBoolean(BLUR_OUTPUT_OPTIMIZEINFLIGHT, true);
-  }
-
-  /**
-   * Sets up the output portion of the map reduce job. This does effect the map
-   * side of the job, of a map and reduce job.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param tableDescriptor
-   *          the table descriptor to write the output of the indexing job.
-   * @throws IOException
-   */
-  public static void setupJob(Job job, TableDescriptor tableDescriptor) throws IOException {
-    job.setReducerClass(DefaultBlurReducer.class);
-    job.setNumReduceTasks(tableDescriptor.getShardCount());
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(BlurMutate.class);
-    job.setOutputFormatClass(BlurOutputFormat.class);
-    setTableDescriptor(job, tableDescriptor);
-    BlurMapReduceUtil.addDependencyJars(job);
-    BlurMapReduceUtil.addAllJarsInBlurLib(job.getConfiguration());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
deleted file mode 100644
index 7c12a76..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
+++ /dev/null
@@ -1,178 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.blur.utils.ReaderBlurRecord;
-import org.apache.hadoop.io.Writable;
-
-public class BlurRecord implements Writable, ReaderBlurRecord {
-
-  private String _rowId;
-  private String _recordId;
-  private String _family;
-
-  private List<BlurColumn> _columns = new ArrayList<BlurColumn>();
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    _rowId = IOUtil.readString(in);
-    _recordId = IOUtil.readString(in);
-    _family = IOUtil.readString(in);
-    int size = IOUtil.readVInt(in);
-    _columns.clear();
-    for (int i = 0; i < size; i++) {
-      BlurColumn column = new BlurColumn();
-      column.readFields(in);
-      _columns.add(column);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    IOUtil.writeString(out, _rowId);
-    IOUtil.writeString(out, _recordId);
-    IOUtil.writeString(out, _family);
-    IOUtil.writeVInt(out, _columns.size());
-    for (BlurColumn column : _columns) {
-      column.write(out);
-    }
-  }
-
-  public String getRowId() {
-    return _rowId;
-  }
-
-  public void setRowId(String rowId) {
-    this._rowId = rowId;
-  }
-
-  public String getRecordId() {
-    return _recordId;
-  }
-
-  public void setRecordId(String recordId) {
-    this._recordId = recordId;
-  }
-
-  public String getFamily() {
-    return _family;
-  }
-
-  public void setFamily(String family) {
-    this._family = family;
-  }
-
-  public List<BlurColumn> getColumns() {
-    return _columns;
-  }
-
-  public void setColumns(List<BlurColumn> columns) {
-    this._columns = columns;
-  }
-
-  public void clearColumns() {
-    _columns.clear();
-  }
-
-  public void addColumn(BlurColumn column) {
-    _columns.add(column);
-  }
-
-  public void addColumn(String name, String value) {
-    BlurColumn blurColumn = new BlurColumn();
-    blurColumn.setName(name);
-    blurColumn.setValue(value);
-    addColumn(blurColumn);
-  }
-
-  @Override
-  public void setRecordIdStr(String value) {
-    setRecordId(value);
-  }
-
-  @Override
-  public void setFamilyStr(String family) {
-    setFamily(family);
-  }
-
-  public void reset() {
-    clearColumns();
-    _rowId = null;
-    _recordId = null;
-    _family = null;
-  }
-
-  @Override
-  public void setRowIdStr(String rowId) {
-    setRowId(rowId);
-  }
-
-  @Override
-  public String toString() {
-    return "{rowId=" + _rowId + ", recordId=" + _recordId + ", family=" + _family + ", columns=" + _columns + "}";
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((_columns == null) ? 0 : _columns.hashCode());
-    result = prime * result + ((_family == null) ? 0 : _family.hashCode());
-    result = prime * result + ((_recordId == null) ? 0 : _recordId.hashCode());
-    result = prime * result + ((_rowId == null) ? 0 : _rowId.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    BlurRecord other = (BlurRecord) obj;
-    if (_columns == null) {
-      if (other._columns != null)
-        return false;
-    } else if (!_columns.equals(other._columns))
-      return false;
-    if (_family == null) {
-      if (other._family != null)
-        return false;
-    } else if (!_family.equals(other._family))
-      return false;
-    if (_recordId == null) {
-      if (other._recordId != null)
-        return false;
-    } else if (!_recordId.equals(other._recordId))
-      return false;
-    if (_rowId == null) {
-      if (other._rowId != null)
-        return false;
-    } else if (!_rowId.equals(other._rowId))
-      return false;
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
deleted file mode 100644
index 5f4fec6..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.RecordReader;
-
-
-public abstract class BlurRecordReader extends RecordReader<Text, BlurRecord> {
-
-//  private IndexReader reader;
-//  private Directory directory;
-//  private int startingDocId;
-//  private int endingDocId;
-//  private int position;
-//  private Text rowid = new Text();
-//  private BlurRecord record = new BlurRecord();
-//
-//  @Override
-//  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-//    BlurInputSplit blurSplit = (BlurInputSplit) split;
-//    Path path = blurSplit.getIndexPath();
-//    String segmentName = blurSplit.getSegmentName();
-//    startingDocId = blurSplit.getStartingDocId();
-//    endingDocId = blurSplit.getEndingDocId();
-//    directory = new HdfsDirectory(context.getConfiguration(), path);
-//
-//    IndexCommit commit = Utils.findLatest(directory);
-//    reader = Utils.openSegmentReader(directory, commit, segmentName, Utils.getTermInfosIndexDivisor(context.getConfiguration()));
-//    int maxDoc = reader.maxDoc();
-//    if (endingDocId >= maxDoc) {
-//      endingDocId = maxDoc - 1;
-//    }
-//    position = startingDocId - 1;
-//  }
-//
-//  @Override
-//  public boolean nextKeyValue() throws IOException, InterruptedException {
-//    do {
-//      position++;
-//      if (position > endingDocId) {
-//        return false;
-//      }
-//    } while (reader.isDeleted(position));
-//    readDocument();
-//    return true;
-//  }
-//
-//  private void readDocument() throws CorruptIndexException, IOException {
-//    Document document = reader.document(position);
-//    record.reset();
-//    rowid.set(RowDocumentUtil.readRecord(document, record));
-//  }
-//
-//  @Override
-//  public Text getCurrentKey() throws IOException, InterruptedException {
-//    return rowid;
-//  }
-//
-//  @Override
-//  public BlurRecord getCurrentValue() throws IOException, InterruptedException {
-//    return record;
-//  }
-//
-//  @Override
-//  public float getProgress() throws IOException, InterruptedException {
-//    int total = endingDocId - startingDocId;
-//    return (float) position / (float) total;
-//  }
-//
-//  @Override
-//  public void close() throws IOException {
-//    reader.close();
-//    directory.close();
-//  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
deleted file mode 100644
index 6bbaad4..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib;
-
-import java.io.IOException;
-
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class CheckOutputSpecs {
-  
-  public static void checkOutputSpecs(Configuration config, int reducers) throws IOException, InterruptedException {
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(config);
-    if (tableDescriptor == null) {
-      throw new IOException("setTableDescriptor needs to be called first.");
-    }
-    int shardCount = tableDescriptor.getShardCount();
-    FileSystem fileSystem = BlurOutputFormat.getOutputPath(config).getFileSystem(config);
-    Path tablePath = new Path(tableDescriptor.getTableUri());
-    if (fileSystem.exists(tablePath)) {
-      BlurUtil.validateShardCount(shardCount, fileSystem, tablePath);
-    } else {
-      throw new IOException("Table path [ " + tablePath + " ] doesn't exist for table [ " + tableDescriptor.getName()
-          + " ].");
-    }
-    BlurUtil.validateWritableDirectory(fileSystem, tablePath);
-    int reducerMultiplier = BlurOutputFormat.getReducerMultiplier(config);
-    int validNumberOfReducers = reducerMultiplier * shardCount;
-    if (reducers > 0 && reducers != validNumberOfReducers) {
-      throw new IllegalArgumentException("Invalid number of reducers [ " + reducers + " ]."
-          + " Number of Reducers should be [ " + validNumberOfReducers + " ].");
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
deleted file mode 100644
index a79541c..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CopyRateDirectory.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.lucene.store.DataInput;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-
-/**
- * Decorator of Directory to capture the copy rate of a directory copy.
- */
-public class CopyRateDirectory extends Directory {
-
-  private final Directory _directory;
-  private final RateCounter _copyRateCounter;
-
-  public CopyRateDirectory(Directory dir, RateCounter copyRateCounter) {
-    _directory = dir;
-    _copyRateCounter = copyRateCounter;
-  }
-
-  public IndexOutput createOutput(String name, IOContext context) throws IOException {
-    return wrap(_directory.createOutput(name, context));
-  }
-
-  private IndexOutput wrap(IndexOutput output) {
-    return new CopyRateIndexOutput(output, _copyRateCounter);
-  }
-
-  static class CopyRateIndexOutput extends IndexOutput {
-
-    private final IndexOutput _indexOutput;
-    private final RateCounter _copyRateCounter;
-
-    public CopyRateIndexOutput(IndexOutput output, RateCounter copyRateCounter) {
-      _indexOutput = output;
-      _copyRateCounter = copyRateCounter;
-    }
-
-    public void copyBytes(DataInput input, long numBytes) throws IOException {
-      _indexOutput.copyBytes(input, numBytes);
-      if (_copyRateCounter != null) {
-        _copyRateCounter.mark(numBytes);
-      }
-    }
-
-    public void writeByte(byte b) throws IOException {
-      _indexOutput.writeByte(b);
-      if (_copyRateCounter != null) {
-        _copyRateCounter.mark();
-      }
-    }
-
-    public void flush() throws IOException {
-      _indexOutput.flush();
-    }
-
-    public void close() throws IOException {
-      _indexOutput.close();
-    }
-
-    public long getFilePointer() {
-      return _indexOutput.getFilePointer();
-    }
-
-    @SuppressWarnings("deprecation")
-    public void seek(long pos) throws IOException {
-      _indexOutput.seek(pos);
-    }
-
-    public void writeBytes(byte[] b, int offset, int length) throws IOException {
-      _indexOutput.writeBytes(b, offset, length);
-      _copyRateCounter.mark(length);
-    }
-
-    public long length() throws IOException {
-      return _indexOutput.length();
-    }
-  }
-
-  public String[] listAll() throws IOException {
-    return _directory.listAll();
-  }
-
-  public boolean fileExists(String name) throws IOException {
-    return _directory.fileExists(name);
-  }
-
-  public void deleteFile(String name) throws IOException {
-    _directory.deleteFile(name);
-  }
-
-  public long fileLength(String name) throws IOException {
-    return _directory.fileLength(name);
-  }
-
-  public void sync(Collection<String> names) throws IOException {
-    _directory.sync(names);
-  }
-
-  public IndexInput openInput(String name, IOContext context) throws IOException {
-    return _directory.openInput(name, context);
-  }
-
-  public void close() throws IOException {
-    _directory.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
deleted file mode 100644
index 6830e32..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
+++ /dev/null
@@ -1,409 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.SnappyCodec;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-
-import com.google.common.base.Splitter;
-
-@SuppressWarnings("static-access")
-public class CsvBlurDriver {
-
-  public static final String CSVLOADER = "csvloader";
-  public static final String MAPRED_COMPRESS_MAP_OUTPUT = "mapred.compress.map.output";
-  public static final String MAPRED_MAP_OUTPUT_COMPRESSION_CODEC = "mapred.map.output.compression.codec";
-  public static final int DEFAULT_WIDTH = 100;
-  public static final String HEADER = "The \"" +CSVLOADER +
-  		"\" command is used to load delimited into a Blur table.\nThe required options are \"-c\", \"-t\", \"-d\". The " +
-  		"standard format for the contents of a file is:\"rowid,recordid,family,col1,col2,...\". However there are " +
-  		"several options, such as the rowid and recordid can be generated based on the data in the record via the " +
-  		"\"-A\" and \"-a\" options. The family can assigned based on the path via the \"-I\" option. The column " +
-  		"name order can be mapped via the \"-d\" option. Also you can set the input " +
-  		"format to either sequence files vie the \"-S\" option or leave the default text files.";
-
-  enum COMPRESSION {
-    SNAPPY(SnappyCodec.class), GZIP(GzipCodec.class), BZIP(BZip2Codec.class), DEFAULT(DefaultCodec.class);
-
-    private final String className;
-
-    private COMPRESSION(Class<? extends CompressionCodec> clazz) {
-      className = clazz.getName();
-    }
-
-    public String getClassName() {
-      return className;
-    }
-  }
-
-  interface ControllerPool {
-    Iface getClient(String controllerConnectionStr);
-  }
-
-  public static void main(String... args) throws Exception {
-    Configuration configuration = new Configuration();
-    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
-    Job job = setupJob(configuration, new ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return BlurClient.getClient(controllerConnectionStr);
-      }
-    }, otherArgs);
-    if (job == null) {
-      System.exit(1);
-    }
-
-    boolean waitForCompletion = job.waitForCompletion(true);
-    System.exit(waitForCompletion ? 0 : 1);
-  }
-
-  public static Job setupJob(Configuration configuration, ControllerPool controllerPool, String... otherArgs)
-      throws Exception {
-    CommandLine cmd = parse(otherArgs);
-    if (cmd == null) {
-      return null;
-    }
-
-    final String controllerConnectionStr = cmd.getOptionValue("c");
-    final String tableName = cmd.getOptionValue("t");
-
-    final Iface client = controllerPool.getClient(controllerConnectionStr);
-    TableDescriptor tableDescriptor = client.describe(tableName);
-
-    Job job = new Job(configuration, "Blur indexer [" + tableName + "]");
-    job.setJarByClass(CsvBlurDriver.class);
-    job.setMapperClass(CsvBlurMapper.class);
-
-    if (cmd.hasOption("p")) {
-      job.getConfiguration().set(MAPRED_COMPRESS_MAP_OUTPUT, "true");
-      String codecStr = cmd.getOptionValue("p");
-      COMPRESSION compression;
-      try {
-        compression = COMPRESSION.valueOf(codecStr.trim().toUpperCase());
-      } catch (IllegalArgumentException e) {
-        compression = null;
-      }
-      if (compression == null) {
-        job.getConfiguration().set(MAPRED_MAP_OUTPUT_COMPRESSION_CODEC, codecStr.trim());
-      } else {
-        job.getConfiguration().set(MAPRED_MAP_OUTPUT_COMPRESSION_CODEC, compression.getClassName());
-      }
-    }
-    if (cmd.hasOption("a")) {
-      CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(job, true);
-    }
-    if (cmd.hasOption("A")) {
-      CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(job, true);
-    }
-    if (cmd.hasOption("S")) {
-      job.setInputFormatClass(SequenceFileInputFormat.class);
-    } else {
-      job.setInputFormatClass(TextInputFormat.class);
-    }
-
-    if (cmd.hasOption("C")) {
-      if (cmd.hasOption("S")) {
-        String[] optionValues = cmd.getOptionValues("C");
-        job.setInputFormatClass(CsvBlurCombineSequenceFileInputFormat.class);
-        CombineFileInputFormat.setMinInputSplitSize(job, Long.parseLong(optionValues[0]));
-        CombineFileInputFormat.setMaxInputSplitSize(job, Long.parseLong(optionValues[1]));
-      } else {
-        System.err.println("'C' can only be used with option 'S'");
-        return null;
-      }
-    }
-
-    if (cmd.hasOption("i")) {
-      for (String input : cmd.getOptionValues("i")) {
-        Path path = new Path(input);
-        Set<Path> pathSet = recurisvelyGetPathesContainingFiles(path, job.getConfiguration());
-        if (pathSet.isEmpty()) {
-          FileInputFormat.addInputPath(job, path);
-        } else {
-          for (Path p : pathSet) {
-            FileInputFormat.addInputPath(job, p);
-          }
-        }
-      }
-    }
-    // processing the 'I' option
-    if (cmd.hasOption("I")) {
-    	if(cmd.hasOption("C")){
-    		 System.err.println("'I' and 'C' both parameters can not be used together.");
-             return null;
-    	}
-      Option[] options = cmd.getOptions();
-      for (Option option : options) {
-        if (option.getOpt().equals("I")) {
-          String[] values = option.getValues();
-          if (values.length < 2) {
-            System.err.println("'I' parameter missing minimum args of (family path*)");
-            return null;
-          }
-          for (String p : getSubArray(values, 1)) {
-            Path path = new Path(p);
-            CsvBlurMapper.addFamilyPath(job, values[0], path);
-            FileInputFormat.addInputPath(job, path);
-          }
-        }
-      }
-    }
-
-    if (cmd.hasOption("s")) {
-      CsvBlurMapper.setSeparator(job, StringEscapeUtils.unescapeJava(cmd.getOptionValue("s")));
-    }
-    if (cmd.hasOption("o")) {
-      BlurOutputFormat.setOptimizeInFlight(job, false);
-    }
-    if (cmd.hasOption("l")) {
-      BlurOutputFormat.setIndexLocally(job, false);
-    }
-    if (cmd.hasOption("b")) {
-      int maxDocumentBufferSize = Integer.parseInt(cmd.getOptionValue("b"));
-      BlurOutputFormat.setMaxDocumentBufferSize(job, maxDocumentBufferSize);
-    }
-    if (cmd.hasOption("r")) {
-      int reducerMultiplier = Integer.parseInt(cmd.getOptionValue("r"));
-      BlurOutputFormat.setReducerMultiplier(job, reducerMultiplier);
-    }
-    // processing the 'd' option
-    Option[] options = cmd.getOptions();
-    for (Option option : options) {
-      if (option.getOpt().equals("d")) {
-        String[] values = option.getValues();
-        if (values.length < 2) {
-          System.err.println("'d' parameter missing minimum args of (family columname*)");
-          return null;
-        }
-        CsvBlurMapper.addColumns(job, values[0], getSubArray(values, 1));
-      }
-    }
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    BlurMapReduceUtil.addDependencyJars(job.getConfiguration(), Splitter.class);
-    return job;
-  }
-
-  private static String[] getSubArray(String[] array, int starting) {
-    String[] result = new String[array.length - starting];
-    System.arraycopy(array, starting, result, 0, result.length);
-    return result;
-  }
-
-  private static Set<Path> recurisvelyGetPathesContainingFiles(Path path, Configuration configuration)
-      throws IOException {
-    Set<Path> pathSet = new HashSet<Path>();
-    FileSystem fileSystem = path.getFileSystem(configuration);
-    FileStatus[] listStatus = fileSystem.listStatus(path);
-    for (FileStatus status : listStatus) {
-      if (status.isDir()) {
-        pathSet.addAll(recurisvelyGetPathesContainingFiles(status.getPath(), configuration));
-      } else {
-        pathSet.add(status.getPath().getParent());
-      }
-    }
-    return pathSet;
-  }
-
-  private static CommandLine parse(String... otherArgs) throws ParseException {
-    Options options = new Options();
-    options.addOption(OptionBuilder.withArgName("controller*").hasArgs().isRequired(true)
-        .withDescription("* Thrift controller connection string. (host1:40010 host2:40010 ...)").create("c"));
-    options.addOption(OptionBuilder.withArgName("tablename").hasArg().isRequired(true)
-        .withDescription("* Blur table name.").create("t"));
-    options.addOption(OptionBuilder.withArgName("family column*").hasArgs().isRequired(true)
-        .withDescription("* Define the mapping of fields in the CSV file to column names. (family col1 col2 col3 ...)")
-        .create("d"));
-    options.addOption(OptionBuilder
-        .withArgName("delimiter")
-        .hasArg()
-        .withDescription(
-            "The file delimiter to be used. (default value ',')  NOTE: For special "
-                + "charactors like the default hadoop separator of ASCII value 1, you can use standard "
-                + "java escaping (\\u0001)").create("s"));
-    options.addOption(OptionBuilder.withArgName("path*").hasArg()
-        .withDescription("The directory to index, the family name is assumed to BE present in the file contents. (hdfs://namenode/input/in1)").create("i"));
-    options.addOption(OptionBuilder.withArgName("family path*").hasArgs()
-        .withDescription("The directory to index with a family name, the family name is assumed to NOT be present in the file contents. (family hdfs://namenode/input/in1)").create("I"));
-    options
-        .addOption(OptionBuilder
-            .withArgName("auto generate record ids")
-            .withDescription(
-                "No Record Ids - Automatically generate record ids for each record based on a MD5 has of the data within the record.")
-            .create("a"));
-    options
-        .addOption(OptionBuilder
-            .withArgName("auto generate row ids")
-            .withDescription(
-                "No Row Ids - Automatically generate row ids for each record based on a MD5 has of the data within the record.")
-            .create("A"));
-    options.addOption(OptionBuilder.withArgName("disable optimize indexes during copy")
-        .withDescription("Disable optimize indexes during copy, this has very little overhead. (enabled by default)")
-        .create("o"));
-    options.addOption(OptionBuilder
-        .withArgName("disable index locally")
-        .withDescription(
-            "Disable the use storage local on the server that is running the reducing "
-                + "task and copy to Blur table once complete. (enabled by default)").create("l"));
-    options.addOption(OptionBuilder.withArgName("sequence files inputs")
-        .withDescription("The input files are sequence files.").create("S"));
-    options.addOption(OptionBuilder
-        .withArgName("size")
-        .hasArg()
-        .withDescription(
-            "The maximum number of Lucene documents to buffer in the reducer for a single "
-                + "row before spilling over to disk. (default 1000)").create("b"));
-    options.addOption(OptionBuilder
-        .withArgName("multiplier")
-        .hasArg()
-        .withDescription(
-            "The reducer multipler allows for an increase in the number of reducers per "
-                + "shard in the given table.  For example if the table has 128 shards and the "
-                + "reducer multiplier is 4 the total number of reducers will be 512, 4 reducers "
-                + "per shard. (default 1)").create("r"));
-    options.addOption(OptionBuilder
-        .withArgName("minimum maximum")
-        .hasArgs(2)
-        .withDescription(
-            "Enables a combine file input to help deal with many small files as the input. Provide "
-                + "the minimum and maximum size per mapper.  For a minimum of 1GB and a maximum of "
-                + "2.5GB: (1000000000 2500000000)").create("C"));
-    options.addOption(OptionBuilder
-        .withArgName("codec")
-        .hasArgs(1)
-        .withDescription(
-            "Sets the compression codec for the map compress output setting. (SNAPPY,GZIP,BZIP,DEFAULT, or classname)")
-        .create("p"));
-
-    CommandLineParser parser = new PosixParser();
-    CommandLine cmd = null;
-    try {
-      cmd = parser.parse(options, otherArgs);
-    } catch (ParseException e) {
-      System.err.println(e.getMessage());
-      HelpFormatter formatter = new HelpFormatter();
-      PrintWriter pw = new PrintWriter(System.err, true);
-      formatter.printHelp(pw, DEFAULT_WIDTH, CSVLOADER, HEADER, options, HelpFormatter.DEFAULT_LEFT_PAD,
-          HelpFormatter.DEFAULT_DESC_PAD, null, false);
-      return null;
-    }
-
-    if (!(cmd.hasOption("I") || cmd.hasOption("i"))) {
-      System.err.println("Missing input directory, see options 'i' and 'I'.");
-      HelpFormatter formatter = new HelpFormatter();
-      PrintWriter pw = new PrintWriter(System.err, true);
-      formatter.printHelp(pw, DEFAULT_WIDTH, CSVLOADER, HEADER, options, HelpFormatter.DEFAULT_LEFT_PAD,
-          HelpFormatter.DEFAULT_DESC_PAD, null, false);
-      return null;
-    }
-    return cmd;
-  }
-
-  public static class CsvBlurCombineSequenceFileInputFormat extends CombineFileInputFormat<Writable, Text> {
-
-    
-    private static class SequenceFileRecordReaderWrapper extends RecordReader<Writable, Text>{
-    	
-    	private final RecordReader<Writable,Text> delegate;
-    	private final FileSplit fileSplit;
-
-		@SuppressWarnings("unused")
-		public SequenceFileRecordReaderWrapper(CombineFileSplit split,
-            TaskAttemptContext context, Integer index) throws IOException{
-            fileSplit = new FileSplit(split.getPath(index),
-                      split.getOffset(index), split.getLength(index),
-                      split.getLocations());
-            delegate = new SequenceFileInputFormat<Writable,Text>().createRecordReader(fileSplit, context);
-        }
-
-        @Override public float getProgress() throws IOException, InterruptedException {
-            return delegate.getProgress();
-        }
-
-		@Override
-		public Writable getCurrentKey() throws IOException,
-				InterruptedException {
-			return delegate.getCurrentKey();
-		}
-
-		@Override
-		public Text getCurrentValue() throws IOException, InterruptedException {
-			return delegate.getCurrentValue();
-		}
-
-		@Override
-		public void initialize(InputSplit arg0, TaskAttemptContext context)
-				throws IOException, InterruptedException {
-			delegate.initialize(fileSplit, context);
-		}
-
-		@Override
-		public boolean nextKeyValue() throws IOException, InterruptedException {
-			return delegate.nextKeyValue();
-		}
-		
-		@Override public void close() throws IOException {
-            delegate.close();
-		}
-
-    }
-    	
-    @Override
-	public RecordReader<Writable, Text> createRecordReader(
-			InputSplit split, TaskAttemptContext context) throws IOException {
-		return new CombineFileRecordReader<Writable, Text>((CombineFileSplit) split, context, SequenceFileRecordReaderWrapper.class);
-	}
-  }
-
-}


Mime
View raw message