incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/3] git commit: More updates to the API and I ahve added a HDFS implementation of the server.
Date Sun, 07 Oct 2012 13:47:46 GMT
Updated Branches:
  refs/heads/new-api-prototype 320f9de80 -> 2c4dc79f1


More updates to the API and I ahve added a HDFS implementation of the server.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/2c4dc79f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/2c4dc79f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/2c4dc79f

Branch: refs/heads/new-api-prototype
Commit: 2c4dc79f123943aa6f1a559e627e98ec22abf7c7
Parents: 320f9de
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Oct 7 09:46:57 2012 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Oct 7 09:46:57 2012 -0400

----------------------------------------------------------------------
 src/blur-new-api-prototype/service.thrift          |   14 +
 .../main/java/org/apache/blur/core/HDFSServer.java |  269 +
 .../main/java/org/apache/blur/core/RSession.java   |  137 +
 .../main/java/org/apache/blur/core/ServerFile.java |  273 +
 .../java/org/apache/blur/core/ShardServer.java     |   54 +
 .../src/main/java/org/apache/blur/core/Util.java   |   96 +
 .../main/java/org/apache/blur/core/WSession.java   |   58 +
 .../org/apache/blur/core/WSessionThreaded.java     |   99 +
 .../org/apache/blur/core/WSessionThreadedFile.java |   37 +
 .../org/apache/blur/core/WSessionThreadedPath.java |   36 +
 .../org/apache/blur/proto/AbstractCommand.java     |   30 -
 .../java/org/apache/blur/proto/ClientManager.java  |  336 -
 .../main/java/org/apache/blur/proto/Command.java   |   24 -
 .../java/org/apache/blur/proto/Connection.java     |  130 -
 .../main/java/org/apache/blur/proto/RSession.java  |  137 -
 .../main/java/org/apache/blur/proto/Server.java    |  273 -
 .../src/main/java/org/apache/blur/proto/Util.java  |   91 -
 .../main/java/org/apache/blur/proto/WSession.java  |   65 -
 .../org/apache/blur/proto/WSessionThreaded.java    |  100 -
 .../proto/example/clients/LoadClientBatch.java     |   94 -
 .../example/clients/LoadThriftClientBatch.java     |   94 +
 .../proto/example/clients/ReadClientBatch.java     |   95 -
 .../example/clients/ReadThriftClientBatch.java     |   95 +
 .../org/apache/blur/store/SimpleHDFSDirectory.java |  144 +
 .../org/apache/blur/thrift/AbstractCommand.java    |   30 +
 .../java/org/apache/blur/thrift/ClientManager.java |  336 +
 .../main/java/org/apache/blur/thrift/Command.java  |   24 +
 .../java/org/apache/blur/thrift/Connection.java    |  130 +
 .../apache/blur/thrift/generated/BlurShard.java    | 6157 +++++++++++++++
 29 files changed, 8083 insertions(+), 1375 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/service.thrift
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/service.thrift b/src/blur-new-api-prototype/service.thrift
index 6949ff3..759ffd8 100644
--- a/src/blur-new-api-prototype/service.thrift
+++ b/src/blur-new-api-prototype/service.thrift
@@ -36,4 +36,18 @@ service BlurTuple {
   void commitWriteSession(1:Session session) throws (1:BlurException e)
   void rollbackWriteSession(1:Session session) throws (1:BlurException e)
   
+}
+
+service BlurShard {
+
+  void addReadSession(1:Session session) throws (1:BlurException e)
+  void executeQuery(1:Session session, 2:string query) throws (1:BlurException e)
+  list<Tuple> nextMetaDataResults(1:Session session, 2:i32 batchSize) throws (1:BlurException e)
+  list<Tuple> nextResults(1:Session session, 2:i32 batchSize) throws (1:BlurException e)
+  void removeReadSession(1:Session session) throws (1:BlurException e)
+
+  void addWriteSession(1:Session session) throws (1:BlurException e)
+  void writeTuples(1:Session session, 2:list<Tuple> tuples) throws (1:BlurException e)
+  void removeWriteSession(1:Session session) throws (1:BlurException e)
+  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/HDFSServer.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/HDFSServer.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/HDFSServer.java
new file mode 100644
index 0000000..2144946
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/HDFSServer.java
@@ -0,0 +1,269 @@
+package org.apache.blur.core;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.store.SimpleHDFSDirectory;
+import org.apache.blur.thrift.generated.Attribute;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurTuple.Iface;
+import org.apache.blur.thrift.generated.BlurTuple.Processor;
+import org.apache.blur.thrift.generated.Session;
+import org.apache.blur.thrift.generated.Tuple;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.codecs.appending.AppendingCodec;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.Version;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TThreadPoolServer.Args;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
+
+public class HDFSServer implements Iface {
+
+  private static final List<Tuple> EMPTY_LIST = new ArrayList<Tuple>();
+
+  public static void main(String[] argsStr) throws TTransportException, IOException {
+    Configuration conf = new Configuration();
+    HDFSServer server = new HDFSServer(conf, new Path(argsStr[0]));
+    Processor<Iface> processor = new Processor<Iface>(server);
+    Args args = new Args(new TServerSocket(new InetSocketAddress("127.0.0.1", 9000)));
+    args.minWorkerThreads(50);
+    args.maxWorkerThreads(50);
+    args.processor(processor);
+    args.transportFactory(new TFramedTransport.Factory());
+    args.protocolFactory(new TBinaryProtocol.Factory(true, true));
+
+    TThreadPoolServer tserver = new TThreadPoolServer(args);
+    tserver.serve();
+  }
+
+  private Map<String, RSession> readSessions = new ConcurrentHashMap<String, RSession>();
+  private Map<String, WSession> writeSessions = new ConcurrentHashMap<String, WSession>();
+  private IndexWriter writer;
+  private Path path;
+  private Configuration configuration;
+
+  public HDFSServer(Configuration configuration, Path path) throws IOException {
+    this.path = path;
+    this.configuration = configuration;
+    Directory directory = new SimpleHDFSDirectory(configuration, path);
+    this.writer = openWriter(directory);
+  }
+
+  private IndexWriter openWriter(Directory directory) throws IOException {
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_40, new KeywordAnalyzer());
+    conf.setCodec(new AppendingCodec());
+    return new IndexWriter(directory, conf);
+  }
+
+  @Override
+  public Session openReadSession() throws BlurException, TException {
+    try {
+      IndexReader reader = DirectoryReader.open(writer, true);
+      RSession session = new RSession(UUID.randomUUID().toString(), reader);
+      readSessions.put(session.getSessionId(), session);
+      return new Session(session.getSessionId());
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public void executeQuery(Session readSession, String query) throws BlurException, TException {
+    try {
+      RSession session = getReadSession(readSession);
+      if (query.trim().equals("*")) {
+        session.execute(new MatchAllDocsQuery());
+        return;
+      }
+      QueryParser parser = new QueryParser(Version.LUCENE_40, "SUPER", new KeywordAnalyzer());
+      parser.setAllowLeadingWildcard(true);
+      session.execute(parser.parse(query));
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public List<Tuple> nextMetaDataResults(Session readSession, int batchSize) throws BlurException, TException {
+    try {
+      RSession session = getReadSession(readSession);
+      if (session.isMetaDataBeenFetched()) {
+        return EMPTY_LIST;
+      }
+      Tuple tuple = new Tuple();
+      tuple.addToAttributes(Util.newAttribute("totalResults", session.getTotalHits()));
+      return Arrays.asList(tuple);
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public List<Tuple> nextResults(Session readSession, int batchSize) throws BlurException, TException {
+    try {
+      RSession session = getReadSession(readSession);
+      List<Tuple> results = new ArrayList<Tuple>();
+      for (int i = 0; i < batchSize; i++) {
+        Tuple tuple = convert(session.nextDocument());
+        if (tuple == null) {
+          break;
+        }
+        results.add(tuple);
+      }
+      return results;
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public void closeReadSession(Session readSession) throws BlurException, TException {
+    try {
+      RSession session = readSessions.remove(readSession.getSessionId());
+      session.close();
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public Session openWriteSession() throws BlurException, TException {
+    try {
+      String id = UUID.randomUUID().toString();
+      Path p = new Path(path, id);
+      Directory directory = new SimpleHDFSDirectory(configuration, p);
+      WSession session = new WSessionThreadedPath(id, openWriter(directory), directory, p);
+      writeSessions.put(session.getSessionId(), session);
+      return new Session(session.getSessionId());
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public void writeTuples(Session writeSession, List<Tuple> tuples) throws BlurException, TException {
+    try {
+      WSession session = getWriteSession(writeSession);
+      session.addDocuments(convert(tuples));
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public void commitWriteSession(Session writeSession) throws BlurException, TException {
+    try {
+      WSession session = writeSessions.remove(writeSession.getSessionId());
+      session.closeWriter();
+      writer.addIndexes(session.getDirectory());
+      writer.commit();
+      writer.maybeMerge();
+      rm(((WSessionThreadedPath) session).getPath());
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  @Override
+  public void rollbackWriteSession(Session writeSession) throws BlurException, TException {
+    try {
+      WSession session = writeSessions.remove(writeSession.getSessionId());
+      session.closeWriter();
+      rm(((WSessionThreadedPath) session).getPath());
+    } catch (Throwable t) {
+      throw Util.wrapThrowable(t);
+    }
+  }
+
+  private void rm(Path path) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(configuration);
+    fileSystem.delete(path, true);
+    fileSystem.close();
+  }
+
+  private Document convert(Tuple tuple) throws BlurException {
+    if (tuple == null) {
+      return null;
+    }
+    Document newDoc = new Document();
+    for (Attribute attribute : tuple.getAttributes()) {
+      newDoc.add(Util.getField(attribute));
+    }
+    return newDoc;
+  }
+
+  private Tuple convert(Document document) {
+    if (document == null) {
+      return null;
+    }
+    List<IndexableField> fields = document.getFields();
+    Tuple tuple = new Tuple();
+    for (IndexableField fieldable : fields) {
+      tuple.addToAttributes(Util.toAttribute(fieldable));
+    }
+    return tuple;
+  }
+
+  private List<Document> convert(List<Tuple> tuples) throws BlurException {
+    List<Document> docs = new ArrayList<Document>();
+    for (Tuple tuple : tuples) {
+      docs.add(convert(tuple));
+    }
+    return docs;
+  }
+
+  private WSession getWriteSession(Session session) throws BlurException {
+    WSession wsession = writeSessions.get(session.getSessionId());
+    if (wsession == null) {
+      throw new BlurException("Write Session [" + session + "] not found", null);
+    }
+    return wsession;
+  }
+
+  private RSession getReadSession(Session session) throws BlurException {
+    RSession rsession = readSessions.get(session.getSessionId());
+    if (rsession == null) {
+      throw new BlurException("Read Session [" + session + "] not found", null);
+    }
+    return rsession;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/RSession.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/RSession.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/RSession.java
new file mode 100644
index 0000000..c62be96
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/RSession.java
@@ -0,0 +1,137 @@
+package org.apache.blur.core;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
+
+public class RSession {
+
+  private String sessionId;
+  private IndexReader reader;
+  private IndexSearcher searcher;
+  private TopDocs topDocs;
+  private int totalHits;
+  private Query query;
+  private int position = 0;
+  private int totalPosition = 0;
+  private ScoreDoc scoreDoc;
+  private int fetch = 10000;
+  private boolean pagedBefore;
+  private double countToPreFetchAt = fetch * 0.6;
+  private boolean preFetch = true;
+  private ExecutorService executor;
+  private Future<TopDocs> futureFetch;
+  private boolean runningPreFetch = false;
+  private boolean metaData;
+
+  public IndexReader getReader() {
+    return reader;
+  }
+
+  public RSession(String sessionId, IndexReader reader) {
+    this.sessionId = sessionId;
+    this.reader = reader;
+    searcher = new IndexSearcher(reader);
+    executor = Executors.newSingleThreadExecutor();
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public void execute(Query query) throws IOException {
+    this.query = query;
+    topDocs = searcher.search(query, fetch);
+    totalHits = topDocs.totalHits;
+  }
+
+  public int getTotalHits() {
+    metaData = true;
+    return totalHits;
+  }
+
+  public Document nextDocument() throws IOException {
+    if (totalPosition >= totalHits) {
+      return null;
+    }
+    try {
+      if (position >= topDocs.scoreDocs.length) {
+        page();
+      } else {
+        checkToSeeIfWeCanGetAhead();
+      }
+      this.scoreDoc = topDocs.scoreDocs[position];
+      return searcher.doc(topDocs.scoreDocs[position].doc);
+    } finally {
+      totalPosition++;
+      position++;
+    }
+  }
+
+  private void checkToSeeIfWeCanGetAhead() throws IOException {
+    if (preFetch && pagedBefore) {
+      if (position > countToPreFetchAt && !runningPreFetch) {
+        runningPreFetch = true;
+        final ScoreDoc after = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
+        futureFetch = executor.submit(new Callable<TopDocs>() {
+          @Override
+          public TopDocs call() throws Exception {
+            return searcher.searchAfter(after, query, fetch);
+          }
+        });
+      }
+    }
+  }
+
+  private void page() throws IOException {
+    if (preFetch && pagedBefore) {
+      try {
+        topDocs = futureFetch.get();
+        runningPreFetch = false;
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (ExecutionException e) {
+        e.printStackTrace();
+      }
+    } else {
+      pagedBefore = true;
+      topDocs = searcher.searchAfter(scoreDoc, query, fetch);
+    }
+    position = 0;
+  }
+
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  public boolean isMetaDataBeenFetched() {
+    return metaData;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerFile.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerFile.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerFile.java
new file mode 100644
index 0000000..5f96bad
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerFile.java
@@ -0,0 +1,273 @@
+package org.apache.blur.core;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.thrift.generated.Attribute;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurTuple.Iface;
+import org.apache.blur.thrift.generated.BlurTuple.Processor;
+import org.apache.blur.thrift.generated.Session;
+import org.apache.blur.thrift.generated.Tuple;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.Version;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TThreadPoolServer.Args;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
+
+public class ServerFile implements Iface {
+
+  private static final List<Tuple> EMPTY_LIST = new ArrayList<Tuple>();
+
+  public static void main(String[] argsStr) throws TTransportException, IOException {
+    ServerFile server = new ServerFile(new File(argsStr[0]));
+    Processor<Iface> processor = new Processor<Iface>(server);
+    Args args = new Args(new TServerSocket(new InetSocketAddress("127.0.0.1", 9000)));
+    args.minWorkerThreads(50);
+    args.maxWorkerThreads(50);
+    args.processor(processor);
+    args.transportFactory(new TFramedTransport.Factory());
+    args.protocolFactory(new TBinaryProtocol.Factory(true, true));
+
+    TThreadPoolServer tserver = new TThreadPoolServer(args);
+    tserver.serve();
+  }
+
+  private Map<String, RSession> readSessions = new ConcurrentHashMap<String, RSession>();
+  private Map<String, WSession> writeSessions = new ConcurrentHashMap<String, WSession>();
+  private IndexWriter writer;
+  private File file;
+
+  public ServerFile(File file) throws IOException {
+    this.file = file;
+    Directory directory = FSDirectory.open(file);
+    this.writer = openWriter(directory);
+  }
+
+  private IndexWriter openWriter(Directory directory) throws IOException {
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_40, new KeywordAnalyzer());
+    return new IndexWriter(directory, conf);
+  }
+
+  @Override
+  public Session openReadSession() throws BlurException, TException {
+    try {
+      IndexReader reader = DirectoryReader.open(writer, true);
+      RSession session = new RSession(UUID.randomUUID().toString(), reader);
+      readSessions.put(session.getSessionId(), session);
+      return new Session(session.getSessionId());
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new BlurException();
+    }
+  }
+
+  @Override
+  public void executeQuery(Session readSession, String query) throws BlurException, TException {
+    try {
+      RSession session = getReadSession(readSession);
+      if (query.trim().equals("*")) {
+        session.execute(new MatchAllDocsQuery());
+        return;
+      }
+      QueryParser parser = new QueryParser(Version.LUCENE_40, "SUPER", new KeywordAnalyzer());
+      parser.setAllowLeadingWildcard(true);
+      session.execute(parser.parse(query));
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new BlurException();
+    }
+  }
+
+  @Override
+  public List<Tuple> nextMetaDataResults(Session readSession, int batchSize) throws BlurException, TException {
+    try {
+      RSession session = getReadSession(readSession);
+      if (session.isMetaDataBeenFetched()) {
+        return EMPTY_LIST;
+      }
+      Tuple tuple = new Tuple();
+      tuple.addToAttributes(Util.newAttribute("totalResults", session.getTotalHits()));
+      return Arrays.asList(tuple);
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new BlurException();
+    }
+  }
+
+  @Override
+  public List<Tuple> nextResults(Session readSession, int batchSize) throws BlurException, TException {
+    try {
+      RSession session = getReadSession(readSession);
+      List<Tuple> results = new ArrayList<Tuple>();
+      for (int i = 0; i < batchSize; i++) {
+        Tuple tuple = convert(session.nextDocument());
+        if (tuple == null) {
+          break;
+        }
+        results.add(tuple);
+      }
+      return results;
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new BlurException();
+    }
+  }
+
+  @Override
+  public void closeReadSession(Session readSession) throws BlurException, TException {
+    try {
+      RSession session = readSessions.remove(readSession.getSessionId());
+      session.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new BlurException();
+    }
+  }
+
+  @Override
+  public Session openWriteSession() throws BlurException, TException {
+    try {
+      String id = UUID.randomUUID().toString();
+      File f = new File(file, id);
+      Directory directory = FSDirectory.open(f);
+      WSession session = new WSessionThreadedFile(id, openWriter(directory), directory, f);
+      writeSessions.put(session.getSessionId(), session);
+      return new Session(session.getSessionId());
+    } catch (Exception e) {
+      throw new BlurException();
+    }
+  }
+
+  @Override
+  public void writeTuples(Session writeSession, List<Tuple> tuples) throws BlurException, TException {
+    try {
+      WSession session = getWriteSession(writeSession);
+      session.addDocuments(convert(tuples));
+    } catch (Exception e) {
+      throw new BlurException();
+    }
+  }
+
+  @Override
+  public void commitWriteSession(Session writeSession) throws BlurException, TException {
+    try {
+      WSession session = writeSessions.remove(writeSession.getSessionId());
+      session.closeWriter();
+      writer.addIndexes(session.getDirectory());
+      writer.commit();
+      writer.maybeMerge();
+      rm(((WSessionThreadedFile) session).getFile());
+    } catch (Exception e) {
+      throw new BlurException();
+    }
+  }
+
+  @Override
+  public void rollbackWriteSession(Session writeSession) throws BlurException, TException {
+    try {
+      WSession session = writeSessions.remove(writeSession.getSessionId());
+      session.closeWriter();
+      rm(((WSessionThreadedFile) session).getFile());
+    } catch (Exception e) {
+      throw new BlurException();
+    }
+  }
+
+  private void rm(File file) throws IOException {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  private Document convert(Tuple tuple) throws BlurException {
+    if (tuple == null) {
+      return null;
+    }
+    Document newDoc = new Document();
+    for (Attribute attribute : tuple.getAttributes()) {
+      newDoc.add(Util.getField(attribute));
+    }
+    return newDoc;
+  }
+
+  private Tuple convert(Document document) {
+    if (document == null) {
+      return null;
+    }
+    List<IndexableField> fields = document.getFields();
+    Tuple tuple = new Tuple();
+    for (IndexableField fieldable : fields) {
+      tuple.addToAttributes(Util.toAttribute(fieldable));
+    }
+    return tuple;
+  }
+
+  private List<Document> convert(List<Tuple> tuples) throws BlurException {
+    List<Document> docs = new ArrayList<Document>();
+    for (Tuple tuple : tuples) {
+      docs.add(convert(tuple));
+    }
+    return docs;
+  }
+
+  private WSession getWriteSession(Session session) throws BlurException {
+    WSession wsession = writeSessions.get(session.getSessionId());
+    if (wsession == null) {
+      throw new BlurException("Write Session [" + session + "] not found", null);
+    }
+    return wsession;
+  }
+
+  private RSession getReadSession(Session session) throws BlurException {
+    RSession rsession = readSessions.get(session.getSessionId());
+    if (rsession == null) {
+      throw new BlurException("Read Session [" + session + "] not found", null);
+    }
+    return rsession;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ShardServer.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ShardServer.java
new file mode 100644
index 0000000..00af1f9
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ShardServer.java
@@ -0,0 +1,54 @@
+package org.apache.blur.core;
+
+import java.util.List;
+
+import org.apache.blur.thrift.generated.*;
+import org.apache.thrift.TException;
+
+public class ShardServer implements BlurShard.Iface {
+
+  public static void main(String[] args) {
+
+  }
+
+  @Override
+  public void addReadSession(Session session) throws BlurException, TException {
+    
+  }
+
+  @Override
+  public void executeQuery(Session session, String query) throws BlurException, TException {
+    
+  }
+
+  @Override
+  public List<Tuple> nextMetaDataResults(Session session, int batchSize) throws BlurException, TException {
+    return null;
+  }
+
+  @Override
+  public List<Tuple> nextResults(Session session, int batchSize) throws BlurException, TException {
+    return null;
+  }
+
+  @Override
+  public void removeReadSession(Session session) throws BlurException, TException {
+    
+  }
+
+  @Override
+  public void addWriteSession(Session session) throws BlurException, TException {
+    
+  }
+
+  @Override
+  public void writeTuples(Session session, List<Tuple> tuples) throws BlurException, TException {
+    
+  }
+
+  @Override
+  public void removeWriteSession(Session session) throws BlurException, TException {
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/Util.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/Util.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/Util.java
new file mode 100644
index 0000000..d62bcdd
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/Util.java
@@ -0,0 +1,96 @@
+package org.apache.blur.core;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.blur.thrift.generated.Attribute;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.TYPE;
+import org.apache.blur.thrift.generated.Tuple;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.IndexableField;
+
+public class Util {
+
+  public static Attribute newAttribute(String name, String value) {
+    return new Attribute(name, ByteBuffer.wrap(value.getBytes()), TYPE.STRING);
+  }
+
+  public static Attribute newAttribute(String name, int value) {
+    ByteBuffer buffer = ByteBuffer.allocate(4);
+    buffer.putInt(value);
+    buffer.flip();
+    Attribute attribute = new Attribute();
+    attribute.setName(name);
+    attribute.setValue(buffer.array());
+    attribute.setType(TYPE.INT);
+    return attribute;
+  }
+
+  public static Attribute toAttribute(IndexableField fieldable) {
+    return newAttribute(fieldable.name(), fieldable.stringValue());
+  }
+
+  public static IndexableField getField(Attribute attribute) throws BlurException {
+    switch (attribute.getType()) {
+    case STRING:
+      return new StringField(attribute.getName(), toString(attribute.getValue()), Store.YES);
+    default:
+      throw new BlurException("Not supported [" + attribute.getType() + "]", null);
+    }
+  }
+
+  public static String toString(byte[] bs) {
+    return new String(bs);
+  }
+
+  public static String toString(Tuple tuple) {
+    List<Attribute> attributes = tuple.getAttributes();
+    String s = "";
+    for (Attribute attribute : attributes) {
+      if (!s.isEmpty()) {
+        s += ",";
+      }
+      String name = attribute.getName();
+      s += "\"" + name + "\":";
+      switch (attribute.getType()) {
+      case STRING:
+        s += "\"" + toString(attribute.getValue()) + "\"";
+        break;
+      case INT:
+        s += toInt(attribute.getValue());
+        break;
+      default:
+        throw new RuntimeException("Type [" + attribute.getType() + "] not supported.");
+      }
+    }
+    return "{" + s + "}";
+  }
+
+  public static int toInt(byte[] bs) {
+    return ByteBuffer.wrap(bs).getInt();
+  }
+
+  public static BlurException wrapThrowable(Throwable t) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSession.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSession.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSession.java
new file mode 100644
index 0000000..f33b202
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSession.java
@@ -0,0 +1,58 @@
+package org.apache.blur.core;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+
+public class WSession {
+
+  protected IndexWriter writer;
+  protected String sessionId;
+  protected Directory directory;
+
+  public WSession(String id, IndexWriter writer, Directory directory) {
+    this.sessionId = id;
+    this.writer = writer;
+    this.directory = directory;
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public void addDocument(Document document) throws IOException {
+    writer.addDocument(document);
+  }
+
+  public void addDocuments(List<Document> documents) throws IOException {
+    writer.addDocuments(documents);
+  }
+
+  public void closeWriter() throws IOException {
+    writer.close();
+  }
+
+  public Directory getDirectory() {
+    return directory;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreaded.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreaded.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreaded.java
new file mode 100644
index 0000000..17e0db7
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreaded.java
@@ -0,0 +1,99 @@
+package org.apache.blur.core;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+
+public abstract class WSessionThreaded extends WSession {
+
+  private Thread thread;
+  private volatile boolean running = true;
+  private BlockingQueue<Document> queue = new ArrayBlockingQueue<Document>(100);
+
+  public WSessionThreaded(String id, IndexWriter writer, Directory directory) {
+    super(id, writer, directory);
+    startThread();
+  }
+
+  private void startThread() {
+    this.thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (running) {
+          Document document;
+          try {
+            document = queue.poll(10, TimeUnit.MILLISECONDS);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+            return;
+          }
+          if (document != null) {
+            try {
+              writer.addDocument(document);
+            } catch (CorruptIndexException e) {
+              e.printStackTrace();
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+        List<Document> docs = new ArrayList<Document>();
+        queue.drainTo(docs);
+        try {
+          writer.addDocuments(docs);
+        } catch (CorruptIndexException e) {
+          e.printStackTrace();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    thread.start();
+  }
+
+  public void addDocument(Document document) throws IOException {
+    if (running) {
+      try {
+        queue.put(document);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    } else {
+      throw new IOException("Writer closed");
+    }
+  }
+
+  public void closeWriter() throws IOException {
+    running = false;
+    try {
+      thread.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedFile.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedFile.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedFile.java
new file mode 100644
index 0000000..a420aee
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedFile.java
@@ -0,0 +1,37 @@
+package org.apache.blur.core;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.File;
+
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+
+public class WSessionThreadedFile extends WSessionThreaded {
+
+  private File file;
+
+  public WSessionThreadedFile(String id, IndexWriter writer, Directory directory, File file) {
+    super(id, writer, directory);
+    this.file = file;
+  }
+
+  public File getFile() {
+    return file;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedPath.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedPath.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedPath.java
new file mode 100644
index 0000000..2f37d84
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedPath.java
@@ -0,0 +1,36 @@
+package org.apache.blur.core;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+
+public class WSessionThreadedPath extends WSessionThreaded {
+
+  private Path path;
+
+  public WSessionThreadedPath(String id, IndexWriter writer, Directory directory, Path p) {
+    super(id, writer, directory);
+    this.path = p;
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/AbstractCommand.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/AbstractCommand.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/AbstractCommand.java
deleted file mode 100644
index fe02002..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/AbstractCommand.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.blur.proto;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.thrift.TException;
-
-
-public abstract class AbstractCommand<CLIENT, T> implements Cloneable {
-  public abstract T call(CLIENT client) throws BlurException, TException;
-
-  @Override
-  public Object clone() throws CloneNotSupportedException {
-    return super.clone();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/ClientManager.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/ClientManager.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/ClientManager.java
deleted file mode 100644
index 5929012..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/ClientManager.java
+++ /dev/null
@@ -1,336 +0,0 @@
-package org.apache.blur.proto;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Proxy;
-import java.net.Proxy.Type;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.BlurTuple.Client;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransportException;
-
-public class ClientManager {
-
-  private static final Object NULL = new Object();
-
-  private static final Log LOG = LogFactory.getLog(ClientManager.class);
-  private static final int MAX_RETRIES = 5;
-  private static final long BACK_OFF_TIME = TimeUnit.MILLISECONDS.toMillis(250);
-  private static final long MAX_BACK_OFF_TIME = TimeUnit.SECONDS.toMillis(10);
-  private static final long ONE_SECOND = TimeUnit.SECONDS.toMillis(1);
-
-  private static Map<Connection, BlockingQueue<Client>> clientPool = new ConcurrentHashMap<Connection, BlockingQueue<Client>>();
-  private static Thread daemon;
-  private static AtomicBoolean running = new AtomicBoolean(true);
-  private static Map<Connection, Object> badConnections = new ConcurrentHashMap<Connection, Object>();
-
-  static {
-    startDaemon();
-  }
-
-  private static void startDaemon() {
-    daemon = new Thread(new Runnable() {
-      private Set<Connection> good = new HashSet<Connection>();
-
-      @Override
-      public void run() {
-        while (running.get()) {
-          good.clear();
-          Set<Connection> badConns = badConnections.keySet();
-          for (Connection connection : badConns) {
-            if (isConnectionGood(connection)) {
-              good.add(connection);
-            }
-          }
-          for (Connection connection : good) {
-            badConnections.remove(connection);
-          }
-          try {
-            Thread.sleep(ONE_SECOND);
-          } catch (InterruptedException e) {
-            return;
-          }
-        }
-      }
-    });
-    daemon.setDaemon(true);
-    daemon.setName("Blur-Client-Manager-Connection-Checker");
-    daemon.start();
-  }
-
-  protected static boolean isConnectionGood(Connection connection) {
-    try {
-      returnClient(connection, getClient(connection));
-      return true;
-    } catch (TTransportException e) {
-      LOG.debug("Connection [" + connection + "] is still bad.");
-    } catch (IOException e) {
-      LOG.debug("Connection [" + connection + "] is still bad.");
-    }
-    return false;
-  }
-
-  public static <CLIENT, T> T execute(Connection connection, AbstractCommand<CLIENT, T> command) throws BlurException, TException, IOException {
-    return execute(connection, command, MAX_RETRIES, BACK_OFF_TIME, MAX_BACK_OFF_TIME);
-  }
-
-  public static <CLIENT, T> T execute(Connection connection, AbstractCommand<CLIENT, T> command, int maxRetries, long backOffTime, long maxBackOffTime) throws BlurException,
-      TException, IOException {
-    return execute(Arrays.asList(connection), command, maxRetries, backOffTime, maxBackOffTime);
-  }
-
-  public static <CLIENT, T> T execute(List<Connection> connections, AbstractCommand<CLIENT, T> command) throws BlurException, TException, IOException {
-    return execute(connections, command, MAX_RETRIES, BACK_OFF_TIME, MAX_BACK_OFF_TIME);
-  }
-
-  private static class LocalResources {
-    AtomicInteger retries = new AtomicInteger();
-    AtomicReference<Client> client = new AtomicReference<Client>();
-    List<Connection> shuffledConnections = new ArrayList<Connection>();
-    Random random = new Random();
-  }
-
-  @SuppressWarnings("unchecked")
-  public static <CLIENT, T> T execute(List<Connection> connections, AbstractCommand<CLIENT, T> command, int maxRetries, long backOffTime, long maxBackOffTime)
-      throws BlurException, TException, IOException {
-    LocalResources localResources = new LocalResources();
-    AtomicReference<Client> client = localResources.client;
-    Random random = localResources.random;
-    AtomicInteger retries = localResources.retries;
-    List<Connection> shuffledConnections = localResources.shuffledConnections;
-
-    retries.set(0);
-    shuffledConnections.addAll(connections);
-
-    Collections.shuffle(shuffledConnections, random);
-    boolean allBad = true;
-    int connectionErrorCount = 0;
-    while (true) {
-      for (Connection connection : shuffledConnections) {
-        if (isBadConnection(connection)) {
-          continue;
-        }
-        client.set(null);
-        try {
-          client.set(getClient(connection));
-        } catch (IOException e) {
-          if (handleError(connection, client, retries, command, e, maxRetries, backOffTime, maxBackOffTime)) {
-            throw e;
-          } else {
-            markBadConnection(connection);
-            continue;
-          }
-        }
-        try {
-          T result = command.call((CLIENT) client.get());
-          allBad = false;
-          return result;
-        } catch (RuntimeException e) {
-          Throwable cause = e.getCause();
-          if (cause instanceof TTransportException) {
-            TTransportException t = (TTransportException) cause;
-            if (handleError(connection, client, retries, command, t, maxRetries, backOffTime, maxBackOffTime)) {
-              throw t;
-            }
-          } else {
-            throw e;
-          }
-        } catch (TTransportException e) {
-          if (handleError(connection, client, retries, command, e, maxRetries, backOffTime, maxBackOffTime)) {
-            throw e;
-          }
-        } finally {
-          if (client.get() != null) {
-            returnClient(connection, client);
-          }
-        }
-      }
-      if (allBad) {
-        connectionErrorCount++;
-        LOG.error("All connections are bad [" + connectionErrorCount + "].");
-        if (connectionErrorCount >= maxRetries) {
-          throw new IOException("All connections are bad.");
-        }
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-  }
-
-  private static void markBadConnection(Connection connection) {
-    LOG.info("Marking bad connection [" + connection + "]");
-    badConnections.put(connection, NULL);
-  }
-
-  private static boolean isBadConnection(Connection connection) {
-    return badConnections.containsKey(connection);
-  }
-
-  private static <CLIENT, T> boolean handleError(Connection connection, AtomicReference<Client> client, AtomicInteger retries, AbstractCommand<CLIENT, T> command, Exception e,
-      int maxRetries, long backOffTime, long maxBackOffTime) {
-    if (client.get() != null) {
-      trashConnections(connection, client);
-      markBadConnection(connection);
-      client.set(null);
-    }
-    if (retries.get() > maxRetries) {
-      LOG.error("No more retries [" + retries + "] out of [" + maxRetries + "]");
-      return true;
-    }
-    LOG.error("Retrying call [" + command + "] retry [" + retries.get() + "] out of [" + maxRetries + "] message [" + e.getMessage() + "]");
-    sleep(backOffTime, maxBackOffTime, retries.get(), maxRetries);
-    retries.incrementAndGet();
-    return false;
-  }
-
-  public static void sleep(long backOffTime, long maxBackOffTime, int retry, int maxRetries) {
-    long extra = (maxBackOffTime - backOffTime) / maxRetries;
-    long sleep = backOffTime + (extra * retry);
-    LOG.info("Backing off call for [" + sleep + " ms]");
-    try {
-      Thread.sleep(sleep);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static <CLIENT, T> T execute(String connectionStr, AbstractCommand<CLIENT, T> command, int maxRetries, long backOffTime, long maxBackOffTime) throws BlurException,
-      TException, IOException {
-    return execute(getConnections(connectionStr), command, maxRetries, backOffTime, maxBackOffTime);
-  }
-
-  public static List<Connection> getConnections(String connectionStr) {
-    int start = 0;
-    int index = connectionStr.indexOf(',');
-    if (index >= 0) {
-      List<Connection> connections = new ArrayList<Connection>();
-      while (index >= 0) {
-        connections.add(new Connection(connectionStr.substring(start, index)));
-        start = index + 1;
-        index = connectionStr.indexOf(',', start);
-      }
-      connections.add(new Connection(connectionStr.substring(start)));
-      return connections;
-    }
-    return Arrays.asList(new Connection(connectionStr));
-  }
-
-  public static <CLIENT, T> T execute(String connectionStr, AbstractCommand<CLIENT, T> command) throws BlurException, TException, IOException {
-    return execute(getConnections(connectionStr), command);
-  }
-
-  private static void returnClient(Connection connection, AtomicReference<Client> client) {
-    returnClient(connection, client.get());
-  }
-
-  private static void returnClient(Connection connection, Client client) {
-    try {
-      clientPool.get(connection).put(client);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private static void trashConnections(Connection connection, AtomicReference<Client> c) {
-    BlockingQueue<Client> blockingQueue;
-    synchronized (clientPool) {
-      blockingQueue = clientPool.put(connection, new LinkedBlockingQueue<Client>());
-      try {
-        blockingQueue.put(c.get());
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    LOG.info("Trashing client for connections [" + connection + "]");
-    for (Client client : blockingQueue) {
-      close(client);
-    }
-  }
-
-  public static void close(Client client) {
-    client.getInputProtocol().getTransport().close();
-    client.getOutputProtocol().getTransport().close();
-  }
-
-  private static Client getClient(Connection connection) throws TTransportException, IOException {
-    BlockingQueue<Client> blockingQueue;
-    synchronized (clientPool) {
-      blockingQueue = clientPool.get(connection);
-      if (blockingQueue == null) {
-        blockingQueue = new LinkedBlockingQueue<Client>();
-        clientPool.put(connection, blockingQueue);
-      }
-    }
-    if (blockingQueue.isEmpty()) {
-      return newClient(connection);
-    }
-    try {
-      return blockingQueue.take();
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static Client newClient(Connection connection) throws TTransportException, IOException {
-    String host = connection.getHost();
-    int port = connection.getPort();
-    TSocket trans;
-    Socket socket;
-    if (connection.isProxy()) {
-      Proxy proxy = new Proxy(Type.SOCKS, new InetSocketAddress(connection.getProxyHost(), connection.getProxyPort()));
-      socket = new Socket(proxy);
-    } else {
-      socket = new Socket();
-    }
-    socket.setTcpNoDelay(true);
-    socket.connect(new InetSocketAddress(host, port));
-    trans = new TSocket(socket);
-    TProtocol proto = new TBinaryProtocol(new TFramedTransport(trans));
-    Client client = new Client(proto);
-    return client;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Command.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Command.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Command.java
deleted file mode 100644
index b354f3c..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Command.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.blur.proto;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.blur.thrift.generated.BlurTuple.Client;
-
-
-public abstract class Command<T> extends AbstractCommand<Client, T> {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Connection.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Connection.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Connection.java
deleted file mode 100644
index fe3eee6..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Connection.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package org.apache.blur.proto;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-public class Connection {
-
-  private String _host = null;
-  private int _port = -1;
-  private String _proxyHost = null;
-  private int _proxyPort = -1;
-  private boolean _proxy = false;
-
-  public Connection(String connectionStr) {
-    int index = connectionStr.indexOf(':');
-    if (index >= 0) {
-      int slashIndex = connectionStr.indexOf('/');
-      if (slashIndex > 0) {
-        _host = connectionStr.substring(0, index);
-        _port = Integer.parseInt(connectionStr.substring(index + 1, slashIndex));
-        int indexOfProxyPort = connectionStr.indexOf(':', slashIndex);
-        _proxyHost = connectionStr.substring(slashIndex + 1, indexOfProxyPort);
-        _proxyPort = Integer.parseInt(connectionStr.substring(indexOfProxyPort + 1));
-      } else {
-        _host = connectionStr.substring(0, index);
-        _port = Integer.parseInt(connectionStr.substring(index + 1));
-      }
-    } else {
-      throw new RuntimeException("Connection string of [" + connectionStr + "] does not match 'host1:port' or 'host1:port/proxyhost1:proxyport'");
-    }
-  }
-
-  public Connection(String host, int port, String proxyHost, int proxyPort) {
-    _port = port;
-    _host = host;
-    _proxyHost = proxyHost;
-    _proxyPort = proxyPort;
-    _proxy = true;
-  }
-
-  public Connection(String host, int port) {
-    _port = port;
-    _host = host;
-  }
-
-  public String getHost() {
-    return _host;
-  }
-
-  public int getPort() {
-    return _port;
-  }
-
-  public boolean isProxy() {
-    return _proxy;
-  }
-
-  public int getProxyPort() {
-    return _proxyPort;
-  }
-
-  public String getProxyHost() {
-    return _proxyHost;
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((_host == null) ? 0 : _host.hashCode());
-    result = prime * result + _port;
-    result = prime * result + (_proxy ? 1231 : 1237);
-    result = prime * result + ((_proxyHost == null) ? 0 : _proxyHost.hashCode());
-    result = prime * result + _proxyPort;
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    Connection other = (Connection) obj;
-    if (_host == null) {
-      if (other._host != null)
-        return false;
-    } else if (!_host.equals(other._host))
-      return false;
-    if (_port != other._port)
-      return false;
-    if (_proxy != other._proxy)
-      return false;
-    if (_proxyHost == null) {
-      if (other._proxyHost != null)
-        return false;
-    } else if (!_proxyHost.equals(other._proxyHost))
-      return false;
-    if (_proxyPort != other._proxyPort)
-      return false;
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    return "Connection [_host=" + _host + ", _port=" + _port + ", _proxy=" + _proxy + ", _proxyHost=" + _proxyHost + ", _proxyPort=" + _proxyPort + "]";
-  }
-
-  public Object getConnectionStr() {
-    if (_proxyHost != null) {
-      return _host + ":" + _port + "/" + _proxyHost + ":" + _proxyPort;
-    }
-    return _host + ":" + _port;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/RSession.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/RSession.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/RSession.java
deleted file mode 100644
index 0b13a53..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/RSession.java
+++ /dev/null
@@ -1,137 +0,0 @@
-package org.apache.blur.proto;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TopDocs;
-
-public class RSession {
-
-  private String sessionId;
-  private IndexReader reader;
-  private IndexSearcher searcher;
-  private TopDocs topDocs;
-  private int totalHits;
-  private Query query;
-  private int position = 0;
-  private int totalPosition = 0;
-  private ScoreDoc scoreDoc;
-  private int fetch = 10000;
-  private boolean pagedBefore;
-  private double countToPreFetchAt = fetch * 0.6;
-  private boolean preFetch = true;
-  private ExecutorService executor;
-  private Future<TopDocs> futureFetch;
-  private boolean runningPreFetch = false;
-  private boolean metaData;
-
-  public IndexReader getReader() {
-    return reader;
-  }
-
-  public RSession(String sessionId, IndexReader reader) {
-    this.sessionId = sessionId;
-    this.reader = reader;
-    searcher = new IndexSearcher(reader);
-    executor = Executors.newSingleThreadExecutor();
-  }
-
-  public String getSessionId() {
-    return sessionId;
-  }
-
-  public void execute(Query query) throws IOException {
-    this.query = query;
-    topDocs = searcher.search(query, fetch);
-    totalHits = topDocs.totalHits;
-  }
-
-  public int getTotalHits() {
-    metaData = true;
-    return totalHits;
-  }
-
-  public Document nextDocument() throws IOException {
-    if (totalPosition >= totalHits) {
-      return null;
-    }
-    try {
-      if (position >= topDocs.scoreDocs.length) {
-        page();
-      } else {
-        checkToSeeIfWeCanGetAhead();
-      }
-      this.scoreDoc = topDocs.scoreDocs[position];
-      return searcher.doc(topDocs.scoreDocs[position].doc);
-    } finally {
-      totalPosition++;
-      position++;
-    }
-  }
-
-  private void checkToSeeIfWeCanGetAhead() throws IOException {
-    if (preFetch && pagedBefore) {
-      if (position > countToPreFetchAt && !runningPreFetch) {
-        runningPreFetch = true;
-        final ScoreDoc after = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
-        futureFetch = executor.submit(new Callable<TopDocs>() {
-          @Override
-          public TopDocs call() throws Exception {
-            return searcher.searchAfter(after, query, fetch);
-          }
-        });
-      }
-    }
-  }
-
-  private void page() throws IOException {
-    if (preFetch && pagedBefore) {
-      try {
-        topDocs = futureFetch.get();
-        runningPreFetch = false;
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      } catch (ExecutionException e) {
-        e.printStackTrace();
-      }
-    } else {
-      pagedBefore = true;
-      topDocs = searcher.searchAfter(scoreDoc, query, fetch);
-    }
-    position = 0;
-  }
-
-  public void close() throws IOException {
-    reader.close();
-  }
-
-  public boolean isMetaDataBeenFetched() {
-    return metaData;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Server.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Server.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Server.java
deleted file mode 100644
index f997917..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Server.java
+++ /dev/null
@@ -1,273 +0,0 @@
-package org.apache.blur.proto;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.blur.thrift.generated.Attribute;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.BlurTuple.Iface;
-import org.apache.blur.thrift.generated.BlurTuple.Processor;
-import org.apache.blur.thrift.generated.Session;
-import org.apache.blur.thrift.generated.Tuple;
-import org.apache.lucene.analysis.core.KeywordAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.queryparser.classic.QueryParser;
-import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
-import org.apache.lucene.util.Version;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.server.TThreadPoolServer.Args;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
-
-public class Server implements Iface {
-
-  private static final List<Tuple> EMPTY_LIST = new ArrayList<Tuple>();
-
-  public static void main(String[] argsStr) throws TTransportException, IOException {
-    Server server = new Server(new File(argsStr[0]));
-    Processor<Iface> processor = new Processor<Iface>(server);
-    Args args = new Args(new TServerSocket(new InetSocketAddress("127.0.0.1", 9000)));
-    args.minWorkerThreads(50);
-    args.maxWorkerThreads(50);
-    args.processor(processor);
-    args.transportFactory(new TFramedTransport.Factory());
-    args.protocolFactory(new TBinaryProtocol.Factory(true, true));
-
-    TThreadPoolServer tserver = new TThreadPoolServer(args);
-    tserver.serve();
-  }
-
-  private Map<String, RSession> readSessions = new ConcurrentHashMap<String, RSession>();
-  private Map<String, WSession> writeSessions = new ConcurrentHashMap<String, WSession>();
-  private IndexWriter writer;
-  private File file;
-
-  public Server(File file) throws IOException {
-    this.file = file;
-    Directory directory = FSDirectory.open(file);
-    this.writer = openWriter(directory);
-  }
-
-  private IndexWriter openWriter(Directory directory) throws IOException {
-    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_40, new KeywordAnalyzer());
-    return new IndexWriter(directory, conf);
-  }
-
-  @Override
-  public Session openReadSession() throws BlurException, TException {
-    try {
-      IndexReader reader = DirectoryReader.open(writer, true);
-      RSession session = new RSession(UUID.randomUUID().toString(), reader);
-      readSessions.put(session.getSessionId(), session);
-      return new Session(session.getSessionId());
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public void executeQuery(Session readSession, String query) throws BlurException, TException {
-    try {
-      RSession session = getReadSession(readSession);
-      if (query.trim().equals("*")) {
-        session.execute(new MatchAllDocsQuery());
-        return;
-      }
-      QueryParser parser = new QueryParser(Version.LUCENE_40, "SUPER", new KeywordAnalyzer());
-      parser.setAllowLeadingWildcard(true);
-      session.execute(parser.parse(query));
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public List<Tuple> nextMetaDataResults(Session readSession, int batchSize) throws BlurException, TException {
-    try {
-      RSession session = getReadSession(readSession);
-      if (session.isMetaDataBeenFetched()) {
-        return EMPTY_LIST;
-      }
-      Tuple tuple = new Tuple();
-      tuple.addToAttributes(Util.newAttribute("totalResults", session.getTotalHits()));
-      return Arrays.asList(tuple);
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public List<Tuple> nextResults(Session readSession, int batchSize) throws BlurException, TException {
-    try {
-      RSession session = getReadSession(readSession);
-      List<Tuple> results = new ArrayList<Tuple>();
-      for (int i = 0; i < batchSize; i++) {
-        Tuple tuple = convert(session.nextDocument());
-        if (tuple == null) {
-          break;
-        }
-        results.add(tuple);
-      }
-      return results;
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public void closeReadSession(Session readSession) throws BlurException, TException {
-    try {
-      RSession session = readSessions.remove(readSession.getSessionId());
-      session.close();
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public Session openWriteSession() throws BlurException, TException {
-    try {
-      String id = UUID.randomUUID().toString();
-      File f = new File(file, id);
-      Directory directory = FSDirectory.open(f);
-      WSession session = new WSessionThreaded(id, openWriter(directory), f, directory);
-      writeSessions.put(session.getSessionId(), session);
-      return new Session(session.getSessionId());
-    } catch (Exception e) {
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public void writeTuples(Session writeSession, List<Tuple> tuples) throws BlurException, TException {
-    try {
-      WSession session = getWriteSession(writeSession);
-      session.addDocuments(convert(tuples));
-    } catch (Exception e) {
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public void commitWriteSession(Session writeSession) throws BlurException, TException {
-    try {
-      WSession session = writeSessions.remove(writeSession.getSessionId());
-      session.closeWriter();
-      writer.addIndexes(session.getDirectory());
-      writer.commit();
-      writer.maybeMerge();
-      rm(session.getFile());
-    } catch (Exception e) {
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public void rollbackWriteSession(Session writeSession) throws BlurException, TException {
-    try {
-      WSession session = writeSessions.remove(writeSession.getSessionId());
-      session.closeWriter();
-      rm(session.getFile());
-    } catch (Exception e) {
-      throw new BlurException();
-    }
-  }
-
-  private void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  private Document convert(Tuple tuple) throws BlurException {
-    if (tuple == null) {
-      return null;
-    }
-    Document newDoc = new Document();
-    for (Attribute attribute : tuple.getAttributes()) {
-      newDoc.add(Util.getField(attribute));
-    }
-    return newDoc;
-  }
-
-  private Tuple convert(Document document) {
-    if (document == null) {
-      return null;
-    }
-    List<IndexableField> fields = document.getFields();
-    Tuple tuple = new Tuple();
-    for (IndexableField fieldable : fields) {
-      tuple.addToAttributes(Util.toAttribute(fieldable));
-    }
-    return tuple;
-  }
-
-  private List<Document> convert(List<Tuple> tuples) throws BlurException {
-    List<Document> docs = new ArrayList<Document>();
-    for (Tuple tuple : tuples) {
-      docs.add(convert(tuple));
-    }
-    return docs;
-  }
-  
-  private WSession getWriteSession(Session session) throws BlurException {
-    WSession wsession = writeSessions.get(session.getSessionId());
-    if (wsession == null) {
-      throw new BlurException("Write Session [" + session + "] not found", null);
-    }
-    return wsession;
-  }
-
-  private RSession getReadSession(Session session) throws BlurException {
-    RSession rsession = readSessions.get(session.getSessionId());
-    if (rsession == null) {
-      throw new BlurException("Read Session [" + session + "] not found", null);
-    }
-    return rsession;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Util.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Util.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Util.java
deleted file mode 100644
index 3a65fb6..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/Util.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.blur.proto;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.blur.thrift.generated.Attribute;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.TYPE;
-import org.apache.blur.thrift.generated.Tuple;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.IndexableField;
-
-public class Util {
-
-  public static Attribute newAttribute(String name, String value) {
-    return new Attribute(name, ByteBuffer.wrap(value.getBytes()), TYPE.STRING);
-  }
-
-  public static Attribute newAttribute(String name, int value) {
-    ByteBuffer buffer = ByteBuffer.allocate(4);
-    buffer.putInt(value);
-    buffer.flip();
-    Attribute attribute = new Attribute();
-    attribute.setName(name);
-    attribute.setValue(buffer.array());
-    attribute.setType(TYPE.INT);
-    return attribute;
-  }
-
-  public static Attribute toAttribute(IndexableField fieldable) {
-    return newAttribute(fieldable.name(), fieldable.stringValue());
-  }
-
-  public static IndexableField getField(Attribute attribute) throws BlurException {
-    switch (attribute.getType()) {
-    case STRING:
-      return new StringField(attribute.getName(), toString(attribute.getValue()), Store.YES);
-    default:
-      throw new BlurException("Not supported [" + attribute.getType() + "]", null);
-    }
-  }
-
-  public static String toString(byte[] bs) {
-    return new String(bs);
-  }
-
-  public static String toString(Tuple tuple) {
-    List<Attribute> attributes = tuple.getAttributes();
-    String s = "";
-    for (Attribute attribute : attributes) {
-      if (!s.isEmpty()) {
-        s += ",";
-      }
-      String name = attribute.getName();
-      s += "\"" + name + "\":";
-      switch (attribute.getType()) {
-      case STRING:
-        s += "\"" + toString(attribute.getValue()) + "\"";
-        break;
-      case INT:
-        s += toInt(attribute.getValue());
-        break;
-      default:
-        throw new RuntimeException("Type [" + attribute.getType() + "] not supported.");
-      }
-    }
-    return "{" + s + "}";
-  }
-
-  public static int toInt(byte[] bs) {
-    return ByteBuffer.wrap(bs).getInt();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2c4dc79f/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/WSession.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/WSession.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/WSession.java
deleted file mode 100644
index d674434..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/WSession.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package org.apache.blur.proto;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.store.Directory;
-
-public class WSession {
-
-  protected IndexWriter writer;
-  protected String sessionId;
-  protected File file;
-  protected Directory directory;
-
-  public WSession(String id, IndexWriter writer, File file, Directory directory) {
-    this.sessionId = id;
-    this.writer = writer;
-    this.file = file;
-    this.directory = directory;
-  }
-
-  public String getSessionId() {
-    return sessionId;
-  }
-
-  public void addDocument(Document document) throws IOException {
-    writer.addDocument(document);
-  }
-  
-  public void addDocuments(List<Document> documents) throws IOException {
-    writer.addDocuments(documents);
-  }
-
-  public void closeWriter() throws IOException {
-    writer.close();
-  }
-
-  public File getFile() {
-    return file;
-  }
-
-  public Directory getDirectory() {
-    return directory;
-  }
-
-}


Mime
View raw message