incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/46] git commit: New API, work in progress. Some of the search method has been implemented.
Date Tue, 30 Oct 2012 02:53:18 GMT
New API, work in progress.  Some of the search method has been implemented.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/f219d526
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/f219d526
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/f219d526

Branch: refs/heads/0.2-dev
Commit: f219d52638d1b604b9d9a856615b75f6baf56061
Parents: 6faa4de
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Oct 29 20:13:30 2012 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Oct 29 20:13:30 2012 -0400

----------------------------------------------------------------------
 .../org/apache/blur/thrift/BlurShardServer.java    |   71 ++++++---
 .../blur/thrift/IndexSearcherParallelCall.java     |   14 ++
 .../java/org/apache/blur/thrift/SessionInfo.java   |   61 ++++++++
 .../org/apache/blur/thrift/TopFieldDocsMerger.java |   73 +++++++++
 .../main/java/org/apache/blur/utils/BlurUtil.java  |  120 +++++++++++++++
 5 files changed, 316 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f219d526/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 2121a7c..b808651 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -28,8 +28,10 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongArray;
 
@@ -63,11 +65,15 @@ import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurExecutorCompletionService.Cancel;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.ForkJoin;
+import org.apache.blur.utils.ForkJoin.ParallelReturn;
 import org.apache.blur.utils.QueryCache;
 import org.apache.blur.utils.QueryCacheEntry;
 import org.apache.blur.utils.QueryCacheKey;
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.Filter;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
 import org.apache.thrift.TException;
 
 public class BlurShardServer extends TableAdmin implements Iface {
@@ -81,12 +87,16 @@ public class BlurShardServer extends TableAdmin implements Iface {
   private QueryCache _queryCache;
   private BlurQueryChecker _queryChecker;
   private ExecutorService _dataFetch;
+  private ExecutorService _indexSearcherExecutor;
+  private ExecutorService _searchExecutor;
   private String _cluster = BlurConstants.BLUR_CLUSTER;
   private int _dataFetchThreadCount = 32;
 
   public void init() throws BlurException {
     _queryCache = new QueryCache("shard-cache", _maxQueryCacheElements, _maxTimeToLive);
     _dataFetch = Executors.newThreadPool("data-fetch-", _dataFetchThreadCount);
+    _indexSearcherExecutor = Executors.newThreadPool("index-searcher-", 16);
+    _searchExecutor = Executors.newThreadPool("search-", 16);
 
     if (_configuration == null) {
       throw new BException("Configuration must be set before initialization.");
@@ -378,7 +388,7 @@ public class BlurShardServer extends TableAdmin implements Iface {
       try {
         IndexReader indexReader = entry.getValue().getIndexReader();
         // @TODO use new thread pool here
-        IndexSearcher indexSearcher = new IndexSearcher(indexReader, this._dataFetch);
+        IndexSearcher indexSearcher = new IndexSearcher(indexReader, _searchExecutor);
         sessionInfo.add(index, indexReader);
         sessionInfo.add(index, indexSearcher);
       } catch (IOException e) {
@@ -390,40 +400,55 @@ public class BlurShardServer extends TableAdmin implements Iface {
   }
 
   @Override
-  public TopFieldDocs search(Session session, QueryArgs queryArgs) throws BlurException,
TException {
+  public List<TopFieldDocs> search(Session session, QueryArgs queryArgs) throws BlurException,
TException {
     SessionInfo info = getSessionInfo(session);
     try {
       Map<Integer, IndexSearcher> searchers = info.getSearchers();
       List<Integer> shardIndexes = queryArgs.getShardIndexes();
-      List<IndexSearcher> searchersToSearch = new ArrayList<IndexSearcher>();
-      if (shardIndexes == null) {
-        // all indexes
-        searchersToSearch.addAll(searchers.values());
-      } else {
-        for (Integer index : shardIndexes) {
-          IndexSearcher searcher = searchers.get(index);
-          if (searcher != null) {
-            searchersToSearch.add(searcher);
-          }
-        }
+      List<IndexSearcher> searchersToSearch = getSearchers(shardIndexes, searchers);
+
+      List<Future<TopFieldDocs>> futures = new ArrayList<Future<TopFieldDocs>>(searchersToSearch.size());
+      Query query = BlurUtil.getQuery(queryArgs);
+      Filter filter = BlurUtil.getFilter(queryArgs);
+      Sort sort = BlurUtil.getSort(queryArgs);
+      for (IndexSearcher searcher : searchersToSearch) {
+        Future<TopFieldDocs> future = _searchExecutor.submit(new SearchCallable(searcher,query));
+        futures.add(future);
       }
-      IndexSearcherParallelCall call = new IndexSearcherParallelCall();
-      TopFieldDocsMerger merger = new TopFieldDocsMerger();
-      return convert(ForkJoin.execute(_dataFetch, searchersToSearch, call, new Cancel() {
-        @Override
-        public void cancel() {
 
-        }
-      }).merge(merger));
+      return null;
     } catch (Throwable t) {
       LOG.error("Unknown error", t);
       throw new BException(t.getMessage(), t);
     }
   }
 
-  private TopFieldDocs convert(org.apache.lucene.search.TopFieldDocs merge) {
-    // TODO Auto-generated method stub
-    return null;
+  private static class SearchCallable implements Callable<TopFieldDocs> {
+    public SearchCallable(IndexSearcher searcher, Query query) {
+      // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public TopFieldDocs call() throws Exception {
+
+      return null;
+    }
+  }
+
+  private List<IndexSearcher> getSearchers(List<Integer> shardIndexes, Map<Integer,
IndexSearcher> searchers) {
+    List<IndexSearcher> searchersToSearch = new ArrayList<IndexSearcher>();
+    if (shardIndexes == null) {
+      // all indexes
+      searchersToSearch.addAll(searchers.values());
+    } else {
+      for (Integer index : shardIndexes) {
+        IndexSearcher searcher = searchers.get(index);
+        if (searcher != null) {
+          searchersToSearch.add(searcher);
+        }
+      }
+    }
+    return searchersToSearch;
   }
 
   private SessionInfo getSessionInfo(Session session) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f219d526/src/blur-core/src/main/java/org/apache/blur/thrift/IndexSearcherParallelCall.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/IndexSearcherParallelCall.java
b/src/blur-core/src/main/java/org/apache/blur/thrift/IndexSearcherParallelCall.java
new file mode 100644
index 0000000..bfa46f6
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/IndexSearcherParallelCall.java
@@ -0,0 +1,14 @@
+package org.apache.blur.thrift;
+
+import org.apache.blur.utils.ForkJoin.ParallelCall;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TopFieldDocs;
+
+public class IndexSearcherParallelCall implements ParallelCall<IndexSearcher, TopFieldDocs>
{
+
+  @Override
+  public TopFieldDocs call(IndexSearcher input) throws Exception {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f219d526/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java b/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java
new file mode 100644
index 0000000..e147423
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/SessionInfo.java
@@ -0,0 +1,61 @@
+package org.apache.blur.thrift;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
+
+public class SessionInfo {
+
+  private String uuid;
+  private BlurAnalyzer analyzer;
+  private Map<Integer, IndexReader> readers = new HashMap<Integer, IndexReader>();
+  private Map<Integer, IndexSearcher> searchers = new HashMap<Integer, IndexSearcher>();
+
+  public BlurAnalyzer getAnalyzer() {
+    return analyzer;
+  }
+
+  public void setAnalyzer(BlurAnalyzer analyzer) {
+    this.analyzer = analyzer;
+  }
+
+  public String getUuid() {
+    return uuid;
+  }
+
+  public void setUuid(String uuid) {
+    this.uuid = uuid;
+  }
+
+  public void add(int index, IndexReader indexReader) {
+    readers.put(index, indexReader);
+  }
+  
+  public void add(int index, IndexSearcher indexSearcher) {
+    searchers.put(index, indexSearcher);
+  }
+
+  public Map<Integer, IndexReader> getReaders() {
+    return readers;
+  }
+
+  public void setReaders(Map<Integer, IndexReader> readers) {
+    this.readers = readers;
+  }
+
+  public Map<Integer, IndexSearcher> getSearchers() {
+    return searchers;
+  }
+
+  public void setSearchers(Map<Integer, IndexSearcher> searchers) {
+    this.searchers = searchers;
+  }
+  
+  
+  
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f219d526/src/blur-core/src/main/java/org/apache/blur/thrift/TopFieldDocsMerger.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/TopFieldDocsMerger.java b/src/blur-core/src/main/java/org/apache/blur/thrift/TopFieldDocsMerger.java
new file mode 100644
index 0000000..6adb02b
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/TopFieldDocsMerger.java
@@ -0,0 +1,73 @@
+package org.apache.blur.thrift;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.utils.BlurExecutorCompletionService;
+import org.apache.blur.utils.ForkJoin.Merger;
+import org.apache.lucene.search.Filter;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.TopFieldDocs;
+
+public class TopFieldDocsMerger implements Merger<TopFieldDocs> {
+
+  private static Log LOG = LogFactory.getLog(TopFieldDocsMerger.class);
+
+  private long _minimumNumberOfResults;
+  private long _maxQueryTime;
+  private Query _query;
+  private Filter _filter;
+  private Sort _sort;
+  private int _topN;
+
+  public TopFieldDocsMerger(long minimumNumberOfResults, long maxQueryTime, int topN) {
+    _minimumNumberOfResults = minimumNumberOfResults;
+    _maxQueryTime = maxQueryTime;
+    _topN = topN;
+  }
+
+  @Override
+  public TopFieldDocs merge(BlurExecutorCompletionService<TopFieldDocs> service) throws
BlurException {
+    List<TopFieldDocs> results = new ArrayList<TopFieldDocs>();
+    long total = 0;
+    while (service.getRemainingCount() > 0) {
+      Future<TopFieldDocs> future = service.poll(_maxQueryTime, TimeUnit.MILLISECONDS,
true, _query, _filter, _sort);
+      if (future != null) {
+        TopFieldDocs topFieldDocs = service.getResultThrowException(future, _query, _filter,
_sort);
+        total += topFieldDocs.totalHits;
+        results.add(topFieldDocs);
+        if (total >= _minimumNumberOfResults) {
+          // Called to stop execution of any other running queries.
+          service.cancelAll();
+          return merge(results);
+        }
+      } else {
+        LOG.info("Query timeout with max query time of [{0}] for query [{1}] filter [{2}]
sort [{3}].", _maxQueryTime, _query, _filter, _sort);
+        throw new BlurException("Query timeout with max query time of [" + _maxQueryTime
+ "] for query [" + _query + "] filter [" + _filter + "] sort [" + _sort + "].", null);
+      }
+    }
+    return merge(results);
+  }
+
+  private TopFieldDocs merge(List<TopFieldDocs> results) throws BlurException {
+    try {
+      TopDocs topDocs = TopDocs.merge(_sort, _topN, results.toArray(new TopDocs[]{}));
+      if (_sort == null) {
+        return new TopFieldDocs(topDocs.totalHits, topDocs.scoreDocs, null, topDocs.getMaxScore());
+      } else {
+        return (TopFieldDocs) topDocs;
+      }
+    } catch (IOException e) {
+      throw new BException("Unkown IOException", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f219d526/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
index 34a177a..616c898 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -29,6 +29,7 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -53,15 +54,21 @@ import org.apache.blur.thrift.generated.BlurQuery;
 import org.apache.blur.thrift.generated.BlurResult;
 import org.apache.blur.thrift.generated.BlurResults;
 import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.DocLocation;
 import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.QueryArgs;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.RecordMutation;
 import org.apache.blur.thrift.generated.RecordMutationType;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.RowMutationType;
+import org.apache.blur.thrift.generated.ScoreDoc;
 import org.apache.blur.thrift.generated.Selector;
 import org.apache.blur.thrift.generated.SimpleQuery;
+import org.apache.blur.thrift.generated.SortField;
+import org.apache.blur.thrift.generated.SortType;
+import org.apache.blur.thrift.generated.TopFieldDocs;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -72,7 +79,11 @@ import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.SlowCompositeReaderWrapper;
 import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Filter;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField.Type;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.util.RamUsageEstimator;
@@ -668,4 +679,113 @@ public class BlurUtil {
     int index = shard.indexOf('-');
     return Integer.parseInt(shard.substring(index + 1));
   }
+
+  public static TopFieldDocs convert(org.apache.lucene.search.TopFieldDocs topFieldDocs)
{
+    TopFieldDocs result = new TopFieldDocs();
+    result.setFields(convert(topFieldDocs.fields));
+    result.setMaxScore(topFieldDocs.getMaxScore());
+    result.setScoreDocs(convert(topFieldDocs.scoreDocs));
+    result.setTotalHits(topFieldDocs.totalHits);
+    return result;
+  }
+
+  public static List<ScoreDoc> convert(org.apache.lucene.search.ScoreDoc[] scoreDocs)
{
+    List<ScoreDoc> result = new ArrayList<ScoreDoc>(scoreDocs.length);
+    for (int i = 0; i < scoreDocs.length; i++) {
+      org.apache.lucene.search.ScoreDoc scoreDoc = scoreDocs[i];
+      if (scoreDoc instanceof org.apache.lucene.search.FieldDoc) {
+        result.add(convert((org.apache.lucene.search.FieldDoc) scoreDoc));
+      } else {
+        result.add(convert(scoreDoc));
+      }
+    }
+    return result;
+  }
+
+  public List<ScoreDoc> convert(org.apache.lucene.search.FieldDoc[] fieldDocs) {
+    List<ScoreDoc> result = new ArrayList<ScoreDoc>(fieldDocs.length);
+    for (int i = 0; i < fieldDocs.length; i++) {
+      result.add(convert(fieldDocs[i]));
+    }
+    return result;
+  }
+
+  public static List<SortField> convert(org.apache.lucene.search.SortField[] fields)
{
+    List<SortField> result = new ArrayList<SortField>(fields.length);
+    for (int i = 0; i < fields.length; i++) {
+      result.add(convert(fields[i]));
+    }
+    return result;
+  }
+
+  public static SortField convert(org.apache.lucene.search.SortField sortField) {
+    SortField result = new SortField();
+    result.setField(sortField.getField());
+    result.setReverse(sortField.getReverse());
+    result.setType(convert(sortField.getType()));
+    return result;
+  }
+
+  public static ScoreDoc convert(org.apache.lucene.search.ScoreDoc scoreDoc) {
+    ScoreDoc result = new ScoreDoc();
+    result.setDocLocation(new DocLocation(scoreDoc.doc, scoreDoc.shardIndex));
+    result.setScore(scoreDoc.score);
+    return result;
+  }
+
+  public static ScoreDoc convert(org.apache.lucene.search.FieldDoc fieldDoc) {
+    ScoreDoc result = new ScoreDoc();
+    result.setDocLocation(new DocLocation(fieldDoc.doc, fieldDoc.shardIndex));
+    result.setFields(convert(fieldDoc.fields));
+    result.setScore(fieldDoc.score);
+    return result;
+  }
+
+  public static SortType convert(Type type) {
+    switch (type) {
+    case BYTE:
+      return SortType.BYTE;
+    case BYTES:
+      return SortType.BYTES;
+    case SCORE:
+      return SortType.SCORE;
+    case DOC:
+      return SortType.DOC;
+    case STRING:
+      return SortType.STRING;
+    case INT:
+      return SortType.INT;
+    case FLOAT:
+      return SortType.FLOAT;
+    case LONG:
+      return SortType.LONG;
+    case DOUBLE:
+      return SortType.DOUBLE;
+    case SHORT:
+      return SortType.SHORT;
+    case STRING_VAL:
+      return SortType.STRING_VAL;
+    default:
+      throw new RuntimeException("Not supported");
+    }
+  }
+
+  public static List<ByteBuffer> convert(Object[] fields) {
+    throw new RuntimeException("Not supported YET!");
+  }
+
+  public static Query getQuery(QueryArgs queryArgs) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public static Filter getFilter(QueryArgs queryArgs) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public static Sort getSort(QueryArgs queryArgs) {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }


Mime
View raw message