incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/7] Removing old code, deprecating code that should not be used, and adding javadoc.
Date Thu, 16 May 2013 20:45:18 GMT
Updated Branches:
  refs/heads/0.1.5 6a9fde037 -> 66c804e88


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0cc4d72e/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
new file mode 100644
index 0000000..0eec9a7
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurMapper.java
@@ -0,0 +1,230 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.google.common.base.Splitter;
+
+/**
+ * This will parse a standard csv file into a {@link BlurMutate} object. Use the
+ * static addColumns, and setSeparator methods to configure the class.
+ */
+public class CsvBlurMapper extends BaseBlurMapper<LongWritable, Text> {
+
+  public static final String BLUR_CSV_SEPARATOR = "blur.csv.separator";
+  public static final String BLUR_CSV_FAMILY_COLUMN_PREFIX = "blur.csv.family.";
+  public static final String BLUR_CSV_FAMILIES = "blur.csv.families";
+
+  private Map<String, List<String>> columnNameMap;
+  private String separator = ",";
+  private Splitter splitter;
+
+  /**
+   * Sets all the family and column definitions.
+   * 
+   * @param job
+   *          the job to setup.
+   * @param strDefinition
+   *          the string definition. <br/>
+   * <br/>
+   *          Example:<br/>
+   *          "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
+   *          Where "cf1" is a family name that contains columns "col1", "col2"
+   *          and "col3" and a second family of "cf2" with columns "col1",
+   *          "col2", and "col3".
+   */
+  public static void setColumns(Job job, String strDefinition) {
+    setColumns(job.getConfiguration(), strDefinition);
+  }
+
+  /**
+   * Sets all the family and column definitions.
+   * 
+   * @param configuration
+   *          the configuration to setup.
+   * @param strDefinition
+   *          the string definition. <br/>
+   * <br/>
+   *          Example:<br/>
+   *          "cf1:col1,col2,col3|cf2:col1,col2,col3"<br/>
+   *          Where "cf1" is a family name that contains columns "col1", "col2"
+   *          and "col3" and a second family of "cf2" with columns "col1",
+   *          "col2", and "col3".
+   */
+  public static void setColumns(Configuration configuration, String strDefinition) {
+    Iterable<String> familyDefs = Splitter.on('|').split(strDefinition);
+    for (String familyDef : familyDefs) {
+      int indexOf = familyDef.indexOf(':');
+      if (indexOf < 0) {
+        throwMalformedDefinition(strDefinition);
+      }
+      String family = familyDef.substring(0, indexOf);
+      Iterable<String> cols = Splitter.on(',').split(familyDef.substring(indexOf +
1));
+      List<String> colnames = new ArrayList<String>();
+      for (String columnName : cols) {
+        colnames.add(columnName);
+      }
+      if (family.trim().isEmpty() || colnames.isEmpty()) {
+        throwMalformedDefinition(strDefinition);
+      }
+      addColumns(configuration, family, colnames.toArray(new String[colnames.size()]));
+    }
+  }
+
+  private static void throwMalformedDefinition(String strDefinition) {
+    throw new RuntimeException("Family and column definition string not valid [" + strDefinition
+        + "] should look like \"family1:colname1,colname2|family2:colname1,colname2,colname3\"");
+  }
+
+  /**
+   * Adds the column layout for the given family.
+   * 
+   * @param job
+   *          the job to apply the layout.
+   * @param family
+   *          the family name.
+   * @param columns
+   *          the column names.
+   */
+  public static void addColumns(Job job, String family, String... columns) {
+    addColumns(job.getConfiguration(), family, columns);
+  }
+
+  /**
+   * Adds the column layout for the given family.
+   * 
+   * @param configuration
+   *          the configuration to apply the layout.
+   * @param family
+   *          the family name.
+   * @param columns
+   *          the column names.
+   */
+  public static void addColumns(Configuration configuration, String family, String... columns)
{
+    Collection<String> families = new TreeSet<String>(configuration.getStringCollection(BLUR_CSV_FAMILIES));
+    families.add(family);
+    configuration.setStrings(BLUR_CSV_FAMILIES, families.toArray(new String[] {}));
+    configuration.setStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family, columns);
+  }
+
+  /**
+   * Sets the separator of the file, by default it is ",".
+   * 
+   * @param job
+   *          the job to apply the separator change.
+   * @param separator
+   *          the separator.
+   */
+  public static void setSeparator(Job job, String separator) {
+    setSeparator(job.getConfiguration(), separator);
+  }
+
+  /**
+   * Sets the separator of the file, by default it is ",".
+   * 
+   * @param configuration
+   *          the configuration to apply the separator change.
+   * @param separator
+   *          the separator.
+   */
+  public static void setSeparator(Configuration configuration, String separator) {
+    configuration.set(BLUR_CSV_SEPARATOR, separator);
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Configuration configuration = context.getConfiguration();
+    Collection<String> familyNames = configuration.getStringCollection(BLUR_CSV_FAMILIES);
+    columnNameMap = new HashMap<String, List<String>>();
+    for (String family : familyNames) {
+      String[] columnsNames = configuration.getStrings(BLUR_CSV_FAMILY_COLUMN_PREFIX + family);
+      columnNameMap.put(family, Arrays.asList(columnsNames));
+    }
+    splitter = Splitter.on(separator);
+    separator = configuration.get(BLUR_CSV_SEPARATOR, separator);
+  }
+
+  @Override
+  protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
+    BlurRecord record = _mutate.getRecord();
+    record.clearColumns();
+    String str = value.toString();
+
+    Iterable<String> split = splitter.split(str);
+    List<String> list = toList(split);
+
+    if (list.size() < 3) {
+      throw new IOException("Record [" + str + "] too short.");
+    }
+
+    record.setRowId(list.get(0));
+    record.setRecordId(list.get(1));
+    String family = list.get(2);
+    record.setFamily(family);
+
+    List<String> columnNames = columnNameMap.get(family);
+    if (columnNames == null) {
+      throw new IOException("Family [" + family + "] is missing in the definition.");
+    }
+    if (list.size() - 3 != columnNames.size()) {
+      throw new IOException("Record [" + str + "] too short, does not match defined record
[rowid,recordid,family"
+          + getColumnNames(columnNames) + "].");
+    }
+
+    for (int i = 0; i < columnNames.size(); i++) {
+      record.addColumn(columnNames.get(i), list.get(i + 3));
+      _fieldCounter.increment(1);
+    }
+    _key.set(record.getRowId());
+    _mutate.setMutateType(MUTATE_TYPE.REPLACE);
+    context.write(_key, _mutate);
+    _recordCounter.increment(1);
+    context.progress();
+  }
+
+  private String getColumnNames(List<String> columnNames) {
+    StringBuilder builder = new StringBuilder();
+    for (String c : columnNames) {
+      builder.append(',').append(c);
+    }
+    return builder.toString();
+  }
+
+  private List<String> toList(Iterable<String> split) {
+    List<String> lst = new ArrayList<String>();
+    for (String s : split) {
+      lst.add(s);
+    }
+    return lst;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0cc4d72e/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
index c89ca6b..826ba46 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/DefaultBlurReducer.java
@@ -18,7 +18,6 @@ package org.apache.blur.mapreduce.lib;
  */
 import java.io.IOException;
 
-import org.apache.blur.mapreduce.BlurMutate;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Reducer;
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0cc4d72e/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
new file mode 100644
index 0000000..46c030f
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/IOUtil.java
@@ -0,0 +1,58 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class IOUtil {
+
+  public static final String UTF_8 = "UTF-8";
+
+  public static String readString(DataInput input) throws IOException {
+    int length = readVInt(input);
+    byte[] buffer = new byte[length];
+    input.readFully(buffer);
+    return new String(buffer, UTF_8);
+  }
+
+  public static void writeString(DataOutput output, String s) throws IOException {
+    byte[] bs = s.getBytes(UTF_8);
+    writeVInt(output, bs.length);
+    output.write(bs);
+  }
+
+  public static int readVInt(DataInput input) throws IOException {
+    byte b = input.readByte();
+    int i = b & 0x7F;
+    for (int shift = 7; (b & 0x80) != 0; shift += 7) {
+      b = input.readByte();
+      i |= (b & 0x7F) << shift;
+    }
+    return i;
+  }
+
+  public static void writeVInt(DataOutput output, int i) throws IOException {
+    while ((i & ~0x7F) != 0) {
+      output.writeByte((byte) ((i & 0x7f) | 0x80));
+      i >>>= 7;
+    }
+    output.writeByte((byte) i);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0cc4d72e/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
new file mode 100644
index 0000000..3ca6a35
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/ProgressableDirectory.java
@@ -0,0 +1,275 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+/**
+ * The {@link ProgressableDirectory} allows for progress to be recorded while
+ * Lucene is blocked and merging. This prevents the Task from being killed after
+ * not reporting progress because of the blocked merge.
+ */
+public class ProgressableDirectory extends Directory {
+
+  private Directory _directory;
+  private Progressable _progressable;
+
+  public ProgressableDirectory(Directory directory, Progressable progressable) {
+    _directory = directory;
+    _progressable = progressable;
+  }
+
+  @Override
+  public void clearLock(String name) throws IOException {
+    _directory.clearLock(name);
+  }
+
+  @Override
+  public void close() throws IOException {
+    _directory.close();
+  }
+
+  private IndexInput wrapInput(String name, IndexInput openInput) {
+    return new ProgressableIndexInput(name, openInput, 16384, _progressable);
+  }
+
+  private IndexOutput wrapOutput(IndexOutput createOutput) {
+    return new ProgressableIndexOutput(createOutput, _progressable);
+  }
+
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    return wrapOutput(_directory.createOutput(name, context));
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    _directory.deleteFile(name);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return _directory.equals(obj);
+  }
+
+  @Override
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  @Override
+  public LockFactory getLockFactory() {
+    return _directory.getLockFactory();
+  }
+
+  @Override
+  public String getLockID() {
+    return _directory.getLockID();
+  }
+
+  @Override
+  public int hashCode() {
+    return _directory.hashCode();
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  @Override
+  public Lock makeLock(String name) {
+    return _directory.makeLock(name);
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    return wrapInput(name, _directory.openInput(name, context));
+  }
+
+  @Override
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _directory.setLockFactory(lockFactory);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  @Override
+  public String toString() {
+    return _directory.toString();
+  }
+
+  @SuppressWarnings("deprecation")
+  static class ProgressableIndexOutput extends IndexOutput {
+
+    private Progressable _progressable;
+    private IndexOutput _indexOutput;
+
+    public ProgressableIndexOutput(IndexOutput indexOutput, Progressable progressable) {
+      _indexOutput = indexOutput;
+      _progressable = progressable;
+    }
+
+    @Override
+    public void close() throws IOException {
+      _indexOutput.close();
+      _progressable.progress();
+    }
+
+    @Override
+    public void copyBytes(DataInput input, long numBytes) throws IOException {
+      _indexOutput.copyBytes(input, numBytes);
+      _progressable.progress();
+    }
+
+    @Override
+    public void flush() throws IOException {
+      _indexOutput.flush();
+      _progressable.progress();
+    }
+
+    @Override
+    public long getFilePointer() {
+      return _indexOutput.getFilePointer();
+    }
+
+    @Override
+    public long length() throws IOException {
+      return _indexOutput.length();
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      _indexOutput.seek(pos);
+      _progressable.progress();
+    }
+
+    @Override
+    public void setLength(long length) throws IOException {
+      _indexOutput.setLength(length);
+      _progressable.progress();
+    }
+
+    @Override
+    public String toString() {
+      return _indexOutput.toString();
+    }
+
+    @Override
+    public void writeByte(byte b) throws IOException {
+      _indexOutput.writeByte(b);
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int offset, int length) throws IOException {
+      _indexOutput.writeBytes(b, offset, length);
+      _progressable.progress();
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int length) throws IOException {
+      _indexOutput.writeBytes(b, length);
+      _progressable.progress();
+    }
+
+    @Override
+    public void writeInt(int i) throws IOException {
+      _indexOutput.writeInt(i);
+    }
+
+    @Override
+    public void writeLong(long i) throws IOException {
+      _indexOutput.writeLong(i);
+    }
+
+    @Override
+    public void writeString(String s) throws IOException {
+      _indexOutput.writeString(s);
+    }
+
+    @Override
+    public void writeStringStringMap(Map<String, String> map) throws IOException {
+      _indexOutput.writeStringStringMap(map);
+    }
+
+  }
+
+  static class ProgressableIndexInput extends BufferedIndexInput {
+
+    private IndexInput _indexInput;
+    private final long _length;
+    private Progressable _progressable;
+
+    ProgressableIndexInput(String name, IndexInput indexInput, int buffer, Progressable progressable)
{
+      super(name, buffer);
+      _indexInput = indexInput;
+      _length = indexInput.length();
+      _progressable = progressable;
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      long filePointer = getFilePointer();
+      if (filePointer != _indexInput.getFilePointer()) {
+        _indexInput.seek(filePointer);
+      }
+      _indexInput.readBytes(b, offset, length);
+      _progressable.progress();
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+      _indexInput.close();
+    }
+
+    @Override
+    public long length() {
+      return _length;
+    }
+
+    @Override
+    public ProgressableIndexInput clone() {
+      ProgressableIndexInput clone = (ProgressableIndexInput) super.clone();
+      clone._indexInput = (IndexInput) _indexInput.clone();
+      return clone;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0cc4d72e/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/Utils.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/Utils.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/Utils.java
deleted file mode 100644
index 36e0352..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/Utils.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.SegmentInfo;
-import org.apache.lucene.index.SegmentInfoPerCommit;
-import org.apache.lucene.index.SegmentInfos;
-import org.apache.lucene.index.SegmentReader;
-import org.apache.lucene.store.Directory;
-
-public class Utils {
-
-  public static int getTermInfosIndexDivisor(Configuration conf) {
-    return 128;
-  }
-
-  public static IndexCommit findLatest(Directory dir) throws IOException {
-    Collection<IndexCommit> listCommits = DirectoryReader.listCommits(dir);
-    if (listCommits.size() == 1) {
-      return listCommits.iterator().next();
-    }
-    throw new RuntimeException("Multiple commit points not supported yet.");
-  }
-
-  public static List<String> getSegments(Directory dir, IndexCommit commit) throws
CorruptIndexException, IOException {
-    SegmentInfos infos = new SegmentInfos();
-    infos.read(dir, commit.getSegmentsFileName());
-    List<String> result = new ArrayList<String>();
-    for (SegmentInfoPerCommit info : infos) {
-      result.add(info.info.name);
-    }
-    return result;
-  }
-
-//  public static IndexReader openSegmentReader(Directory directory, IndexCommit commit,
String segmentName, int termInfosIndexDivisor) throws CorruptIndexException, IOException {
-//    SegmentInfos infos = new SegmentInfos();
-//    infos.read(directory, commit.getSegmentsFileName());
-//    SegmentInfo segmentInfo = null;
-//    for (SegmentInfoPerCommit info : infos) {
-//      if (segmentName.equals(info.info.name)) {
-//        segmentInfo = info.info;
-//        break;
-//      }
-//    }
-//    if (segmentInfo == null) {
-//      throw new RuntimeException("SegmentInfo for [" + segmentName + "] not found in directory
[" + directory + "] for commit [" + commit + "]");
-//    }
-//    return SegmentReader.get(true, segmentInfo, termInfosIndexDivisor);
-//  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0cc4d72e/src/blur-mapred/src/test/java/org/apache/blur/mapred/BlurInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapred/BlurInputFormatTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapred/BlurInputFormatTest.java
deleted file mode 100644
index f8f2482..0000000
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapred/BlurInputFormatTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package org.apache.blur.mapred;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-
-import org.apache.blur.mapred.BlurInputFormat;
-import org.apache.blur.mapreduce.BlurRecord;
-import org.apache.blur.mapreduce.lib.BlurInputSplit;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public abstract class BlurInputFormatTest {
-
-//  private Path indexPath = new Path(TMPDIR, "./tmp/test-indexes/oldapi");
-//  private int numberOfShards = 13;
-//  private int rowsPerIndex = 10;
-//
-//  @Before
-//  public void setup() throws IOException {
-//    org.apache.blur.mapreduce.lib.BlurInputFormatTest.buildTestIndexes(indexPath, numberOfShards,
rowsPerIndex);
-//  }
-//
-//  @Test
-//  public void testGetSplits() throws IOException {
-//    BlurInputFormat format = new BlurInputFormat();
-//    JobConf job = new JobConf(new Configuration());
-//    FileInputFormat.addInputPath(job, indexPath);
-//    InputSplit[] splits = format.getSplits(job, -1);
-//    for (int i = 0; i < splits.length; i++) {
-//      BlurInputSplit split = (BlurInputSplit) splits[i];
-//      Path path = new Path(indexPath, BlurUtil.getShardName(BlurConstants.SHARD_PREFIX,
i));
-//      FileSystem fileSystem = path.getFileSystem(job);
-//      assertEquals(new BlurInputSplit(fileSystem.makeQualified(path), "_0", 0, Integer.MAX_VALUE),
split);
-//    }
-//  }
-//
-//  @Test
-//  public void testGetRecordReader() throws IOException {
-//    BlurInputFormat format = new BlurInputFormat();
-//    JobConf job = new JobConf(new Configuration());
-//    FileInputFormat.addInputPath(job, indexPath);
-//    InputSplit[] splits = format.getSplits(job, -1);
-//    for (int i = 0; i < splits.length; i++) {
-//      RecordReader<Text, BlurRecord> reader = format.getRecordReader(splits[i], job,
Reporter.NULL);
-//      Text key = reader.createKey();
-//      BlurRecord value = reader.createValue();
-//      while (reader.next(key, value)) {
-//        System.out.println(reader.getProgress() + " " + key + " " + value);
-//      }
-//    }
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0cc4d72e/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurReducerTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurReducerTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurReducerTest.java
deleted file mode 100644
index 08541c1..0000000
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurReducerTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.blur.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.blur.mapreduce.BlurMutate.MUTATE_TYPE;
-import org.apache.blur.mapreduce.csv.CsvBlurMapper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
-import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BlurReducerTest {
-
-  MapDriver<LongWritable, Text, Text, BlurMutate> mapDriver;
-  ReduceDriver<Text, BlurMutate, Text, BlurMutate> reduceDriver;
-  MapReduceDriver<LongWritable, Text, Text, BlurMutate, Text, BlurMutate> mapReduceDriver;
-
-  @Before
-  public void setUp() throws IOException {
-    CsvBlurMapper mapper = new CsvBlurMapper();
-
-    mapDriver = MapDriver.newMapDriver(mapper);
-    Configuration configuration = mapDriver.getConfiguration();
-    CsvBlurMapper.addColumns(configuration, "cf1", "col1", "col2");
-
-    // Configuration configuration = new Configuration();
-    // BlurTask blurTask = new BlurTask();
-    // blurTask.configureJob(configuration);
-    // mapDriver.setConfiguration(configuration);
-    BlurReducer reducer = new BlurReducer();
-    reduceDriver = ReduceDriver.newReduceDriver(reducer);
-
-    mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
-
-  }
-
-  @Test
-  public void testMapper() {
-    mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,cf1,value1,value2"));
-    mapDriver.withOutput(new Text("rowid1"), new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1",
"record1", "cf1").addColumn("col1", "value1").addColumn("col2", "value2"));
-    mapDriver.runTest();
-  }
-
-  @Test
-  public void testReducer() {
-    // List<IntWritable> values = new ArrayList<IntWritable>();
-    // values.add(new IntWritable(1));
-    // values.add(new IntWritable(1));
-    // reduceDriver.withInput(new Text("6"), values);
-    // reduceDriver.withOutput(new Text("6"), new IntWritable(2));
-    // reduceDriver.runTest();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0cc4d72e/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java
index 1e05566..e04476e 100644
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java
@@ -16,21 +16,21 @@ package org.apache.blur.mapreduce;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import static org.junit.Assert.assertEquals;
+
 import java.io.File;
 
-import org.apache.blur.mapreduce.BlurTask;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
-
-import static org.junit.Assert.*;
-
+@SuppressWarnings("deprecation")
 public class BlurTaskTest {
   private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "/tmp"));
 
   @Test
   public void testGetNumReducersBadPath() {
+
     BlurTask task = new BlurTask();
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(5);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0cc4d72e/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
deleted file mode 100644
index 1d77c36..0000000
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.blur.analysis.BlurAnalyzer;
-import org.apache.blur.mapreduce.BlurRecord;
-import org.apache.blur.mapreduce.lib.BlurInputFormat;
-import org.apache.blur.mapreduce.lib.BlurInputSplit;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.Row;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.blur.utils.RowIndexWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.NoLockFactory;
-import org.apache.lucene.util.Version;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public abstract class BlurInputFormatTest {
-
-//  private Path indexPath = new Path(TMPDIR, "./tmp/test-indexes/newapi");
-//  private int numberOfShards = 13;
-//  private int rowsPerIndex = 10;
-//
-//  @Before
-//  public void setup() throws IOException {
-//    buildTestIndexes(indexPath, numberOfShards, rowsPerIndex);
-//  }
-//
-//  public static void buildTestIndexes(Path indexPath, int numberOfShards, int rowsPerIndex)
throws IOException {
-//    Configuration configuration = new Configuration();
-//    FileSystem fileSystem = indexPath.getFileSystem(configuration);
-//    fileSystem.delete(indexPath, true);
-//    for (int i = 0; i < numberOfShards; i++) {
-//      String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i);
-//      buildIndex(fileSystem, configuration, new Path(indexPath, shardName), rowsPerIndex);
-//    }
-//  }
-//
-//  public static void buildIndex(FileSystem fileSystem, Configuration configuration, Path
path, int rowsPerIndex) throws IOException {
-//    HdfsDirectory directory = new HdfsDirectory(path);
-//    directory.setLockFactory(NoLockFactory.getNoLockFactory());
-//    BlurAnalyzer analyzer = new BlurAnalyzer(new StandardAnalyzer(Version.LUCENE_35));
-//    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, analyzer);
-//    IndexWriter indexWriter = new IndexWriter(directory, conf);
-//    RowIndexWriter writer = new RowIndexWriter(indexWriter, analyzer);
-//    for (int i = 0; i < rowsPerIndex; i++) {
-//      writer.add(false, genRow());
-//    }
-//    indexWriter.close();
-//  }
-//
-//  public static Row genRow() {
-//    Row row = new Row();
-//    row.setId(UUID.randomUUID().toString());
-//    for (int i = 0; i < 10; i++) {
-//      row.addToRecords(genRecord());
-//    }
-//    return row;
-//  }
-//
-//  public static Record genRecord() {
-//    Record record = new Record();
-//    record.setRecordId(UUID.randomUUID().toString());
-//    record.setFamily("cf");
-//    record.addToColumns(new Column("name", UUID.randomUUID().toString()));
-//    return record;
-//  }
-//
-//  @Test
-//  public void testGetSplits() throws IOException, InterruptedException {
-//    BlurInputFormat format = new BlurInputFormat();
-//    Configuration conf = new Configuration();
-//    Job job = new Job(conf);
-//    FileInputFormat.addInputPath(job, indexPath);
-//    JobID jobId = new JobID();
-//    JobContext context = new JobContext(job.getConfiguration(), jobId);
-//    List<InputSplit> list = format.getSplits(context);
-//    for (int i = 0; i < list.size(); i++) {
-//      BlurInputSplit split = (BlurInputSplit) list.get(i);
-//      Path path = new Path(indexPath, BlurUtil.getShardName(BlurConstants.SHARD_PREFIX,
i));
-//      FileSystem fileSystem = path.getFileSystem(conf);
-//      assertEquals(new BlurInputSplit(fileSystem.makeQualified(path), "_0", 0, Integer.MAX_VALUE),
split);
-//    }
-//  }
-//
-//  @Test
-//  public void testCreateRecordReader() throws IOException, InterruptedException {
-//    BlurInputFormat format = new BlurInputFormat();
-//    Configuration conf = new Configuration();
-//    Job job = new Job(conf);
-//    FileInputFormat.addInputPath(job, indexPath);
-//    JobID jobId = new JobID();
-//    JobContext context = new JobContext(job.getConfiguration(), jobId);
-//    List<InputSplit> list = format.getSplits(context);
-//    for (int i = 0; i < list.size(); i++) {
-//      BlurInputSplit split = (BlurInputSplit) list.get(i);
-//      TaskAttemptID taskId = new TaskAttemptID();
-//      TaskAttemptContext taskContext = new TaskAttemptContext(conf, taskId);
-//      RecordReader<Text, BlurRecord> reader = format.createRecordReader(split, taskContext);
-//      while (reader.nextKeyValue()) {
-//        System.out.println(reader.getProgress() + " " + reader.getCurrentKey() + " " +
reader.getCurrentValue());
-//      }
-//    }
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0cc4d72e/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index bb15969..1048eef 100644
--- a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -24,8 +24,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 
-import org.apache.blur.mapreduce.BlurMutate;
-import org.apache.blur.mapreduce.csv.CsvBlurMapper;
 import org.apache.blur.thrift.generated.AnalyzerDefinition;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0cc4d72e/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
new file mode 100644
index 0000000..7633762
--- /dev/null
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurMapperTest.java
@@ -0,0 +1,52 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.lib.CsvBlurMapper;
+import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CsvBlurMapperTest {
+
+  MapDriver<LongWritable, Text, Text, BlurMutate> mapDriver;
+
+  @Before
+  public void setUp() throws IOException {
+    CsvBlurMapper mapper = new CsvBlurMapper();
+    mapDriver = MapDriver.newMapDriver(mapper);
+    Configuration configuration = mapDriver.getConfiguration();
+    CsvBlurMapper.setColumns(configuration, "cf1:col1,col2|cf2:col1,col2,col3");
+  }
+
+  @Test
+  public void testMapper() {
+    mapDriver.withInput(new LongWritable(), new Text("rowid1,record1,cf1,value1,value2"));
+    mapDriver.withOutput(
+        new Text("rowid1"),
+        new BlurMutate(MUTATE_TYPE.REPLACE, "rowid1", "record1", "cf1").addColumn("col1",
"value1").addColumn("col2",
+            "value2"));
+    mapDriver.runTest();
+  }
+
+}
\ No newline at end of file


Mime
View raw message