incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Adding first cut of stream server and client along with a start to a spark rdd for blur. Disabled by default, considered experiemental at this point.
Date Mon, 28 Sep 2015 14:45:21 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 0eaefea45 -> 9ae5bf35b


Adding first cut of stream server and client along with a start to a spark rdd for blur.  Disabled by default, considered experiemental at this point.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9ae5bf35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9ae5bf35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9ae5bf35

Branch: refs/heads/master
Commit: 9ae5bf35b5eea0015e456873e8288961da70a9aa
Parents: 0eaefea
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Sep 28 10:45:06 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Sep 28 10:45:06 2015 -0400

----------------------------------------------------------------------
 .../blur/command/stream/StreamClient.java       | 180 ++++++++++++
 .../blur/command/stream/StreamCommand.java      |  30 ++
 .../blur/command/stream/StreamComplete.java     |  25 ++
 .../apache/blur/command/stream/StreamError.java |  53 ++++
 .../blur/command/stream/StreamFunction.java     |  27 ++
 .../blur/command/stream/StreamIndexContext.java |  86 ++++++
 .../blur/command/stream/StreamProcessor.java    | 225 +++++++++++++++
 .../blur/command/stream/StreamServer.java       | 241 ++++++++++++++++
 .../apache/blur/command/stream/StreamSplit.java |  64 +++++
 .../apache/blur/command/stream/StreamUtil.java  |  56 ++++
 .../blur/command/stream/StreamWriter.java       |  27 ++
 .../blur/thrift/ThriftBlurShardServer.java      |  24 +-
 .../blur/command/stream/StreamServerTest.java   | 285 +++++++++++++++++++
 blur-spark/pom.xml                              |  10 +-
 .../java/org/apache/blur/spark/BlurRDD.java     | 175 ++++++++++++
 .../org/apache/blur/spark/BlurSparkSplit.java   |  51 ++++
 .../org/apache/blur/spark/BlurSparkUtil.java    | 168 +++++++++++
 .../org/apache/blur/spark/UsingBlurRDD.java     |  83 ++++++
 blur-stream-client/pom.xml                      | 116 --------
 blur-stream-server/pom.xml                      | 116 --------
 .../org/apache/blur/utils/BlurConstants.java    |   2 +
 .../src/main/resources/blur-default.properties  |   3 +
 22 files changed, 1806 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java
new file mode 100644
index 0000000..20d1bcd
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.stream;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+
+import com.google.common.io.Closer;
+
+public class StreamClient implements Closeable {
+
+  private static final Log LOG = LogFactory.getLog(StreamClient.class);
+
+  private final String _host;
+  private final int _port;
+  private final Socket _socket;
+  private final DataInputStream _dataInputStream;
+  private final DataOutputStream _dataOutputStream;
+  private final Closer _closer = Closer.create();
+
+  public StreamClient(String host, int port, int timeout) throws IOException {
+    _host = host;
+    _port = port;
+    _socket = _closer.register(new Socket(_host, _port));
+    _socket.setTcpNoDelay(true);
+    _socket.setSoTimeout(timeout);
+    _dataInputStream = new DataInputStream(_closer.register(_socket.getInputStream()));
+    _dataOutputStream = new DataOutputStream(_closer.register(_socket.getOutputStream()));
+  }
+
+  public String getHost() {
+    return _host;
+  }
+
+  public int getPort() {
+    return _port;
+  }
+
+  public DataInputStream getDataInputStream() {
+    return _dataInputStream;
+  }
+
+  public DataOutputStream getDataOutStream() {
+    return _dataOutputStream;
+  }
+
+  @Override
+  public void close() throws IOException {
+    _closer.close();
+  }
+
+  public boolean isClassLoaderAvailable(String classLoaderId) throws IOException {
+    _dataOutputStream.write(StreamCommand.CLASS_LOAD_CHECK.getCommand());
+    StreamUtil.writeString(_dataOutputStream, classLoaderId);
+    _dataOutputStream.flush();
+    String message = StreamUtil.readString(_dataInputStream);
+    if (message.equals("OK")) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public void loadJars(String classLoaderId, Iterable<String> testJars) throws IOException {
+    _dataOutputStream.write(StreamCommand.CLASS_LOAD.getCommand());
+    StreamUtil.writeString(_dataOutputStream, classLoaderId);
+    sendZip(testJars, _dataOutputStream);
+    _dataOutputStream.flush();
+    String message = StreamUtil.readString(_dataInputStream);
+    if (!message.equals("LOADED")) {
+      throw new IOException("Unknown error during load, check logs on server.");
+    }
+  }
+
+  private void sendZip(Iterable<String> testJars, DataOutputStream outputStream) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    ZipOutputStream zipOutputStream = new ZipOutputStream(out);
+    for (String jar : testJars) {
+      LOG.info("Sending jar [{0}]", jar);
+      File file = new File(jar);
+      FileInputStream in = new FileInputStream(file);
+      zipOutputStream.putNextEntry(new ZipEntry(file.getName()));
+      IOUtils.copy(in, zipOutputStream);
+      in.close();
+      zipOutputStream.closeEntry();
+    }
+    zipOutputStream.close();
+    byte[] bs = out.toByteArray();
+    outputStream.writeInt(bs.length);
+    outputStream.write(bs);
+  }
+
+  public void loadJars(String classLoaderId, String... testJars) throws IOException {
+    loadJars(classLoaderId, Arrays.asList(testJars));
+  }
+
+  public <T> Iterable<T> executeStream(StreamSplit streamSplit, StreamFunction<T> streamFunction) throws IOException {
+    return new Iterable<T>() {
+      @Override
+      public Iterator<T> iterator() {
+        try {
+          _dataOutputStream.write(StreamCommand.STREAM.getCommand());
+          byte[] streamSplitBytes = StreamUtil.toBytes(streamSplit.copy());
+          byte[] streamFunctionBytes = StreamUtil.toBytes(streamFunction);
+          _dataOutputStream.writeInt(streamSplitBytes.length);
+          _dataOutputStream.write(streamSplitBytes);
+          _dataOutputStream.writeInt(streamFunctionBytes.length);
+          _dataOutputStream.write(streamFunctionBytes);
+          _dataOutputStream.flush();
+          ObjectInputStream objectInputStream = new ObjectInputStream(_dataInputStream);
+          return new Iterator<T>() {
+
+            private boolean _more = true;
+            private Object _obj;
+
+            @Override
+            public boolean hasNext() {
+              if (!_more) {
+                return false;
+              }
+              if (_obj != null) {
+                return true;
+              }
+              Object o;
+              try {
+                o = objectInputStream.readObject();
+              } catch (ClassNotFoundException | IOException e) {
+                throw new RuntimeException(e);
+              }
+              if (o instanceof StreamComplete) {
+                return _more = false;
+              }
+              _obj = o;
+              return true;
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public T next() {
+              T o = (T) _obj;
+              _obj = null;
+              return o;
+            }
+          };
+        } catch (IOException e) {
+          throw new RuntimeException("Unknown error.", e);
+        }
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamCommand.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamCommand.java
new file mode 100644
index 0000000..95563dc
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamCommand.java
@@ -0,0 +1,30 @@
+package org.apache.blur.command.stream;
+
+public enum StreamCommand {
+  STREAM(1), CLASS_LOAD_CHECK(2), CLASS_LOAD(3), CLOSE(-1);
+
+  private final int _command;
+
+  private StreamCommand(int command) {
+    _command = command;
+  }
+
+  public int getCommand() {
+    return _command;
+  }
+
+  public static StreamCommand find(int command) {
+    switch (command) {
+    case -1:
+      return CLOSE;
+    case 1:
+      return STREAM;
+    case 2:
+      return CLASS_LOAD_CHECK;
+    case 3:
+      return CLASS_LOAD;
+    default:
+      throw new RuntimeException("Command [" + command + "] not found.");
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamComplete.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamComplete.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamComplete.java
new file mode 100644
index 0000000..95f2706
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamComplete.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.stream;
+
+import java.io.Serializable;
+
+public class StreamComplete implements Serializable {
+
+  private static final long serialVersionUID = 4117959719555234581L;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java
new file mode 100644
index 0000000..468cd56
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.stream;
+
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+
+public class StreamError implements Serializable {
+
+  private static final long serialVersionUID = 5624998869726795714L;
+  
+  private final String _message;
+  private final String _stack;
+
+  public StreamError(Exception e) {
+    _message = e.getMessage();
+    Throwable cause = e.getCause();
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    cause.printStackTrace(pw);
+    pw.close();
+    _stack = sw.toString();
+  }
+
+  public String getMessage() {
+    return _message;
+  }
+
+  public String getStack() {
+    return _stack;
+  }
+
+  @Override
+  public String toString() {
+    return "StreamError [_message=" + _message + ", _stack=" + _stack + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamFunction.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamFunction.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamFunction.java
new file mode 100644
index 0000000..7434261
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamFunction.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.stream;
+
+import java.io.Serializable;
+
+import org.apache.blur.command.IndexContext;
+
+public interface StreamFunction<T> extends Serializable {
+
+  void call(IndexContext indexContext, StreamWriter<T> writer) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamIndexContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamIndexContext.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamIndexContext.java
new file mode 100644
index 0000000..6bcc20c
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamIndexContext.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.stream;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.Shard;
+import org.apache.blur.lucene.search.IndexSearcherCloseable;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.lucene.index.IndexReader;
+
+public class StreamIndexContext extends IndexContext implements Closeable {
+
+  private final ShardContext _shardContext;
+  private final TableContext _tableContext;
+  private final IndexSearcherCloseable _indexSearcher;
+  private final IndexReader _indexReader;
+  private final Shard _shard;
+
+  public StreamIndexContext(BlurIndex blurIndex) throws IOException {
+    _shardContext = blurIndex.getShardContext();
+    _tableContext = _shardContext.getTableContext();
+    _indexSearcher = blurIndex.getIndexSearcher();
+    _indexReader = _indexSearcher.getIndexReader();
+    _shard = new Shard(_tableContext.getTable(), _shardContext.getShard());
+  }
+
+  @Override
+  public TableContext getTableContext(String table) throws IOException {
+    throw new RuntimeException("Not supported.");
+  }
+
+  @Override
+  public BlurConfiguration getBlurConfiguration(String table) throws IOException {
+    throw new RuntimeException("Not supported.");
+  }
+
+  @Override
+  public TableContext getTableContext() throws IOException {
+    return _tableContext;
+  }
+
+  @Override
+  public Shard getShard() {
+    return _shard;
+  }
+
+  @Override
+  public IndexSearcherCloseable getIndexSearcher() {
+    return _indexSearcher;
+  }
+
+  @Override
+  public IndexReader getIndexReader() {
+    return _indexReader;
+  }
+
+  @Override
+  public BlurConfiguration getBlurConfiguration() throws IOException {
+    return _tableContext.getBlurConfiguration();
+  }
+
+  @Override
+  public void close() throws IOException {
+    _indexSearcher.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java
new file mode 100644
index 0000000..754ef40
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.OutputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.IndexServer;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.commons.io.IOUtils;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public class StreamProcessor {
+
+  private static final Log LOG = LogFactory.getLog(StreamProcessor.class);
+
+  private final IndexServer _indexServer;
+  private final Map<String, ClassLoader> _classLoaderMap;
+  private final File _tmpFile;
+
+  public StreamProcessor(IndexServer indexServer, File tmpFile) {
+    _indexServer = indexServer;
+    _classLoaderMap = CacheBuilder.newBuilder().concurrencyLevel(4).maximumSize(128)
+        .expireAfterAccess(45, TimeUnit.SECONDS).removalListener(new RemovalListener<String, ClassLoader>() {
+          @Override
+          public void onRemoval(RemovalNotification<String, ClassLoader> notification) {
+            String key = notification.getKey();
+            LOG.info("Unloading classLoaderId [{0}]", key);
+            File file = new File(_tmpFile, key);
+            if (!rmr(file)) {
+              LOG.error("Could not remove file [{0}]", file);
+            }
+          }
+        }).build().asMap();
+
+    _tmpFile = tmpFile;
+  }
+
+  protected boolean rmr(File file) {
+    boolean success = true;
+    if (file.exists()) {
+      if (file.isDirectory()) {
+        for (File f : file.listFiles()) {
+          if (!rmr(f)) {
+            success = false;
+          }
+        }
+      }
+      if (!file.delete()) {
+        success = false;
+      }
+    }
+    return success;
+  }
+
+  public StreamIndexContext getIndexContext(final String table, final String shard) throws IOException {
+    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
+    if (indexes == null) {
+      throw new IOException("Table [" + table + "] is not being served by this server.");
+    }
+    BlurIndex blurIndex = indexes.get(shard);
+    if (blurIndex == null) {
+      throw new IOException("Shard [" + shard + "] for table [" + table + "] is not being served by this server.");
+    }
+    return new StreamIndexContext(blurIndex);
+  }
+
+  public <T> void execute(StreamFunction<T> function, OutputStream outputStream, IndexContext indexContext)
+      throws IOException {
+    final ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
+    StreamWriter<T> writer = getWriter(objectOutputStream);
+    try {
+      function.call(indexContext, writer);
+    } catch (Exception e) {
+      LOG.error("Unknown error.", e);
+      objectOutputStream.writeObject(new StreamError(e));
+    } finally {
+      objectOutputStream.writeObject(new StreamComplete());
+      objectOutputStream.close();
+    }
+  }
+
+  private <T> StreamWriter<T> getWriter(final ObjectOutputStream objectOutputStream) {
+    final WriteLock writeLock = new ReentrantReadWriteLock(true).writeLock();
+    return new StreamWriter<T>() {
+      @Override
+      public void write(T obj) throws IOException {
+        writeLock.lock();
+        try {
+          objectOutputStream.writeObject(obj);
+        } finally {
+          writeLock.unlock();
+        }
+      }
+
+      @Override
+      public void write(Iterable<T> it) throws IOException {
+        writeLock.lock();
+        try {
+          for (T t : it) {
+            objectOutputStream.writeObject(t);
+          }
+        } finally {
+          writeLock.unlock();
+        }
+      }
+    };
+  }
+
+  public StreamFunction<?> getStreamFunction(String classLoaderId, InputStream inputStream) throws IOException {
+    final ClassLoader classLoader = getClassLoader(classLoaderId);
+    ObjectInputStream objectInputStream = new ObjectInputStream(inputStream) {
+      @Override
+      protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+        return classLoader.loadClass(desc.getName());
+      }
+    };
+    try {
+      return (StreamFunction<?>) objectInputStream.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    } finally {
+      objectInputStream.close();
+    }
+  }
+
+  private ClassLoader getClassLoader(String classLoaderId) throws IOException {
+    ClassLoader classLoader = _classLoaderMap.get(classLoaderId);
+    if (classLoader == null) {
+      throw new IOException("ClassLoaderId [" + classLoaderId + "] not found.");
+    }
+    return classLoader;
+  }
+
+  public synchronized void loadClassLoader(String classLoaderId, DataInputStream inputStream) throws IOException {
+    if (_classLoaderMap.containsKey(classLoaderId)) {
+      // read input and discard
+      int length = inputStream.readInt();
+      byte[] buf = new byte[length];
+      inputStream.readFully(buf);
+      return;
+    }
+
+    LOG.info("Class Loader [{0}] Starting", classLoaderId);
+    File copyJarsLocally = copyJarsLocally(classLoaderId, inputStream);
+    List<URL> urls = new ArrayList<URL>();
+    for (File f : copyJarsLocally.listFiles()) {
+      URL url = f.toURI().toURL();
+      LOG.info("Class Loader [{0}] Loading [{1}]", classLoaderId, url);
+      urls.add(url);
+    }
+    URLClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[] {}));
+    _classLoaderMap.put(classLoaderId, classLoader);
+    LOG.info("Class Loader [{0}] Complete", classLoaderId);
+  }
+
+  private File copyJarsLocally(String classLoaderId, DataInputStream inputStream) throws IOException,
+      FileNotFoundException {
+    int length = inputStream.readInt();
+    byte[] buf = new byte[length];
+    inputStream.readFully(buf);
+    ZipInputStream zipInputStream = new ZipInputStream(new ByteArrayInputStream(buf));
+    try {
+      ZipEntry zipEntry;
+      File dir = new File(_tmpFile, classLoaderId);
+      dir.mkdirs();
+      while ((zipEntry = zipInputStream.getNextEntry()) != null) {
+        if (zipEntry.isDirectory()) {
+          throw new IOException("Dirs in delivery zip are not supported.");
+        }
+        String name = zipEntry.getName();
+        File file = new File(dir, name);
+        FileOutputStream output = new FileOutputStream(file);
+        IOUtils.copy(zipInputStream, output);
+        output.close();
+      }
+      return dir;
+    } finally {
+      zipInputStream.close();
+    }
+  }
+
+  public boolean isClassLoaderLoaded(String id) {
+    return _classLoaderMap.containsKey(id);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java
new file mode 100644
index 0000000..82bf6a2
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.concurrent.Executors;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.user.User;
+import org.apache.blur.user.UserContext;
+import org.apache.commons.io.IOUtils;
+
+import com.google.common.io.Closer;
+
+public class StreamServer implements Closeable {
+
+  private static final Log LOG = LogFactory.getLog(StreamServer.class);
+
+  private final int _port;
+  private final int _threadCount;
+  private final StreamProcessor _streamProcessor;
+  private final Closer _closer = Closer.create();
+  private final AtomicBoolean _running = new AtomicBoolean(true);
+
+  private ServerSocket _serverSocket;
+  private ExecutorService _service;
+  private Thread _thread;
+  private int _runningPort;
+
+  public StreamServer(int port, int threadCount, StreamProcessor streamProcessor) {
+    _port = port;
+    _threadCount = threadCount;
+    _streamProcessor = streamProcessor;
+  }
+
+  @Override
+  public void close() throws IOException {
+    _closer.close();
+  }
+
+  public void start() throws IOException {
+    _service = Executors.newThreadPool("stream-server", _threadCount);
+    _serverSocket = new ServerSocket(_port, 1000);
+    _runningPort = _serverSocket.getLocalPort();
+    _thread = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        while (_running.get()) {
+          try {
+            handleSocket(_serverSocket.accept());
+          } catch (IOException e) {
+            LOG.error("Unknown error.", e);
+          }
+        }
+      }
+    });
+    _closer.register(new Closeable() {
+      @Override
+      public void close() throws IOException {
+        _running.set(false);
+        _thread.interrupt();
+      }
+    });
+    _thread.setName("stream-server-main");
+    _thread.setDaemon(true);
+    _thread.start();
+  }
+
+  protected void handleSocket(Socket socket) {
+    _service.submit(new SocketHandler(socket, _streamProcessor));
+  }
+
+  private static class SocketHandler implements Runnable {
+
+    private final Socket _socket;
+    private final Closer _closer = Closer.create();
+    private final StreamProcessor _streamProcessor;
+
+    public SocketHandler(Socket socket, StreamProcessor streamProcessor) {
+      _socket = _closer.register(socket);
+      _streamProcessor = streamProcessor;
+    }
+
+    @Override
+    public void run() {
+      InputStream inputStream;
+      OutputStream outputStream;
+      try {
+        inputStream = _closer.register(_socket.getInputStream());
+        outputStream = _closer.register(_socket.getOutputStream());
+        while (true) {
+          int read = inputStream.read();
+          StreamCommand command = StreamCommand.find(read);
+          switch (command) {
+          case STREAM: {
+            executeStream(_streamProcessor, inputStream, outputStream);
+            break;
+          }
+          case CLASS_LOAD: {
+            executeClassLoad(_streamProcessor, inputStream, outputStream);
+            break;
+          }
+          case CLASS_LOAD_CHECK: {
+            checkClassLoad(_streamProcessor, inputStream, outputStream);
+            break;
+          }
+          case CLOSE: {
+            return;
+          }
+          default:
+            throw new RuntimeException("Command [" + command + "] not supported.");
+          }
+        }
+      } catch (Throwable t) {
+        if (t instanceof SocketException) {
+          if (t.getMessage().trim().toLowerCase().equals("socket closed")) {
+            return;
+          }
+        }
+        LOG.error("Unknown error.", t);
+      } finally {
+        try {
+          _closer.close();
+        } catch (IOException e) {
+          LOG.error("Unknown Error");
+        }
+      }
+    }
+  }
+
+  public static void executeStream(StreamProcessor streamProcessor, InputStream in, OutputStream outputStream)
+      throws IOException {
+    DataInputStream inputStream = new DataInputStream(in);
+    byte[] streamSplitBytes = getObjectBytes(inputStream);
+    byte[] functionBytes = getObjectBytes(inputStream);
+    StreamSplit streamSplit = getStreamSplit(toInputStream(streamSplitBytes));
+    String table = streamSplit.getTable();
+    String shard = streamSplit.getShard();
+    String classLoaderId = streamSplit.getClassLoaderId();
+    User user = new User(streamSplit.getUser(), streamSplit.getUserAttributes());
+    UserContext.setUser(user);
+    StreamIndexContext indexContext = null;
+    try {
+      indexContext = streamProcessor.getIndexContext(table, shard);
+      StreamFunction<?> function = streamProcessor.getStreamFunction(classLoaderId, toInputStream(functionBytes));
+      streamProcessor.execute(function, outputStream, indexContext);
+    } finally {
+      IOUtils.closeQuietly(indexContext);
+      UserContext.reset();
+    }
+  }
+
+  public static void executeClassLoad(StreamProcessor streamProcessor, InputStream inputStream,
+      OutputStream outputStream) throws IOException {
+    DataInputStream in = new DataInputStream(inputStream);
+    DataOutputStream out = new DataOutputStream(outputStream);
+
+    int length = in.readInt();
+    byte[] buf = new byte[length];
+    in.readFully(buf);
+    String id = new String(buf);
+    streamProcessor.loadClassLoader(id, in);
+    byte[] bs = "LOADED".getBytes();
+    out.writeInt(bs.length);
+    out.write(bs);
+  }
+
+  public static void checkClassLoad(StreamProcessor streamProcessor, InputStream inputStream, OutputStream outputStream)
+      throws IOException {
+    DataInputStream in = new DataInputStream(inputStream);
+    DataOutputStream out = new DataOutputStream(outputStream);
+
+    int length = in.readInt();
+    byte[] buf = new byte[length];
+    in.readFully(buf);
+    String id = new String(buf);
+    byte[] bs;
+    if (streamProcessor.isClassLoaderLoaded(id)) {
+      bs = "OK".getBytes();
+    } else {
+      bs = "NOT FOUND".getBytes();
+    }
+    out.writeInt(bs.length);
+    out.write(bs);
+  }
+
+  private static InputStream toInputStream(byte[] bs) {
+    return new ByteArrayInputStream(bs);
+  }
+
+  private static byte[] getObjectBytes(DataInputStream inputStream) throws IOException {
+    int length = inputStream.readInt();
+    byte[] buf = new byte[length];
+    inputStream.readFully(buf);
+    return buf;
+  }
+
+  private static StreamSplit getStreamSplit(InputStream inputStream) throws IOException {
+    ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
+    try {
+      return (StreamSplit) objectInputStream.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    } finally {
+      objectInputStream.close();
+    }
+  }
+
+  public int getPort() {
+    return _runningPort;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamSplit.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamSplit.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamSplit.java
new file mode 100644
index 0000000..d45f473
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamSplit.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.stream;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class StreamSplit implements Serializable {
+
+  private static final long serialVersionUID = -1760098859541747672L;
+
+  private final String table;
+  private final String shard;
+  private final String classLoaderId;
+  private final String user;
+  private final Map<String, String> userAttributes;
+
+  public StreamSplit(String table, String shard, String classLoaderId, String user, Map<String, String> userAttributes) {
+    this.table = table;
+    this.shard = shard;
+    this.classLoaderId = classLoaderId;
+    this.user = user;
+    this.userAttributes = userAttributes;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public String getShard() {
+    return shard;
+  }
+
+  public String getClassLoaderId() {
+    return classLoaderId;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public Map<String, String> getUserAttributes() {
+    return userAttributes;
+  }
+
+  public StreamSplit copy() {
+    return new StreamSplit(table, shard, classLoaderId, user, userAttributes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java
new file mode 100644
index 0000000..e4811b2
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.stream;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+
+public class StreamUtil {
+
+  private static final String UTF_8 = "UTF-8";
+
+  public static void writeString(DataOutput output, String s) throws IOException {
+    try {
+      byte[] bs = s.getBytes(UTF_8);
+      output.writeInt(bs.length);
+      output.write(bs);
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("Does not supported UTF-8?", e);
+    }
+  }
+
+  public static String readString(DataInput input) throws IOException {
+    int length = input.readInt();
+    byte[] buf = new byte[length];
+    input.readFully(buf);
+    return new String(buf, UTF_8);
+  }
+
+  public static byte[] toBytes(Serializable s) throws IOException {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    ObjectOutputStream out = new ObjectOutputStream(outputStream);
+    out.writeObject(s);
+    out.close();
+    return outputStream.toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamWriter.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamWriter.java
new file mode 100644
index 0000000..8a0d728
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamWriter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.stream;
+
+import java.io.IOException;
+
+public interface StreamWriter<T> {
+
+  void write(T obj) throws IOException;
+
+  void write(Iterable<T> it) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 15d861c..00e2b22 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -51,6 +51,8 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SMALL_MERGE_THRESHO
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_THRIFT_ACCEPT_QUEUE_SIZE_PER_THREAD;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_THRIFT_MAX_READ_BUFFER_BYTES;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_THRIFT_SELECTOR_THREADS;
+import static org.apache.blur.utils.BlurConstants.BLUR_STREAM_SERVER_RUNNING_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_STREAM_SERVER_THREADS;
 import static org.apache.blur.utils.BlurConstants.BLUR_THRIFT_DEFAULT_MAX_FRAME_SIZE;
 import static org.apache.blur.utils.BlurConstants.BLUR_THRIFT_MAX_FRAME_SIZE;
 import static org.apache.blur.utils.BlurUtil.quietClose;
@@ -66,6 +68,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.command.ShardCommandManager;
+import org.apache.blur.command.stream.StreamProcessor;
+import org.apache.blur.command.stream.StreamServer;
 import org.apache.blur.concurrent.SimpleUncaughtExceptionHandler;
 import org.apache.blur.concurrent.ThreadWatcher;
 import org.apache.blur.gui.HttpJettyServer;
@@ -193,7 +197,7 @@ public class ThriftBlurShardServer extends ThriftServer {
     BlurQueryChecker queryChecker = new BlurQueryChecker(configuration);
 
     String cluster = configuration.get(BLUR_CLUSTER_NAME, BLUR_CLUSTER);
-    
+
     final ZooKeeper zooKeeper = setupZookeeper(configuration, cluster);
     final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(zooKeeper, configuration, config);
 
@@ -301,6 +305,18 @@ public class ThriftBlurShardServer extends ThriftServer {
       instanceGuiPort = 0;
     }
 
+    StreamServer streamServer;
+    int streamThreadCount = configuration.getInt(BLUR_STREAM_SERVER_THREADS, 100);
+    if (streamThreadCount > 0) {
+      StreamProcessor streamProcessor = new StreamProcessor(indexServer, tmpPath);
+      streamServer = new StreamServer(0, streamThreadCount, streamProcessor);
+      streamServer.start();
+      configuration.setInt(BLUR_STREAM_SERVER_RUNNING_PORT, streamServer.getPort());
+      LOG.info("Stream server started on port [{0}]", streamServer.getPort());
+    } else {
+      streamServer = null;
+    }
+
     final HttpJettyServer httpServer;
     if (configGuiPort >= 0) {
       httpServer = new HttpJettyServer(HttpJettyServer.class, instanceGuiPort);
@@ -339,9 +355,9 @@ public class ThriftBlurShardServer extends ThriftServer {
       @Override
       public void shutdown() {
         ThreadWatcher threadWatcher = ThreadWatcher.instance();
-        quietClose(makeCloseable(hdfsKeyValueTimer), makeCloseable(indexImporterTimer), makeCloseable(indexBulkTimer),
-            blockCacheDirectoryFactory, commandManager, traceStorage, server, shardServer, indexManager, indexServer,
-            threadWatcher, clusterStatus, zooKeeper, httpServer);
+        quietClose(streamServer, makeCloseable(hdfsKeyValueTimer), makeCloseable(indexImporterTimer),
+            makeCloseable(indexBulkTimer), blockCacheDirectoryFactory, commandManager, traceStorage, server,
+            shardServer, indexManager, indexServer, threadWatcher, clusterStatus, zooKeeper, httpServer);
       }
     };
     server.setShutdown(shutdown);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java b/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java
new file mode 100644
index 0000000..da582a3
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.command.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.lucene.search.IndexSearcherCloseable;
+import org.apache.blur.manager.IndexServer;
+import org.apache.blur.manager.writer.BlurIndex;
+import org.apache.blur.manager.writer.IndexAction;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.ShardState;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.similarities.Similarity;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+import com.google.common.io.Closer;
+
+@SuppressWarnings("serial")
+public class StreamServerTest implements Serializable {
+
+  @Test
+  public void testServer() throws IOException {
+    Closer closer = Closer.create();
+    try {
+      File tmpFile = new File("./target/tmp/StreamServerTest");
+      tmpFile.mkdirs();
+      IndexServer indexServer = new TestIndexServer();
+      StreamProcessor streamProcessor = new StreamProcessor(indexServer, tmpFile);
+      int timeout = 3000000;
+      String classLoaderId = UUID.randomUUID().toString();
+
+      StreamServer server = closer.register(new StreamServer(0, 100, streamProcessor));
+      server.start();
+      int port = server.getPort();
+      StreamClient client = closer.register(new StreamClient("localhost", port, timeout));
+      assertFalse(client.isClassLoaderAvailable(classLoaderId));
+      client.loadJars(classLoaderId, getTestJar());
+
+      String table = "test";
+      String shard = "shard";
+      String user = "test";
+      Map<String, String> userAttributes = new HashMap<String, String>();
+      StreamSplit split = new StreamSplit(table, shard, classLoaderId, user, userAttributes);
+      Iterable<String> it = client.executeStream(split, new StreamFunction<String>() {
+        @Override
+        public void call(IndexContext indexContext, StreamWriter<String> writer) throws Exception {
+          writer.write("test");
+        }
+      });
+      Iterator<String> iterator = it.iterator();
+      assertTrue(iterator.hasNext());
+      assertEquals("test", iterator.next());
+      assertFalse(iterator.hasNext());
+
+    } finally {
+      closer.close();
+    }
+  }
+
+  private String getTestJar() {
+    String property = System.getProperty("java.class.path");
+    Splitter splitter = Splitter.on(':');
+    for (String s : splitter.split(property)) {
+      if (s.endsWith(".jar")) {
+        return s;
+      }
+    }
+    throw new RuntimeException("No jars found?");
+  }
+
+  public static class TestIndexServer implements IndexServer {
+
+    @Override
+    public SortedSet<String> getShardListCurrentServerOnly(String table) throws IOException {
+      throw new RuntimeException("Not implemented.");
+    }
+
+    @Override
+    public Map<String, BlurIndex> getIndexes(String table) throws IOException {
+      Map<String, BlurIndex> map = new HashMap<String, BlurIndex>();
+      BlurIndex value = getBlurIndex();
+      map.put("shard", value);
+      return map;
+    }
+
+    @Override
+    public String getNodeName() {
+      throw new RuntimeException("Not implemented.");
+    }
+
+    @Override
+    public long getRecordCount(String table) throws IOException {
+      throw new RuntimeException("Not implemented.");
+    }
+
+    @Override
+    public long getRowCount(String table) throws IOException {
+      throw new RuntimeException("Not implemented.");
+    }
+
+    @Override
+    public long getTableSize(String table) throws IOException {
+      throw new RuntimeException("Not implemented.");
+    }
+
+    @Override
+    public void close() throws IOException {
+      throw new RuntimeException("Not implemented.");
+    }
+
+    @Override
+    public Map<String, ShardState> getShardState(String table) {
+      throw new RuntimeException("Not implemented.");
+    }
+
+    @Override
+    public long getSegmentImportInProgressCount(String table) throws IOException {
+      throw new RuntimeException("Not implemented.");
+    }
+
+    @Override
+    public long getSegmentImportPendingCount(String table) throws IOException {
+      throw new RuntimeException("Not implemented.");
+    }
+
+  }
+
+  public static BlurIndex getBlurIndex() throws IOException {
+    String shard = "shard";
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setName("test");
+    tableDescriptor.setTableUri("file:///tmp");
+    TableContext tableContext = TableContext.create(tableDescriptor);
+    ShardContext shardContext = ShardContext.create(tableContext, shard);
+    return new BlurIndex(shardContext, null, null, null, null, null, null, null) {
+
+      @Override
+      public void removeSnapshot(String name) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public void refresh() throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public void process(IndexAction indexAction) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public void optimize(int numberOfSegmentsPerShard) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public AtomicBoolean isClosed() {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public List<String> getSnapshots() throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public long getSegmentImportPendingCount() throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public long getSegmentImportInProgressCount() throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public long getOnDiskSize() throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public IndexSearcherCloseable getIndexSearcher() throws IOException {
+        return getIndexSearcherCloseable();
+      }
+
+      @Override
+      public void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public void enqueue(List<RowMutation> mutations) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public void createSnapshot(String name) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public void close() throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public void addBulkMutate(String bulkId, RowMutation mutation) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+    };
+  }
+
+  protected static IndexSearcherCloseable getIndexSearcherCloseable() {
+    return new IndexSearcherCloseable() {
+      
+      @Override
+      public void close() throws IOException {
+        
+      }
+      
+      @Override
+      public void setSimilarity(Similarity similarity) {
+        throw new RuntimeException("Not implemented.");
+      }
+      
+      @Override
+      public void search(Query query, Collector collector) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+      
+      @Override
+      public TopDocs search(Query query, int i) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+      
+      @Override
+      public Query rewrite(Query query) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+      
+      @Override
+      public IndexReader getIndexReader() {
+        return null;
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-spark/pom.xml
----------------------------------------------------------------------
diff --git a/blur-spark/pom.xml b/blur-spark/pom.xml
index 377db61..ac5a41a 100644
--- a/blur-spark/pom.xml
+++ b/blur-spark/pom.xml
@@ -27,15 +27,15 @@
 
 	<dependencies>
 		<dependency>
-			<groupId>org.apache.blur</groupId>
-			<artifactId>blur-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
 			<groupId>org.apache.spark</groupId>
 			<artifactId>spark-core_2.10</artifactId>
 			<version>${spark.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 	</dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-spark/src/main/java/org/apache/blur/spark/BlurRDD.java
----------------------------------------------------------------------
diff --git a/blur-spark/src/main/java/org/apache/blur/spark/BlurRDD.java b/blur-spark/src/main/java/org/apache/blur/spark/BlurRDD.java
new file mode 100644
index 0000000..ad743d6
--- /dev/null
+++ b/blur-spark/src/main/java/org/apache/blur/spark/BlurRDD.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.spark;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.blur.command.stream.StreamClient;
+import org.apache.blur.command.stream.StreamFunction;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.user.User;
+import org.apache.blur.user.UserContext;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+
+import com.google.common.base.Splitter;
+import com.google.common.io.Closer;
+
+@SuppressWarnings("serial")
+public class BlurRDD implements Serializable {
+
+  private final static String CLASS_LOADER_ID = UUID.randomUUID().toString();
+
+  private final transient Iface _client;
+  private final transient SparkConf _sparkConf;
+  private final List<String> _jars = new ArrayList<String>();
+  private final int _timeout = 60000;
+
+  public BlurRDD(String zooKeeperConnectionStr, SparkConf sparkConf) throws IOException {
+    this(BlurClient.getClientFromZooKeeperConnectionStr(zooKeeperConnectionStr), sparkConf);
+  }
+
+  public BlurRDD(Iface client, SparkConf sparkConf) throws IOException {
+    _client = client;
+    _sparkConf = sparkConf;
+    if (_sparkConf.contains("spark.jars")) {
+      String jars = _sparkConf.get("spark.jars");
+      for (String jar : Splitter.on(',').split(jars)) {
+        _jars.add(jar);
+      }
+    }
+  }
+
+  public <T> JavaRDD<T> executeStream(JavaSparkContext context, String table, StreamFunction<T> streamFunction) {
+    User user = UserContext.getUser();
+    List<BlurSparkSplit> splits = getSplits(table, user, CLASS_LOADER_ID);
+    return context.parallelize(splits).flatMap(new FlatMapFunction<BlurSparkSplit, T>() {
+      @Override
+      public Iterable<T> call(BlurSparkSplit t) throws Exception {
+        return new Iterable<T>() {
+          @Override
+          public Iterator<T> iterator() {
+            Closer closer = Closer.create();
+            try {
+              String host = t.getHost();
+              int port = t.getPort();
+              int timeout = t.getTimeout();
+              StreamClient streamClient = closer.register(new StreamClient(host, port, timeout));
+              String classLoaderId = t.getClassLoaderId();
+              if (!streamClient.isClassLoaderAvailable(classLoaderId)) {
+                streamClient.loadJars(classLoaderId, _jars);
+              }
+              return wrapClose(closer, streamClient.executeStream(t, streamFunction).iterator());
+            } catch (IOException e) {
+              IOUtils.closeQuietly(closer);
+              throw new RuntimeException(e);
+            }
+          }
+        };
+      }
+    });
+  }
+
+  private static <T> Iterator<T> wrapClose(Closeable c, Iterator<T> t) {
+    return new Iterator<T>() {
+
+      @Override
+      public boolean hasNext() {
+        try {
+          boolean hasNext = t.hasNext();
+          if (!hasNext) {
+            IOUtils.closeQuietly(c);
+          }
+          return hasNext;
+        } catch (Throwable t) {
+          IOUtils.closeQuietly(c);
+          if (t instanceof RuntimeException) {
+            throw (RuntimeException) t;
+          } else {
+            throw new RuntimeException(t);
+          }
+        }
+      }
+
+      @Override
+      public T next() {
+        return t.next();
+      }
+    };
+  }
+
+  private List<BlurSparkSplit> getSplits(String table, User user, String classLoaderId) {
+    try {
+      Map<String, String> shardServerLayout = _client.shardServerLayout(table);
+      List<BlurSparkSplit> splits = new ArrayList<BlurSparkSplit>();
+      for (Entry<String, String> e : shardServerLayout.entrySet()) {
+        String shard = e.getKey();
+        String shardServerWithThriftPort = e.getValue();
+        String host = getHost(shardServerWithThriftPort);
+        int port = getStreamPort(shardServerWithThriftPort);
+        String username = user.getUsername();
+        Map<String, String> attributes = user.getAttributes();
+        splits.add(new BlurSparkSplit(host, port, _timeout, table, shard, classLoaderId, username, attributes));
+      }
+      return splits;
+    } catch (BlurException e) {
+      throw new RuntimeException(e);
+    } catch (TException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String getHost(String shardServerWithThriftPort) {
+    return shardServerWithThriftPort.substring(0, shardServerWithThriftPort.indexOf(':'));
+  }
+
+  private int getStreamPort(String shardServerWithThriftPort) throws BlurException, TException {
+    String port = _client.configurationPerServer(shardServerWithThriftPort,
+        BlurConstants.BLUR_STREAM_SERVER_RUNNING_PORT);
+    return Integer.parseInt(port);
+  }
+
+  public static byte[] toBytes(Serializable s) {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    try {
+      ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
+      objectOutputStream.writeObject(s);
+      objectOutputStream.close();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return outputStream.toByteArray();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkSplit.java
----------------------------------------------------------------------
diff --git a/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkSplit.java b/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkSplit.java
new file mode 100644
index 0000000..59fe968
--- /dev/null
+++ b/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkSplit.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.spark;
+
+import java.util.Map;
+
+import org.apache.blur.command.stream.StreamSplit;
+
+public class BlurSparkSplit extends StreamSplit {
+
+  private static final long serialVersionUID = -6636359986869398948L;
+
+  private final String _host;
+  private final int _port;
+  private final int _timeout;
+
+  public BlurSparkSplit(String host, int port, int timeout, String table, String shard, String classLoaderId,
+      String user, Map<String, String> userAttributes) {
+    super(table, shard, classLoaderId, user, userAttributes);
+    _host = host;
+    _port = port;
+    _timeout = timeout;
+  }
+
+  public String getHost() {
+    return _host;
+  }
+
+  public int getPort() {
+    return _port;
+  }
+
+  public int getTimeout() {
+    return _timeout;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java
----------------------------------------------------------------------
diff --git a/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java b/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java
new file mode 100644
index 0000000..a1f54fc
--- /dev/null
+++ b/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.spark;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+
+import com.google.common.base.Splitter;
+
+public class BlurSparkUtil {
+
+  private static final char JAR_END = '!';
+  private static final String JAR_START = "jar:";
+  private static final String TMP_SPARK_JOB = "tmp-spark-job_";
+  private static final String JAR = ".jar";
+  private static final String SEP = "/";
+  private static final String PATH_SEPARATOR = "path.separator";
+  private static final String JAVA_CLASS_PATH = "java.class.path";
+
+  public static void packJars(SparkConf conf, Class<?>... clazzes) throws IOException {
+    Set<String> classPathThatNeedsToBeIncluded = findJarFiles(clazzes);
+    Set<String> jars = new HashSet<String>();
+    for (String s : classPathThatNeedsToBeIncluded) {
+      if (isJarFile(s)) {
+        jars.add(s);
+      } else {
+        jars.add(createJar(s));
+      }
+    }
+    conf.setJars(jars.toArray(new String[jars.size()]));
+  }
+
+  private static Set<String> findJarFiles(Class<?>[] clazzes) {
+    Set<String> result = new HashSet<String>();
+    for (Class<?> c : clazzes) {
+      result.add(findJarFile(c));
+    }
+    return result;
+  }
+
+  private static String findJarFile(Class<?> c) {
+    String resourceName = "/" + c.getName().replace('.', '/') + ".class";
+    URL url = BlurSparkUtil.class.getResource(resourceName);
+    try {
+      URI uri = url.toURI();
+      if (uri.getScheme().equals("file")) {
+        return findFileInClassFileUri(uri);
+      } else if (uri.getScheme().equals("jar")) {
+        return findFileInJarFileUri(uri);
+      } else {
+        throw new RuntimeException("Unsupported schema [" + uri.getScheme() + "] for uri [" + uri + "]");
+      }
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static String findFileInClassFileUri(URI uri) {
+    String classPath = System.getProperty(JAVA_CLASS_PATH);
+    String pathSeparator = System.getProperty(PATH_SEPARATOR);
+    Splitter splitter = Splitter.on(pathSeparator);
+    Iterable<String> split = splitter.split(classPath);
+    String path = uri.getPath();
+    for (String s : split) {
+      if (path.startsWith(s)) {
+        return new File(s).getAbsolutePath();
+      }
+    }
+    throw new RuntimeException("Uri [" + uri + "] was not found on the classpath.");
+  }
+
+  private static String findFileInJarFileUri(URI uri) throws URISyntaxException {
+    String s = uri.toString();
+    int indexOf1 = s.indexOf(JAR_START);
+    int indexOf2 = s.indexOf(JAR_END);
+    return new File(new URI(s.substring(indexOf1 + JAR_START.length(), indexOf2))).getAbsolutePath();
+  }
+
+  private static String createJar(String s) throws IOException {
+    File sourceFile = new File(s);
+    if (sourceFile.isDirectory()) {
+      File file = File.createTempFile(TMP_SPARK_JOB, JAR);
+      OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file));
+      JarOutputStream jarOut = new JarOutputStream(outputStream);
+      for (File f : sourceFile.listFiles()) {
+        pack(sourceFile, f, jarOut);
+      }
+      jarOut.close();
+      file.deleteOnExit();
+      return file.getAbsolutePath();
+    }
+    throw new RuntimeException("File [" + s + "] is not a directory.");
+  }
+
+  private static void pack(File rootPath, File source, JarOutputStream target) throws IOException {
+    String name = getName(rootPath, source);
+    if (source.isDirectory()) {
+      if (!SEP.equals(name)) {
+        JarEntry entry = new JarEntry(name);
+        entry.setTime(source.lastModified());
+        target.putNextEntry(entry);
+        target.closeEntry();
+      }
+      for (File f : source.listFiles()) {
+        pack(rootPath, f, target);
+      }
+    } else {
+      JarEntry entry = new JarEntry(name);
+      entry.setTime(source.lastModified());
+      target.putNextEntry(entry);
+      BufferedInputStream in = new BufferedInputStream(new FileInputStream(source));
+      IOUtils.copy(in, target);
+      in.close();
+      target.closeEntry();
+    }
+  }
+
+  private static String getName(File rootPath, File source) {
+    String rootStr = rootPath.toURI().toString();
+    String sourceStr = source.toURI().toString();
+    if (sourceStr.startsWith(rootStr)) {
+      String result = sourceStr.substring(rootStr.length());
+      if (source.isDirectory() && !result.endsWith(SEP)) {
+        result += SEP;
+      }
+      return result;
+    } else {
+      throw new RuntimeException("Not sure what happened.");
+    }
+  }
+
+  private static boolean isJarFile(String s) {
+    if (s.endsWith(JAR) || s.endsWith(".zip")) {
+      return true;
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-spark/src/main/java/org/apache/blur/spark/UsingBlurRDD.java
----------------------------------------------------------------------
diff --git a/blur-spark/src/main/java/org/apache/blur/spark/UsingBlurRDD.java b/blur-spark/src/main/java/org/apache/blur/spark/UsingBlurRDD.java
new file mode 100644
index 0000000..a09db32
--- /dev/null
+++ b/blur-spark/src/main/java/org/apache/blur/spark/UsingBlurRDD.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.spark;
+
+import java.io.IOException;
+
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.stream.StreamFunction;
+import org.apache.blur.command.stream.StreamWriter;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.util.BytesRef;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class UsingBlurRDD {
+
+  @SuppressWarnings("serial")
+  public static void main(String[] args) throws IOException {
+    SparkConf sparkConf = new SparkConf();
+    sparkConf.setAppName("test");
+    sparkConf.setMaster("local[2]");
+    BlurSparkUtil.packJars(sparkConf, UsingBlurRDD.class);
+    JavaSparkContext context = new JavaSparkContext(sparkConf);
+
+    Iface client = BlurClient.getClient("127.0.0.1:40020");
+    BlurRDD blurRDD = new BlurRDD(client, sparkConf);
+    String table = "test1234";
+    final String field = "fam0.col0";
+
+    for (int i = 0; i < 1; i++) {
+      long s = System.nanoTime();
+      JavaRDD<String> rdd = blurRDD.executeStream(context, table, new StreamFunction<String>() {
+        @Override
+        public void call(IndexContext indexContext, StreamWriter<String> writer) throws Exception {
+          IndexReader indexReader = indexContext.getIndexReader();
+          for (AtomicReaderContext atomicReaderContext : indexReader.leaves()) {
+            AtomicReader reader = atomicReaderContext.reader();
+            Terms terms = reader.fields().terms(field);
+            if (terms != null) {
+              TermsEnum termsEnum = terms.iterator(null);
+              BytesRef ref;
+              while ((ref = termsEnum.next()) != null) {
+                writer.write(ref.utf8ToString());
+              }
+            }
+          }
+        }
+      });
+      long count = rdd.distinct().count();
+      long e = System.nanoTime();
+
+      System.out.println(count + " " + (e - s) / 1000000.0 + " ms");
+
+    }
+    // Iterator<String> iterator = rdd.distinct().toLocalIterator();
+    // while (iterator.hasNext()) {
+    // System.out.println(iterator.next());
+    // }
+    context.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-stream-client/pom.xml
----------------------------------------------------------------------
diff --git a/blur-stream-client/pom.xml b/blur-stream-client/pom.xml
deleted file mode 100644
index 9571f26..0000000
--- a/blur-stream-client/pom.xml
+++ /dev/null
@@ -1,116 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
-	license agreements. See the NOTICE file distributed with this work for additional 
-	information regarding copyright ownership. The ASF licenses this file to 
-	you under the Apache License, Version 2.0 (the "License"); you may not use 
-	this file except in compliance with the License. You may obtain a copy of 
-	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-	by applicable law or agreed to in writing, software distributed under the 
-	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
-	OF ANY KIND, either express or implied. See the License for the specific 
-	language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>org.apache.blur</groupId>
-		<artifactId>blur</artifactId>
-		<version>0.3.0.incubating</version>
-		<relativePath>../pom.xml</relativePath>
-	</parent>
-	<groupId>org.apache.blur</groupId>
-	<artifactId>blur-stream-client</artifactId>
-	<version>${projectVersion}</version>
-	<packaging>jar</packaging>
-	<name>Blur Stream Client</name>
-
-	<dependencies>
-		<dependency>
-			<groupId>io.netty</groupId>
-			<artifactId>netty-all</artifactId>
-			<version>4.0.31.Final</version>
-		</dependency>
-	</dependencies>
-
-	<repositories>
-		<repository>
-			<id>libdir</id>
-			<url>file://${basedir}/../lib</url>
-		</repository>
-	</repositories>
-
-	<build>
-		<pluginManagement>
-			<plugins>
-				<plugin>
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-jar-plugin</artifactId>
-					<executions>
-						<execution>
-							<goals>
-								<goal>test-jar</goal>
-							</goals>
-						</execution>
-					</executions>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-source-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>attach-sources</id>
-						<goals>
-							<goal>jar</goal>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-help-plugin</artifactId>
-				<version>2.2</version>
-				<executions>
-					<execution>
-						<phase>generate-resources</phase>
-						<goals>
-							<goal>effective-pom</goal>
-						</goals>
-						<configuration>
-							<output>${project.build.directory}/effective-pom.xml</output>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-install-plugin</artifactId>
-				<version>2.3.1</version>
-				<executions>
-					<execution>
-						<phase>install</phase>
-						<goals>
-							<goal>install-file</goal>
-						</goals>
-						<configuration>
-							<file>${project.build.directory}/${artifactId}-${project.version}.jar</file>
-							<pomFile>${project.build.directory}/effective-pom.xml</pomFile>
-							<!-- sources></sources -->
-							<!-- javadoc></javadoc -->
-							<groupId>${project.groupId}</groupId>
-							<artifactId>${project.artifactId}</artifactId>
-							<version>${project.version}</version>
-							<packaging>jar</packaging>
-							<!--classifier></classifier -->
-							<generatePom>true</generatePom>
-							<createChecksum>true</createChecksum>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-stream-server/pom.xml
----------------------------------------------------------------------
diff --git a/blur-stream-server/pom.xml b/blur-stream-server/pom.xml
deleted file mode 100644
index 02e6d7a..0000000
--- a/blur-stream-server/pom.xml
+++ /dev/null
@@ -1,116 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
-	license agreements. See the NOTICE file distributed with this work for additional 
-	information regarding copyright ownership. The ASF licenses this file to 
-	you under the Apache License, Version 2.0 (the "License"); you may not use 
-	this file except in compliance with the License. You may obtain a copy of 
-	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-	by applicable law or agreed to in writing, software distributed under the 
-	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
-	OF ANY KIND, either express or implied. See the License for the specific 
-	language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>org.apache.blur</groupId>
-		<artifactId>blur</artifactId>
-		<version>0.3.0.incubating</version>
-		<relativePath>../pom.xml</relativePath>
-	</parent>
-	<groupId>org.apache.blur</groupId>
-	<artifactId>blur-stream-server</artifactId>
-	<version>${projectVersion}</version>
-	<packaging>jar</packaging>
-	<name>Blur Stream Server</name>
-
-	<dependencies>
-		<dependency>
-			<groupId>io.netty</groupId>
-			<artifactId>netty-all</artifactId>
-			<version>4.0.31.Final</version>
-		</dependency>
-	</dependencies>
-
-	<repositories>
-		<repository>
-			<id>libdir</id>
-			<url>file://${basedir}/../lib</url>
-		</repository>
-	</repositories>
-
-	<build>
-		<pluginManagement>
-			<plugins>
-				<plugin>
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-jar-plugin</artifactId>
-					<executions>
-						<execution>
-							<goals>
-								<goal>test-jar</goal>
-							</goals>
-						</execution>
-					</executions>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-source-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>attach-sources</id>
-						<goals>
-							<goal>jar</goal>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-help-plugin</artifactId>
-				<version>2.2</version>
-				<executions>
-					<execution>
-						<phase>generate-resources</phase>
-						<goals>
-							<goal>effective-pom</goal>
-						</goals>
-						<configuration>
-							<output>${project.build.directory}/effective-pom.xml</output>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-install-plugin</artifactId>
-				<version>2.3.1</version>
-				<executions>
-					<execution>
-						<phase>install</phase>
-						<goals>
-							<goal>install-file</goal>
-						</goals>
-						<configuration>
-							<file>${project.build.directory}/${artifactId}-${project.version}.jar</file>
-							<pomFile>${project.build.directory}/effective-pom.xml</pomFile>
-							<!-- sources></sources -->
-							<!-- javadoc></javadoc -->
-							<groupId>${project.groupId}</groupId>
-							<artifactId>${project.artifactId}</artifactId>
-							<version>${project.version}</version>
-							<packaging>jar</packaging>
-							<!--classifier></classifier -->
-							<generatePom>true</generatePom>
-							<createChecksum>true</createChecksum>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index fa6b185..6616946 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -170,6 +170,8 @@ public class BlurConstants {
   public static final String BLUR_CLUSTER_NAME = "blur.cluster.name";
   public static final String BLUR_CLUSTER;
   public static final String BLUR_HTTP_STATUS_RUNNING_PORT = "blur.http.status.running.port";
+  public static final String BLUR_STREAM_SERVER_RUNNING_PORT = "blur.stream.server.running.port";
+  public static final String BLUR_STREAM_SERVER_THREADS = "blur.stream.server.threads";
 
   public static final String BLUR_SHARD_COMMAND_DRIVER_THREADS = "blur.shard.command.driver.threads";
   public static final String BLUR_SHARD_COMMAND_WORKER_THREADS = "blur.shard.command.worker.threads";

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index 19bdac8..a45d8e1 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -92,6 +92,9 @@ blur.shard.bind.address=0.0.0.0
 # The default binding port of the shard server, 0 for random
 blur.shard.bind.port=40020
 
+# Experimental stream server.  Set threads to positive number to enable.
+blur.stream.server.threads=0
+
 # The number of command driver threads.
 blur.shard.command.driver.threads=16
 


Mime
View raw message