incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/51] [abbrv] git commit: First cut at the inputformat for blur.
Date Tue, 11 Dec 2012 02:20:58 GMT
First cut at the inputformat for blur.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/beb05ce9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/beb05ce9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/beb05ce9

Branch: refs/heads/0.2-dev
Commit: beb05ce9bb1146c8cb60a1b0f807b33ed3827c9f
Parents: 5687219
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Dec 6 16:51:11 2012 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Dec 6 16:51:11 2012 -0500

----------------------------------------------------------------------
 bin/servers.sh                                     |   34 +++
 bin/start-server.sh                                |   43 ++++
 bin/stop-server.sh                                 |   38 ++++
 .../main/java/org/apache/blur/utils/BlurUtil.java  |   26 +++
 .../blur/hadoop/io/AddDocumentsWritable.java       |   33 +++
 .../apache/blur/hadoop/io/DocumentWritable.java    |   47 ++++
 .../org/apache/blur/hadoop/io/HadoopTransport.java |   94 ++++++++
 .../org/apache/blur/hadoop/io/MutateWritable.java  |    7 +
 .../apache/blur/hadoop/io/ScoreDocWritable.java    |   47 ++++
 .../apache/blur/mapreduce/lib/BlurInputFormat.java |   95 ++++----
 .../apache/blur/mapreduce/lib/BlurInputSplit.java  |  131 ++---------
 .../apache/blur/mapreduce/lib/BlurJobSetup.java    |   31 +++
 .../blur/mapreduce/lib/BlurOutputFormat.java       |    4 +-
 .../org/apache/blur/mapreduce/lib/BlurReader.java  |  173 +++++++++++++++
 .../blur/mapreduce/lib/BlurRecordReader.java       |   91 --------
 .../blur/mapreduce/lib/BlurRecordWriter.java       |  115 ----------
 .../org/apache/blur/mapreduce/lib/BlurWriter.java  |   71 ++++++
 .../java/org/apache/blur/mapreduce/lib/Utils.java  |   74 ------
 .../serializer/MatchAllDocsQueryWritable.java      |   36 +++
 .../apache/blur/lucene/serializer/QUERY_TYPE.java  |    8 +-
 .../blur/testsuite/AddDocumentsLuceneApiTable.java |    2 +
 .../blur/testsuite/SearchLuceneApiTable.java       |    2 +-
 22 files changed, 761 insertions(+), 441 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/bin/servers.sh
----------------------------------------------------------------------
diff --git a/bin/servers.sh b/bin/servers.sh
new file mode 100755
index 0000000..6b22678
--- /dev/null
+++ b/bin/servers.sh
@@ -0,0 +1,34 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/blur-config.sh
+
+export HOSTLIST="${BLUR_HOME_CONF}/shards"
+
+for shard in `cat "$HOSTLIST"|sed  "s/#.*$//;/^$/d"`; do
+ ssh $BLUR_SSH_OPTS $shard $"${@// /\\ }" \
+   2>&1 | sed "s/^/$shard: /" &
+ if [ "$BLUR_SHARD_SLEEP" != "" ]; then
+   sleep $BLUR_SHARD_SLEEP
+ fi
+done
+
+wait
+

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/bin/start-server.sh
----------------------------------------------------------------------
diff --git a/bin/start-server.sh b/bin/start-server.sh
new file mode 100755
index 0000000..d53fcbc
--- /dev/null
+++ b/bin/start-server.sh
@@ -0,0 +1,43 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/blur-config.sh
+
+INSTANCE=0
+while [  $INSTANCE -lt $BLUR_NUMBER_OF_SERVER_INSTANCES_PER_MACHINE ]; do
+  PID_FILE=$BLUR_HOME/pids/blur-$INSTANCE.pid
+
+  if [ -f $PID_FILE ]; then
+    if kill -0 `cat $PID_FILE` > /dev/null 2>&1; then
+      echo Blur Server already running as process `cat $PID_FILE`.  Stop it first.
+      let INSTANCE=INSTANCE+1
+      continue
+    fi
+  fi
+
+  PROC_NAME=blur-server-$HOSTNAME-$INSTANCE
+  nohup "$JAVA_HOME"/bin/java -Dblur.name=$PROC_NAME -Djava.library.path=$JAVA_LIBRARY_PATH -Dblur-$INSTANCE $BLUR_JVM_OPTIONS -Dblur.logs.dir=$BLUR_LOGS -Dblur.log.file=$PROC_NAME.log -cp $BLUR_CLASSPATH org.apache.blur.thrift.ThriftBlurServer -s $INSTANCE > "$BLUR_LOGS/$PROC_NAME.out" 2>&1 < /dev/null &
+  echo $! > $PID_FILE
+  echo Blur Server [$INSTANCE] starting as process `cat $PID_FILE`.
+
+  let INSTANCE=INSTANCE+1 
+done
+
+

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/bin/stop-server.sh
----------------------------------------------------------------------
diff --git a/bin/stop-server.sh b/bin/stop-server.sh
new file mode 100755
index 0000000..42a882f
--- /dev/null
+++ b/bin/stop-server.sh
@@ -0,0 +1,38 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/blur-config.sh
+
+INSTANCE=0
+while [  $INSTANCE -lt $BLUR_NUMBER_OF_SERVER_INSTANCES_PER_MACHINE ]; do
+  PID_FILE=$BLUR_HOME/pids/blur-$INSTANCE.pid
+
+  if [ -f $PID_FILE ]; then
+    if kill -0 `cat $PID_FILE` > /dev/null 2>&1; then
+      echo Stopping Blur Server [$INSTANCE] server with pid [`cat $PID_FILE`].
+      kill `cat $PID_FILE`
+    else
+      echo No Blur Server [$INSTANCE] server to stop
+    fi
+  else
+    echo No Blur Server [$INSTANCE] server to stop
+  fi
+  let INSTANCE=INSTANCE+1 
+done
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
index b9cb4cc..1d8bea0 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -42,6 +42,7 @@ import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
 import org.apache.blur.metrics.BlurMetrics;
 import org.apache.blur.metrics.BlurMetrics.MethodCall;
 import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.QueryArgs;
 import org.apache.blur.thrift.generated.ScoreDoc;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -60,6 +61,7 @@ import org.apache.lucene.util.RamUsageEstimator;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
 import org.apache.thrift.transport.TMemoryBuffer;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -517,4 +519,28 @@ public class BlurUtil {
     return docLocations;
   }
 
+  public static String toJson(TBase base) throws IOException {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    TIOStreamTransport transport = new TIOStreamTransport(outputStream);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      base.write(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    transport.close();
+    return new String(outputStream.toByteArray());
+  }
+
+  public static void readJson(TBase base, String json) throws IOException {
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(json.getBytes());
+    TIOStreamTransport transport = new TIOStreamTransport(inputStream);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      base.read(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/AddDocumentsWritable.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/AddDocumentsWritable.java b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/AddDocumentsWritable.java
new file mode 100644
index 0000000..dc35e1a
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/AddDocumentsWritable.java
@@ -0,0 +1,33 @@
+package org.apache.blur.hadoop.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+public class AddDocumentsWritable extends MutateWritable implements Writable {
+  
+  private List<DocumentWritable> documents = new ArrayList<DocumentWritable>();
+
+  public List<DocumentWritable> getDocuments() {
+    return documents;
+  }
+
+  public void setDocuments(List<DocumentWritable> documents) {
+    this.documents = documents;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DocumentWritable.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DocumentWritable.java b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DocumentWritable.java
new file mode 100644
index 0000000..d9ebe4c
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/DocumentWritable.java
@@ -0,0 +1,47 @@
+package org.apache.blur.hadoop.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.blur.thrift.generated.Document;
+import org.apache.hadoop.io.Writable;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+public class DocumentWritable extends Document implements Writable {
+
+  public DocumentWritable() {
+
+  }
+
+  public DocumentWritable(Document doc) {
+    super(doc);
+  }
+
+  public DocumentWritable(DocumentWritable doc) {
+    super(doc);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    HadoopTransport transport = new HadoopTransport(out);
+    TCompactProtocol protocol = new TCompactProtocol(transport);
+    try {
+      this.write(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    HadoopTransport transport = new HadoopTransport(in);
+    TCompactProtocol protocol = new TCompactProtocol(transport);
+    try {
+      this.read(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/HadoopTransport.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/HadoopTransport.java b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/HadoopTransport.java
new file mode 100644
index 0000000..10ce10e
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/HadoopTransport.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.blur.hadoop.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class HadoopTransport extends TTransport {
+
+  private final InputStream in;
+  private final DataOutput out;
+
+  public HadoopTransport(final DataInput in) {
+    if (in instanceof InputStream) {
+      this.in = (InputStream) in;
+    } else {
+      this.in = new InputStream() {
+        @Override
+        public int read() throws IOException {
+          return in.readByte() & 0xff;
+        }
+      };
+    }
+    this.out = null;
+  }
+
+  public HadoopTransport(DataOutput out) {
+    this.in = null;
+    this.out = out;
+  }
+
+  @Override
+  public boolean isOpen() {
+    return true;
+  }
+
+  @Override
+  public void open() throws TTransportException {
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    int bytesRead;
+    try {
+      bytesRead = in.read(buf, off, len);
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+    if (bytesRead < 0) {
+      throw new TTransportException(TTransportException.END_OF_FILE);
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    try {
+      out.write(buf, off, len);
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+  }
+
+  @Override
+  public void flush() throws TTransportException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/MutateWritable.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/MutateWritable.java b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/MutateWritable.java
new file mode 100644
index 0000000..db39f38
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/MutateWritable.java
@@ -0,0 +1,7 @@
+package org.apache.blur.hadoop.io;
+
+import org.apache.hadoop.io.Writable;
+
+public abstract class MutateWritable implements Writable {
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/ScoreDocWritable.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/ScoreDocWritable.java b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/ScoreDocWritable.java
new file mode 100644
index 0000000..037e953
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/hadoop/io/ScoreDocWritable.java
@@ -0,0 +1,47 @@
+package org.apache.blur.hadoop.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.blur.thrift.generated.ScoreDoc;
+import org.apache.hadoop.io.Writable;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+public class ScoreDocWritable extends ScoreDoc implements Writable {
+  
+  public ScoreDocWritable() {
+    
+  }
+  
+  public ScoreDocWritable(ScoreDoc doc) {
+    super(doc);
+  }
+  
+  public ScoreDocWritable(ScoreDocWritable doc) {
+    super(doc);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    HadoopTransport transport = new HadoopTransport(out);
+    TCompactProtocol protocol = new TCompactProtocol(transport);
+    try {
+      this.write(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    HadoopTransport transport = new HadoopTransport(in);
+    TCompactProtocol protocol = new TCompactProtocol(transport);
+    try {
+      this.read(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
index b3419dd..e69b6d2 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
@@ -16,56 +16,55 @@ package org.apache.blur.mapreduce.lib;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import org.apache.blur.mapreduce.BlurRecord;
-import org.apache.hadoop.io.Text;
+import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_CONNECTION_STR;
+import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_TABLE_NAME;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.hadoop.io.DocumentWritable;
+import org.apache.blur.hadoop.io.ScoreDocWritable;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.TException;
+
+public class BlurInputFormat extends InputFormat<ScoreDocWritable, DocumentWritable> {
 
+  @Override
+  public RecordReader<ScoreDocWritable, DocumentWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+    BlurReader blurReader = new BlurReader();
+    blurReader.initialize(split, context);
+    return blurReader;
+  }
 
-public abstract class BlurInputFormat extends InputFormat<Text, BlurRecord> {
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+    Configuration configuration = context.getConfiguration();
+    String connectionStr = configuration.get(BLUR_CONNECTION_STR);
+    String tableName = configuration.get(BLUR_TABLE_NAME);
+    Iface client = BlurClient.getClient(connectionStr);
+    try {
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      TableDescriptor describe = client.describe(tableName);
+      int shardCount = describe.getShardCount();
+      for (int i = 0; i < shardCount; i++) {
+        splits.add(new BlurInputSplit(i));
+      }
+      return splits;
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
 
-//  @SuppressWarnings("unchecked")
-//  @Override
-//  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
-//    List<?> splits = new ArrayList<Object>();
-//    Path[] paths = FileInputFormat.getInputPaths(context);
-//    for (Path path : paths) {
-//      findAllSegments(context.getConfiguration(), path, splits);
-//    }
-//    return (List<InputSplit>) splits;
-//  }
-//
-//  @Override
-//  public RecordReader<Text, BlurRecord> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-//    BlurRecordReader blurRecordReader = new BlurRecordReader();
-//    blurRecordReader.initialize(split, context);
-//    return blurRecordReader;
-//  }
-//
-//  public static void findAllSegments(Configuration configuration, Path path, List<?> splits) throws IOException {
-//    FileSystem fileSystem = path.getFileSystem(configuration);
-//    if (fileSystem.isFile(path)) {
-//      return;
-//    } else {
-//      FileStatus[] listStatus = fileSystem.listStatus(path);
-//      for (FileStatus status : listStatus) {
-//        Path p = status.getPath();
-//        HdfsDirectory directory = new HdfsDirectory(p);
-//        if (IndexReader.indexExists(directory)) {
-//          addSplits(directory, splits);
-//        } else {
-//          findAllSegments(configuration, p, splits);
-//        }
-//      }
-//    }
-//  }
-//
-//  @SuppressWarnings("unchecked")
-//  public static void addSplits(HdfsDirectory directory, @SuppressWarnings("rawtypes") List splits) throws IOException {
-//    IndexCommit commit = Utils.findLatest(directory);
-//    List<String> segments = Utils.getSegments(directory, commit);
-//    for (String segment : segments) {
-//      BlurInputSplit split = new BlurInputSplit(directory.getHdfsDirPath(), segment, 0, Integer.MAX_VALUE);
-//      splits.add(split);
-//    }
-//  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java
index 7407ba4..4c76d9b 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputSplit.java
@@ -16,142 +16,57 @@ package org.apache.blur.mapreduce.lib;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+/*
+ * Creates a split for shard in the table, current not location aware.
+ */
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 
-public class BlurInputSplit extends InputSplit implements Writable, org.apache.hadoop.mapred.InputSplit {
-
-  private int _endingDocId;
-  private int _startingDocId;
-  private String _segmentName;
-  private Path _path;
-
-  public BlurInputSplit() {
-
-  }
-
-  public BlurInputSplit(Path path, String segmentName, int startingDocId, int endingDocId) {
-    _endingDocId = endingDocId;
-    _startingDocId = startingDocId;
-    _segmentName = segmentName;
-    _path = path;
-  }
-
-  @Override
-  public long getLength() {
-    return _endingDocId - _startingDocId;
-  }
-
-  @Override
-  public String[] getLocations() {
-    return new String[] {};
-  }
-
-  public Path getIndexPath() {
-    return _path;
-  }
+public class BlurInputSplit extends InputSplit implements Writable {
 
-  public String getSegmentName() {
-    return _segmentName;
-  }
+  private int shard;
 
-  public int getStartingDocId() {
-    return _startingDocId;
+  public int getShard() {
+    return shard;
   }
 
-  public int getEndingDocId() {
-    return _endingDocId;
+  public void setShard(int shard) {
+    this.shard = shard;
   }
 
-  public void setEndingDocId(int endingDocId) {
-    _endingDocId = endingDocId;
-  }
-
-  public void setStartingDocId(int startingDocId) {
-    _startingDocId = startingDocId;
-  }
-
-  public void setSegmentName(String segmentName) {
-    _segmentName = segmentName;
-  }
-
-  public void setPath(Path path) {
-    _path = path;
-  }
+  public BlurInputSplit() {
 
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(_startingDocId);
-    out.writeInt(_endingDocId);
-    writeString(out, _segmentName);
-    writeString(out, _path.toUri().toString());
   }
 
-  private void writeString(DataOutput out, String s) throws IOException {
-    byte[] bs = s.getBytes();
-    out.writeInt(bs.length);
-    out.write(bs);
+  public BlurInputSplit(int shard) {
+    setShard(shard);
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    _startingDocId = in.readInt();
-    _endingDocId = in.readInt();
-    _segmentName = readString(in);
-    _path = new Path(readString(in));
-  }
-
-  private String readString(DataInput in) throws IOException {
-    int length = in.readInt();
-    byte[] buf = new byte[length];
-    in.readFully(buf);
-    return new String(buf);
+    shard = in.readInt();
   }
 
   @Override
-  public String toString() {
-    return "path=" + _path + ", segmentName=" + _segmentName + ", startingDocId=" + _startingDocId + ", endingDocId=" + _endingDocId;
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(shard);
   }
 
   @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + _endingDocId;
-    result = prime * result + ((_path == null) ? 0 : _path.hashCode());
-    result = prime * result + ((_segmentName == null) ? 0 : _segmentName.hashCode());
-    result = prime * result + _startingDocId;
-    return result;
+  public long getLength() throws IOException, InterruptedException {
+    // Get the result size of this shard.
+    return 0;
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    BlurInputSplit other = (BlurInputSplit) obj;
-    if (_endingDocId != other._endingDocId)
-      return false;
-    if (_path == null) {
-      if (other._path != null)
-        return false;
-    } else if (!_path.equals(other._path))
-      return false;
-    if (_segmentName == null) {
-      if (other._segmentName != null)
-        return false;
-    } else if (!_segmentName.equals(other._segmentName))
-      return false;
-    if (_startingDocId != other._startingDocId)
-      return false;
-    return true;
+  public String[] getLocations() throws IOException, InterruptedException {
+    // Get the location of this shard.
+    return new String[] {};
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurJobSetup.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurJobSetup.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurJobSetup.java
new file mode 100644
index 0000000..8faeda8
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurJobSetup.java
@@ -0,0 +1,31 @@
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.blur.thrift.generated.QueryArgs;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+public class BlurJobSetup {
+  
+  public static final String BLUR_QUERY = "blur.query";
+  public static final String BLUR_FIELDS = "blur.fields";
+  public static final String BLUR_TABLE_NAME = "blur.tableName";
+  public static final String BLUR_SESSION_ID = "blur.sessionId";
+  public static final String BLUR_CONNECTION_STR = "blur.connectionStr";
+  public static final String BLUR_WAIT_TO_BE_VISIBLE = "blur.waitToBeVisible";
+  public static final String BLUR_WRITE_AHEAD_LOG = "blur.writeAheadLog";
+  
+  public static void setupJob(Job job, String connectionStr, String tableName, Set<String> fields, QueryArgs queryArgs) throws IOException {
+    Configuration configuration = job.getConfiguration();
+    configuration.set(BLUR_CONNECTION_STR, connectionStr);
+    configuration.set(BLUR_SESSION_ID, UUID.randomUUID().toString());
+    configuration.set(BLUR_TABLE_NAME, tableName);
+    configuration.set(BLUR_QUERY, BlurUtil.toJson(queryArgs));
+    configuration.setStrings(BLUR_FIELDS, fields.toArray(new String[fields.size()]));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index b3fd106..374a7a6 100644
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -36,12 +36,12 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurRecord> {
 
   @Override
   public RecordWriter<Text, BlurRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-    return new BlurRecordWriter(context);
+    return null;
   }
 
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-    return new BlurOutputCommitter(context);
+    return null;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java
new file mode 100644
index 0000000..662daf9
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurReader.java
@@ -0,0 +1,173 @@
+package org.apache.blur.mapreduce.lib;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_CONNECTION_STR;
+import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_SESSION_ID;
+import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_TABLE_NAME;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.blur.hadoop.io.DocumentWritable;
+import org.apache.blur.hadoop.io.ScoreDocWritable;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Document;
+import org.apache.blur.thrift.generated.QueryArgs;
+import org.apache.blur.thrift.generated.ScoreDoc;
+import org.apache.blur.thrift.generated.Session;
+import org.apache.blur.thrift.generated.TopFieldDocs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.TException;
+public class BlurReader extends RecordReader<ScoreDocWritable, DocumentWritable> {
+
+  private Iface client;
+  private Session session;
+  private Set<String> fields;
+  private QueryArgs queryArgs;
+
+  private long totalHits = -1;
+  private long totalPosition = -1;
+
+  private int position = 0;
+  private List<ScoreDoc> scoreDocs = new ArrayList<ScoreDoc>();
+  private List<Document> documents = new ArrayList<Document>();
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+    Configuration configuration = context.getConfiguration();
+
+    String connectionStr = configuration.get(BLUR_CONNECTION_STR);
+    String sessionId = configuration.get(BLUR_SESSION_ID);
+    String tableName = configuration.get(BLUR_TABLE_NAME);
+    String[] fieldArray = configuration.getStrings(BLUR_TABLE_NAME);
+    if (fieldArray != null) {
+      fields = new HashSet<String>(Arrays.asList(fieldArray));
+    }
+
+    // queryArgs = new QueryArgs();
+    // queryArgs.setNumberToFetch(1000);
+    // // read query args here
+    //
+    // Query query = new MatchAllDocsQuery();
+    //
+    // QueryWritable queryWritable = new QueryWritable(query);
+    // DataOutputBuffer buffer = new DataOutputBuffer();
+    // queryWritable.write(buffer);
+    // buffer.close();
+    //
+    // queryArgs.setQuery(trim(buffer));
+    //
+    // // ////////
+
+    configureForThisSplit((BlurInputSplit) split, queryArgs);
+
+    session = new Session(sessionId, tableName);
+    client = BlurClient.getClient(connectionStr);
+    search();
+  }
+
+  // private static byte[] trim(DataOutputBuffer buffer) {
+  // byte[] buf = new byte[buffer.getLength()];
+  // System.arraycopy(buffer.getData(), 0, buf, 0, buf.length);
+  // return buf;
+  // }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    totalPosition++;
+    position++;
+    if (totalPosition >= totalHits) {
+      return false;
+    }
+    if (position >= documents.size()) {
+      search();
+      position++;
+      if (totalPosition < totalHits) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private void search() throws IOException {
+    try {
+      if (!scoreDocs.isEmpty()) {
+        queryArgs.setAfter(scoreDocs.get(scoreDocs.size() - 1));
+      }
+      List<TopFieldDocs> results = client.search(session, queryArgs);
+      TopFieldDocs topFieldDocs = results.get(0);
+      totalHits = topFieldDocs.getTotalHits();
+      scoreDocs = topFieldDocs.getScoreDocs();
+      documents = client.doc(session, getDocLocations(topFieldDocs), fields);
+      position = -1;
+    } catch (BlurException e) {
+      throw new IOException(e);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public ScoreDocWritable getCurrentKey() throws IOException, InterruptedException {
+    return new ScoreDocWritable(scoreDocs.get(position));
+  }
+
+  @Override
+  public DocumentWritable getCurrentValue() throws IOException, InterruptedException {
+    return new DocumentWritable(documents.get(position));
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    if (totalHits < 0) {
+      return 0;
+    }
+    return totalPosition / (float) totalHits;
+  }
+
+  private void configureForThisSplit(BlurInputSplit split, QueryArgs queryArgs) {
+    int shard = split.getShard();
+    queryArgs.setShardIndexes(Arrays.asList(shard));
+  }
+  
+  private List<Long> getDocLocations(TopFieldDocs topFieldDocs) {
+    List<ScoreDoc> scoreDocs = topFieldDocs.scoreDocs;
+    List<Long> docLocations = new ArrayList<Long>();
+    for (ScoreDoc scoreDoc : scoreDocs) {
+      docLocations.add(scoreDoc.getDocLocation());
+    }
+    return docLocations;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
deleted file mode 100644
index e34a2b3..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordReader.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.blur.mapreduce.BlurRecord;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.RecordReader;
-
-
-public abstract class BlurRecordReader extends RecordReader<Text, BlurRecord> {
-
-//  private IndexReader reader;
-//  private Directory directory;
-//  private int startingDocId;
-//  private int endingDocId;
-//  private int position;
-//  private Text rowid = new Text();
-//  private BlurRecord record = new BlurRecord();
-//
-//  @Override
-//  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-//    BlurInputSplit blurSplit = (BlurInputSplit) split;
-//    Path path = blurSplit.getIndexPath();
-//    String segmentName = blurSplit.getSegmentName();
-//    startingDocId = blurSplit.getStartingDocId();
-//    endingDocId = blurSplit.getEndingDocId();
-//    directory = new HdfsDirectory(context.getConfiguration(), path);
-//
-//    IndexCommit commit = Utils.findLatest(directory);
-//    reader = Utils.openSegmentReader(directory, commit, segmentName, Utils.getTermInfosIndexDivisor(context.getConfiguration()));
-//    int maxDoc = reader.maxDoc();
-//    if (endingDocId >= maxDoc) {
-//      endingDocId = maxDoc - 1;
-//    }
-//    position = startingDocId - 1;
-//  }
-//
-//  @Override
-//  public boolean nextKeyValue() throws IOException, InterruptedException {
-//    do {
-//      position++;
-//      if (position > endingDocId) {
-//        return false;
-//      }
-//    } while (reader.isDeleted(position));
-//    readDocument();
-//    return true;
-//  }
-//
-//  private void readDocument() throws CorruptIndexException, IOException {
-//    Document document = reader.document(position);
-//    record.reset();
-//    rowid.set(RowDocumentUtil.readRecord(document, record));
-//  }
-//
-//  @Override
-//  public Text getCurrentKey() throws IOException, InterruptedException {
-//    return rowid;
-//  }
-//
-//  @Override
-//  public BlurRecord getCurrentValue() throws IOException, InterruptedException {
-//    return record;
-//  }
-//
-//  @Override
-//  public float getProgress() throws IOException, InterruptedException {
-//    int total = endingDocId - startingDocId;
-//    return (float) position / (float) total;
-//  }
-//
-//  @Override
-//  public void close() throws IOException {
-//    reader.close();
-//    directory.close();
-//  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
deleted file mode 100644
index f754ab3..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecordWriter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.mapreduce.BlurColumn;
-import org.apache.blur.mapreduce.BlurRecord;
-import org.apache.blur.store.hdfs.HdfsDirectory;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.core.KeywordAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Index;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.NoLockFactory;
-import org.apache.lucene.util.Version;
-
-public class BlurRecordWriter extends RecordWriter<Text, BlurRecord> {
-
-  private static Log LOG = LogFactory.getLog(BlurRecordWriter.class);
-
-  private Text prevKey = new Text();
-  private List<Document> documents = new ArrayList<Document>();
-  private IndexWriter writer;
-
-  public BlurRecordWriter(TaskAttemptContext context) throws IOException {
-    Configuration configuration = context.getConfiguration();
-    String outputPath = configuration.get("mapred.output.dir");
-    int id = context.getTaskAttemptID().getTaskID().getId();
-    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, id);
-    Path basePath = new Path(outputPath);
-    Path indexPath = new Path(basePath, shardName);
-
-    // @TODO
-    Analyzer analyzer = new KeywordAnalyzer();
-
-    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, analyzer);
-
-    // @TODO setup compressed directory, read compression codec from config,
-    // setup progressable dir, setup lock factory
-    Directory dir = new HdfsDirectory(configuration, indexPath);
-    dir.setLockFactory(NoLockFactory.getNoLockFactory());
-    writer = new IndexWriter(dir, conf);
-  }
-
-  @Override
-  public void write(Text key, BlurRecord value) throws IOException, InterruptedException {
-    if (!prevKey.equals(key)) {
-      flush();
-      prevKey.set(key);
-    }
-    add(value);
-  }
-
-  private void add(BlurRecord value) {
-    List<BlurColumn> columns = value.getColumns();
-    String family = value.getFamily();
-    Document document = new Document();
-    document.add(new Field(BlurConstants.ROW_ID, value.getRowId(), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
-    document.add(new Field(BlurConstants.RECORD_ID, value.getRecordId(), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
-    for (BlurColumn column : columns) {
-      document.add(convert(family, column));
-    }
-    documents.add(document);
-    LOG.error("Needs to use blur analyzer and field converter");
-  }
-
-  private Field convert(String family, BlurColumn column) {
-    return new Field(family + "." + column.getName(), column.getValue(), Store.YES, Index.ANALYZED_NO_NORMS);
-  }
-
-  private void flush() throws CorruptIndexException, IOException {
-    if (documents.isEmpty()) {
-      return;
-    }
-    writer.addDocuments(documents);
-    documents.clear();
-  }
-
-  @Override
-  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-    flush();
-    writer.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java
new file mode 100644
index 0000000..b80caf5
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurWriter.java
@@ -0,0 +1,71 @@
+package org.apache.blur.mapreduce.lib;
+
+import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_CONNECTION_STR;
+import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_TABLE_NAME;
+import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_WAIT_TO_BE_VISIBLE;
+import static org.apache.blur.mapreduce.lib.BlurJobSetup.BLUR_WRITE_AHEAD_LOG;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.hadoop.io.AddDocumentsWritable;
+import org.apache.blur.hadoop.io.MutateWritable;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Document;
+import org.apache.blur.thrift.generated.MutateOptions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.TException;
+
+public class BlurWriter extends RecordWriter<BytesWritable, MutateWritable> {
+
+  
+  private Iface client;
+  private MutateOptions options;
+
+  public static void main(String[] args) {
+    Configuration configuration = new Configuration();
+    configuration.set(BLUR_CONNECTION_STR, "127.0.0.1:40020");
+    configuration.set(BLUR_TABLE_NAME, "test_table");
+
+  }
+
+  public BlurWriter(Configuration configuration, int shardIndex) {
+    String connectionStr = configuration.get(BLUR_CONNECTION_STR);
+    String table = configuration.get(BLUR_TABLE_NAME);
+    boolean writeAheadLog = configuration.getBoolean(BLUR_WRITE_AHEAD_LOG, false);
+    boolean waitToBeVisible = configuration.getBoolean(BLUR_WAIT_TO_BE_VISIBLE, false);
+    client = BlurClient.getClient(connectionStr);
+    options = new MutateOptions(table, shardIndex, waitToBeVisible, writeAheadLog);
+  }
+
+  @Override
+  public void write(BytesWritable key, MutateWritable value) throws IOException, InterruptedException {
+    if (value instanceof AddDocumentsWritable) {
+      AddDocumentsWritable addDocumentsWritable = (AddDocumentsWritable) value;
+      List<Document> docs = getDocs(addDocumentsWritable.getDocuments());
+      try {
+        client.addDocuments(options, docs);
+      } catch (BlurException e) {
+        throw new IOException(e);
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private List<Document> getDocs(List<?> documents) {
+    return (List<Document>) documents;
+  }
+
+  @Override
+  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/Utils.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/Utils.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/Utils.java
deleted file mode 100644
index 36e0352..0000000
--- a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/Utils.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.SegmentInfo;
-import org.apache.lucene.index.SegmentInfoPerCommit;
-import org.apache.lucene.index.SegmentInfos;
-import org.apache.lucene.index.SegmentReader;
-import org.apache.lucene.store.Directory;
-
-public class Utils {
-
-  public static int getTermInfosIndexDivisor(Configuration conf) {
-    return 128;
-  }
-
-  public static IndexCommit findLatest(Directory dir) throws IOException {
-    Collection<IndexCommit> listCommits = DirectoryReader.listCommits(dir);
-    if (listCommits.size() == 1) {
-      return listCommits.iterator().next();
-    }
-    throw new RuntimeException("Multiple commit points not supported yet.");
-  }
-
-  public static List<String> getSegments(Directory dir, IndexCommit commit) throws CorruptIndexException, IOException {
-    SegmentInfos infos = new SegmentInfos();
-    infos.read(dir, commit.getSegmentsFileName());
-    List<String> result = new ArrayList<String>();
-    for (SegmentInfoPerCommit info : infos) {
-      result.add(info.info.name);
-    }
-    return result;
-  }
-
-//  public static IndexReader openSegmentReader(Directory directory, IndexCommit commit, String segmentName, int termInfosIndexDivisor) throws CorruptIndexException, IOException {
-//    SegmentInfos infos = new SegmentInfos();
-//    infos.read(directory, commit.getSegmentsFileName());
-//    SegmentInfo segmentInfo = null;
-//    for (SegmentInfoPerCommit info : infos) {
-//      if (segmentName.equals(info.info.name)) {
-//        segmentInfo = info.info;
-//        break;
-//      }
-//    }
-//    if (segmentInfo == null) {
-//      throw new RuntimeException("SegmentInfo for [" + segmentName + "] not found in directory [" + directory + "] for commit [" + commit + "]");
-//    }
-//    return SegmentReader.get(true, segmentInfo, termInfosIndexDivisor);
-//  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-store/src/main/java/org/apache/blur/lucene/serializer/MatchAllDocsQueryWritable.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/lucene/serializer/MatchAllDocsQueryWritable.java b/src/blur-store/src/main/java/org/apache/blur/lucene/serializer/MatchAllDocsQueryWritable.java
new file mode 100644
index 0000000..88e988b
--- /dev/null
+++ b/src/blur-store/src/main/java/org/apache/blur/lucene/serializer/MatchAllDocsQueryWritable.java
@@ -0,0 +1,36 @@
+package org.apache.blur.lucene.serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+
+public class MatchAllDocsQueryWritable extends AbtractQueryWritable<MatchAllDocsQuery> {
+
+  private MatchAllDocsQuery query;
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeFloat(query.getBoost());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    float boost = in.readFloat();
+    query = new MatchAllDocsQuery();
+    query.setBoost(boost);
+  }
+
+  @Override
+  public Query getQuery() {
+    return query;
+  }
+
+  @Override
+  public void setQuery(MatchAllDocsQuery query) {
+    this.query = query;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-store/src/main/java/org/apache/blur/lucene/serializer/QUERY_TYPE.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/lucene/serializer/QUERY_TYPE.java b/src/blur-store/src/main/java/org/apache/blur/lucene/serializer/QUERY_TYPE.java
index 005e204..495261a 100644
--- a/src/blur-store/src/main/java/org/apache/blur/lucene/serializer/QUERY_TYPE.java
+++ b/src/blur-store/src/main/java/org/apache/blur/lucene/serializer/QUERY_TYPE.java
@@ -39,7 +39,7 @@ import org.apache.lucene.search.TermRangeQuery;
 import org.apache.lucene.search.WildcardQuery;
 
 public enum QUERY_TYPE {
-  BOOLEAN((byte) 0), TERM((byte) 1), FUZZY((byte) 2);
+  BOOLEAN((byte) 0), TERM((byte) 1), FUZZY((byte) 2), MATCH_ALL_DOCS((byte) 3);
 
   private final byte type;
 
@@ -59,6 +59,8 @@ public enum QUERY_TYPE {
       return TERM;
     case 2:
       return FUZZY;
+    case 3:
+      return MATCH_ALL_DOCS;
     default:
       throw new RuntimeException("Type [" + type + "] is not supported");
     }
@@ -84,7 +86,7 @@ public enum QUERY_TYPE {
     } else if (query instanceof FuzzyLikeThisQuery) {
       throw new RuntimeException("no impl");
     } else if (query instanceof MatchAllDocsQuery) {
-      throw new RuntimeException("no impl");
+      return MATCH_ALL_DOCS;
     } else if (query instanceof MoreLikeThisQuery) {
       throw new RuntimeException("no impl");
     } else if (query instanceof MultiPhraseQuery) {
@@ -122,6 +124,8 @@ public enum QUERY_TYPE {
       return new TermQueryWritable();
     case FUZZY:
       return new FuzzyQueryWritable();
+    case MATCH_ALL_DOCS:
+      return new MatchAllDocsQueryWritable();
     default:
       throw new RuntimeException("Type [" + type + "] is not supported");
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-testsuite/src/main/java/org/apache/blur/testsuite/AddDocumentsLuceneApiTable.java
----------------------------------------------------------------------
diff --git a/src/blur-testsuite/src/main/java/org/apache/blur/testsuite/AddDocumentsLuceneApiTable.java b/src/blur-testsuite/src/main/java/org/apache/blur/testsuite/AddDocumentsLuceneApiTable.java
index 2f6be97..ae9c71b 100644
--- a/src/blur-testsuite/src/main/java/org/apache/blur/testsuite/AddDocumentsLuceneApiTable.java
+++ b/src/blur-testsuite/src/main/java/org/apache/blur/testsuite/AddDocumentsLuceneApiTable.java
@@ -65,8 +65,10 @@ public class AddDocumentsLuceneApiTable {
       if (docs.size() >= batch) {
         client.addDocuments(options, docs);
         docs.clear();
+        break;
       }
       total++;
+      
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/beb05ce9/src/blur-testsuite/src/main/java/org/apache/blur/testsuite/SearchLuceneApiTable.java
----------------------------------------------------------------------
diff --git a/src/blur-testsuite/src/main/java/org/apache/blur/testsuite/SearchLuceneApiTable.java b/src/blur-testsuite/src/main/java/org/apache/blur/testsuite/SearchLuceneApiTable.java
index 5e03669..980394e 100644
--- a/src/blur-testsuite/src/main/java/org/apache/blur/testsuite/SearchLuceneApiTable.java
+++ b/src/blur-testsuite/src/main/java/org/apache/blur/testsuite/SearchLuceneApiTable.java
@@ -22,9 +22,9 @@ import java.net.Socket;
 import java.util.List;
 
 import org.apache.blur.lucene.serializer.QueryWritable;
+import org.apache.blur.thrift.generated.Blur.Client;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Document;
-import org.apache.blur.thrift.generated.ShardServer.Client;
 import org.apache.blur.thrift.generated.QueryArgs;
 import org.apache.blur.thrift.generated.Session;
 import org.apache.blur.thrift.generated.TopFieldDocs;


Mime
View raw message