incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [25/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BufferedDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BufferedDirectory.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BufferedDirectory.java
new file mode 100644
index 0000000..6a12983
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BufferedDirectory.java
@@ -0,0 +1,150 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+public class BufferedDirectory extends Directory {
+
+  private Directory _directory;
+  private int _buffer;
+
+  public BufferedDirectory(Directory directory, int buffer) {
+    _directory = directory;
+    _buffer = buffer;
+  }
+
+  public void close() throws IOException {
+    _directory.close();
+  }
+
+  public IndexOutput createOutput(String name) throws IOException {
+    return _directory.createOutput(name);
+  }
+
+  public void deleteFile(String name) throws IOException {
+    _directory.deleteFile(name);
+  }
+
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  @SuppressWarnings("deprecation")
+  public long fileModified(String name) throws IOException {
+    return _directory.fileModified(name);
+  }
+
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  public IndexInput openInput(String name, int bufferSize) throws IOException {
+    return openInput(name);
+  }
+
+  public IndexInput openInput(String name) throws IOException {
+    return new BigBufferIndexInput(name, _directory.openInput(name), _buffer);
+  }
+
+  @SuppressWarnings("deprecation")
+  public void touchFile(String name) throws IOException {
+    _directory.touchFile(name);
+  }
+
+  public static class BigBufferIndexInput extends BufferedIndexInput {
+
+    private IndexInput _input;
+    private long _length;
+
+    public BigBufferIndexInput(String name, IndexInput input, int buffer) {
+      super(name, buffer);
+      _input = input;
+      _length = input.length();
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      _input.seek(getFilePointer());
+      _input.readBytes(b, offset, length);
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+      _input.close();
+    }
+
+    @Override
+    public long length() {
+      return _length;
+    }
+
+    @Override
+    public Object clone() {
+      BigBufferIndexInput clone = (BigBufferIndexInput) super.clone();
+      clone._input = (IndexInput) _input.clone();
+      return clone;
+    }
+  }
+
+  public void clearLock(String name) throws IOException {
+    _directory.clearLock(name);
+  }
+
+  public LockFactory getLockFactory() {
+    return _directory.getLockFactory();
+  }
+
+  public String getLockID() {
+    return _directory.getLockID();
+  }
+
+  public Lock makeLock(String name) {
+    return _directory.makeLock(name);
+  }
+
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _directory.setLockFactory(lockFactory);
+  }
+
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  @SuppressWarnings("deprecation")
+  public void sync(String name) throws IOException {
+    _directory.sync(name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/IOUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/IOUtil.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/IOUtil.java
new file mode 100644
index 0000000..6acb009
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/IOUtil.java
@@ -0,0 +1,58 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class IOUtil {
+
+  public static final String UTF_8 = "UTF-8";
+
+  public static String readString(DataInput input) throws IOException {
+    int length = readVInt(input);
+    byte[] buffer = new byte[length];
+    input.readFully(buffer);
+    return new String(buffer, UTF_8);
+  }
+
+  public static void writeString(DataOutput output, String s) throws IOException {
+    byte[] bs = s.getBytes(UTF_8);
+    writeVInt(output, bs.length);
+    output.write(bs);
+  }
+
+  public static int readVInt(DataInput input) throws IOException {
+    byte b = input.readByte();
+    int i = b & 0x7F;
+    for (int shift = 7; (b & 0x80) != 0; shift += 7) {
+      b = input.readByte();
+      i |= (b & 0x7F) << shift;
+    }
+    return i;
+  }
+
+  public static void writeVInt(DataOutput output, int i) throws IOException {
+    while ((i & ~0x7F) != 0) {
+      output.writeByte((byte) ((i & 0x7f) | 0x80));
+      i >>>= 7;
+    }
+    output.writeByte((byte) i);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/ProgressableDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/ProgressableDirectory.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/ProgressableDirectory.java
new file mode 100644
index 0000000..456ae6b
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/ProgressableDirectory.java
@@ -0,0 +1,273 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+
+public class ProgressableDirectory extends Directory {
+
+  private Directory _directory;
+  private Progressable _progressable;
+
+  public ProgressableDirectory(Directory directory, Progressable progressable) {
+    _directory = directory;
+    _progressable = progressable;
+  }
+
+  public void clearLock(String name) throws IOException {
+    _directory.clearLock(name);
+  }
+
+  public void close() throws IOException {
+    _directory.close();
+  }
+
+  public void copy(Directory to, String src, String dest) throws IOException {
+    _directory.copy(to, src, dest);
+  }
+
+  private IndexInput wrapInput(String name, IndexInput openInput) {
+    return new ProgressableIndexInput(name, openInput, 16384, _progressable);
+  }
+
+  private IndexOutput wrapOutput(IndexOutput createOutput) {
+    return new ProgressableIndexOutput(createOutput, _progressable);
+  }
+
+  public IndexOutput createOutput(String name) throws IOException {
+    return wrapOutput(_directory.createOutput(name));
+  }
+
+  public void deleteFile(String name) throws IOException {
+    _directory.deleteFile(name);
+  }
+
+  public boolean equals(Object obj) {
+    return _directory.equals(obj);
+  }
+
+  public boolean fileExists(String name) throws IOException {
+    return _directory.fileExists(name);
+  }
+
+  public long fileLength(String name) throws IOException {
+    return _directory.fileLength(name);
+  }
+
+  @SuppressWarnings("deprecation")
+  public long fileModified(String name) throws IOException {
+    return _directory.fileModified(name);
+  }
+
+  public LockFactory getLockFactory() {
+    return _directory.getLockFactory();
+  }
+
+  public String getLockID() {
+    return _directory.getLockID();
+  }
+
+  public int hashCode() {
+    return _directory.hashCode();
+  }
+
+  public String[] listAll() throws IOException {
+    return _directory.listAll();
+  }
+
+  public Lock makeLock(String name) {
+    return _directory.makeLock(name);
+  }
+
+  public IndexInput openInput(String name, int bufferSize) throws IOException {
+    return wrapInput(name, _directory.openInput(name, bufferSize));
+  }
+
+  public IndexInput openInput(String name) throws IOException {
+    return wrapInput(name, _directory.openInput(name));
+  }
+
+  public void setLockFactory(LockFactory lockFactory) throws IOException {
+    _directory.setLockFactory(lockFactory);
+  }
+
+  public void sync(Collection<String> names) throws IOException {
+    _directory.sync(names);
+  }
+
+  @SuppressWarnings("deprecation")
+  public void sync(String name) throws IOException {
+    _directory.sync(name);
+  }
+
+  public String toString() {
+    return _directory.toString();
+  }
+
+  @SuppressWarnings("deprecation")
+  public void touchFile(String name) throws IOException {
+    _directory.touchFile(name);
+  }
+
+  static class ProgressableIndexOutput extends IndexOutput {
+
+    private Progressable _progressable;
+    private IndexOutput _indexOutput;
+
+    public ProgressableIndexOutput(IndexOutput indexOutput, Progressable progressable) {
+      _indexOutput = indexOutput;
+      _progressable = progressable;
+    }
+
+    public void close() throws IOException {
+      _indexOutput.close();
+      _progressable.progress();
+    }
+
+    public void copyBytes(DataInput input, long numBytes) throws IOException {
+      _indexOutput.copyBytes(input, numBytes);
+      _progressable.progress();
+    }
+
+    public void flush() throws IOException {
+      _indexOutput.flush();
+      _progressable.progress();
+    }
+
+    public long getFilePointer() {
+      return _indexOutput.getFilePointer();
+    }
+
+    public long length() throws IOException {
+      return _indexOutput.length();
+    }
+
+    public void seek(long pos) throws IOException {
+      _indexOutput.seek(pos);
+      _progressable.progress();
+    }
+
+    public void setLength(long length) throws IOException {
+      _indexOutput.setLength(length);
+      _progressable.progress();
+    }
+
+    public String toString() {
+      return _indexOutput.toString();
+    }
+
+    public void writeByte(byte b) throws IOException {
+      _indexOutput.writeByte(b);
+    }
+
+    public void writeBytes(byte[] b, int offset, int length) throws IOException {
+      _indexOutput.writeBytes(b, offset, length);
+      _progressable.progress();
+    }
+
+    public void writeBytes(byte[] b, int length) throws IOException {
+      _indexOutput.writeBytes(b, length);
+      _progressable.progress();
+    }
+
+    @SuppressWarnings("deprecation")
+    public void writeChars(char[] s, int start, int length) throws IOException {
+      _indexOutput.writeChars(s, start, length);
+      _progressable.progress();
+    }
+
+    @SuppressWarnings("deprecation")
+    public void writeChars(String s, int start, int length) throws IOException {
+      _indexOutput.writeChars(s, start, length);
+      _progressable.progress();
+    }
+
+    public void writeInt(int i) throws IOException {
+      _indexOutput.writeInt(i);
+    }
+
+    public void writeLong(long i) throws IOException {
+      _indexOutput.writeLong(i);
+    }
+
+    public void writeString(String s) throws IOException {
+      _indexOutput.writeString(s);
+    }
+
+    public void writeStringStringMap(Map<String, String> map) throws IOException {
+      _indexOutput.writeStringStringMap(map);
+    }
+
+  }
+
+  static class ProgressableIndexInput extends BufferedIndexInput {
+
+    private IndexInput _indexInput;
+    private final long _length;
+    private Progressable _progressable;
+
+    ProgressableIndexInput(String name, IndexInput indexInput, int buffer, Progressable progressable) {
+      super(name, buffer);
+      _indexInput = indexInput;
+      _length = indexInput.length();
+      _progressable = progressable;
+    }
+
+    @Override
+    protected void readInternal(byte[] b, int offset, int length) throws IOException {
+      long filePointer = getFilePointer();
+      if (filePointer != _indexInput.getFilePointer()) {
+        _indexInput.seek(filePointer);
+      }
+      _indexInput.readBytes(b, offset, length);
+      _progressable.progress();
+    }
+
+    @Override
+    protected void seekInternal(long pos) throws IOException {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+      _indexInput.close();
+    }
+
+    @Override
+    public long length() {
+      return _length;
+    }
+
+    @Override
+    public Object clone() {
+      ProgressableIndexInput clone = (ProgressableIndexInput) super.clone();
+      clone._indexInput = (IndexInput) _indexInput.clone();
+      return clone;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/SpinLock.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/SpinLock.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/SpinLock.java
new file mode 100644
index 0000000..4a150f8
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/SpinLock.java
@@ -0,0 +1,112 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.hadoop.util.Progressable;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+
+public class SpinLock {
+
+  private static final Log LOG = LogFactory.getLog(SpinLock.class);
+  private ZooKeeper _zooKeeper;
+  private String _path;
+  private int _maxCopies;
+  private String _name;
+  private long _delay = TimeUnit.SECONDS.toMillis(30);
+  private Progressable _progressable;
+
+  public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
+    Progressable progressable = new Progressable() {
+      @Override
+      public void progress() {
+        System.out.println("go");
+      }
+    };
+    String zkConnectionStr = "localhost";
+    SpinLock lock = new SpinLock(progressable, zkConnectionStr, "test", "/test-spin-lock");
+    lock.copyLock(null);
+  }
+
+  public SpinLock(Progressable progressable, String zkConnectionStr, String name, String path) throws IOException, KeeperException, InterruptedException {
+    _path = path;
+    _name = name;
+    _progressable = progressable;
+    _zooKeeper = new ZooKeeper(zkConnectionStr, 60000, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+
+      }
+    });
+    checkMaxCopies();
+  }
+
+  private void checkMaxCopies() throws KeeperException, InterruptedException {
+    Stat stat = _zooKeeper.exists(_path, false);
+    if (stat == null) {
+      LOG.warn("Path [{0}] not set no limit on copies.", _path);
+      _maxCopies = Integer.MAX_VALUE;
+    } else {
+      byte[] data = _zooKeeper.getData(_path, false, stat);
+      _maxCopies = Integer.parseInt(new String(data));
+    }
+  }
+
+  public void copyLock(@SuppressWarnings("rawtypes") Context context) {
+    if (_maxCopies == Integer.MAX_VALUE) {
+      return;
+    }
+    try {
+      String newpath = _zooKeeper.create(_path + "/" + _name, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+      while (true) {
+        _progressable.progress();
+        checkMaxCopies();
+        List<String> children = new ArrayList<String>(_zooKeeper.getChildren(_path, false));
+        Collections.sort(children);
+        for (int i = 0; i < _maxCopies && i < children.size(); i++) {
+          if (newpath.equals(_path + "/" + children.get(i))) {
+            return;
+          }
+        }
+        LOG.info("Waiting for copy lock");
+        context.setStatus("Waiting for copy lock");
+        Thread.sleep(_delay);
+      }
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerRebuild.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerRebuild.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerRebuild.java
new file mode 100644
index 0000000..29eba4e
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerRebuild.java
@@ -0,0 +1,79 @@
+package org.apache.blur.mapreduce.example;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.BlurTask;
+import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.ColumnDefinition;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+
+public class BlurExampleIndexerRebuild {
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    if (otherArgs.length != 2) {
+      System.err.println("Usage: blurindexer <in> <out>");
+      System.exit(2);
+    }
+
+    AnalyzerDefinition ad = new AnalyzerDefinition();
+    ad.defaultDefinition = new ColumnDefinition(StandardAnalyzer.class.getName(), true, null);
+
+    TableDescriptor descriptor = new TableDescriptor();
+    descriptor.analyzerDefinition = ad;
+    descriptor.compressionBlockSize = 32768;
+    descriptor.compressionClass = DefaultCodec.class.getName();
+    descriptor.isEnabled = true;
+    descriptor.name = "test-table";
+    descriptor.shardCount = 1;
+    descriptor.cluster = "default";
+    descriptor.tableUri = "./blur-testing";
+
+    BlurTask blurTask = new BlurTask();
+    blurTask.setTableDescriptor(descriptor);
+    blurTask.setIndexingType(INDEXING_TYPE.REBUILD);
+    blurTask.setOptimize(false);
+    Job job = blurTask.configureJob(configuration);
+    job.setJarByClass(BlurExampleIndexerRebuild.class);
+    job.setMapperClass(BlurExampleMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
+    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
+    long s = System.currentTimeMillis();
+    boolean waitForCompletion = job.waitForCompletion(true);
+    long e = System.currentTimeMillis();
+    System.out.println("Completed in [" + (e - s) + " ms]");
+    System.exit(waitForCompletion ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerUpdate.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerUpdate.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerUpdate.java
new file mode 100644
index 0000000..6ffe837
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleIndexerUpdate.java
@@ -0,0 +1,68 @@
+package org.apache.blur.mapreduce.example;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.mapreduce.BlurTask;
+import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE;
+import org.apache.blur.thrift.generated.AnalyzerDefinition;
+import org.apache.blur.thrift.generated.ColumnDefinition;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+
+public class BlurExampleIndexerUpdate {
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration configuration = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
+    if (otherArgs.length != 2) {
+      System.err.println("Usage: blurindexer <in> <out>");
+      System.exit(2);
+    }
+
+    AnalyzerDefinition ad = new AnalyzerDefinition();
+    ad.defaultDefinition = new ColumnDefinition(StandardAnalyzer.class.getName(), true, null);
+
+    ZookeeperClusterStatus status = new ZookeeperClusterStatus("localhost");
+    TableDescriptor descriptor = status.getTableDescriptor(false, "default", "test-table");
+
+    BlurTask blurTask = new BlurTask();
+    blurTask.setTableDescriptor(descriptor);
+    blurTask.setIndexingType(INDEXING_TYPE.UPDATE);
+    Job job = blurTask.configureJob(configuration);
+    job.setJarByClass(BlurExampleIndexerUpdate.class);
+    job.setMapperClass(BlurExampleMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
+    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
+    boolean waitForCompletion = job.waitForCompletion(true);
+    System.exit(waitForCompletion ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleMapper.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleMapper.java
new file mode 100644
index 0000000..20b86d2
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/example/BlurExampleMapper.java
@@ -0,0 +1,51 @@
+package org.apache.blur.mapreduce.example;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.blur.mapreduce.BlurMapper;
+import org.apache.blur.mapreduce.BlurRecord;
+import org.apache.blur.mapreduce.BlurMutate.MUTATE_TYPE;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+
+public class BlurExampleMapper extends BlurMapper<LongWritable, Text> {
+
+  @Override
+  protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException {
+    BlurRecord record = _mutate.getRecord();
+    record.clearColumns();
+    String str = value.toString();
+    String[] split = str.split("\\t");
+    record.setRowId(UUID.randomUUID().toString());
+    record.setRecordId(UUID.randomUUID().toString());
+    record.setFamily("cf1");
+    for (int i = 0; i < split.length; i++) {
+      record.addColumn("c" + i, split[i]);
+      _fieldCounter.increment(1);
+    }
+    byte[] bs = record.getRowId().getBytes();
+    _key.set(bs, 0, bs.length);
+    _mutate.setMutateType(MUTATE_TYPE.ADD);
+    context.write(_key, _mutate);
+    _recordCounter.increment(1);
+    context.progress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
new file mode 100644
index 0000000..da7a333
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
@@ -0,0 +1,87 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.mapreduce.BlurRecord;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexReader;
+
+
+public class BlurInputFormat extends InputFormat<Text, BlurRecord> {
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+    List<?> splits = new ArrayList<Object>();
+    Path[] paths = FileInputFormat.getInputPaths(context);
+    for (Path path : paths) {
+      findAllSegments(context.getConfiguration(), path, splits);
+    }
+    return (List<InputSplit>) splits;
+  }
+
+  @Override
+  public RecordReader<Text, BlurRecord> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+    BlurRecordReader blurRecordReader = new BlurRecordReader();
+    blurRecordReader.initialize(split, context);
+    return blurRecordReader;
+  }
+
+  public static void findAllSegments(Configuration configuration, Path path, List<?> splits) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(configuration);
+    if (fileSystem.isFile(path)) {
+      return;
+    } else {
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus status : listStatus) {
+        Path p = status.getPath();
+        HdfsDirectory directory = new HdfsDirectory(p);
+        if (IndexReader.indexExists(directory)) {
+          addSplits(directory, splits);
+        } else {
+          findAllSegments(configuration, p, splits);
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void addSplits(HdfsDirectory directory, @SuppressWarnings("rawtypes") List splits) throws IOException {
+    IndexCommit commit = Utils.findLatest(directory);
+    List<String> segments = Utils.getSegments(directory, commit);
+    for (String segment : segments) {
+      BlurInputSplit split = new BlurInputSplit(directory.getHdfsDirPath(), segment, 0, Integer.MAX_VALUE);
+      splits.add(split);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java
new file mode 100644
index 0000000..7407ba4
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java
@@ -0,0 +1,157 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public class BlurInputSplit extends InputSplit implements Writable, org.apache.hadoop.mapred.InputSplit {
+
+  private int _endingDocId;
+  private int _startingDocId;
+  private String _segmentName;
+  private Path _path;
+
+  public BlurInputSplit() {
+
+  }
+
+  public BlurInputSplit(Path path, String segmentName, int startingDocId, int endingDocId) {
+    _endingDocId = endingDocId;
+    _startingDocId = startingDocId;
+    _segmentName = segmentName;
+    _path = path;
+  }
+
+  @Override
+  public long getLength() {
+    return _endingDocId - _startingDocId;
+  }
+
+  @Override
+  public String[] getLocations() {
+    return new String[] {};
+  }
+
+  public Path getIndexPath() {
+    return _path;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  public int getStartingDocId() {
+    return _startingDocId;
+  }
+
+  public int getEndingDocId() {
+    return _endingDocId;
+  }
+
+  public void setEndingDocId(int endingDocId) {
+    _endingDocId = endingDocId;
+  }
+
+  public void setStartingDocId(int startingDocId) {
+    _startingDocId = startingDocId;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public void setPath(Path path) {
+    _path = path;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(_startingDocId);
+    out.writeInt(_endingDocId);
+    writeString(out, _segmentName);
+    writeString(out, _path.toUri().toString());
+  }
+
+  private void writeString(DataOutput out, String s) throws IOException {
+    byte[] bs = s.getBytes();
+    out.writeInt(bs.length);
+    out.write(bs);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _startingDocId = in.readInt();
+    _endingDocId = in.readInt();
+    _segmentName = readString(in);
+    _path = new Path(readString(in));
+  }
+
+  private String readString(DataInput in) throws IOException {
+    int length = in.readInt();
+    byte[] buf = new byte[length];
+    in.readFully(buf);
+    return new String(buf);
+  }
+
+  @Override
+  public String toString() {
+    return "path=" + _path + ", segmentName=" + _segmentName + ", startingDocId=" + _startingDocId + ", endingDocId=" + _endingDocId;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + _endingDocId;
+    result = prime * result + ((_path == null) ? 0 : _path.hashCode());
+    result = prime * result + ((_segmentName == null) ? 0 : _segmentName.hashCode());
+    result = prime * result + _startingDocId;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BlurInputSplit other = (BlurInputSplit) obj;
+    if (_endingDocId != other._endingDocId)
+      return false;
+    if (_path == null) {
+      if (other._path != null)
+        return false;
+    } else if (!_path.equals(other._path))
+      return false;
+    if (_segmentName == null) {
+      if (other._segmentName != null)
+        return false;
+    } else if (!_segmentName.equals(other._segmentName))
+      return false;
+    if (_startingDocId != other._startingDocId)
+      return false;
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
new file mode 100644
index 0000000..f5abf65
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputCommitter.java
@@ -0,0 +1,56 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class BlurOutputCommitter extends OutputCommitter {
+
+  public BlurOutputCommitter(TaskAttemptContext context) {
+
+  }
+
+  @Override
+  public void setupJob(JobContext jobContext) throws IOException {
+
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskContext) throws IOException {
+
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext taskContext) throws IOException {
+
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext taskContext) throws IOException {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
new file mode 100644
index 0000000..b3fd106
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -0,0 +1,47 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.BlurRecord;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+
+public class BlurOutputFormat extends OutputFormat<Text, BlurRecord> {
+
+  @Override
+  public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+
+  }
+
+  @Override
+  public RecordWriter<Text, BlurRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+    return new BlurRecordWriter(context);
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+    return new BlurOutputCommitter(context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
new file mode 100644
index 0000000..bca3637
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
@@ -0,0 +1,103 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.BlurRecord;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.utils.RowDocumentUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.store.Directory;
+
+
+public class BlurRecordReader extends RecordReader<Text, BlurRecord> {
+
+  private IndexReader reader;
+  private Directory directory;
+  private int startingDocId;
+  private int endingDocId;
+  private int position;
+  private Text rowid = new Text();
+  private BlurRecord record = new BlurRecord();
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+    BlurInputSplit blurSplit = (BlurInputSplit) split;
+    Path path = blurSplit.getIndexPath();
+    String segmentName = blurSplit.getSegmentName();
+    startingDocId = blurSplit.getStartingDocId();
+    endingDocId = blurSplit.getEndingDocId();
+    directory = new HdfsDirectory(path);
+
+    IndexCommit commit = Utils.findLatest(directory);
+    reader = Utils.openSegmentReader(directory, commit, segmentName, Utils.getTermInfosIndexDivisor(context.getConfiguration()));
+    int maxDoc = reader.maxDoc();
+    if (endingDocId >= maxDoc) {
+      endingDocId = maxDoc - 1;
+    }
+    position = startingDocId - 1;
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    do {
+      position++;
+      if (position > endingDocId) {
+        return false;
+      }
+    } while (reader.isDeleted(position));
+    readDocument();
+    return true;
+  }
+
+  private void readDocument() throws CorruptIndexException, IOException {
+    Document document = reader.document(position);
+    record.reset();
+    rowid.set(RowDocumentUtil.readRecord(document, record));
+  }
+
+  @Override
+  public Text getCurrentKey() throws IOException, InterruptedException {
+    return rowid;
+  }
+
+  @Override
+  public BlurRecord getCurrentValue() throws IOException, InterruptedException {
+    return record;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    int total = endingDocId - startingDocId;
+    return (float) position / (float) total;
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+    directory.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
new file mode 100644
index 0000000..6b15543
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
@@ -0,0 +1,116 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.mapreduce.BlurColumn;
+import org.apache.blur.mapreduce.BlurRecord;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.NoLockFactory;
+import org.apache.lucene.util.Version;
+
+
+public class BlurRecordWriter extends RecordWriter<Text, BlurRecord> {
+
+  private static Log LOG = LogFactory.getLog(BlurRecordWriter.class);
+
+  private Text prevKey = new Text();
+  private List<Document> documents = new ArrayList<Document>();
+  private IndexWriter writer;
+
+  public BlurRecordWriter(TaskAttemptContext context) throws IOException {
+    Configuration configuration = context.getConfiguration();
+    String outputPath = configuration.get("mapred.output.dir");
+    int id = context.getTaskAttemptID().getTaskID().getId();
+    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, id);
+    Path basePath = new Path(outputPath);
+    Path indexPath = new Path(basePath, shardName);
+
+    // @TODO
+    Analyzer analyzer = new KeywordAnalyzer();
+
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, analyzer);
+
+    // @TODO setup compressed directory, read compression codec from config,
+    // setup progressable dir, setup lock factory
+    Directory dir = new HdfsDirectory(indexPath);
+    dir.setLockFactory(NoLockFactory.getNoLockFactory());
+    writer = new IndexWriter(dir, conf);
+  }
+
+  @Override
+  public void write(Text key, BlurRecord value) throws IOException, InterruptedException {
+    if (!prevKey.equals(key)) {
+      flush();
+      prevKey.set(key);
+    }
+    add(value);
+  }
+
+  private void add(BlurRecord value) {
+    List<BlurColumn> columns = value.getColumns();
+    String family = value.getFamily();
+    Document document = new Document();
+    document.add(new Field(BlurConstants.ROW_ID, value.getRowId(), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
+    document.add(new Field(BlurConstants.RECORD_ID, value.getRecordId(), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
+    for (BlurColumn column : columns) {
+      document.add(convert(family, column));
+    }
+    documents.add(document);
+    LOG.error("Needs to use blur analyzer and field converter");
+  }
+
+  private Field convert(String family, BlurColumn column) {
+    return new Field(family + "." + column.getName(), column.getValue(), Store.YES, Index.ANALYZED_NO_NORMS);
+  }
+
+  private void flush() throws CorruptIndexException, IOException {
+    if (documents.isEmpty()) {
+      return;
+    }
+    writer.addDocuments(documents);
+    documents.clear();
+  }
+
+  @Override
+  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+    flush();
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/Utils.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/Utils.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/Utils.java
new file mode 100644
index 0000000..421c791
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/Utils.java
@@ -0,0 +1,72 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.store.Directory;
+
+public class Utils {
+
+  public static int getTermInfosIndexDivisor(Configuration conf) {
+    return 128;
+  }
+
+  public static IndexCommit findLatest(Directory dir) throws IOException {
+    Collection<IndexCommit> listCommits = IndexReader.listCommits(dir);
+    if (listCommits.size() == 1) {
+      return listCommits.iterator().next();
+    }
+    throw new RuntimeException("Multiple commit points not supported yet.");
+  }
+
+  public static List<String> getSegments(Directory dir, IndexCommit commit) throws CorruptIndexException, IOException {
+    SegmentInfos infos = new SegmentInfos();
+    infos.read(dir, commit.getSegmentsFileName());
+    List<String> result = new ArrayList<String>();
+    for (SegmentInfo info : infos) {
+      result.add(info.name);
+    }
+    return result;
+  }
+
+  public static IndexReader openSegmentReader(Directory directory, IndexCommit commit, String segmentName, int termInfosIndexDivisor) throws CorruptIndexException, IOException {
+    SegmentInfos infos = new SegmentInfos();
+    infos.read(directory, commit.getSegmentsFileName());
+    SegmentInfo segmentInfo = null;
+    for (SegmentInfo info : infos) {
+      if (segmentName.equals(info.name)) {
+        segmentInfo = info;
+        break;
+      }
+    }
+    if (segmentInfo == null) {
+      throw new RuntimeException("SegmentInfo for [" + segmentName + "] not found in directory [" + directory + "] for commit [" + commit + "]");
+    }
+    return SegmentReader.get(true, segmentInfo, termInfosIndexDivisor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapred/BlurInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapred/BlurInputFormatTest.java b/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapred/BlurInputFormatTest.java
deleted file mode 100644
index 0f5795e..0000000
--- a/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapred/BlurInputFormatTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package com.nearinfinity.blur.mapred;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.nearinfinity.blur.mapreduce.BlurRecord;
-import com.nearinfinity.blur.mapreduce.lib.BlurInputSplit;
-import com.nearinfinity.blur.utils.BlurConstants;
-import com.nearinfinity.blur.utils.BlurUtil;
-
-public class BlurInputFormatTest {
-
-  private Path indexPath = new Path("./tmp/test-indexes/oldapi");
-  private int numberOfShards = 13;
-  private int rowsPerIndex = 10;
-
-  @Before
-  public void setup() throws IOException {
-    com.nearinfinity.blur.mapreduce.lib.BlurInputFormatTest.buildTestIndexes(indexPath, numberOfShards, rowsPerIndex);
-  }
-
-  @Test
-  public void testGetSplits() throws IOException {
-    BlurInputFormat format = new BlurInputFormat();
-    JobConf job = new JobConf(new Configuration());
-    FileInputFormat.addInputPath(job, indexPath);
-    InputSplit[] splits = format.getSplits(job, -1);
-    for (int i = 0; i < splits.length; i++) {
-      BlurInputSplit split = (BlurInputSplit) splits[i];
-      Path path = new Path(indexPath, BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i));
-      FileSystem fileSystem = path.getFileSystem(job);
-      assertEquals(new BlurInputSplit(fileSystem.makeQualified(path), "_0", 0, Integer.MAX_VALUE), split);
-    }
-  }
-
-  @Test
-  public void testGetRecordReader() throws IOException {
-    BlurInputFormat format = new BlurInputFormat();
-    JobConf job = new JobConf(new Configuration());
-    FileInputFormat.addInputPath(job, indexPath);
-    InputSplit[] splits = format.getSplits(job, -1);
-    for (int i = 0; i < splits.length; i++) {
-      RecordReader<Text, BlurRecord> reader = format.getRecordReader(splits[i], job, Reporter.NULL);
-      Text key = reader.createKey();
-      BlurRecord value = reader.createValue();
-      while (reader.next(key, value)) {
-        System.out.println(reader.getProgress() + " " + key + " " + value);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapreduce/BlurTaskTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapreduce/BlurTaskTest.java b/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapreduce/BlurTaskTest.java
deleted file mode 100644
index 7683b7d..0000000
--- a/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapreduce/BlurTaskTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package com.nearinfinity.blur.mapreduce;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.File;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-import com.nearinfinity.blur.thrift.generated.TableDescriptor;
-
-import static org.junit.Assert.*;
-
-public class BlurTaskTest {
-
-  @Test
-  public void testGetNumReducersBadPath() {
-    BlurTask task = new BlurTask();
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setShardCount(5);
-    tableDescriptor.setTableUri("file:///tmp/blur34746545");
-    tableDescriptor.setName("blur34746545");
-    task.setTableDescriptor(tableDescriptor);
-    assertEquals(5, task.getNumReducers(new Configuration()));
-  }
-
-  @Test
-  public void testGetNumReducersValidPath() {
-    new File("/tmp/blurTestShards/shard-1/").mkdirs();
-    new File("/tmp/blurTestShards/shard-2/").mkdirs();
-    new File("/tmp/blurTestShards/shard-3/").mkdirs();
-    try {
-      BlurTask task = new BlurTask();
-      TableDescriptor tableDescriptor = new TableDescriptor();
-      tableDescriptor.setShardCount(5);
-      tableDescriptor.setTableUri("file:///tmp/blurTestShards");
-      tableDescriptor.setName("blurTestShards");
-      task.setTableDescriptor(tableDescriptor);
-      assertEquals(3, task.getNumReducers(new Configuration()));
-    } finally {
-      new File("/tmp/blurTestShards/shard-1/").delete();
-      new File("/tmp/blurTestShards/shard-2/").delete();
-      new File("/tmp/blurTestShards/shard-3/").delete();
-      new File("/tmp/blurTestShards/").delete();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapreduce/lib/BlurInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapreduce/lib/BlurInputFormatTest.java b/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapreduce/lib/BlurInputFormatTest.java
deleted file mode 100644
index 6d03fe9..0000000
--- a/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapreduce/lib/BlurInputFormatTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package com.nearinfinity.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.NoLockFactory;
-import org.apache.lucene.util.Version;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.nearinfinity.blur.analysis.BlurAnalyzer;
-import com.nearinfinity.blur.mapreduce.BlurRecord;
-import com.nearinfinity.blur.mapreduce.lib.BlurInputFormat;
-import com.nearinfinity.blur.mapreduce.lib.BlurInputSplit;
-import com.nearinfinity.blur.store.hdfs.HdfsDirectory;
-import com.nearinfinity.blur.thrift.generated.Column;
-import com.nearinfinity.blur.thrift.generated.Record;
-import com.nearinfinity.blur.thrift.generated.Row;
-import com.nearinfinity.blur.utils.BlurConstants;
-import com.nearinfinity.blur.utils.BlurUtil;
-import com.nearinfinity.blur.utils.RowIndexWriter;
-
-public class BlurInputFormatTest {
-
-  private Path indexPath = new Path("./tmp/test-indexes/newapi");
-  private int numberOfShards = 13;
-  private int rowsPerIndex = 10;
-
-  @Before
-  public void setup() throws IOException {
-    buildTestIndexes(indexPath, numberOfShards, rowsPerIndex);
-  }
-
-  public static void buildTestIndexes(Path indexPath, int numberOfShards, int rowsPerIndex) throws IOException {
-    Configuration configuration = new Configuration();
-    FileSystem fileSystem = indexPath.getFileSystem(configuration);
-    fileSystem.delete(indexPath, true);
-    for (int i = 0; i < numberOfShards; i++) {
-      String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i);
-      buildIndex(fileSystem, configuration, new Path(indexPath, shardName), rowsPerIndex);
-    }
-  }
-
-  public static void buildIndex(FileSystem fileSystem, Configuration configuration, Path path, int rowsPerIndex) throws IOException {
-    HdfsDirectory directory = new HdfsDirectory(path);
-    directory.setLockFactory(NoLockFactory.getNoLockFactory());
-    BlurAnalyzer analyzer = new BlurAnalyzer(new StandardAnalyzer(Version.LUCENE_35));
-    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, analyzer);
-    IndexWriter indexWriter = new IndexWriter(directory, conf);
-    RowIndexWriter writer = new RowIndexWriter(indexWriter, analyzer);
-    for (int i = 0; i < rowsPerIndex; i++) {
-      writer.add(false, genRow());
-    }
-    indexWriter.close();
-  }
-
-  public static Row genRow() {
-    Row row = new Row();
-    row.setId(UUID.randomUUID().toString());
-    for (int i = 0; i < 10; i++) {
-      row.addToRecords(genRecord());
-    }
-    return row;
-  }
-
-  public static Record genRecord() {
-    Record record = new Record();
-    record.setRecordId(UUID.randomUUID().toString());
-    record.setFamily("cf");
-    record.addToColumns(new Column("name", UUID.randomUUID().toString()));
-    return record;
-  }
-
-  @Test
-  public void testGetSplits() throws IOException, InterruptedException {
-    BlurInputFormat format = new BlurInputFormat();
-    Configuration conf = new Configuration();
-    Job job = new Job(conf);
-    FileInputFormat.addInputPath(job, indexPath);
-    JobID jobId = new JobID();
-    JobContext context = new JobContext(job.getConfiguration(), jobId);
-    List<InputSplit> list = format.getSplits(context);
-    for (int i = 0; i < list.size(); i++) {
-      BlurInputSplit split = (BlurInputSplit) list.get(i);
-      Path path = new Path(indexPath, BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i));
-      FileSystem fileSystem = path.getFileSystem(conf);
-      assertEquals(new BlurInputSplit(fileSystem.makeQualified(path), "_0", 0, Integer.MAX_VALUE), split);
-    }
-  }
-
-  @Test
-  public void testCreateRecordReader() throws IOException, InterruptedException {
-    BlurInputFormat format = new BlurInputFormat();
-    Configuration conf = new Configuration();
-    Job job = new Job(conf);
-    FileInputFormat.addInputPath(job, indexPath);
-    JobID jobId = new JobID();
-    JobContext context = new JobContext(job.getConfiguration(), jobId);
-    List<InputSplit> list = format.getSplits(context);
-    for (int i = 0; i < list.size(); i++) {
-      BlurInputSplit split = (BlurInputSplit) list.get(i);
-      TaskAttemptID taskId = new TaskAttemptID();
-      TaskAttemptContext taskContext = new TaskAttemptContext(conf, taskId);
-      RecordReader<Text, BlurRecord> reader = format.createRecordReader(split, taskContext);
-      while (reader.nextKeyValue()) {
-        System.out.println(reader.getProgress() + " " + reader.getCurrentKey() + " " + reader.getCurrentValue());
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapreduce/lib/BlurRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapreduce/lib/BlurRecordWriterTest.java b/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapreduce/lib/BlurRecordWriterTest.java
deleted file mode 100644
index 2792900..0000000
--- a/src/blur-mapred/src/test/java/com/nearinfinity/blur/mapreduce/lib/BlurRecordWriterTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.nearinfinity.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.lucene.index.IndexReader;
-import org.junit.Test;
-
-import com.nearinfinity.blur.mapreduce.BlurRecord;
-import com.nearinfinity.blur.store.hdfs.HdfsDirectory;
-import com.nearinfinity.blur.utils.BlurConstants;
-import com.nearinfinity.blur.utils.BlurUtil;
-
-public class BlurRecordWriterTest {
-
-  @Test
-  public void testBlurRecordWriter() throws IOException, InterruptedException {
-    JobID jobId = new JobID();
-    TaskID tId = new TaskID(jobId, false, 13);
-    TaskAttemptID taskId = new TaskAttemptID(tId, 0);
-    Configuration conf = new Configuration();
-    String pathStr = "./tmp/output-record-writer-test-newapi";
-    rm(new File(pathStr));
-    conf.set("mapred.output.dir", pathStr);
-    TaskAttemptContext context = new TaskAttemptContext(conf, taskId);
-    BlurRecordWriter writer = new BlurRecordWriter(context);
-
-    Text key = new Text();
-    BlurRecord value = new BlurRecord();
-
-    for (int i = 0; i < 10; i++) {
-      String rowId = UUID.randomUUID().toString();
-      key.set(rowId);
-      value.setFamily("cf");
-      value.setRowId(rowId);
-      value.setRecordId(UUID.randomUUID().toString());
-      value.addColumn("name", "value");
-      writer.write(key, value);
-    }
-
-    writer.close(context);
-
-    // assert index exists and has document
-
-    HdfsDirectory dir = new HdfsDirectory(new Path(pathStr, BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, 13)));
-    assertTrue(IndexReader.indexExists(dir));
-    IndexReader reader = IndexReader.open(dir);
-    assertEquals(10, reader.numDocs());
-  }
-
-  private void rm(File file) {
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/test/java/org/apache/blur/mapred/BlurInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapred/BlurInputFormatTest.java b/src/blur-mapred/src/test/java/org/apache/blur/mapred/BlurInputFormatTest.java
new file mode 100644
index 0000000..a183f39
--- /dev/null
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapred/BlurInputFormatTest.java
@@ -0,0 +1,82 @@
+package org.apache.blur.mapred;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.blur.mapred.BlurInputFormat;
+import org.apache.blur.mapreduce.BlurRecord;
+import org.apache.blur.mapreduce.lib.BlurInputSplit;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class BlurInputFormatTest {
+
+  private Path indexPath = new Path("./tmp/test-indexes/oldapi");
+  private int numberOfShards = 13;
+  private int rowsPerIndex = 10;
+
+  @Before
+  public void setup() throws IOException {
+    org.apache.blur.mapreduce.lib.BlurInputFormatTest.buildTestIndexes(indexPath, numberOfShards, rowsPerIndex);
+  }
+
+  @Test
+  public void testGetSplits() throws IOException {
+    BlurInputFormat format = new BlurInputFormat();
+    JobConf job = new JobConf(new Configuration());
+    FileInputFormat.addInputPath(job, indexPath);
+    InputSplit[] splits = format.getSplits(job, -1);
+    for (int i = 0; i < splits.length; i++) {
+      BlurInputSplit split = (BlurInputSplit) splits[i];
+      Path path = new Path(indexPath, BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i));
+      FileSystem fileSystem = path.getFileSystem(job);
+      assertEquals(new BlurInputSplit(fileSystem.makeQualified(path), "_0", 0, Integer.MAX_VALUE), split);
+    }
+  }
+
+  @Test
+  public void testGetRecordReader() throws IOException {
+    BlurInputFormat format = new BlurInputFormat();
+    JobConf job = new JobConf(new Configuration());
+    FileInputFormat.addInputPath(job, indexPath);
+    InputSplit[] splits = format.getSplits(job, -1);
+    for (int i = 0; i < splits.length; i++) {
+      RecordReader<Text, BlurRecord> reader = format.getRecordReader(splits[i], job, Reporter.NULL);
+      Text key = reader.createKey();
+      BlurRecord value = reader.createValue();
+      while (reader.next(key, value)) {
+        System.out.println(reader.getProgress() + " " + key + " " + value);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java
new file mode 100644
index 0000000..2930545
--- /dev/null
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/BlurTaskTest.java
@@ -0,0 +1,62 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.File;
+
+import org.apache.blur.mapreduce.BlurTask;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+
+import static org.junit.Assert.*;
+
+public class BlurTaskTest {
+
+  @Test
+  public void testGetNumReducersBadPath() {
+    BlurTask task = new BlurTask();
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setShardCount(5);
+    tableDescriptor.setTableUri("file:///tmp/blur34746545");
+    tableDescriptor.setName("blur34746545");
+    task.setTableDescriptor(tableDescriptor);
+    assertEquals(5, task.getNumReducers(new Configuration()));
+  }
+
+  @Test
+  public void testGetNumReducersValidPath() {
+    new File("/tmp/blurTestShards/shard-1/").mkdirs();
+    new File("/tmp/blurTestShards/shard-2/").mkdirs();
+    new File("/tmp/blurTestShards/shard-3/").mkdirs();
+    try {
+      BlurTask task = new BlurTask();
+      TableDescriptor tableDescriptor = new TableDescriptor();
+      tableDescriptor.setShardCount(5);
+      tableDescriptor.setTableUri("file:///tmp/blurTestShards");
+      tableDescriptor.setName("blurTestShards");
+      task.setTableDescriptor(tableDescriptor);
+      assertEquals(3, task.getNumReducers(new Configuration()));
+    } finally {
+      new File("/tmp/blurTestShards/shard-1/").delete();
+      new File("/tmp/blurTestShards/shard-2/").delete();
+      new File("/tmp/blurTestShards/shard-3/").delete();
+      new File("/tmp/blurTestShards/").delete();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
new file mode 100644
index 0000000..36fb383
--- /dev/null
+++ b/src/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
@@ -0,0 +1,145 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.mapreduce.BlurRecord;
+import org.apache.blur.mapreduce.lib.BlurInputFormat;
+import org.apache.blur.mapreduce.lib.BlurInputSplit;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.Row;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.RowIndexWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.NoLockFactory;
+import org.apache.lucene.util.Version;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class BlurInputFormatTest {
+
+  private Path indexPath = new Path("./tmp/test-indexes/newapi");
+  private int numberOfShards = 13;
+  private int rowsPerIndex = 10;
+
+  @Before
+  public void setup() throws IOException {
+    buildTestIndexes(indexPath, numberOfShards, rowsPerIndex);
+  }
+
+  public static void buildTestIndexes(Path indexPath, int numberOfShards, int rowsPerIndex) throws IOException {
+    Configuration configuration = new Configuration();
+    FileSystem fileSystem = indexPath.getFileSystem(configuration);
+    fileSystem.delete(indexPath, true);
+    for (int i = 0; i < numberOfShards; i++) {
+      String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i);
+      buildIndex(fileSystem, configuration, new Path(indexPath, shardName), rowsPerIndex);
+    }
+  }
+
+  public static void buildIndex(FileSystem fileSystem, Configuration configuration, Path path, int rowsPerIndex) throws IOException {
+    HdfsDirectory directory = new HdfsDirectory(path);
+    directory.setLockFactory(NoLockFactory.getNoLockFactory());
+    BlurAnalyzer analyzer = new BlurAnalyzer(new StandardAnalyzer(Version.LUCENE_35));
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, analyzer);
+    IndexWriter indexWriter = new IndexWriter(directory, conf);
+    RowIndexWriter writer = new RowIndexWriter(indexWriter, analyzer);
+    for (int i = 0; i < rowsPerIndex; i++) {
+      writer.add(false, genRow());
+    }
+    indexWriter.close();
+  }
+
+  public static Row genRow() {
+    Row row = new Row();
+    row.setId(UUID.randomUUID().toString());
+    for (int i = 0; i < 10; i++) {
+      row.addToRecords(genRecord());
+    }
+    return row;
+  }
+
+  public static Record genRecord() {
+    Record record = new Record();
+    record.setRecordId(UUID.randomUUID().toString());
+    record.setFamily("cf");
+    record.addToColumns(new Column("name", UUID.randomUUID().toString()));
+    return record;
+  }
+
+  @Test
+  public void testGetSplits() throws IOException, InterruptedException {
+    BlurInputFormat format = new BlurInputFormat();
+    Configuration conf = new Configuration();
+    Job job = new Job(conf);
+    FileInputFormat.addInputPath(job, indexPath);
+    JobID jobId = new JobID();
+    JobContext context = new JobContext(job.getConfiguration(), jobId);
+    List<InputSplit> list = format.getSplits(context);
+    for (int i = 0; i < list.size(); i++) {
+      BlurInputSplit split = (BlurInputSplit) list.get(i);
+      Path path = new Path(indexPath, BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, i));
+      FileSystem fileSystem = path.getFileSystem(conf);
+      assertEquals(new BlurInputSplit(fileSystem.makeQualified(path), "_0", 0, Integer.MAX_VALUE), split);
+    }
+  }
+
+  @Test
+  public void testCreateRecordReader() throws IOException, InterruptedException {
+    BlurInputFormat format = new BlurInputFormat();
+    Configuration conf = new Configuration();
+    Job job = new Job(conf);
+    FileInputFormat.addInputPath(job, indexPath);
+    JobID jobId = new JobID();
+    JobContext context = new JobContext(job.getConfiguration(), jobId);
+    List<InputSplit> list = format.getSplits(context);
+    for (int i = 0; i < list.size(); i++) {
+      BlurInputSplit split = (BlurInputSplit) list.get(i);
+      TaskAttemptID taskId = new TaskAttemptID();
+      TaskAttemptContext taskContext = new TaskAttemptContext(conf, taskId);
+      RecordReader<Text, BlurRecord> reader = format.createRecordReader(split, taskContext);
+      while (reader.nextKeyValue()) {
+        System.out.println(reader.getProgress() + " " + reader.getCurrentKey() + " " + reader.getCurrentValue());
+      }
+    }
+  }
+
+}


Mime
View raw message