incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [02/11] Blur MR projects restructured.
Date Thu, 01 May 2014 20:49:01 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
deleted file mode 100644
index 8f59e31..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
+++ /dev/null
@@ -1,487 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigInteger;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-
-import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-import com.google.common.base.Splitter;
-
-/**
- * This will parse a standard csv file into a {@link BlurMutate} object. Use the
- * static addColumns, and setSeparator methods to configure the class.
- */
-public class CsvBlurMapper extends BaseBlurMapper<Writable, Text> {
-
-  public static final String UTF_8 = "UTF-8";
-  public static final String BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA = "blur.csv.auto.generate.record.id.as.hash.of.data";
-  public static final String BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA = "blur.csv.auto.generate.row.id.as.hash.of.data";
-  public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES = "blur.csv.family.path.mappings.families";
-  public static final String BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX = "blur.csv.family.path.mappings.family.";
-  public static final String BLUR_CSV_SEPARATOR_BASE64 = "blur.csv.separator.base64";
-  public static final String BLUR_CSV_FAMILY_COLUMN_PREFIX = "blur.csv.family.";
-  public static final String BLUR_CSV_FAMILIES = "blur.csv.families";
-  public static final String HIVE_NULL = "\\N";
-
-  protected Map<String, List<String>> _columnNameMap;
-  protected String _separator = Base64.encodeBase64String(",".getBytes());
-  protected Splitter _splitter;
-  protected boolean _familyNotInFile;
-  protected String _familyFromPath;
-  protected boolean _autoGenerateRecordIdAsHashOfData;
-  protected MessageDigest _digest;
-  protected boolean _autoGenerateRowIdAsHashOfData;
-
-  /**
-   * Add a mapping for a family to a path. This is to be used when an entire
-   * path is to be processed as a single family and the data itself does not
-   * contain the family.<br/>
-   * <br/>
-   * 
-   * NOTE: the familyNotInFile property must be set before this method can be
-   * called.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param family
-   *          the family.
-   * @param path
-   *          the path.
-   */
-  public static void addFamilyPath(Job job, String family, Path path) {
-    addFamilyPath(job.getConfiguration(), family, path);
-  }
-
-  /**
-   * Add a mapping for a family to a path. This is to be used when an entire
-   * path is to be processed as a single family and the data itself does not
-   * contain the family.<br/>
-   * <br/>
-   * 
-   * NOTE: the familyNotInFile property must be set before this method can be
-   * called.
-   * 
-   * @param configuration
-   *          the configuration to setup.
-   * @param family
-   *          the family.
-   * @param path
-   *          the path.
-   */
-  public static void addFamilyPath(Configuration configuration, String family, Path path) {
-    append(configuration, BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES, family);
-    append(configuration, BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family, path.toString());
-  }
-
-  protected static void append(Configuration configuration, String name, String value) {
-    Collection<String> set = configuration.getStringCollection(name);
-    if (set == null) {
-      set = new TreeSet<String>();
-    }
-    set.add(value);
-    configuration.setStrings(name, set.toArray(new String[set.size()]));
-  }
-
-  /**
-   * If set to true the record id will be automatically generated as a hash of
-   * the data that the record contains.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param autoGenerateRecordIdAsHashOfData
-   *          boolean.
-   */
-  public static void setAutoGenerateRecordIdAsHashOfData(Job job, boolean autoGenerateRecordIdAsHashOfData) {
-    setAutoGenerateRecordIdAsHashOfData(job.getConfiguration(), autoGenerateRecordIdAsHashOfData);
-  }
-
-  /**
-   * If set to true the record id will be automatically generated as a hash of
-   * the data that the record contains.
-   * 
-   * @param configuration
-   *          the configuration to setup.
-   * @param autoGenerateRecordIdAsHashOfData
-   *          boolean.
-   */
-  public static void setAutoGenerateRecordIdAsHashOfData(Configuration configuration,
-      boolean autoGenerateRecordIdAsHashOfData) {
-    configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, autoGenerateRecordIdAsHashOfData);
-  }
-
-  /**
-   * Gets whether or not to generate a recordid for the record based on the
-   * data.
-   * 
-   * @param configuration
-   *          the configuration.
-   * @return boolean.
-   */
-  public static boolean isAutoGenerateRecordIdAsHashOfData(Configuration configuration) {
-    return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_RECORD_ID_AS_HASH_OF_DATA, false);
-  }
-
-  /**
-   * If set to true the record id will be automatically generated as a hash of
-   * the data that the record contains.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param autoGenerateRecordIdAsHashOfData
-   *          boolean.
-   */
-  public static void setAutoGenerateRowIdAsHashOfData(Job job, boolean autoGenerateRowIdAsHashOfData) {
-    setAutoGenerateRowIdAsHashOfData(job.getConfiguration(), autoGenerateRowIdAsHashOfData);
-  }
-
-  /**
-   * If set to true the record id will be automatically generated as a hash of
-   * the data that the record contains.
-   * 
-   * @param configuration
-   *          the configuration to setup.
-   * @param autoGenerateRecordIdAsHashOfData
-   *          boolean.
-   */
-  public static void setAutoGenerateRowIdAsHashOfData(Configuration configuration, boolean autoGenerateRowIdAsHashOfData) {
-    configuration.setBoolean(BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA, autoGenerateRowIdAsHashOfData);
-  }
-
-  /**
-   * Gets whether or not to generate a recordid for the record based on the
-   * data.
-   * 
-   * @param configuration
-   *          the configuration.
-   * @return boolean.
-   */
-  public static boolean isAutoGenerateRowIdAsHashOfData(Configuration configuration) {
-    return configuration.getBoolean(BLUR_CSV_AUTO_GENERATE_ROW_ID_AS_HASH_OF_DATA, false);
-  }
-
-  /**
-   * Sets all the family and column definitions.
-   * 
-   * @param job
-   *          the job to setup.
-   * @param strDefinition
-   *          the string definition. <br/>
-   * <br/>
-   *          Example:<br/>
-   *          "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
-   *          Where "cf1" is a family name that contains columns "col1", "col2"
-   *          and "col3" and a second family of "cf2" with columns "col1",
-   *          "col2", and "col3".
-   */
-  public static void setColumns(Job job, String strDefinition) {
-    setColumns(job.getConfiguration(), strDefinition);
-  }
-
-  /**
-   * Sets all the family and column definitions.
-   * 
-   * @param configuration
-   *          the configuration to setup.
-   * @param strDefinition
-   *          the string definition. <br/>
-   * <br/>
-   *          Example:<br/>
-   *          "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
-   *          Where "cf1" is a family name that contains columns "col1", "col2"
-   *          and "col3" and a second family of "cf2" with columns "col1",
-   *          "col2", and "col3".
-   */
-  public static void setColumns(Configuration configuration, String strDefinition) {
-    Iterable<String> familyDefs = Splitter.on('|').split(strDefinition);
-    for (String familyDef : familyDefs) {
-      int indexOf = familyDef.indexOf(':');
-      if (indexOf < 0) {
-        throwMalformedDefinition(strDefinition);
-      }
-      String family = familyDef.substring(0, indexOf);
-      Iterable<String> cols = Splitter.on(',').split(familyDef.substring(indexOf + 1));
-      List<String> colnames = new ArrayList<String>();
-      for (String columnName : cols) {
-        colnames.add(columnName);
-      }
-      if (family.trim().isEmpty() || colnames.isEmpty()) {
-        throwMalformedDefinition(strDefinition);
-      }
-      addColumns(configuration, family, colnames.toArray(new String[colnames.size()]));
-    }
-  }
-
-  protected static void throwMalformedDefinition(String strDefinition) {
-    throw new RuntimeException("Family and column definition string not valid [" + strDefinition
-        + "] should look like \"family1:colname1,colname2|family2:colname1,colname2,colname3\"");
-  }
-
-  /**
-   * Adds the column layout for the given family.
-   * 
-   * @param job
-   *          the job to apply the layout.
-   * @param family
-   *          the family name.
-   * @param columns
-   *          the column names.
-   */
-  public static void addColumns(Job job, String family, String... columns) {
-    addColumns(job.getConfiguration(), family, columns);
-  }
-
-  /**
-   * Adds the column layout for the given family.
-   * 
-   * @param configuration
-   *          the configuration to apply the layout.
-   * @param family
-   *          the family name.
-   * @param columns
-   *          the column names.
-   */
-  public static void addColumns(Configuration configuration, String family, String... columns) {
-    Collection<String> families = new TreeSet<String>(configuration.getStringCollection(BLUR_CSV_FAMILIES));
-    families.add(family);
-    configuration.setStrings(BLUR_CSV_FAMILIES, families.toArray(new String[] {}));
-    configuration.setStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family, columns);
-  }
-
-  public static Collection<String> getFamilyNames(Configuration configuration) {
-    return configuration.getStringCollection(BLUR_CSV_FAMILIES);
-  }
-
-  public static Map<String, List<String>> getFamilyAndColumnNameMap(Configuration configuration) {
-    Map<String, List<String>> columnNameMap = new HashMap<String, List<String>>();
-    for (String family : getFamilyNames(configuration)) {
-      String[] columnsNames = configuration.getStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family);
-      columnNameMap.put(family, Arrays.asList(columnsNames));
-    }
-    return columnNameMap;
-  }
-
-  /**
-   * Sets the separator of the file, by default it is ",".
-   * 
-   * @param job
-   *          the job to apply the separator change.
-   * @param separator
-   *          the separator.
-   */
-  public static void setSeparator(Job job, String separator) {
-    setSeparator(job.getConfiguration(), separator);
-  }
-
-  /**
-   * Sets the separator of the file, by default it is ",".
-   * 
-   * @param configuration
-   *          the configuration to apply the separator change.
-   * @param separator
-   *          the separator.
-   */
-  public static void setSeparator(Configuration configuration, String separator) {
-    try {
-      configuration.set(BLUR_CSV_SEPARATOR_BASE64, Base64.encodeBase64String(separator.getBytes(UTF_8)));
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    super.setup(context);
-    Configuration configuration = context.getConfiguration();
-    _autoGenerateRecordIdAsHashOfData = isAutoGenerateRecordIdAsHashOfData(configuration);
-    _autoGenerateRowIdAsHashOfData = isAutoGenerateRowIdAsHashOfData(configuration);
-    if (_autoGenerateRecordIdAsHashOfData || _autoGenerateRowIdAsHashOfData) {
-      try {
-        _digest = MessageDigest.getInstance("MD5");
-      } catch (NoSuchAlgorithmException e) {
-        throw new IOException(e);
-      }
-    }
-    _columnNameMap = getFamilyAndColumnNameMap(configuration);
-    _separator = new String(Base64.decodeBase64(configuration.get(BLUR_CSV_SEPARATOR_BASE64, _separator)), UTF_8);
-    _splitter = Splitter.on(_separator);
-    Path fileCurrentlyProcessing = getCurrentFile(context);
-    Collection<String> families = configuration.getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILIES);
-    OUTER: for (String family : families) {
-      Collection<String> pathStrCollection = configuration
-          .getStringCollection(BLUR_CSV_FAMILY_PATH_MAPPINGS_FAMILY_PREFIX + family);
-      for (String pathStr : pathStrCollection) {
-        Path path = new Path(pathStr);
-        path = path.makeQualified(path.getFileSystem(configuration));
-        if (isParent(path, fileCurrentlyProcessing)) {
-          _familyFromPath = family;
-          _familyNotInFile = true;
-          break OUTER;
-        }
-      }
-    }
-  }
-
-  protected boolean isParent(Path possibleParent, Path child) {
-    if (child == null) {
-      return false;
-    }
-    if (possibleParent.equals(child.getParent())) {
-      return true;
-    }
-    return isParent(possibleParent, child.getParent());
-  }
-
-  protected Path getCurrentFile(Context context) throws IOException {
-    InputSplit split = context.getInputSplit();
-    if (split != null && split instanceof FileSplit) {
-      FileSplit inputSplit = (FileSplit) split;
-      Path path = inputSplit.getPath();
-      return path.makeQualified(path.getFileSystem(context.getConfiguration()));
-    }
-    return null;
-  }
-
-  @Override
-  protected void map(Writable k, Text value, Context context) throws IOException, InterruptedException {
-    BlurRecord record = _mutate.getRecord();
-    record.clearColumns();
-    String str = value.toString();
-
-    Iterable<String> split = _splitter.split(str);
-    List<String> list = toList(split);
-
-    int offset = 0;
-    boolean gen = false;
-    if (!_autoGenerateRowIdAsHashOfData) {
-      record.setRowId(list.get(offset++));
-    } else {
-      _digest.reset();
-      byte[] bs = value.getBytes();
-      int length = value.getLength();
-      _digest.update(bs, 0, length);
-      record.setRowId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
-      gen = true;
-    }
-
-    if (!_autoGenerateRecordIdAsHashOfData) {
-      record.setRecordId(list.get(offset++));
-    } else {
-      if (gen) {
-        record.setRecordId(record.getRowId());
-      } else {
-        _digest.reset();
-        byte[] bs = value.getBytes();
-        int length = value.getLength();
-        _digest.update(bs, 0, length);
-        record.setRecordId(new BigInteger(_digest.digest()).toString(Character.MAX_RADIX));
-      }
-    }
-    String family;
-    if (_familyNotInFile) {
-      family = _familyFromPath;
-    } else {
-      family = list.get(offset++);
-    }
-    record.setFamily(family);
-
-    List<String> columnNames = _columnNameMap.get(family);
-    if (columnNames == null) {
-      throw new IOException("Family [" + family + "] is missing in the definition.");
-    }
-    if (list.size() - offset != columnNames.size()) {
-
-      String options = "";
-
-      if (!_autoGenerateRowIdAsHashOfData) {
-        options += "rowid,";
-      }
-      if (!_autoGenerateRecordIdAsHashOfData) {
-        options += "recordid,";
-      }
-      if (!_familyNotInFile) {
-        options += "family,";
-      }
-      String msg = "Record [" + str + "] does not match defined record [" + options + getColumnNames(columnNames)
-          + "].";
-      throw new IOException(msg);
-    }
-
-    for (int i = 0; i < columnNames.size(); i++) {
-      String val = handleHiveNulls(list.get(i + offset));
-      if (val != null) {
-        record.addColumn(columnNames.get(i), val);
-        _columnCounter.increment(1);
-      }
-    }
-    _key.set(record.getRowId());
-    _mutate.setMutateType(MUTATE_TYPE.REPLACE);
-    context.write(_key, _mutate);
-    _recordCounter.increment(1);
-    context.progress();
-  }
-
-  protected String handleHiveNulls(String value) {
-    if (value.equals(HIVE_NULL)) {
-      return null;
-    }
-    return value;
-  }
-
-  public void setFamilyFromPath(String familyFromPath) {
-    this._familyFromPath = familyFromPath;
-  }
-
-  protected String getColumnNames(List<String> columnNames) {
-    StringBuilder builder = new StringBuilder();
-    for (String c : columnNames) {
-      if (builder.length() != 0) {
-        builder.append(',');
-      }
-      builder.append(c);
-    }
-    return builder.toString();
-  }
-
-  protected List<String> toList(Iterable<String> split) {
-    List<String> lst = new ArrayList<String>();
-    for (String s : split) {
-      lst.add(s);
-    }
-    return lst;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
deleted file mode 100644
index 99f2ac5..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- * This class is to be used in conjunction with {@link BlurOutputFormat}
- * .</br></br>
- * 
- * Here is a basic example of how to use both the {@link BlurOutputFormat} and
- * the {@link DefaultBlurReducer} together to build indexes.</br></br>
- * 
- * Once this job has successfully completed the indexes will be imported by the
- * running shard servers and be placed online. This is a polling mechicism in
- * the shard servers and by default they poll every 10 seconds.
- * 
- * 
- * </br></br>
- * 
- * Job job = new Job(conf, "blur index");</br>
- * job.setJarByClass(BlurOutputFormatTest.class);</br>
- * job.setMapperClass(CsvBlurMapper.class);</br>
- * job.setReducerClass(DefaultBlurReducer.class);</br>
- * job.setNumReduceTasks(1);</br>
- * job.setInputFormatClass(TrackingTextInputFormat.class);</br>
- * job.setOutputKeyClass(Text.class);
- * </br>job.setOutputValueClass(BlurMutate.class);</br>
- * job.setOutputFormatClass(BlurOutputFormat.class);</br> </br>
- * FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));</br>
- * CsvBlurMapper.addColumns(job, "cf1", "col");</br> </br> TableDescriptor
- * tableDescriptor = new TableDescriptor();</br>
- * tableDescriptor.setShardCount(1)
- * ;</br>tableDescriptor.setAnalyzerDefinition(new
- * AnalyzerDefinition());</br>tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR
- * + "/out").toString());</br>BlurOutputFormat.setTableDescriptor(job,
- * tableDescriptor);</br>
- * 
- * 
- */
-public class DefaultBlurReducer extends Reducer<Writable, BlurMutate, Writable, BlurMutate> {
-
-  @Override
-  protected void setup(final Context context) throws IOException, InterruptedException {
-    BlurOutputFormat.setProgressable(context);
-    BlurOutputFormat.setGetCounter(new GetCounter() {
-      @Override
-      public Counter getCounter(Enum<?> counterName) {
-        return context.getCounter(counterName);
-      }
-    });
-  }
-
-  @Override
-  protected void reduce(Writable key, Iterable<BlurMutate> values, Context context) throws IOException,
-      InterruptedException {
-    Text textKey = getTextKey(key);
-    for (BlurMutate value : values) {
-      context.write(textKey, value);
-    }
-  }
-
-  protected Text getTextKey(Writable key) {
-    if (key instanceof Text) {
-      return (Text) key;
-    }
-    throw new IllegalArgumentException("Key is not of type Text, you will need to "
-        + "override DefaultBlurReducer and implement \"getTextKey\" method.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
deleted file mode 100644
index e138cd5..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GenericBlurRecordWriter.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.blur.analysis.FieldManager;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.lucene.LuceneVersionConstant;
-import org.apache.blur.lucene.codec.Blur022Codec;
-import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.blur.utils.RowDocumentUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.util.Progressable;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.NoMergePolicy;
-import org.apache.lucene.index.TieredMergePolicy;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.NoLockFactory;
-
-public class GenericBlurRecordWriter {
-
-  private static final Log LOG = LogFactory.getLog(GenericBlurRecordWriter.class);
-  private static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
-
-  private final Text _prevKey = new Text();
-  private final Map<String, List<Field>> _documents = new TreeMap<String, List<Field>>();
-  private final IndexWriter _writer;
-  private final FieldManager _fieldManager;
-  private final Directory _finalDir;
-  private final Directory _localDir;
-  private final File _localPath;
-  private final int _maxDocumentBufferSize;
-  private final IndexWriterConfig _conf;
-  private final IndexWriterConfig _overFlowConf;
-  private final Path _newIndex;
-  private final boolean _indexLocally;
-  private final boolean _optimizeInFlight;
-  private Counter _columnCount;
-  private Counter _fieldCount;
-  private Counter _recordCount;
-  private Counter _rowCount;
-  private Counter _recordDuplicateCount;
-  private Counter _rowOverFlowCount;
-  private Counter _rowDeleteCount;
-  private RateCounter _recordRateCounter;
-  private RateCounter _rowRateCounter;
-  private RateCounter _copyRateCounter;
-  private boolean _countersSetup = false;
-  private IndexWriter _localTmpWriter;
-  private boolean _usingLocalTmpindex;
-  private File _localTmpPath;
-  private ProgressableDirectory _localTmpDir;
-  private String _deletedRowId;
-
-  public GenericBlurRecordWriter(Configuration configuration, int attemptId, String tmpDirName) throws IOException {
-
-    _indexLocally = BlurOutputFormat.isIndexLocally(configuration);
-    _optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(configuration);
-
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    int shardCount = tableDescriptor.getShardCount();
-    int shardId = attemptId % shardCount;
-
-    _maxDocumentBufferSize = BlurOutputFormat.getMaxDocumentBufferSize(configuration);
-    Path tableOutput = BlurOutputFormat.getOutputPath(configuration);
-    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
-    Path indexPath = new Path(tableOutput, shardName);
-    _newIndex = new Path(indexPath, tmpDirName);
-    _finalDir = new ProgressableDirectory(new HdfsDirectory(configuration, _newIndex), getProgressable());
-    _finalDir.setLockFactory(NoLockFactory.getNoLockFactory());
-
-    TableContext tableContext = TableContext.create(tableDescriptor);
-    _fieldManager = tableContext.getFieldManager();
-    Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
-
-    _conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION, analyzer);
-    _conf.setCodec(new Blur022Codec());
-    _conf.setSimilarity(tableContext.getSimilarity());
-    TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
-    mergePolicy.setUseCompoundFile(false);
-
-    _overFlowConf = _conf.clone();
-    _overFlowConf.setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES);
-
-    if (_indexLocally) {
-      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
-      _localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
-      _localDir = new ProgressableDirectory(FSDirectory.open(_localPath), getProgressable());
-      _writer = new IndexWriter(_localDir, _conf.clone());
-    } else {
-      _localPath = null;
-      _localDir = null;
-      _writer = new IndexWriter(_finalDir, _conf.clone());
-    }
-  }
-
-  private Progressable getProgressable() {
-    return new Progressable() {
-      @Override
-      public void progress() {
-        Progressable progressable = BlurOutputFormat.getProgressable();
-        if (progressable != null) {
-          progressable.progress();
-        }
-      }
-    };
-  }
-
-  public void write(Text key, BlurMutate value) throws IOException {
-    if (!_countersSetup) {
-      setupCounter();
-      _countersSetup = true;
-    }
-    if (!_prevKey.equals(key)) {
-      flush();
-      _prevKey.set(key);
-    }
-    add(value);
-  }
-
-  private void setupCounter() {
-    GetCounter getCounter = BlurOutputFormat.getGetCounter();
-    _fieldCount = getCounter.getCounter(BlurCounters.LUCENE_FIELD_COUNT);
-    _columnCount = getCounter.getCounter(BlurCounters.COLUMN_COUNT);
-    _recordCount = getCounter.getCounter(BlurCounters.RECORD_COUNT);
-    _recordDuplicateCount = getCounter.getCounter(BlurCounters.RECORD_DUPLICATE_COUNT);
-    _rowCount = getCounter.getCounter(BlurCounters.ROW_COUNT);
-    _rowDeleteCount = getCounter.getCounter(BlurCounters.ROW_DELETE_COUNT);
-    _rowOverFlowCount = getCounter.getCounter(BlurCounters.ROW_OVERFLOW_COUNT);
-    _recordRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.RECORD_RATE));
-    _rowRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.ROW_RATE));
-    _copyRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.COPY_RATE));
-  }
-
-  private void add(BlurMutate value) throws IOException {
-    BlurRecord blurRecord = value.getRecord();
-    Record record = getRecord(blurRecord);
-    String recordId = record.getRecordId();
-    if (value.getMutateType() == MUTATE_TYPE.DELETE) {
-      _deletedRowId = blurRecord.getRowId();
-      return;
-    }
-    if (_countersSetup) {
-      _columnCount.increment(record.getColumns().size());
-    }
-    List<Field> document = RowDocumentUtil.getDoc(_fieldManager, blurRecord.getRowId(), record);
-    List<Field> dup = _documents.put(recordId, document);
-    if (_countersSetup) {
-      if (dup != null) {
-        _recordDuplicateCount.increment(1);
-      } else {
-        _fieldCount.increment(document.size());
-        _recordCount.increment(1);
-      }
-    }
-    flushToTmpIndexIfNeeded();
-  }
-
-  private void flushToTmpIndexIfNeeded() throws IOException {
-    if (_documents.size() > _maxDocumentBufferSize) {
-      flushToTmpIndex();
-    }
-  }
-
-  private void flushToTmpIndex() throws IOException {
-    if (_documents.isEmpty()) {
-      return;
-    }
-    _usingLocalTmpindex = true;
-    if (_localTmpWriter == null) {
-      String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
-      _localTmpPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
-      _localTmpDir = new ProgressableDirectory(FSDirectory.open(_localTmpPath), BlurOutputFormat.getProgressable());
-      _localTmpWriter = new IndexWriter(_localTmpDir, _overFlowConf.clone());
-      // The local tmp writer has merging disabled so the first document in is
-      // going to be doc 0.
-      // Therefore the first document added is the prime doc
-      List<List<Field>> docs = new ArrayList<List<Field>>(_documents.values());
-      docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
-      _localTmpWriter.addDocuments(docs);
-    } else {
-      _localTmpWriter.addDocuments(_documents.values());
-    }
-    _documents.clear();
-  }
-
-  private void resetLocalTmp() {
-    _usingLocalTmpindex = false;
-    _localTmpWriter = null;
-    _localTmpDir = null;
-    rm(_localTmpPath);
-    _localTmpPath = null;
-  }
-
-  private Record getRecord(BlurRecord value) {
-    Record record = new Record();
-    record.setRecordId(value.getRecordId());
-    record.setFamily(value.getFamily());
-    for (BlurColumn col : value.getColumns()) {
-      record.addToColumns(new Column(col.getName(), col.getValue()));
-    }
-    return record;
-  }
-
-  private void flush() throws CorruptIndexException, IOException {
-    if (_usingLocalTmpindex) {
-      // since we have flushed to disk then we do not need to index the
-      // delete.
-      flushToTmpIndex();
-      _localTmpWriter.close(false);
-      DirectoryReader reader = DirectoryReader.open(_localTmpDir);
-      if (_countersSetup) {
-        _recordRateCounter.mark(reader.numDocs());
-      }
-      _writer.addIndexes(reader);
-      reader.close();
-      resetLocalTmp();
-      if (_countersSetup) {
-        _rowOverFlowCount.increment(1);
-      }
-    } else {
-      if (_documents.isEmpty()) {
-        if (_deletedRowId != null) {
-          _writer.addDocument(getDeleteDoc());
-          if (_countersSetup) {
-            _rowDeleteCount.increment(1);
-          }
-        }
-      } else {
-        List<List<Field>> docs = new ArrayList<List<Field>>(_documents.values());
-        docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
-        _writer.addDocuments(docs);
-        if (_countersSetup) {
-          _recordRateCounter.mark(_documents.size());
-        }
-        _documents.clear();
-      }
-    }
-    _deletedRowId = null;
-    if (_countersSetup) {
-      _rowRateCounter.mark();
-      _rowCount.increment(1);
-    }
-  }
-
-  private Document getDeleteDoc() {
-    Document document = new Document();
-    document.add(new StringField(BlurConstants.ROW_ID, _deletedRowId, Store.NO));
-    document.add(new StringField(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE, Store.NO));
-    return document;
-  }
-
-  public void close() throws IOException {
-    flush();
-    _writer.close();
-    if (_countersSetup) {
-      _recordRateCounter.close();
-      _rowRateCounter.close();
-    }
-    if (_indexLocally) {
-      if (_optimizeInFlight) {
-        copyAndOptimizeInFlightDir();
-      } else {
-        copyDir();
-      }
-    }
-    if (_countersSetup) {
-      _copyRateCounter.close();
-    }
-  }
-
-  private void copyAndOptimizeInFlightDir() throws IOException {
-    CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
-    copyRateDirectory.setLockFactory(NoLockFactory.getNoLockFactory());
-    DirectoryReader reader = DirectoryReader.open(_localDir);
-    IndexWriter writer = new IndexWriter(copyRateDirectory, _conf.clone());
-    writer.addIndexes(reader);
-    writer.close();
-    rm(_localPath);
-  }
-
-  private void copyDir() throws IOException {
-    CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
-    String[] fileNames = _localDir.listAll();
-    for (String fileName : fileNames) {
-      LOG.info("Copying [{0}] to [{1}]", fileName, _newIndex);
-      _localDir.copy(copyRateDirectory, fileName, fileName, IOContext.DEFAULT);
-    }
-    rm(_localPath);
-  }
-
-  private void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
deleted file mode 100644
index 6814ac2..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/GetCounter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-import org.apache.hadoop.mapreduce.Counter;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * 
- */
-public interface GetCounter {
-  
-  Counter getCounter(Enum<?> counterName);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
deleted file mode 100644
index 46c030f..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class IOUtil {
-
-  public static final String UTF_8 = "UTF-8";
-
-  public static String readString(DataInput input) throws IOException {
-    int length = readVInt(input);
-    byte[] buffer = new byte[length];
-    input.readFully(buffer);
-    return new String(buffer, UTF_8);
-  }
-
-  public static void writeString(DataOutput output, String s) throws IOException {
-    byte[] bs = s.getBytes(UTF_8);
-    writeVInt(output, bs.length);
-    output.write(bs);
-  }
-
-  public static int readVInt(DataInput input) throws IOException {
-    byte b = input.readByte();
-    int i = b & 0x7F;
-    for (int shift = 7; (b & 0x80) != 0; shift += 7) {
-      b = input.readByte();
-      i |= (b & 0x7F) << shift;
-    }
-    return i;
-  }
-
-  public static void writeVInt(DataOutput output, int i) throws IOException {
-    while ((i & ~0x7F) != 0) {
-      output.writeByte((byte) ((i & 0x7f) | 0x80));
-      i >>>= 7;
-    }
-    output.writeByte((byte) i);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
deleted file mode 100644
index 004e1fa..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
+++ /dev/null
@@ -1,289 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.hadoop.util.Progressable;
-import org.apache.lucene.store.BufferedIndexInput;
-import org.apache.lucene.store.DataInput;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockFactory;
-
-/**
- * The {@link ProgressableDirectory} allows for progress to be recorded while
- * Lucene is blocked and merging. This prevents the Task from being killed after
- * not reporting progress because of the blocked merge.
- */
-public class ProgressableDirectory extends Directory {
-
-  private static final Log LOG = LogFactory.getLog(ProgressableDirectory.class);
-
-  private final Directory _directory;
-  private final Progressable _progressable;
-
-  public ProgressableDirectory(Directory directory, Progressable progressable) {
-    _directory = directory;
-    if (progressable == null) {
-      LOG.warn("Progressable is null.");
-      _progressable = new Progressable() {
-        @Override
-        public void progress() {
-
-        }
-      };
-    } else {
-      _progressable = progressable;
-    }
-  }
-
-  @Override
-  public void clearLock(String name) throws IOException {
-    _directory.clearLock(name);
-  }
-
-  @Override
-  public void close() throws IOException {
-    _directory.close();
-  }
-
-  private IndexInput wrapInput(String name, IndexInput openInput) {
-    return new ProgressableIndexInput(name, openInput, 16384, _progressable);
-  }
-
-  private IndexOutput wrapOutput(IndexOutput createOutput) {
-    return new ProgressableIndexOutput(createOutput, _progressable);
-  }
-
-  public IndexOutput createOutput(String name, IOContext context) throws IOException {
-    return wrapOutput(_directory.createOutput(name, context));
-  }
-
-  @Override
-  public void deleteFile(String name) throws IOException {
-    _directory.deleteFile(name);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    return _directory.equals(obj);
-  }
-
-  @Override
-  public boolean fileExists(String name) throws IOException {
-    return _directory.fileExists(name);
-  }
-
-  @Override
-  public long fileLength(String name) throws IOException {
-    return _directory.fileLength(name);
-  }
-
-  @Override
-  public LockFactory getLockFactory() {
-    return _directory.getLockFactory();
-  }
-
-  @Override
-  public String getLockID() {
-    return _directory.getLockID();
-  }
-
-  @Override
-  public int hashCode() {
-    return _directory.hashCode();
-  }
-
-  @Override
-  public String[] listAll() throws IOException {
-    return _directory.listAll();
-  }
-
-  @Override
-  public Lock makeLock(String name) {
-    return _directory.makeLock(name);
-  }
-
-  @Override
-  public IndexInput openInput(String name, IOContext context) throws IOException {
-    return wrapInput(name, _directory.openInput(name, context));
-  }
-
-  @Override
-  public void setLockFactory(LockFactory lockFactory) throws IOException {
-    _directory.setLockFactory(lockFactory);
-  }
-
-  @Override
-  public void sync(Collection<String> names) throws IOException {
-    _directory.sync(names);
-  }
-
-  @Override
-  public String toString() {
-    return _directory.toString();
-  }
-
-  @SuppressWarnings("deprecation")
-  static class ProgressableIndexOutput extends IndexOutput {
-
-    private Progressable _progressable;
-    private IndexOutput _indexOutput;
-
-    public ProgressableIndexOutput(IndexOutput indexOutput, Progressable progressable) {
-      _indexOutput = indexOutput;
-      _progressable = progressable;
-    }
-
-    @Override
-    public void close() throws IOException {
-      _indexOutput.close();
-      _progressable.progress();
-    }
-
-    @Override
-    public void copyBytes(DataInput input, long numBytes) throws IOException {
-      _indexOutput.copyBytes(input, numBytes);
-      _progressable.progress();
-    }
-
-    @Override
-    public void flush() throws IOException {
-      _indexOutput.flush();
-      _progressable.progress();
-    }
-
-    @Override
-    public long getFilePointer() {
-      return _indexOutput.getFilePointer();
-    }
-
-    @Override
-    public long length() throws IOException {
-      return _indexOutput.length();
-    }
-
-    @Override
-    public void seek(long pos) throws IOException {
-      _indexOutput.seek(pos);
-      _progressable.progress();
-    }
-
-    @Override
-    public void setLength(long length) throws IOException {
-      _indexOutput.setLength(length);
-      _progressable.progress();
-    }
-
-    @Override
-    public String toString() {
-      return _indexOutput.toString();
-    }
-
-    @Override
-    public void writeByte(byte b) throws IOException {
-      _indexOutput.writeByte(b);
-    }
-
-    @Override
-    public void writeBytes(byte[] b, int offset, int length) throws IOException {
-      _indexOutput.writeBytes(b, offset, length);
-      _progressable.progress();
-    }
-
-    @Override
-    public void writeBytes(byte[] b, int length) throws IOException {
-      _indexOutput.writeBytes(b, length);
-      _progressable.progress();
-    }
-
-    @Override
-    public void writeInt(int i) throws IOException {
-      _indexOutput.writeInt(i);
-    }
-
-    @Override
-    public void writeLong(long i) throws IOException {
-      _indexOutput.writeLong(i);
-    }
-
-    @Override
-    public void writeString(String s) throws IOException {
-      _indexOutput.writeString(s);
-    }
-
-    @Override
-    public void writeStringStringMap(Map<String, String> map) throws IOException {
-      _indexOutput.writeStringStringMap(map);
-    }
-
-  }
-
-  static class ProgressableIndexInput extends BufferedIndexInput {
-
-    private IndexInput _indexInput;
-    private final long _length;
-    private Progressable _progressable;
-
-    ProgressableIndexInput(String name, IndexInput indexInput, int buffer, Progressable progressable) {
-      super("ProgressableIndexInput(" + indexInput.toString() + ")", buffer);
-      _indexInput = indexInput;
-      _length = indexInput.length();
-      _progressable = progressable;
-    }
-
-    @Override
-    protected void readInternal(byte[] b, int offset, int length) throws IOException {
-      long filePointer = getFilePointer();
-      if (filePointer != _indexInput.getFilePointer()) {
-        _indexInput.seek(filePointer);
-      }
-      _indexInput.readBytes(b, offset, length);
-      _progressable.progress();
-    }
-
-    @Override
-    protected void seekInternal(long pos) throws IOException {
-
-    }
-
-    @Override
-    public void close() throws IOException {
-      _indexInput.close();
-    }
-
-    @Override
-    public long length() {
-      return _length;
-    }
-
-    @Override
-    public ProgressableIndexInput clone() {
-      ProgressableIndexInput clone = (ProgressableIndexInput) super.clone();
-      clone._indexInput = (IndexInput) _indexInput.clone();
-      return clone;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
deleted file mode 100644
index 694759e..0000000
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/RateCounter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.mapreduce.Counter;
-
-/**
- * This turns a standard hadoop counter into a rate counter.
- */
-public class RateCounter {
-
-  private final Counter _counter;
-  private final long _reportTime;
-  private final long _rateTime;
-  private long _lastReport;
-  private long _count = 0;
-
-  public RateCounter(Counter counter) {
-    this(counter, TimeUnit.SECONDS, 1);
-  }
-
-  public RateCounter(Counter counter, TimeUnit unit, long reportTime) {
-    _counter = counter;
-    _lastReport = System.nanoTime();
-    _reportTime = unit.toNanos(reportTime);
-    _rateTime = unit.toSeconds(reportTime);
-  }
-
-  public void mark() {
-    mark(1l);
-  }
-
-  public void mark(long n) {
-    long now = System.nanoTime();
-    if (_lastReport + _reportTime < now) {
-      long rate = _count / _rateTime;
-      _counter.setValue(rate);
-      _lastReport = System.nanoTime();
-      _count = 0;
-    }
-    _count += n;
-  }
-
-  public void close() {
-    _counter.setValue(0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
deleted file mode 100644
index c14e86e..0000000
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
+++ /dev/null
@@ -1,229 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.*;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-
-import org.apache.blur.MiniCluster;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.store.buffer.BufferStore;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.blur.utils.GCWatcher;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class BlurOutputFormatMiniClusterTest {
-
-  private static Configuration conf = new Configuration();
-  private static FileSystem fileSystem;
-  private static MiniMRCluster mr;
-  private static Path TEST_ROOT_DIR;
-  private static JobConf jobConf;
-  private static MiniCluster miniCluster;
-  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
-  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir",
-      "./target/tmp_BlurOutputFormatMiniClusterTest"));
-
-  @BeforeClass
-  public static void setupTest() throws Exception {
-    GCWatcher.init(0.60);
-    LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
-    File testDirectory = new File(TMPDIR, "blur-cluster-test").getAbsoluteFile();
-    testDirectory.mkdirs();
-
-    Path directory = new Path(testDirectory.getPath());
-    FsPermission dirPermissions = localFS.getFileStatus(directory).getPermission();
-    FsAction userAction = dirPermissions.getUserAction();
-    FsAction groupAction = dirPermissions.getGroupAction();
-    FsAction otherAction = dirPermissions.getOtherAction();
-
-    StringBuilder builder = new StringBuilder();
-    builder.append(userAction.ordinal());
-    builder.append(groupAction.ordinal());
-    builder.append(otherAction.ordinal());
-    String dirPermissionNum = builder.toString();
-    System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
-    testDirectory.delete();
-    miniCluster = new MiniCluster();
-    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true);
-
-    // System.setProperty("test.build.data",
-    // "./target/BlurOutputFormatTest/data");
-    // TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
-    // "target/tmp/BlurOutputFormatTest_tmp"));
-    TEST_ROOT_DIR = new Path(miniCluster.getFileSystemUri().toString() + "/blur_test");
-    System.setProperty("hadoop.log.dir", "./target/BlurOutputFormatTest/hadoop_log");
-    try {
-      fileSystem = TEST_ROOT_DIR.getFileSystem(conf);
-    } catch (IOException io) {
-      throw new RuntimeException("problem getting local fs", io);
-    }
-    mr = new MiniMRCluster(1, miniCluster.getFileSystemUri().toString(), 1);
-    jobConf = mr.createJobConf();
-    BufferStore.initNewBuffer(128, 128 * 128);
-  }
-
-  @AfterClass
-  public static void teardown() {
-    if (mr != null) {
-      mr.shutdown();
-    }
-    miniCluster.shutdownBlurCluster();
-    rm(new File("build"));
-  }
-
-  private static void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  @Before
-  public void setup() {
-    TableContext.clear();
-  }
-
-  @Test
-  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException, BlurException,
-      TException {
-    fileSystem.delete(inDir, true);
-    String tableName = "testBlurOutputFormat";
-    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
-    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
-
-    Job job = new Job(jobConf, "blur index");
-    job.setJarByClass(BlurOutputFormatMiniClusterTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/blur/" + tableName).toString();
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tableUri);
-    tableDescriptor.setName(tableName);
-
-    Iface client = getClient();
-    client.createTable(tableDescriptor);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path tablePath = new Path(tableUri);
-    Path shardPath = new Path(tablePath, BlurUtil.getShardName(0));
-    FileStatus[] listStatus = fileSystem.listStatus(shardPath);
-    assertEquals(3, listStatus.length);
-    System.out.println("======" + listStatus.length);
-    for (FileStatus fileStatus : listStatus) {
-      System.out.println(fileStatus.getPath());
-    }
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    while (true) {
-      TableStats tableStats = client.tableStats(tableName);
-      System.out.println(tableStats);
-      if (tableStats.getRowCount() > 0) {
-        break;
-      }
-      Thread.sleep(5000);
-    }
-
-    assertTrue(fileSystem.exists(tablePath));
-    assertFalse(fileSystem.isFile(tablePath));
-
-    FileStatus[] listStatusAfter = fileSystem.listStatus(shardPath);
-
-    assertEquals(11, listStatusAfter.length);
-
-  }
-
-  private Iface getClient() {
-    return BlurClient.getClient(miniCluster.getControllerConnectionStr());
-  }
-
-  public static String readFile(String name) throws IOException {
-    DataInputStream f = fileSystem.open(new Path(TEST_ROOT_DIR + "/" + name));
-    BufferedReader b = new BufferedReader(new InputStreamReader(f));
-    StringBuilder result = new StringBuilder();
-    String line = b.readLine();
-    while (line != null) {
-      result.append(line);
-      result.append('\n');
-      line = b.readLine();
-    }
-    b.close();
-    return result.toString();
-  }
-
-  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
-      int numberOfRecords, String family) throws IOException {
-    // "1,1,cf1,val1"
-    Path file = new Path(TEST_ROOT_DIR + "/" + name);
-    fileSystem.delete(file, false);
-    DataOutputStream f = fileSystem.create(file);
-    PrintWriter writer = new PrintWriter(f);
-    for (int row = 0; row < numberOfRows; row++) {
-      for (int record = 0; record < numberOfRecords; record++) {
-        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
-      }
-    }
-    writer.close();
-    return file;
-  }
-
-  private String getRecord(int rowId, int recordId, String family) {
-    return rowId + "," + recordId + "," + family + ",valuetoindex";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
deleted file mode 100644
index 811dba5..0000000
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ /dev/null
@@ -1,427 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.util.Collection;
-import java.util.TreeSet;
-
-import org.apache.blur.server.TableContext;
-import org.apache.blur.store.buffer.BufferStore;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TestMapperReducerCleanup.TrackingTextInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.lucene.index.DirectoryReader;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class BlurOutputFormatTest {
-
-  private static Configuration conf = new Configuration();
-  private static FileSystem localFs;
-  private static MiniMRCluster mr;
-  private static Path TEST_ROOT_DIR;
-  private static JobConf jobConf;
-  private Path outDir = new Path(TEST_ROOT_DIR + "/out");
-  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
-
-  @BeforeClass
-  public static void setupTest() throws Exception {
-    System.setProperty("test.build.data", "./target/BlurOutputFormatTest/data");
-    TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", "target/tmp/BlurOutputFormatTest_tmp"));
-    System.setProperty("hadoop.log.dir", "./target/BlurOutputFormatTest/hadoop_log");
-    try {
-      localFs = FileSystem.getLocal(conf);
-    } catch (IOException io) {
-      throw new RuntimeException("problem getting local fs", io);
-    }
-    mr = new MiniMRCluster(1, "file:///", 1);
-    jobConf = mr.createJobConf();
-    BufferStore.initNewBuffer(128, 128 * 128);
-  }
-
-  @AfterClass
-  public static void teardown() {
-    if (mr != null) {
-      mr.shutdown();
-    }
-    rm(new File("build"));
-  }
-
-  private static void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  @Before
-  public void setup() {
-    TableContext.clear();
-  }
-
-  @Test
-  public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException {
-    localFs.delete(inDir, true);
-    localFs.delete(outDir, true);
-    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
-    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
-
-    Job job = new Job(jobConf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tableUri);
-    tableDescriptor.setName("test");
-
-    createShardDirectories(outDir, 1);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    Path path = new Path(tableUri, BlurUtil.getShardName(0));
-    Collection<Path> commitedTasks = getCommitedTasks(path);
-    assertEquals(1, commitedTasks.size());
-    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
-    assertEquals(2, reader.numDocs());
-    reader.close();
-  }
-
-  private Collection<Path> getCommitedTasks(Path path) throws IOException {
-    Collection<Path> result = new TreeSet<Path>();
-    FileSystem fileSystem = path.getFileSystem(jobConf);
-    FileStatus[] listStatus = fileSystem.listStatus(path);
-    for (FileStatus fileStatus : listStatus) {
-      Path p = fileStatus.getPath();
-      if (fileStatus.isDir() && p.getName().endsWith(".commit")) {
-        result.add(p);
-      }
-    }
-    return result;
-  }
-
-  @Test
-  public void testBlurOutputFormatOverFlowTest() throws IOException, InterruptedException, ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-
-    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
-    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
-
-    Job job = new Job(jobConf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TrackingTextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tableUri);
-    tableDescriptor.setName("test");
-
-    createShardDirectories(outDir, 1);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    BlurOutputFormat.setIndexLocally(job, true);
-    BlurOutputFormat.setOptimizeInFlight(job, false);
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    Path path = new Path(tableUri, BlurUtil.getShardName(0));
-    Collection<Path> commitedTasks = getCommitedTasks(path);
-    assertEquals(1, commitedTasks.size());
-
-    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
-    assertEquals(80000, reader.numDocs());
-    reader.close();
-  }
-
-  @Test
-  public void testBlurOutputFormatOverFlowMultipleReducersTest() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-
-    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
-    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
-
-    Job job = new Job(jobConf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TrackingTextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(2);
-    tableDescriptor.setTableUri(tableUri);
-    tableDescriptor.setName("test");
-
-    createShardDirectories(outDir, 2);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    BlurOutputFormat.setIndexLocally(job, false);
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    long total = 0;
-    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(tableUri, BlurUtil.getShardName(i));
-      Collection<Path> commitedTasks = getCommitedTasks(path);
-      assertEquals(1, commitedTasks.size());
-
-      DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
-      total += reader.numDocs();
-      reader.close();
-    }
-    assertEquals(80000, total);
-
-  }
-
-  @Test
-  public void testBlurOutputFormatOverFlowMultipleReducersWithReduceMultiplierTest() throws IOException,
-      InterruptedException, ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-
-    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
-    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
-
-    Job job = new Job(jobConf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TrackingTextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(7);
-    tableDescriptor.setTableUri(tableUri);
-    tableDescriptor.setName("test");
-
-    createShardDirectories(outDir, 7);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    int multiple = 2;
-    BlurOutputFormat.setReducerMultiplier(job, multiple);
-
-    assertTrue(job.waitForCompletion(true));
-    Counters ctrs = job.getCounters();
-    System.out.println("Counters: " + ctrs);
-
-    long total = 0;
-    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(tableUri, BlurUtil.getShardName(i));
-      Collection<Path> commitedTasks = getCommitedTasks(path);
-      assertTrue(multiple >= commitedTasks.size());
-      for (Path p : commitedTasks) {
-        DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, p));
-        total += reader.numDocs();
-        reader.close();
-      }
-    }
-    assertEquals(80000, total);
-
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testBlurOutputFormatValidateReducerCount() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
-    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
-
-    Job job = new Job(jobConf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TrackingTextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tableUri);
-    tableDescriptor.setName("test");
-
-    createShardDirectories(outDir, 1);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    BlurOutputFormat.setReducerMultiplier(job, 2);
-    job.setNumReduceTasks(4);
-    job.submit();
-
-  }
-
-  // @TODO this test to fail sometimes due to issues in the MR MiniCluster
-  // @Test
-  public void testBlurOutputFormatCleanupDuringJobKillTest() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-
-    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
-    writeRecordsFile("in/part2", 1, 5000, 2000, 100, "cf1"); // 100 * 5000 =
-                                                             // 500,000
-
-    Job job = new Job(jobConf, "blur index");
-    job.setJarByClass(BlurOutputFormatTest.class);
-    job.setMapperClass(CsvBlurMapper.class);
-    job.setInputFormatClass(TrackingTextInputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
-    CsvBlurMapper.addColumns(job, "cf1", "col");
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(2);
-    tableDescriptor.setTableUri(tableUri);
-    tableDescriptor.setName("test");
-
-    createShardDirectories(outDir, 2);
-
-    BlurOutputFormat.setupJob(job, tableDescriptor);
-    BlurOutputFormat.setIndexLocally(job, false);
-
-    job.submit();
-    boolean killCalled = false;
-    while (!job.isComplete()) {
-      Thread.sleep(1000);
-      System.out.printf("Killed [" + killCalled + "] Map [%f] Reduce [%f]%n", job.mapProgress() * 100,
-          job.reduceProgress() * 100);
-      if (job.reduceProgress() > 0.7 && !killCalled) {
-        job.killJob();
-        killCalled = true;
-      }
-    }
-
-    assertFalse(job.isSuccessful());
-
-    for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(tableUri, BlurUtil.getShardName(i));
-      FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
-      FileStatus[] listStatus = fileSystem.listStatus(path);
-      assertEquals(toString(listStatus), 0, listStatus.length);
-    }
-  }
-
-  private String toString(FileStatus[] listStatus) {
-    if (listStatus == null || listStatus.length == 0) {
-      return "";
-    }
-    String s = "";
-    for (FileStatus fileStatus : listStatus) {
-      if (s.length() > 0) {
-        s += ",";
-      }
-      s += fileStatus.getPath();
-    }
-    return s;
-  }
-
-  public static String readFile(String name) throws IOException {
-    DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
-    BufferedReader b = new BufferedReader(new InputStreamReader(f));
-    StringBuilder result = new StringBuilder();
-    String line = b.readLine();
-    while (line != null) {
-      result.append(line);
-      result.append('\n');
-      line = b.readLine();
-    }
-    b.close();
-    return result.toString();
-  }
-
-  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
-      int numberOfRecords, String family) throws IOException {
-    // "1,1,cf1,val1"
-    Path file = new Path(TEST_ROOT_DIR + "/" + name);
-    localFs.delete(file, false);
-    DataOutputStream f = localFs.create(file);
-    PrintWriter writer = new PrintWriter(f);
-    for (int row = 0; row < numberOfRows; row++) {
-      for (int record = 0; record < numberOfRecords; record++) {
-        writer.println(getRecord(row + starintgRowId, record + startRecordId, family));
-      }
-    }
-    writer.close();
-    return file;
-  }
-
-  private void createShardDirectories(Path outDir, int shardCount) throws IOException {
-    localFs.mkdirs(outDir);
-    for (int i = 0; i < shardCount; i++) {
-      localFs.mkdirs(new Path(outDir, BlurUtil.getShardName(i)));
-    }
-  }
-
-  private String getRecord(int rowId, int recordId, String family) {
-    return rowId + "," + recordId + "," + family + ",valuetoindex";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
deleted file mode 100644
index ec3239e..0000000
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.blur.mapreduce.lib.CsvBlurDriver.ControllerPool;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.SnappyCodec;
-import org.apache.hadoop.mapreduce.Job;
-import org.junit.Test;
-
-public class CsvBlurDriverTest {
-
-  protected String tableUri = "file:///tmp/tmppath";
-  protected int shardCount = 13;
-
-  @Test
-  public void testCsvBlurDriverTestFail1() throws Exception {
-    Configuration configuration = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return null;
-      }
-    };
-    assertNull(CsvBlurDriver.setupJob(configuration, controllerPool, new String[] {}));
-  }
-
-  @Test
-  public void testCsvBlurDriverTest() throws Exception {
-    Configuration configurationSetup = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return getMockIface();
-      }
-    };
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010", "-d", "family1", "col1",
-        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i", "file:///tmp/test2");
-    assertNotNull(job);
-    Configuration configuration = job.getConfiguration();
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    assertEquals(tableDescriptor.getName(), "table1");
-    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
-    assertEquals(2, inputs.size());
-    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
-    assertEquals(2, familyAndColumnNameMap.size());
-  }
-
-  @Test
-  public void testCsvBlurDriverTest2() throws Exception {
-    Configuration configurationSetup = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return getMockIface();
-      }
-    };
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010", "-d", "family1", "col1",
-        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i", "file:///tmp/test2",
-        "-S", "-C", "1000000", "2000000");
-    assertNotNull(job);
-    Configuration configuration = job.getConfiguration();
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    assertEquals(tableDescriptor.getName(), "table1");
-    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
-    assertEquals(2, inputs.size());
-    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
-    assertEquals(2, familyAndColumnNameMap.size());
-  }
-
-  @Test
-  public void testCsvBlurDriverTest3() throws Exception {
-    Configuration configurationSetup = new Configuration();
-    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
-      @Override
-      public Iface getClient(String controllerConnectionStr) {
-        return getMockIface();
-      }
-    };
-    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010", "-d", "family1", "col1",
-        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1", "-i", "file:///tmp/test2",
-        "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
-    assertNotNull(job);
-    Configuration configuration = job.getConfiguration();
-    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
-    assertEquals(tableDescriptor.getName(), "table1");
-    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
-    assertEquals(2, inputs.size());
-    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
-    assertEquals(2, familyAndColumnNameMap.size());
-    assertEquals("true", configuration.get(CsvBlurDriver.MAPRED_COMPRESS_MAP_OUTPUT));
-    assertEquals(SnappyCodec.class.getName(), configuration.get(CsvBlurDriver.MAPRED_MAP_OUTPUT_COMPRESSION_CODEC));
-  }
-
-  protected Iface getMockIface() {
-    InvocationHandler handler = new InvocationHandler() {
-
-      @Override
-      public Object invoke(Object o, Method method, Object[] args) throws Throwable {
-        if (method.getName().equals("describe")) {
-          TableDescriptor tableDescriptor = new TableDescriptor();
-          tableDescriptor.setName((String) args[0]);
-          tableDescriptor.setTableUri(tableUri);
-          tableDescriptor.setShardCount(shardCount);
-          return tableDescriptor;
-        }
-        throw new RuntimeException("not implemented.");
-      }
-    };
-    return (Iface) Proxy.newProxyInstance(Iface.class.getClassLoader(), new Class[] { Iface.class }, handler);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b8851cac/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
deleted file mode 100644
index 47aa8e5..0000000
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CsvBlurMapperTest {
-
-  private MapDriver<Writable, Text, Text, BlurMutate> _mapDriver;
-  private CsvBlurMapper _mapper;
-
-  @Before
-  public void setUp() throws IOException {
-    _mapper = new CsvBlurMapper();
-    _mapDriver = MapDriver.newMapDriver(_mapper);
-  }
-
-  @Test
-  public void testMapperWithFamilyInData() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,cf1,value1,value2"));
-    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-
-  @Test
-  public void testMapperFamilyPerPath() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
-    _mapper.setFamilyFromPath("cf1");
-
-    _mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,value1,value2"));
-    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-  
-  @Test
-  public void testMapperAutoGenerateRecordId() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
-    _mapper.setFamilyFromPath("cf1");
-
-    _mapDriver.withInput(new LongWritable(), new Text("rowid1,value1,value2"));
-    _mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "-25nqln3n2vb4cayex9y9tpxx3", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-  
-  @Test
-  public void testMapperAutoGenerateRowId() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
-    _mapper.setFamilyFromPath("cf1");
-
-    _mapDriver.withInput(new LongWritable(), new Text("record1,value1,value2"));
-    _mapDriver.withOutput(new Text("-50b4uzohynr7j7s9pve7ytz66"), new BlurMutate(MUTATE_TYPE.REPLACE, "-50b4uzohynr7j7s9pve7ytz66", "record1", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-  
-  @Test
-  public void testMapperAutoGenerateRowIdAndRecordId() {
-    Configuration configuration = _mapDriver.getConfiguration();
-    CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setAutoGenerateRowIdAsHashOfData(configuration, true);
-    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
-    CsvBlurMapper.addFamilyPath(configuration, "cf1", new Path("/"));
-    _mapper.setFamilyFromPath("cf1");
-
-    _mapDriver.withInput(new LongWritable(), new Text("value1,value2"));
-    _mapDriver.withOutput(new Text("5q0tme15ph3h5pns8sv3u5wy2"), new BlurMutate(MUTATE_TYPE.REPLACE, "5q0tme15ph3h5pns8sv3u5wy2", "5q0tme15ph3h5pns8sv3u5wy2", "cf1")
-        .addColumn("col1", "value1").addColumn("col2", "value2"));
-    _mapDriver.runTest();
-  }
-
-}
\ No newline at end of file


Mime
View raw message