incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/4] git commit: More updates, added Bql parser.
Date Wed, 10 Oct 2012 02:01:29 GMT
More updates, added Bql parser.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/3fbd7f52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/3fbd7f52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/3fbd7f52

Branch: refs/heads/new-api-prototype
Commit: 3fbd7f52c76bbeefc8c1a5ce828ca13f881fd3f3
Parents: f2b81bd
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Oct 9 19:14:30 2012 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Oct 9 19:14:30 2012 -0400

----------------------------------------------------------------------
 src/blur-new-api-prototype/service.thrift          |   11 +-
 .../main/java/org/apache/blur/core/RSession.java   |  137 -----
 .../main/java/org/apache/blur/core/ServerFile.java |  235 +--------
 .../main/java/org/apache/blur/core/ServerHdfs.java |  153 +++---
 .../apache/blur/core/TupleStoredFieldVisitor.java  |  151 ++++++
 .../main/java/org/apache/blur/core/WSession.java   |   58 --
 .../org/apache/blur/core/WSessionThreaded.java     |   99 ----
 .../org/apache/blur/core/WSessionThreadedFile.java |   37 --
 .../org/apache/blur/core/WSessionThreadedPath.java |   36 --
 .../org/apache/blur/core/sessions/ReadSession.java |   54 ++
 .../blur/core/sessions/RunningQuerySession.java    |  165 ++++++
 .../apache/blur/core/sessions/WriteSession.java    |   60 +++
 .../blur/core/sessions/WriteSessionThreaded.java   |   99 ++++
 .../core/sessions/WriteSessionThreadedFile.java    |   50 ++
 .../core/sessions/WriteSessionThreadedPath.java    |   49 ++
 .../java/org/apache/blur/parser/BqlException.java  |   15 +
 .../java/org/apache/blur/parser/BqlParser.java     |  238 +++++++++
 .../example/clients/LoadThriftClientBatch.java     |   10 +-
 .../example/clients/ReadThriftClientBatch.java     |   20 +-
 .../apache/blur/thrift/generated/BlurTuple.java    |  170 +++++--
 .../apache/blur/thrift/generated/QuerySession.java |  409 +++++++++++++++
 .../java/org/apache/blur/parser/BqlParserTest.java |  177 +++++++
 22 files changed, 1715 insertions(+), 718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/service.thrift
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/service.thrift b/src/blur-new-api-prototype/service.thrift
index 759ffd8..81cc582 100644
--- a/src/blur-new-api-prototype/service.thrift
+++ b/src/blur-new-api-prototype/service.thrift
@@ -23,12 +23,17 @@ struct Session {
   1:string sessionId
 }
 
+struct QuerySession {
+  1:Session session,
+  2:string queryId
+}
+
 service BlurTuple {
 
   Session openReadSession() throws (1:BlurException e)
-  void executeQuery(1:Session session, 2:string query) throws (1:BlurException e)
-  list<Tuple> nextMetaDataResults(1:Session session, 2:i32 batchSize) throws (1:BlurException e)
-  list<Tuple> nextResults(1:Session session, 2:i32 batchSize) throws (1:BlurException e)
+  QuerySession executeQuery(1:Session session, 2:string query) throws (1:BlurException e)
+  list<Tuple> nextMetaDataResults(1:QuerySession session, 2:i32 batchSize) throws (1:BlurException e)
+  list<Tuple> nextResults(1:QuerySession session, 2:i32 batchSize) throws (1:BlurException e)
   void closeReadSession(1:Session session) throws (1:BlurException e)
 
   Session openWriteSession() throws (1:BlurException e)

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/RSession.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/RSession.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/RSession.java
deleted file mode 100644
index c62be96..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/RSession.java
+++ /dev/null
@@ -1,137 +0,0 @@
-package org.apache.blur.core;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TopDocs;
-
-public class RSession {
-
-  private String sessionId;
-  private IndexReader reader;
-  private IndexSearcher searcher;
-  private TopDocs topDocs;
-  private int totalHits;
-  private Query query;
-  private int position = 0;
-  private int totalPosition = 0;
-  private ScoreDoc scoreDoc;
-  private int fetch = 10000;
-  private boolean pagedBefore;
-  private double countToPreFetchAt = fetch * 0.6;
-  private boolean preFetch = true;
-  private ExecutorService executor;
-  private Future<TopDocs> futureFetch;
-  private boolean runningPreFetch = false;
-  private boolean metaData;
-
-  public IndexReader getReader() {
-    return reader;
-  }
-
-  public RSession(String sessionId, IndexReader reader) {
-    this.sessionId = sessionId;
-    this.reader = reader;
-    searcher = new IndexSearcher(reader);
-    executor = Executors.newSingleThreadExecutor();
-  }
-
-  public String getSessionId() {
-    return sessionId;
-  }
-
-  public void execute(Query query) throws IOException {
-    this.query = query;
-    topDocs = searcher.search(query, fetch);
-    totalHits = topDocs.totalHits;
-  }
-
-  public int getTotalHits() {
-    metaData = true;
-    return totalHits;
-  }
-
-  public Document nextDocument() throws IOException {
-    if (totalPosition >= totalHits) {
-      return null;
-    }
-    try {
-      if (position >= topDocs.scoreDocs.length) {
-        page();
-      } else {
-        checkToSeeIfWeCanGetAhead();
-      }
-      this.scoreDoc = topDocs.scoreDocs[position];
-      return searcher.doc(topDocs.scoreDocs[position].doc);
-    } finally {
-      totalPosition++;
-      position++;
-    }
-  }
-
-  private void checkToSeeIfWeCanGetAhead() throws IOException {
-    if (preFetch && pagedBefore) {
-      if (position > countToPreFetchAt && !runningPreFetch) {
-        runningPreFetch = true;
-        final ScoreDoc after = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
-        futureFetch = executor.submit(new Callable<TopDocs>() {
-          @Override
-          public TopDocs call() throws Exception {
-            return searcher.searchAfter(after, query, fetch);
-          }
-        });
-      }
-    }
-  }
-
-  private void page() throws IOException {
-    if (preFetch && pagedBefore) {
-      try {
-        topDocs = futureFetch.get();
-        runningPreFetch = false;
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      } catch (ExecutionException e) {
-        e.printStackTrace();
-      }
-    } else {
-      pagedBefore = true;
-      topDocs = searcher.searchAfter(scoreDoc, query, fetch);
-    }
-    position = 0;
-  }
-
-  public void close() throws IOException {
-    reader.close();
-  }
-
-  public boolean isMetaDataBeenFetched() {
-    return metaData;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerFile.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerFile.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerFile.java
index 5f96bad..5d90f7c 100644
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerFile.java
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerFile.java
@@ -16,35 +16,13 @@ package org.apache.blur.core;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.blur.thrift.generated.Attribute;
-import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurTuple.Iface;
 import org.apache.blur.thrift.generated.BlurTuple.Processor;
-import org.apache.blur.thrift.generated.Session;
-import org.apache.blur.thrift.generated.Tuple;
-import org.apache.lucene.analysis.core.KeywordAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.queryparser.classic.QueryParser;
-import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
-import org.apache.lucene.util.Version;
-import org.apache.thrift.TException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.server.TThreadPoolServer.Args;
@@ -52,12 +30,12 @@ import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TTransportException;
 
-public class ServerFile implements Iface {
+public class ServerFile {
 
-  private static final List<Tuple> EMPTY_LIST = new ArrayList<Tuple>();
 
   public static void main(String[] argsStr) throws TTransportException, IOException {
-    ServerFile server = new ServerFile(new File(argsStr[0]));
+    Configuration conf = new Configuration();
+    ServerHdfs server = new ServerHdfs(conf, new Path(argsStr[0]), true);
     Processor<Iface> processor = new Processor<Iface>(server);
     Args args = new Args(new TServerSocket(new InetSocketAddress("127.0.0.1", 9000)));
     args.minWorkerThreads(50);
@@ -69,205 +47,4 @@ public class ServerFile implements Iface {
     TThreadPoolServer tserver = new TThreadPoolServer(args);
     tserver.serve();
   }
-
-  private Map<String, RSession> readSessions = new ConcurrentHashMap<String, RSession>();
-  private Map<String, WSession> writeSessions = new ConcurrentHashMap<String, WSession>();
-  private IndexWriter writer;
-  private File file;
-
-  public ServerFile(File file) throws IOException {
-    this.file = file;
-    Directory directory = FSDirectory.open(file);
-    this.writer = openWriter(directory);
-  }
-
-  private IndexWriter openWriter(Directory directory) throws IOException {
-    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_40, new KeywordAnalyzer());
-    return new IndexWriter(directory, conf);
-  }
-
-  @Override
-  public Session openReadSession() throws BlurException, TException {
-    try {
-      IndexReader reader = DirectoryReader.open(writer, true);
-      RSession session = new RSession(UUID.randomUUID().toString(), reader);
-      readSessions.put(session.getSessionId(), session);
-      return new Session(session.getSessionId());
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public void executeQuery(Session readSession, String query) throws BlurException, TException {
-    try {
-      RSession session = getReadSession(readSession);
-      if (query.trim().equals("*")) {
-        session.execute(new MatchAllDocsQuery());
-        return;
-      }
-      QueryParser parser = new QueryParser(Version.LUCENE_40, "SUPER", new KeywordAnalyzer());
-      parser.setAllowLeadingWildcard(true);
-      session.execute(parser.parse(query));
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public List<Tuple> nextMetaDataResults(Session readSession, int batchSize) throws BlurException, TException {
-    try {
-      RSession session = getReadSession(readSession);
-      if (session.isMetaDataBeenFetched()) {
-        return EMPTY_LIST;
-      }
-      Tuple tuple = new Tuple();
-      tuple.addToAttributes(Util.newAttribute("totalResults", session.getTotalHits()));
-      return Arrays.asList(tuple);
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public List<Tuple> nextResults(Session readSession, int batchSize) throws BlurException, TException {
-    try {
-      RSession session = getReadSession(readSession);
-      List<Tuple> results = new ArrayList<Tuple>();
-      for (int i = 0; i < batchSize; i++) {
-        Tuple tuple = convert(session.nextDocument());
-        if (tuple == null) {
-          break;
-        }
-        results.add(tuple);
-      }
-      return results;
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public void closeReadSession(Session readSession) throws BlurException, TException {
-    try {
-      RSession session = readSessions.remove(readSession.getSessionId());
-      session.close();
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public Session openWriteSession() throws BlurException, TException {
-    try {
-      String id = UUID.randomUUID().toString();
-      File f = new File(file, id);
-      Directory directory = FSDirectory.open(f);
-      WSession session = new WSessionThreadedFile(id, openWriter(directory), directory, f);
-      writeSessions.put(session.getSessionId(), session);
-      return new Session(session.getSessionId());
-    } catch (Exception e) {
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public void writeTuples(Session writeSession, List<Tuple> tuples) throws BlurException, TException {
-    try {
-      WSession session = getWriteSession(writeSession);
-      session.addDocuments(convert(tuples));
-    } catch (Exception e) {
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public void commitWriteSession(Session writeSession) throws BlurException, TException {
-    try {
-      WSession session = writeSessions.remove(writeSession.getSessionId());
-      session.closeWriter();
-      writer.addIndexes(session.getDirectory());
-      writer.commit();
-      writer.maybeMerge();
-      rm(((WSessionThreadedFile) session).getFile());
-    } catch (Exception e) {
-      throw new BlurException();
-    }
-  }
-
-  @Override
-  public void rollbackWriteSession(Session writeSession) throws BlurException, TException {
-    try {
-      WSession session = writeSessions.remove(writeSession.getSessionId());
-      session.closeWriter();
-      rm(((WSessionThreadedFile) session).getFile());
-    } catch (Exception e) {
-      throw new BlurException();
-    }
-  }
-
-  private void rm(File file) throws IOException {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  private Document convert(Tuple tuple) throws BlurException {
-    if (tuple == null) {
-      return null;
-    }
-    Document newDoc = new Document();
-    for (Attribute attribute : tuple.getAttributes()) {
-      newDoc.add(Util.getField(attribute));
-    }
-    return newDoc;
-  }
-
-  private Tuple convert(Document document) {
-    if (document == null) {
-      return null;
-    }
-    List<IndexableField> fields = document.getFields();
-    Tuple tuple = new Tuple();
-    for (IndexableField fieldable : fields) {
-      tuple.addToAttributes(Util.toAttribute(fieldable));
-    }
-    return tuple;
-  }
-
-  private List<Document> convert(List<Tuple> tuples) throws BlurException {
-    List<Document> docs = new ArrayList<Document>();
-    for (Tuple tuple : tuples) {
-      docs.add(convert(tuple));
-    }
-    return docs;
-  }
-
-  private WSession getWriteSession(Session session) throws BlurException {
-    WSession wsession = writeSessions.get(session.getSessionId());
-    if (wsession == null) {
-      throw new BlurException("Write Session [" + session + "] not found", null);
-    }
-    return wsession;
-  }
-
-  private RSession getReadSession(Session session) throws BlurException {
-    RSession rsession = readSessions.get(session.getSessionId());
-    if (rsession == null) {
-      throw new BlurException("Read Session [" + session + "] not found", null);
-    }
-    return rsession;
-  }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerHdfs.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerHdfs.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerHdfs.java
index 9533c68..56db60e 100644
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerHdfs.java
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/ServerHdfs.java
@@ -16,10 +16,12 @@ package org.apache.blur.core;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -31,16 +33,24 @@ import org.apache.blur.cache.BlockCache;
 import org.apache.blur.cache.BlockDirectory;
 import org.apache.blur.cache.BlockDirectoryCache;
 import org.apache.blur.cache.BufferStore;
+import org.apache.blur.core.sessions.ReadSession;
+import org.apache.blur.core.sessions.RunningQuerySession;
+import org.apache.blur.core.sessions.WriteSession;
+import org.apache.blur.core.sessions.WriteSessionThreadedFile;
+import org.apache.blur.core.sessions.WriteSessionThreadedPath;
+import org.apache.blur.parser.BqlParser;
 import org.apache.blur.store.SimpleHDFSDirectory;
 import org.apache.blur.thrift.generated.Attribute;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurTuple.Iface;
 import org.apache.blur.thrift.generated.BlurTuple.Processor;
+import org.apache.blur.thrift.generated.QuerySession;
 import org.apache.blur.thrift.generated.Session;
 import org.apache.blur.thrift.generated.Tuple;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.codecs.appending.AppendingCodec;
 import org.apache.lucene.document.Document;
@@ -48,10 +58,8 @@ import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.queryparser.classic.QueryParser;
-import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.util.Version;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -60,6 +68,7 @@ import org.apache.thrift.server.TThreadPoolServer.Args;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TTransportException;
+import org.mortbay.log.Log;
 
 public class ServerHdfs implements Iface {
 
@@ -74,7 +83,7 @@ public class ServerHdfs implements Iface {
 
   public static void main(String[] argsStr) throws TTransportException, IOException {
     Configuration conf = new Configuration();
-    ServerHdfs server = new ServerHdfs(conf, new Path(argsStr[0]));
+    ServerHdfs server = new ServerHdfs(conf, new Path(argsStr[0]), false);
     Processor<Iface> processor = new Processor<Iface>(server);
     Args args = new Args(new TServerSocket(new InetSocketAddress("127.0.0.1", 9000)));
     args.minWorkerThreads(50);
@@ -87,28 +96,45 @@ public class ServerHdfs implements Iface {
     tserver.serve();
   }
 
-  private Map<String, RSession> readSessions = new ConcurrentHashMap<String, RSession>();
-  private Map<String, WSession> writeSessions = new ConcurrentHashMap<String, WSession>();
+  private Map<String, ReadSession> readSessions = new ConcurrentHashMap<String, ReadSession>();
+  private Map<String, WriteSession> writeSessions = new ConcurrentHashMap<String, WriteSession>();
   private IndexWriter writer;
   private Path path;
   private Configuration configuration;
+  private Analyzer analyzer = new KeywordAnalyzer();
+  private boolean nativeDirectory;
 
-  public ServerHdfs(Configuration configuration, Path path) throws IOException {
-    this.path = path;
+  public ServerHdfs(Configuration configuration, Path p, boolean nativeDirectory) throws IOException {
     this.configuration = configuration;
-    Directory directory = new SimpleHDFSDirectory(configuration, path);
-    BufferStore.init();
-    long totalMemory = getTotalMemory();
-    int slabSize = MAX_SLABSIZE;
-    BlockCache cache;
-    if (totalMemory < 0) {
-      cache = new BlockCache(false, MIN_SLABSIZE, MIN_SLABSIZE);
+    this.nativeDirectory = nativeDirectory;
+    FileSystem fileSystem = p.getFileSystem(configuration);
+    this.path = fileSystem.makeQualified(p);
+    fileSystem.close();
+    if (nativeDirectory) {
+      FSDirectory directory = FSDirectory.open(getFile(this.path));
+      this.writer = openWriter(directory);
     } else {
-      cache = new BlockCache(true, totalMemory, slabSize);
+      Directory directory = new SimpleHDFSDirectory(configuration, this.path);
+      BufferStore.init();
+      long totalMemory = getTotalMemory();
+      int slabSize = MAX_SLABSIZE;
+      BlockCache cache;
+      if (totalMemory < 0) {
+        Log.info("Block cache using [" + MIN_SLABSIZE + "] total memory in cache, with slab size of [" + MIN_SLABSIZE + "].");
+        cache = new BlockCache(false, MIN_SLABSIZE, MIN_SLABSIZE);
+      } else {
+        Log.info("Block cache using [" + totalMemory + "] total memory in off heap cache, with slab size of [" + slabSize + "].");
+        cache = new BlockCache(true, totalMemory, slabSize);
+      }
+      BlockDirectoryCache blockDirectoryCache = new BlockDirectoryCache(cache);
+      BlockDirectory blockDirectory = new BlockDirectory("embedded", directory, blockDirectoryCache);
+      this.writer = openWriter(blockDirectory);
     }
-    BlockDirectoryCache blockDirectoryCache = new BlockDirectoryCache(cache);
-    BlockDirectory blockDirectory = new BlockDirectory("embedded", directory, blockDirectoryCache);
-    this.writer = openWriter(blockDirectory);
+  }
+
+  private File getFile(Path p) {
+    URI uri = p.toUri();
+    return new File(uri);
   }
 
   private long getTotalMemory() {
@@ -149,7 +175,7 @@ public class ServerHdfs implements Iface {
   public Session openReadSession() throws BlurException, TException {
     try {
       IndexReader reader = DirectoryReader.open(writer, true);
-      RSession session = new RSession(UUID.randomUUID().toString(), reader);
+      ReadSession session = new ReadSession(UUID.randomUUID().toString(), reader);
       readSessions.put(session.getSessionId(), session);
       return new Session(session.getSessionId());
     } catch (Throwable t) {
@@ -158,30 +184,35 @@ public class ServerHdfs implements Iface {
   }
 
   @Override
-  public void executeQuery(Session readSession, String query) throws BlurException, TException {
+  public QuerySession executeQuery(Session readSession, String query) throws BlurException, TException {
     try {
-      RSession session = getReadSession(readSession);
-      if (query.trim().equals("*")) {
-        session.execute(new MatchAllDocsQuery());
-        return;
-      }
-      QueryParser parser = new QueryParser(Version.LUCENE_40, "SUPER", new KeywordAnalyzer());
-      parser.setAllowLeadingWildcard(true);
-      session.execute(parser.parse(query));
+      ReadSession session = getReadSession(readSession);
+      BqlParser bqlParser = new BqlParser(analyzer);
+      bqlParser.parse(query);
+
+      QuerySession querySession = new QuerySession();
+      querySession.setQueryId(UUID.randomUUID().toString());
+      querySession.setSession(readSession);
+
+      RunningQuerySession runningQuerySession = session.create(querySession.getQueryId());
+      runningQuerySession.executeBql(bqlParser);
+
+      return querySession;
     } catch (Throwable t) {
       throw Util.wrapThrowable(t);
     }
   }
 
   @Override
-  public List<Tuple> nextMetaDataResults(Session readSession, int batchSize) throws BlurException, TException {
+  public List<Tuple> nextMetaDataResults(QuerySession querySession, int batchSize) throws BlurException, TException {
     try {
-      RSession session = getReadSession(readSession);
-      if (session.isMetaDataBeenFetched()) {
+      ReadSession session = getReadSession(querySession.getSession());
+      RunningQuerySession runningQuerySession = session.getRunningQuerySession(querySession.getQueryId());
+      if (runningQuerySession.isMetaDataBeenFetched()) {
         return EMPTY_LIST;
       }
       Tuple tuple = new Tuple();
-      tuple.addToAttributes(Util.newAttribute("totalResults", session.getTotalHits()));
+      tuple.addToAttributes(Util.newAttribute("totalResults", runningQuerySession.getTotalHits()));
       return Arrays.asList(tuple);
     } catch (Throwable t) {
       throw Util.wrapThrowable(t);
@@ -189,12 +220,13 @@ public class ServerHdfs implements Iface {
   }
 
   @Override
-  public List<Tuple> nextResults(Session readSession, int batchSize) throws BlurException, TException {
+  public List<Tuple> nextResults(QuerySession querySession, int batchSize) throws BlurException, TException {
     try {
-      RSession session = getReadSession(readSession);
+      ReadSession session = getReadSession(querySession.getSession());
+      RunningQuerySession runningQuerySession = session.getRunningQuerySession(querySession.getQueryId());
       List<Tuple> results = new ArrayList<Tuple>();
       for (int i = 0; i < batchSize; i++) {
-        Tuple tuple = convert(session.nextDocument());
+        Tuple tuple = runningQuerySession.nextDocument();
         if (tuple == null) {
           break;
         }
@@ -209,7 +241,7 @@ public class ServerHdfs implements Iface {
   @Override
   public void closeReadSession(Session readSession) throws BlurException, TException {
     try {
-      RSession session = readSessions.remove(readSession.getSessionId());
+      ReadSession session = readSessions.remove(readSession.getSessionId());
       session.close();
     } catch (Throwable t) {
       throw Util.wrapThrowable(t);
@@ -221,8 +253,15 @@ public class ServerHdfs implements Iface {
     try {
       String id = UUID.randomUUID().toString();
       Path p = new Path(path, id);
-      Directory directory = new SimpleHDFSDirectory(configuration, p);
-      WSession session = new WSessionThreadedPath(id, openWriter(directory), directory, p);
+      WriteSession session;
+      if (nativeDirectory) {
+        File file = getFile(p);
+        Directory directory = FSDirectory.open(file);
+        session = new WriteSessionThreadedFile(id, openWriter(directory), directory, file);
+      } else {
+        Directory directory = new SimpleHDFSDirectory(configuration, p);
+        session = new WriteSessionThreadedPath(id, openWriter(directory), directory, configuration, p);
+      }
       writeSessions.put(session.getSessionId(), session);
       return new Session(session.getSessionId());
     } catch (Throwable t) {
@@ -233,7 +272,7 @@ public class ServerHdfs implements Iface {
   @Override
   public void writeTuples(Session writeSession, List<Tuple> tuples) throws BlurException, TException {
     try {
-      WSession session = getWriteSession(writeSession);
+      WriteSession session = getWriteSession(writeSession);
       session.addDocuments(convert(tuples));
     } catch (Throwable t) {
       throw Util.wrapThrowable(t);
@@ -243,12 +282,12 @@ public class ServerHdfs implements Iface {
   @Override
   public void commitWriteSession(Session writeSession) throws BlurException, TException {
     try {
-      WSession session = writeSessions.remove(writeSession.getSessionId());
+      WriteSession session = writeSessions.remove(writeSession.getSessionId());
       session.closeWriter();
       writer.addIndexes(session.getDirectory());
       writer.commit();
       writer.maybeMerge();
-      rm(((WSessionThreadedPath) session).getPath());
+      session.removeTempSpace();
     } catch (Throwable t) {
       throw Util.wrapThrowable(t);
     }
@@ -257,20 +296,14 @@ public class ServerHdfs implements Iface {
   @Override
   public void rollbackWriteSession(Session writeSession) throws BlurException, TException {
     try {
-      WSession session = writeSessions.remove(writeSession.getSessionId());
+      WriteSession session = writeSessions.remove(writeSession.getSessionId());
       session.closeWriter();
-      rm(((WSessionThreadedPath) session).getPath());
+      session.removeTempSpace();
     } catch (Throwable t) {
       throw Util.wrapThrowable(t);
     }
   }
 
-  private void rm(Path path) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(configuration);
-    fileSystem.delete(path, true);
-    fileSystem.close();
-  }
-
   private Document convert(Tuple tuple) throws BlurException {
     if (tuple == null) {
       return null;
@@ -282,18 +315,6 @@ public class ServerHdfs implements Iface {
     return newDoc;
   }
 
-  private Tuple convert(Document document) {
-    if (document == null) {
-      return null;
-    }
-    List<IndexableField> fields = document.getFields();
-    Tuple tuple = new Tuple();
-    for (IndexableField fieldable : fields) {
-      tuple.addToAttributes(Util.toAttribute(fieldable));
-    }
-    return tuple;
-  }
-
   private List<Document> convert(List<Tuple> tuples) throws BlurException {
     List<Document> docs = new ArrayList<Document>();
     for (Tuple tuple : tuples) {
@@ -302,16 +323,16 @@ public class ServerHdfs implements Iface {
     return docs;
   }
 
-  private WSession getWriteSession(Session session) throws BlurException {
-    WSession wsession = writeSessions.get(session.getSessionId());
+  private WriteSession getWriteSession(Session session) throws BlurException {
+    WriteSession wsession = writeSessions.get(session.getSessionId());
     if (wsession == null) {
       throw new BlurException("Write Session [" + session + "] not found", null);
     }
     return wsession;
   }
 
-  private RSession getReadSession(Session session) throws BlurException {
-    RSession rsession = readSessions.get(session.getSessionId());
+  private ReadSession getReadSession(Session session) throws BlurException {
+    ReadSession rsession = readSessions.get(session.getSessionId());
     if (rsession == null) {
       throw new BlurException("Read Session [" + session + "] not found", null);
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/TupleStoredFieldVisitor.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/TupleStoredFieldVisitor.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/TupleStoredFieldVisitor.java
new file mode 100644
index 0000000..4a3650c
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/TupleStoredFieldVisitor.java
@@ -0,0 +1,151 @@
+package org.apache.blur.core;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.blur.thrift.generated.Attribute;
+import org.apache.blur.thrift.generated.TYPE;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.StoredFieldVisitor;
+
+public class TupleStoredFieldVisitor extends StoredFieldVisitor {
+
+  private final Map<String, int[]> fieldToAttributeLookup;
+  private Attribute[] attributes;
+  private List<String> attributeNames;
+
+  public TupleStoredFieldVisitor(List<String> attributeTokens, List<String> attributeTokenAliases) {
+    if (attributeTokens.size() != attributeTokenAliases.size()) {
+      throw new RuntimeException("something went wrong here");
+    }
+    Map<String, List<Integer>> fieldToAttributeLookupInternal = new HashMap<String, List<Integer>>();
+    attributeNames = new ArrayList<String>();
+    for (int i = 0; i < attributeTokens.size(); i++) {
+      String attributeToken = attributeTokens.get(i);
+      String returnName = attributeTokenAliases.get(i);
+      if (returnName == null) {
+        returnName = attributeToken;
+      }
+      attributeNames.add(returnName);
+      List<Integer> attributeCollection = fieldToAttributeLookupInternal.get(attributeToken);
+      if (attributeCollection == null) {
+        attributeCollection = new ArrayList<Integer>();
+        fieldToAttributeLookupInternal.put(attributeToken, attributeCollection);
+      }
+      attributeCollection.add(i);
+    }
+    fieldToAttributeLookup = convert(fieldToAttributeLookupInternal);
+    attributes = new Attribute[attributeTokens.size()];
+  }
+
+  private Map<String, int[]> convert(Map<String, List<Integer>> map) {
+    Map<String, int[]> result = new HashMap<String, int[]>();
+    for (Entry<String,List<Integer>> e : map.entrySet()) {
+      int[] array = convert(e.getValue());
+      result.put(e.getKey(), array);
+    }
+    return result;
+  }
+
+  private int[] convert(List<Integer> list) {
+    int[] array = new int[list.size()];
+    for (int i = 0; i < list.size(); i++) {
+      array[i] = list.get(i);
+    }
+    return array;
+  }
+
+  @Override
+  public Status needsField(FieldInfo fieldInfo) throws IOException {
+    int[] list = fieldToAttributeLookup.get(fieldInfo.name);
+    if (list == null) {
+      return Status.NO;
+    }
+    return Status.YES;
+  }
+
+  @Override
+  public void stringField(FieldInfo fieldInfo, String value) throws IOException {
+    int[] indexes = fieldToAttributeLookup.get(fieldInfo.name);
+    if (indexes == null) {
+      return;
+    }
+    Attribute attribute = new Attribute();
+    attribute.setType(TYPE.STRING);
+    attribute.setName(fieldInfo.name);
+    attribute.setValue(value.getBytes("UTF-8"));
+    for (int i = 0; i < indexes.length; i++) {
+      attributes[indexes[i]] = attribute;
+    }
+  }
+
+  @Override
+  public void binaryField(FieldInfo fieldInfo, byte[] value, int offset, int length) throws IOException {
+    notSupportedYet();
+  }
+
+  @Override
+  public void intField(FieldInfo fieldInfo, int value) {
+    notSupportedYet();
+  }
+
+  @Override
+  public void longField(FieldInfo fieldInfo, long value) {
+    notSupportedYet();
+  }
+
+  @Override
+  public void floatField(FieldInfo fieldInfo, float value) {
+    notSupportedYet();
+  }
+
+  @Override
+  public void doubleField(FieldInfo fieldInfo, double value) {
+    notSupportedYet();
+  }
+
+  private void notSupportedYet() {
+    throw new RuntimeException("not supported yet");
+  }
+
+  public List<Attribute> getTupleAndReset() {
+    List<Attribute> tuple = new ArrayList<Attribute>();
+    for (int i = 0; i < attributes.length; i++) {
+      String name = attributeNames.get(i);
+      tuple.add(copy(attributes[i]).setName(name));
+    }
+    reset(attributes);
+    return tuple;
+  }
+
+  private Attribute copy(Attribute attribute) {
+    return new Attribute(attribute);
+  }
+
+  private static void reset(Attribute[] attributes) {
+    for (int i = 0; i < attributes.length; i++) {
+      attributes[i] = null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSession.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSession.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSession.java
deleted file mode 100644
index f33b202..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSession.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.apache.blur.core;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.store.Directory;
-
-public class WSession {
-
-  protected IndexWriter writer;
-  protected String sessionId;
-  protected Directory directory;
-
-  public WSession(String id, IndexWriter writer, Directory directory) {
-    this.sessionId = id;
-    this.writer = writer;
-    this.directory = directory;
-  }
-
-  public String getSessionId() {
-    return sessionId;
-  }
-
-  public void addDocument(Document document) throws IOException {
-    writer.addDocument(document);
-  }
-
-  public void addDocuments(List<Document> documents) throws IOException {
-    writer.addDocuments(documents);
-  }
-
-  public void closeWriter() throws IOException {
-    writer.close();
-  }
-
-  public Directory getDirectory() {
-    return directory;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreaded.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreaded.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreaded.java
deleted file mode 100644
index 17e0db7..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreaded.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package org.apache.blur.core;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.store.Directory;
-
-public abstract class WSessionThreaded extends WSession {
-
-  private Thread thread;
-  private volatile boolean running = true;
-  private BlockingQueue<Document> queue = new ArrayBlockingQueue<Document>(100);
-
-  public WSessionThreaded(String id, IndexWriter writer, Directory directory) {
-    super(id, writer, directory);
-    startThread();
-  }
-
-  private void startThread() {
-    this.thread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        while (running) {
-          Document document;
-          try {
-            document = queue.poll(10, TimeUnit.MILLISECONDS);
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-            return;
-          }
-          if (document != null) {
-            try {
-              writer.addDocument(document);
-            } catch (CorruptIndexException e) {
-              e.printStackTrace();
-            } catch (IOException e) {
-              e.printStackTrace();
-            }
-          }
-        }
-        List<Document> docs = new ArrayList<Document>();
-        queue.drainTo(docs);
-        try {
-          writer.addDocuments(docs);
-        } catch (CorruptIndexException e) {
-          e.printStackTrace();
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    });
-    thread.start();
-  }
-
-  public void addDocument(Document document) throws IOException {
-    if (running) {
-      try {
-        queue.put(document);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    } else {
-      throw new IOException("Writer closed");
-    }
-  }
-
-  public void closeWriter() throws IOException {
-    running = false;
-    try {
-      thread.join();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-    writer.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedFile.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedFile.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedFile.java
deleted file mode 100644
index a420aee..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedFile.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.blur.core;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.File;
-
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.store.Directory;
-
-public class WSessionThreadedFile extends WSessionThreaded {
-
-  private File file;
-
-  public WSessionThreadedFile(String id, IndexWriter writer, Directory directory, File file) {
-    super(id, writer, directory);
-    this.file = file;
-  }
-
-  public File getFile() {
-    return file;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedPath.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedPath.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedPath.java
deleted file mode 100644
index 2f37d84..0000000
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/WSessionThreadedPath.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.blur.core;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.store.Directory;
-
-public class WSessionThreadedPath extends WSessionThreaded {
-
-  private Path path;
-
-  public WSessionThreadedPath(String id, IndexWriter writer, Directory directory, Path p) {
-    super(id, writer, directory);
-    this.path = p;
-  }
-
-  public Path getPath() {
-    return path;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/ReadSession.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/ReadSession.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/ReadSession.java
new file mode 100644
index 0000000..468e909
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/ReadSession.java
@@ -0,0 +1,54 @@
+package org.apache.blur.core.sessions;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.lucene.index.IndexReader;
+
+public class ReadSession {
+
+  private String sessionId;
+  private IndexReader reader;
+  private Map<String, RunningQuerySession> runningQueries = new ConcurrentHashMap<String, RunningQuerySession>();
+
+  public ReadSession(String sessionId, IndexReader reader) {
+    this.sessionId = sessionId;
+    this.reader = reader;
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public RunningQuerySession create(String queryId) {
+    RunningQuerySession runningQuerySession = new RunningQuerySession(reader, sessionId, queryId);
+    runningQueries.put(queryId, runningQuerySession);
+    return runningQuerySession;
+  }
+
+  public RunningQuerySession getRunningQuerySession(String queryId) {
+    return runningQueries.get(queryId);
+  }
+
+  public void close() throws IOException {
+    reader.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/RunningQuerySession.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/RunningQuerySession.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/RunningQuerySession.java
new file mode 100644
index 0000000..a195c1b
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/RunningQuerySession.java
@@ -0,0 +1,165 @@
+package org.apache.blur.core.sessions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.blur.core.TupleStoredFieldVisitor;
+import org.apache.blur.parser.BqlParser;
+import org.apache.blur.parser.BqlParser.OrderBy;
+import org.apache.blur.parser.BqlParser.QueryType;
+import org.apache.blur.thrift.generated.Tuple;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.SortField.Type;
+import org.apache.lucene.search.TopDocs;
+
+public class RunningQuerySession {
+
+  private IndexSearcher searcher;
+  private TopDocs topDocs;
+  private int totalHits;
+  private Query query;
+  private int position = 0;
+  private int totalPosition = 0;
+  private ScoreDoc scoreDoc;
+  private boolean pagedBefore;
+  private int fetch = 10000;
+  private double countToPreFetchAt = fetch * 0.6;
+  private boolean preFetch = false;
+  private ExecutorService executor;
+  private Future<TopDocs> futureFetch;
+  private boolean runningPreFetch = false;
+  private boolean metaData;
+  private TupleStoredFieldVisitor fieldVistor;
+  private Sort sort;
+  private long limit;
+
+  public RunningQuerySession(IndexReader reader, String sessionId, String queryId) {
+    searcher = new IndexSearcher(reader);
+    executor = Executors.newSingleThreadExecutor();
+  }
+
+  public void executeBql(BqlParser bqlParser) throws IOException {
+    if (bqlParser.getQueryType() == QueryType.SELECT) {
+      limit = bqlParser.getLimit();
+      query = bqlParser.getLuceneQuery();
+      if (query == null) {
+        query = new MatchAllDocsQuery();
+      }
+      List<String> attributeTokens = bqlParser.getAttributeTokens();
+      List<String> attributeTokenAliases = bqlParser.getAttributeTokenAliases();
+      fieldVistor = new TupleStoredFieldVisitor(attributeTokens, attributeTokenAliases);
+
+      List<String> orderByAttributes = bqlParser.getOrderByAttributes();
+      List<OrderBy> orderByAttributesDirection = bqlParser.getOrderByAttributesDirection();
+
+      if (!orderByAttributes.isEmpty()) {
+        SortField[] fields = new SortField[orderByAttributes.size()];
+        for (int i = 0; i < orderByAttributes.size(); i++) {
+          if (orderByAttributesDirection.get(i) == OrderBy.ASC) {
+            fields[i] = new SortField(orderByAttributes.get(i), Type.STRING_VAL);
+          } else {
+            fields[i] = new SortField(orderByAttributes.get(i), Type.STRING_VAL, true);
+          }
+        }
+        sort = new Sort(fields);
+      }
+
+      execute();
+    } else {
+      throw new RuntimeException("Only SELECT query type supported.");
+    }
+  }
+
+  public boolean isMetaDataBeenFetched() {
+    return metaData;
+  }
+
+  public int getTotalHits() {
+    metaData = true;
+    return totalHits;
+  }
+
+  public Tuple nextDocument() throws IOException {
+    if (totalPosition >= limit) {
+      return null;
+    }
+    if (totalPosition >= totalHits) {
+      return null;
+    }
+    try {
+      if (position >= topDocs.scoreDocs.length) {
+        page();
+      } else {
+        checkToSeeIfWeCanGetAhead();
+      }
+      this.scoreDoc = topDocs.scoreDocs[position];
+      searcher.doc(topDocs.scoreDocs[position].doc, fieldVistor);
+      return new Tuple(fieldVistor.getTupleAndReset());
+    } finally {
+      totalPosition++;
+      position++;
+    }
+  }
+
+  private void execute() throws IOException {
+    if (sort == null) {
+      topDocs = searcher.search(query, fetch);
+    } else {
+      topDocs = searcher.search(query, fetch, sort);
+    }
+    totalHits = topDocs.totalHits;
+  }
+
+  private void checkToSeeIfWeCanGetAhead() throws IOException {
+    if (preFetch && pagedBefore) {
+      if (position > countToPreFetchAt && !runningPreFetch) {
+        runningPreFetch = true;
+        final ScoreDoc after = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
+        futureFetch = executor.submit(new Callable<TopDocs>() {
+          @Override
+          public TopDocs call() throws Exception {
+            if (sort == null) {
+              return searcher.searchAfter(after, query, fetch);
+            } else {
+              return searcher.searchAfter(after, query, fetch, sort);
+            }
+//            return searcher.searchAfter(after, query, fetch);
+          }
+        });
+      }
+    }
+  }
+
+  private void page() throws IOException {
+    if (preFetch && pagedBefore) {
+      try {
+        topDocs = futureFetch.get();
+        runningPreFetch = false;
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (ExecutionException e) {
+        e.printStackTrace();
+      }
+    } else {
+      pagedBefore = true;
+      if (sort == null) {
+        topDocs = searcher.searchAfter(scoreDoc, query, fetch);
+      } else {
+        topDocs = searcher.searchAfter(scoreDoc, query, fetch, sort);
+      }
+    }
+    position = 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSession.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSession.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSession.java
new file mode 100644
index 0000000..d74b383
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSession.java
@@ -0,0 +1,60 @@
+package org.apache.blur.core.sessions;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+
+public abstract class WriteSession {
+
+  protected IndexWriter writer;
+  protected String sessionId;
+  protected Directory directory;
+
+  public WriteSession(String id, IndexWriter writer, Directory directory) {
+    this.sessionId = id;
+    this.writer = writer;
+    this.directory = directory;
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public void addDocument(Document document) throws IOException {
+    writer.addDocument(document);
+  }
+
+  public void addDocuments(List<Document> documents) throws IOException {
+    writer.addDocuments(documents);
+  }
+
+  public void closeWriter() throws IOException {
+    writer.close();
+  }
+
+  public Directory getDirectory() {
+    return directory;
+  }
+
+  public abstract void removeTempSpace() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSessionThreaded.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSessionThreaded.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSessionThreaded.java
new file mode 100644
index 0000000..309488c
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSessionThreaded.java
@@ -0,0 +1,99 @@
+package org.apache.blur.core.sessions;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+
+public abstract class WriteSessionThreaded extends WriteSession {
+
+  private Thread thread;
+  private volatile boolean running = true;
+  private BlockingQueue<Document> queue = new ArrayBlockingQueue<Document>(100);
+
+  public WriteSessionThreaded(String id, IndexWriter writer, Directory directory) {
+    super(id, writer, directory);
+    startThread();
+  }
+
+  private void startThread() {
+    this.thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (running) {
+          Document document;
+          try {
+            document = queue.poll(10, TimeUnit.MILLISECONDS);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+            return;
+          }
+          if (document != null) {
+            try {
+              writer.addDocument(document);
+            } catch (CorruptIndexException e) {
+              e.printStackTrace();
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+        List<Document> docs = new ArrayList<Document>();
+        queue.drainTo(docs);
+        try {
+          writer.addDocuments(docs);
+        } catch (CorruptIndexException e) {
+          e.printStackTrace();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    thread.start();
+  }
+
+  public void addDocument(Document document) throws IOException {
+    if (running) {
+      try {
+        queue.put(document);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    } else {
+      throw new IOException("Writer closed");
+    }
+  }
+
+  public void closeWriter() throws IOException {
+    running = false;
+    try {
+      thread.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSessionThreadedFile.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSessionThreadedFile.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSessionThreadedFile.java
new file mode 100644
index 0000000..5798b33
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSessionThreadedFile.java
@@ -0,0 +1,50 @@
+package org.apache.blur.core.sessions;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.File;
+
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+
+public class WriteSessionThreadedFile extends WriteSessionThreaded {
+
+  private File file;
+
+  public WriteSessionThreadedFile(String id, IndexWriter writer, Directory directory, File file) {
+    super(id, writer, directory);
+    this.file = file;
+  }
+
+  @Override
+  public void removeTempSpace() {
+    rm(file);
+  }
+
+  private static void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSessionThreadedPath.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSessionThreadedPath.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSessionThreadedPath.java
new file mode 100644
index 0000000..4555608
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/core/sessions/WriteSessionThreadedPath.java
@@ -0,0 +1,49 @@
+package org.apache.blur.core.sessions;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+
+public class WriteSessionThreadedPath extends WriteSessionThreaded {
+
+  private Path path;
+  private Configuration configuration;
+
+  public WriteSessionThreadedPath(String id, IndexWriter writer, Directory directory, Configuration configuration, Path p) {
+    super(id, writer, directory);
+    this.path = p;
+    this.configuration = configuration;
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  @Override
+  public void removeTempSpace() throws IOException {
+    FileSystem fileSystem = path.getFileSystem(configuration);
+    fileSystem.delete(path, true);
+    fileSystem.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/parser/BqlException.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/parser/BqlException.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/parser/BqlException.java
new file mode 100644
index 0000000..28b0200
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/parser/BqlException.java
@@ -0,0 +1,15 @@
+package org.apache.blur.parser;
+
+public class BqlException extends Exception {
+
+  private static final long serialVersionUID = 5871985403028318296L;
+
+  public BqlException(String message) {
+    super(message);
+  }
+
+  public BqlException(Exception e) {
+    super(e);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/parser/BqlParser.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/parser/BqlParser.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/parser/BqlParser.java
new file mode 100644
index 0000000..832a734
--- /dev/null
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/parser/BqlParser.java
@@ -0,0 +1,238 @@
+package org.apache.blur.parser;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.util.Version;
+
+public class BqlParser {
+
+  private static final String LIMIT = "limit";
+  private static final String BY = "by";
+  private static final String ORDER = "order";
+  private static final String WHERE = "where";
+  private static final String FROM = "from";
+  private static final String SELECT = "select";
+  private static final String AS = "as";
+  private static final String ASC = "asc";
+  private static final String DESC = "desc";
+  private static final Collection<String> STOP_WORDS = new HashSet<String>(Arrays.asList(ORDER, WHERE, FROM, SELECT, LIMIT));
+
+  public enum QueryType {
+    SELECT
+  }
+
+  public enum OrderBy {
+    ASC, DESC
+  }
+
+  private List<String> attributeTokens = new ArrayList<String>();
+  private List<String> attributeTokenAliases = new ArrayList<String>();
+  private QueryType type;
+  private String tableName;
+  private String tableNameAlias;
+  private Analyzer analyzer;
+  private Query luceneQuery;
+  private List<String> orderByAttributes = new ArrayList<String>();
+  private List<OrderBy> orderByAttributesDirection = new ArrayList<OrderBy>();
+  private long limit = Long.MAX_VALUE;
+
+  public long getLimit() {
+    return limit;
+  }
+
+  public BqlParser(Analyzer analyzer) {
+    this.analyzer = analyzer;
+  }
+
+  public void parse(String queryString) throws BqlException {
+    String lowerCase = queryString.toLowerCase();
+    if (parseSelect(lowerCase) && parseFrom(lowerCase) && parseWhere(lowerCase) && parseOrderBy(lowerCase) && parserLimit(lowerCase)) {
+      type = QueryType.SELECT;
+      return;
+    }
+    throw new RuntimeException("Not supported [" + queryString + "]");
+  }
+
+  private boolean parserLimit(String query) throws BqlException {
+    StringTokenizer tokenizer = new StringTokenizer(query, " ,");
+    boolean limitTokenFound = false;
+    while (tokenizer.hasMoreTokens()) {
+      String token = tokenizer.nextToken();
+      if (!limitTokenFound && token.equals(LIMIT)) {
+        limitTokenFound = true;
+      } else if (limitTokenFound) {
+        if (STOP_WORDS.contains(token)) {
+          return true;
+        }
+        limit = Long.parseLong(token);
+        return true;
+      }
+    }
+    return limitTokenFound;
+  }
+
+  private boolean parseOrderBy(String query) throws BqlException {
+    StringTokenizer tokenizer = new StringTokenizer(query, " ,");
+    boolean orderByTokenFound = false;
+    while (tokenizer.hasMoreTokens()) {
+      String token = tokenizer.nextToken();
+      if (!orderByTokenFound && token.equals(ORDER)) {
+        orderByTokenFound = true;
+      } else if (orderByTokenFound) {
+        if (STOP_WORDS.contains(token)) {
+          return true;
+        }
+        if (token.equals(BY)) {
+          continue;
+        } else if (token.equals(ASC)) {
+          orderByAttributesDirection.add(OrderBy.ASC);
+        } else if (token.equals(DESC)) {
+          orderByAttributesDirection.add(OrderBy.DESC);
+        } else {
+          orderByAttributes.add(token);
+          addDefaultAsc();
+        }
+      }
+    }
+    return true;
+  }
+
+  private void addDefaultAsc() {
+    if (orderByAttributesDirection.size() < orderByAttributes.size()) {
+      orderByAttributesDirection.add(OrderBy.ASC);
+    }
+  }
+
+  private boolean parseWhere(String query) throws BqlException {
+    int index = query.indexOf(WHERE);
+    if (index < 0) {
+      return true;
+    }
+    int fromIndex = notToMaxInt(query.indexOf(FROM, index));
+    int selectIndex = notToMaxInt(query.indexOf(SELECT, index));
+    int orderIndex = notToMaxInt(query.indexOf(ORDER));
+    int endingIndex = Math.min(fromIndex, Math.min(selectIndex, orderIndex));
+    if (endingIndex < 0 || endingIndex == Integer.MAX_VALUE) {
+      luceneQuery = luceneParse(query.substring(index + WHERE.length()));
+    } else {
+      luceneQuery = luceneParse(query.substring(index + WHERE.length(), endingIndex));
+    }
+    return true;
+  }
+
+  private int notToMaxInt(int i) {
+    if (i < 0) {
+      return Integer.MAX_VALUE;
+    }
+    return i;
+  }
+
+  private Query luceneParse(String luceneQuery) throws BqlException {
+    QueryParser parser = new QueryParser(Version.LUCENE_40, "default", analyzer);
+    try {
+      return parser.parse(luceneQuery);
+    } catch (ParseException e) {
+      throw new BqlException(e);
+    }
+  }
+
+  private boolean parseFrom(String query) throws BqlException {
+    StringTokenizer tokenizer = new StringTokenizer(query, " ,");
+    boolean fromTokenFound = false;
+    while (tokenizer.hasMoreTokens()) {
+      String token = tokenizer.nextToken();
+      if (!fromTokenFound && token.equals(FROM)) {
+        fromTokenFound = true;
+      } else if (fromTokenFound) {
+        if (STOP_WORDS.contains(token)) {
+          return true;
+        }
+        if (tableName == null) {
+          tableName = token;
+        } else if (token.equals(AS)) {
+          tableNameAlias = tokenizer.nextToken();
+        } else {
+          throw new BqlException("Table [" + tableName + "] already set, only a single table is allowed.");
+        }
+
+      }
+    }
+    return fromTokenFound;
+  }
+
+  private boolean parseSelect(String query) {
+    StringTokenizer tokenizer = new StringTokenizer(query, " ,");
+    boolean selectTokenFound = false;
+    while (tokenizer.hasMoreTokens()) {
+      String token = tokenizer.nextToken();
+      if (!selectTokenFound && token.equals(SELECT)) {
+        selectTokenFound = true;
+      } else if (selectTokenFound) {
+        if (STOP_WORDS.contains(token)) {
+          addNullAlias();
+          return true;
+        }
+        if (token.equals(AS)) {
+          attributeTokenAliases.add(tokenizer.nextToken());
+        } else {
+          addNullAlias();
+          attributeTokens.add(token);
+        }
+      }
+    }
+    addNullAlias();
+    return selectTokenFound;
+  }
+
+  private void addNullAlias() {
+    if (attributeTokenAliases.size() != attributeTokens.size()) {
+      attributeTokenAliases.add(null);
+    }
+  }
+
+  public QueryType getQueryType() {
+    return type;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String getTableNameAlias() {
+    return tableNameAlias;
+  }
+
+  public List<String> getAttributeTokens() {
+    return attributeTokens;
+  }
+
+  public List<String> getAttributeTokenAliases() {
+    return attributeTokenAliases;
+  }
+
+  public Query getLuceneQuery() {
+    return luceneQuery;
+  }
+
+  public Analyzer getAnalyzer() {
+    return analyzer;
+  }
+
+  public List<String> getOrderByAttributes() {
+    return orderByAttributes;
+  }
+
+  public List<OrderBy> getOrderByAttributesDirection() {
+    return orderByAttributesDirection;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/example/clients/LoadThriftClientBatch.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/example/clients/LoadThriftClientBatch.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/example/clients/LoadThriftClientBatch.java
index 28ba40e..e49767e 100644
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/example/clients/LoadThriftClientBatch.java
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/example/clients/LoadThriftClientBatch.java
@@ -43,13 +43,13 @@ public class LoadThriftClientBatch {
         long s = System.currentTimeMillis();
         Random random = new Random();
         List<Tuple> tuples = new ArrayList<Tuple>();
-        
+
         long count = 0;
         long total = 0;
         long start = System.nanoTime();
         long attributeCount = 0;
         long attributeTotal = 0;
-        
+
         for (int i = 0; i < length; i++) {
           long now = System.nanoTime();
           if (start + 5000000000L < now) {
@@ -61,17 +61,17 @@ public class LoadThriftClientBatch {
             count = 0;
             attributeCount = 0;
           }
-          
+
           Tuple tuple = new Tuple();
           for (int f = 0; f < fields; f++) {
-            tuple.addToAttributes(Util.newAttribute("id" + f, Long.toString(random.nextLong())));
+            tuple.addToAttributes(Util.newAttribute("id" + f, Long.toString(Math.abs(random.nextLong()))));
           }
           tuples.add(tuple);
           if (tuples.size() >= batch) {
             client.writeTuples(session, tuples);
             tuples.clear();
           }
-          
+
           count++;
           total++;
           int atCount = tuple.getAttributes().size();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/3fbd7f52/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/example/clients/ReadThriftClientBatch.java
----------------------------------------------------------------------
diff --git a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/example/clients/ReadThriftClientBatch.java b/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/example/clients/ReadThriftClientBatch.java
index 1bf8d07..d6ce88b 100644
--- a/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/example/clients/ReadThriftClientBatch.java
+++ b/src/blur-new-api-prototype/src/main/java/org/apache/blur/proto/example/clients/ReadThriftClientBatch.java
@@ -24,6 +24,7 @@ import org.apache.blur.thrift.ClientManager;
 import org.apache.blur.thrift.Command;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurTuple.Client;
+import org.apache.blur.thrift.generated.QuerySession;
 import org.apache.blur.thrift.generated.Session;
 import org.apache.blur.thrift.generated.Tuple;
 import org.apache.thrift.TException;
@@ -35,10 +36,13 @@ public class ReadThriftClientBatch {
       @Override
       public Void call(Client client) throws BlurException, TException {
         Session session = client.openReadSession();
-        // client.executeQuery(session, "id1:fb*");
-        client.executeQuery(session, "*");
+        // QuerySession querySession = client.executeQuery(session,
+        // "select id0,id1 from table1 order by desc id0");
+        QuerySession querySession = client.executeQuery(session, "select id0,id1 from table1 limit 100");
+        // QuerySession querySession = client.executeQuery(session,
+        // "select id0,id1,id2 as id_othername from table1 where id1:86184300004670* order by desc id0");
         while (true) {
-          List<Tuple> metaDataResults = client.nextMetaDataResults(session, 10);
+          List<Tuple> metaDataResults = client.nextMetaDataResults(querySession, 100);
           if (metaDataResults.isEmpty()) {
             break;
           }
@@ -63,13 +67,13 @@ public class ReadThriftClientBatch {
             count = 0;
             attributeCount = 0;
           }
-          List<Tuple> results = client.nextResults(session, 10);
+          List<Tuple> results = client.nextResults(querySession, 100);
           if (results.isEmpty()) {
             break;
           }
-//          for (Tuple tuple : results) {
-//            System.out.println(Util.toString(tuple));
-//          }
+          for (Tuple tuple : results) {
+            System.out.println(Util.toString(tuple));
+          }
           count += results.size();
           total += results.size();
           int atCount = 0;
@@ -89,7 +93,5 @@ public class ReadThriftClientBatch {
         return null;
       }
     });
-
   }
-
 }


Mime
View raw message