incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [28/46] git commit: Some stubbed out classes and the beginning of an implementation of the Lucene thrift service.
Date Tue, 30 Oct 2012 02:53:19 GMT
Some stubbed out classes and the beginning of an implementation of the Lucene thrift service.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/0c06f355
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/0c06f355
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/0c06f355

Branch: refs/heads/0.2-dev
Commit: 0c06f3559507f75ad47b0515a94dc72fdd86b5c6
Parents: 1e930bb
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Oct 27 20:19:54 2012 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Oct 27 20:19:54 2012 -0400

----------------------------------------------------------------------
 .../org/apache/blur/thrift/BlurShardServer.java    |  167 ++++++++++++++-
 .../main/java/org/apache/blur/utils/BlurUtil.java  |    5 +
 2 files changed, 162 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0c06f355/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 507359b..2121a7c 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -21,14 +21,20 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_CACHE_MAX_TIMETOLIV
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DATA_FETCH_THREAD_COUNT;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongArray;
 
 import org.apache.blur.BlurConfiguration;
+import org.apache.blur.analysis.BlurAnalyzer;
 import org.apache.blur.concurrent.Executors;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
@@ -37,25 +43,33 @@ import org.apache.blur.manager.IndexManager;
 import org.apache.blur.manager.IndexServer;
 import org.apache.blur.manager.results.BlurResultIterable;
 import org.apache.blur.manager.writer.BlurIndex;
-import org.apache.blur.thrift.BException;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurQuery;
 import org.apache.blur.thrift.generated.BlurQueryStatus;
 import org.apache.blur.thrift.generated.BlurResults;
+import org.apache.blur.thrift.generated.DocLocation;
+import org.apache.blur.thrift.generated.Document;
 import org.apache.blur.thrift.generated.FetchResult;
+import org.apache.blur.thrift.generated.Lucene.Iface;
+import org.apache.blur.thrift.generated.QueryArgs;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.Schema;
 import org.apache.blur.thrift.generated.Selector;
+import org.apache.blur.thrift.generated.Session;
 import org.apache.blur.thrift.generated.TableStats;
-import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.Term;
+import org.apache.blur.thrift.generated.TopFieldDocs;
 import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurExecutorCompletionService.Cancel;
 import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.ForkJoin;
 import org.apache.blur.utils.QueryCache;
 import org.apache.blur.utils.QueryCacheEntry;
 import org.apache.blur.utils.QueryCacheKey;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
 import org.apache.thrift.TException;
 
-
 public class BlurShardServer extends TableAdmin implements Iface {
 
   private static final Log LOG = LogFactory.getLog(BlurShardServer.class);
@@ -73,14 +87,14 @@ public class BlurShardServer extends TableAdmin implements Iface {
   public void init() throws BlurException {
     _queryCache = new QueryCache("shard-cache", _maxQueryCacheElements, _maxTimeToLive);
     _dataFetch = Executors.newThreadPool("data-fetch-", _dataFetchThreadCount);
-    
-    if(_configuration == null) {
+
+    if (_configuration == null) {
       throw new BException("Configuration must be set before initialization.");
     }
-      _cluster = _configuration.get(BlurConstants.BLUR_CLUSTER_NAME, BlurConstants.BLUR_CLUSTER);
-      _dataFetchThreadCount = _configuration.getInt(BLUR_SHARD_DATA_FETCH_THREAD_COUNT, 8);
-      _maxQueryCacheElements = _configuration.getInt(BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS,
128);
-      _maxTimeToLive = _configuration.getLong(BLUR_SHARD_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1));
     
+    _cluster = _configuration.get(BlurConstants.BLUR_CLUSTER_NAME, BlurConstants.BLUR_CLUSTER);
+    _dataFetchThreadCount = _configuration.getInt(BLUR_SHARD_DATA_FETCH_THREAD_COUNT, 8);
+    _maxQueryCacheElements = _configuration.getInt(BLUR_SHARD_CACHE_MAX_QUERYCACHE_ELEMENTS,
128);
+    _maxTimeToLive = _configuration.getLong(BLUR_SHARD_CACHE_MAX_TIMETOLIVE, TimeUnit.MINUTES.toMillis(1));
   }
 
   @Override
@@ -333,8 +347,141 @@ public class BlurShardServer extends TableAdmin implements Iface {
   public void setDataFetchThreadCount(int dataFetchThreadCount) {
     _dataFetchThreadCount = dataFetchThreadCount;
   }
-  
+
   public void setConfiguration(BlurConfiguration conf) {
     _configuration = conf;
   }
+
+  // New interface from this point
+
+  private Map<String, SessionInfo> sessions = new ConcurrentHashMap<String, SessionInfo>();
+
+  @Override
+  public Session openReadSession(String table) throws BlurException, TException {
+    checkTable(_cluster, table);
+    BlurAnalyzer analyzer = _indexServer.getAnalyzer(table);
+    Map<String, BlurIndex> blurIndexes;
+    try {
+      blurIndexes = _indexServer.getIndexes(table);
+    } catch (IOException e) {
+      LOG.error("Unknown error while trying to fetch index readers.", e);
+      throw new BException(e.getMessage(), e);
+    }
+
+    String uuid = UUID.randomUUID().toString();
+    SessionInfo sessionInfo = new SessionInfo();
+    sessionInfo.setUuid(uuid);
+    sessionInfo.setAnalyzer(analyzer);
+
+    for (Entry<String, BlurIndex> entry : blurIndexes.entrySet()) {
+      int index = BlurUtil.getShardIndex(entry.getKey());
+      try {
+        IndexReader indexReader = entry.getValue().getIndexReader();
+        // @TODO use new thread pool here
+        IndexSearcher indexSearcher = new IndexSearcher(indexReader, this._dataFetch);
+        sessionInfo.add(index, indexReader);
+        sessionInfo.add(index, indexSearcher);
+      } catch (IOException e) {
+        LOG.error("Unknown error while trying to fetch index readers.", e);
+      }
+    }
+    sessions.put(uuid, sessionInfo);
+    return new Session(uuid, null);
+  }
+
+  @Override
+  public TopFieldDocs search(Session session, QueryArgs queryArgs) throws BlurException,
TException {
+    SessionInfo info = getSessionInfo(session);
+    try {
+      Map<Integer, IndexSearcher> searchers = info.getSearchers();
+      List<Integer> shardIndexes = queryArgs.getShardIndexes();
+      List<IndexSearcher> searchersToSearch = new ArrayList<IndexSearcher>();
+      if (shardIndexes == null) {
+        // all indexes
+        searchersToSearch.addAll(searchers.values());
+      } else {
+        for (Integer index : shardIndexes) {
+          IndexSearcher searcher = searchers.get(index);
+          if (searcher != null) {
+            searchersToSearch.add(searcher);
+          }
+        }
+      }
+      IndexSearcherParallelCall call = new IndexSearcherParallelCall();
+      TopFieldDocsMerger merger = new TopFieldDocsMerger();
+      return convert(ForkJoin.execute(_dataFetch, searchersToSearch, call, new Cancel() {
+        @Override
+        public void cancel() {
+
+        }
+      }).merge(merger));
+    } catch (Throwable t) {
+      LOG.error("Unknown error", t);
+      throw new BException(t.getMessage(), t);
+    }
+  }
+
+  private TopFieldDocs convert(org.apache.lucene.search.TopFieldDocs merge) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private SessionInfo getSessionInfo(Session session) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public List<Document> doc(Session session, List<DocLocation> docLocations,
Set<String> fields) throws BlurException, TException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void closeReadSession(Session session) throws BlurException, TException {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public Session openWriteSession(String table) throws BlurException, TException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void addDocuments(Session session, int shardIndex, List<Document> document)
throws BlurException, TException {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void deleteDocumentsByQueries(Session session, int shardIndex, List<QueryArgs>
queries) throws BlurException, TException {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void deleteDocuments(Session session, int shardIndex, List<Term> terms) throws
BlurException, TException {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void updateDocuments(Session session, int shardIndex, List<Term> terms, List<Document>
document) throws BlurException, TException {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void commit(Session session) throws BlurException, TException {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void rollback(Session session) throws BlurException, TException {
+    // TODO Auto-generated method stub
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/0c06f355/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
index f63d5bc..34a177a 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -663,4 +663,9 @@ public class BlurUtil {
   public static AtomicReader getAtomicReader(IndexReader reader) throws IOException {
     return SlowCompositeReaderWrapper.wrap(reader);
   }
+
+  public static int getShardIndex(String shard) {
+    int index = shard.indexOf('-');
+    return Integer.parseInt(shard.substring(index + 1));
+  }
 }


Mime
View raw message