lucene-java-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From uschind...@apache.org
Subject svn commit: r834550 - in /lucene/java/trunk: ./ src/java/org/apache/lucene/search/ src/java/org/apache/lucene/util/ src/test/org/apache/lucene/search/
Date Tue, 10 Nov 2009 17:16:16 GMT
Author: uschindler
Date: Tue Nov 10 17:16:16 2009
New Revision: 834550

URL: http://svn.apache.org/viewvc?rev=834550&view=rev
Log:
LUCENE-2041: Complete parallelizaton of ParallelMultiSearcher and refactoring of MultiSearcher,
Java 5 concurrent API support

Added:
    lucene/java/trunk/src/java/org/apache/lucene/util/DummyConcurrentLock.java   (with props)
    lucene/java/trunk/src/java/org/apache/lucene/util/NamedThreadFactory.java   (with props)
Modified:
    lucene/java/trunk/CHANGES.txt
    lucene/java/trunk/src/java/org/apache/lucene/search/FieldDocSortedHitQueue.java
    lucene/java/trunk/src/java/org/apache/lucene/search/MultiSearcher.java
    lucene/java/trunk/src/java/org/apache/lucene/search/ParallelMultiSearcher.java
    lucene/java/trunk/src/test/org/apache/lucene/search/TestMultiSearcher.java

Modified: lucene/java/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/java/trunk/CHANGES.txt?rev=834550&r1=834549&r2=834550&view=diff
==============================================================================
--- lucene/java/trunk/CHANGES.txt (original)
+++ lucene/java/trunk/CHANGES.txt Tue Nov 10 17:16:16 2009
@@ -135,6 +135,10 @@
 * LUCENE-1933: Provide a convenience AttributeFactory that creates a
   Token instance for all basic attributes.  (Uwe Schindler)
 
+* LUCENE-2041: Parallelize the rest of ParallelMultiSearcher. Lot's of
+  code refactoring and Java 5 concurrent support in MultiSearcher.
+  (Joey Surls, Simon Willnauer via Uwe Schindler)
+
 Optimizations
 
 * LUCENE-1183: Optimize Levenshtein Distance computation in

Modified: lucene/java/trunk/src/java/org/apache/lucene/search/FieldDocSortedHitQueue.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/search/FieldDocSortedHitQueue.java?rev=834550&r1=834549&r2=834550&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/search/FieldDocSortedHitQueue.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/search/FieldDocSortedHitQueue.java Tue Nov
10 17:16:16 2009
@@ -32,11 +32,11 @@
  */
 class FieldDocSortedHitQueue extends PriorityQueue<FieldDoc> {
 
-  volatile SortField[] fields;
+  volatile SortField[] fields = null;
 
   // used in the case where the fields are sorted by locale
   // based strings
-  volatile Collator[] collators;
+  volatile Collator[] collators = null;
 
 
   /**
@@ -44,9 +44,7 @@
    * @param fields Fieldable names, in priority order (highest priority first).
    * @param size  The number of hits to retain.  Must be greater than zero.
    */
-  FieldDocSortedHitQueue (SortField[] fields, int size) {
-    this.fields = fields;
-    this.collators = hasCollators (fields);
+  FieldDocSortedHitQueue (int size) {
     initialize (size);
   }
 
@@ -56,10 +54,10 @@
    * This is to handle the case using ParallelMultiSearcher where the
    * original list contains AUTO and we don't know the actual sort
    * type until the values come back.  The fields can only be set once.
-   * This method is thread safe.
+   * This method should be synchronized external like all other PQ methods.
    * @param fields
    */
-  synchronized void setFields (SortField[] fields) {
+  void setFields (SortField[] fields) {
     this.fields = fields;
     this.collators = hasCollators (fields);
   }

Modified: lucene/java/trunk/src/java/org/apache/lucene/search/MultiSearcher.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/search/MultiSearcher.java?rev=834550&r1=834549&r2=834550&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/search/MultiSearcher.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/search/MultiSearcher.java Tue Nov 10 17:16:16
2009
@@ -23,12 +23,15 @@
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.util.ReaderUtil;
+import org.apache.lucene.util.DummyConcurrentLock;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.locks.Lock;
 
 /** Implements search over a set of <code>Searchables</code>.
  *
@@ -43,8 +46,8 @@
    * initialize Weights.
    */
   private static class CachedDfSource extends Searcher {
-    private Map<Term,Integer> dfMap; // Map from Terms to corresponding doc freqs
-    private int maxDoc; // document count
+    private final Map<Term,Integer> dfMap; // Map from Terms to corresponding doc freqs
+    private final int maxDoc; // document count
 
     public CachedDfSource(Map<Term,Integer> dfMap, int maxDoc, Similarity similarity)
{
       this.dfMap = dfMap;
@@ -66,7 +69,7 @@
 
     @Override
     public int[] docFreqs(Term[] terms) {
-      int[] result = new int[terms.length];
+      final int[] result = new int[terms.length];
       for (int i = 0; i < terms.length; i++) {
         result[i] = docFreq(terms[i]);
       }
@@ -97,8 +100,9 @@
       throw new UnsupportedOperationException();
     }
     
+    @Override
     public Document doc(int i, FieldSelector fieldSelector) {
-        throw new UnsupportedOperationException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -197,22 +201,16 @@
   public TopDocs search(Weight weight, Filter filter, int nDocs)
       throws IOException {
 
-    HitQueue hq = new HitQueue(nDocs, false);
+    final HitQueue hq = new HitQueue(nDocs, false);
     int totalHits = 0;
 
     for (int i = 0; i < searchables.length; i++) { // search each searcher
-      TopDocs docs = searchables[i].search(weight, filter, nDocs);
-      totalHits += docs.totalHits;		  // update totalHits
-      ScoreDoc[] scoreDocs = docs.scoreDocs;
-      for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq
-        ScoreDoc scoreDoc = scoreDocs[j];
-        scoreDoc.doc += starts[i];                // convert doc
-        if(scoreDoc == hq.insertWithOverflow(scoreDoc))
-          break;                                // no more scores > minScore
-      }
+      final TopDocs docs = new MultiSearcherCallableNoSort(DummyConcurrentLock.INSTANCE,
+        searchables[i], weight, filter, nDocs, hq, i, starts).call();
+      totalHits += docs.totalHits; // update totalHits
     }
 
-    ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
+    final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
     for (int i = hq.size()-1; i >= 0; i--)	  // put docs in array
       scoreDocs[i] = hq.pop();
     
@@ -222,41 +220,20 @@
   }
 
   @Override
-  public TopFieldDocs search (Weight weight, Filter filter, int n, Sort sort)
-  throws IOException {
-    FieldDocSortedHitQueue hq = null;
+  public TopFieldDocs search (Weight weight, Filter filter, int n, Sort sort) throws IOException
{
+    FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(n);
     int totalHits = 0;
 
     float maxScore=Float.NEGATIVE_INFINITY;
     
     for (int i = 0; i < searchables.length; i++) { // search each searcher
-      TopFieldDocs docs = searchables[i].search (weight, filter, n, sort);
-      // If one of the Sort fields is FIELD_DOC, need to fix its values, so that
-      // it will break ties by doc Id properly. Otherwise, it will compare to
-      // 'relative' doc Ids, that belong to two different searchers.
-      for (int j = 0; j < docs.fields.length; j++) {
-        if (docs.fields[j].getType() == SortField.DOC) {
-          // iterate over the score docs and change their fields value
-          for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) {
-            FieldDoc fd = (FieldDoc) docs.scoreDocs[j2];
-            fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]);
-          }
-          break;
-        }
-      }
-      if (hq == null) hq = new FieldDocSortedHitQueue (docs.fields, n);
-      totalHits += docs.totalHits;		  // update totalHits
+      final TopFieldDocs docs = new MultiSearcherCallableWithSort(DummyConcurrentLock.INSTANCE,
+        searchables[i], weight, filter, n, hq, sort, i, starts).call();
+      totalHits += docs.totalHits; // update totalHits
       maxScore = Math.max(maxScore, docs.getMaxScore());
-      ScoreDoc[] scoreDocs = docs.scoreDocs;
-      for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq
-        ScoreDoc scoreDoc = scoreDocs[j];
-        scoreDoc.doc += starts[i];                // convert doc
-        if (scoreDoc == hq.insertWithOverflow((FieldDoc) scoreDoc))
-          break;                                  // no more scores > minScore
-      }
     }
 
-    ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
+    final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
     for (int i = hq.size() - 1; i >= 0; i--)	  // put docs in array
       scoreDocs[i] =  hq.pop();
 
@@ -296,7 +273,7 @@
 
   @Override
   public Query rewrite(Query original) throws IOException {
-    Query[] queries = new Query[searchables.length];
+    final Query[] queries = new Query[searchables.length];
     for (int i = 0; i < searchables.length; i++) {
       queries[i] = searchables[i].rewrite(original);
     }
@@ -305,7 +282,7 @@
 
   @Override
   public Explanation explain(Weight weight, int doc) throws IOException {
-    int i = subSearcher(doc);			  // find searcher index
+    final int i = subSearcher(doc);			  // find searcher index
     return searchables[i].explain(weight, doc - starts[i]); // dispatch to searcher
   }
 
@@ -327,14 +304,14 @@
   @Override
   protected Weight createWeight(Query original) throws IOException {
     // step 1
-    Query rewrittenQuery = rewrite(original);
+    final Query rewrittenQuery = rewrite(original);
 
     // step 2
-    Set<Term> terms = new HashSet<Term>();
+    final Set<Term> terms = new HashSet<Term>();
     rewrittenQuery.extractTerms(terms);
 
     // step3
-    Term[] allTermsArray = new Term[terms.size()];
+    final Term[] allTermsArray = new Term[terms.size()];
     terms.toArray(allTermsArray);
     int[] aggregatedDfs = new int[terms.size()];
     for (int i = 0; i < searchables.length; i++) {
@@ -344,16 +321,129 @@
       }
     }
 
-    HashMap<Term,Integer> dfMap = new HashMap<Term,Integer>();
+    final HashMap<Term,Integer> dfMap = new HashMap<Term,Integer>();
     for(int i=0; i<allTermsArray.length; i++) {
       dfMap.put(allTermsArray[i], Integer.valueOf(aggregatedDfs[i]));
     }
 
     // step4
-    int numDocs = maxDoc();
-    CachedDfSource cacheSim = new CachedDfSource(dfMap, numDocs, getSimilarity());
+    final int numDocs = maxDoc();
+    final CachedDfSource cacheSim = new CachedDfSource(dfMap, numDocs, getSimilarity());
 
     return rewrittenQuery.weight(cacheSim);
   }
 
+  /**
+   * A thread subclass for searching a single searchable 
+   */
+  static class MultiSearcherCallableNoSort implements Callable<TopDocs> {
+
+    private final Lock lock;
+    private final Searchable searchable;
+    private final Weight weight;
+    private final Filter filter;
+    private final int nDocs;
+    private final int i;
+    private final HitQueue hq;
+    private final int[] starts;
+
+    public MultiSearcherCallableNoSort(Lock lock, Searchable searchable, Weight weight,
+        Filter filter, int nDocs, HitQueue hq, int i, int[] starts) {
+      this.lock = lock;
+      this.searchable = searchable;
+      this.weight = weight;
+      this.filter = filter;
+      this.nDocs = nDocs;
+      this.hq = hq;
+      this.i = i;
+      this.starts = starts;
+    }
+
+    public TopDocs call() throws IOException {
+      final TopDocs docs = searchable.search (weight, filter, nDocs);
+      final ScoreDoc[] scoreDocs = docs.scoreDocs;
+      for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq
+        final ScoreDoc scoreDoc = scoreDocs[j];
+        scoreDoc.doc += starts[i]; // convert doc 
+        //it would be so nice if we had a thread-safe insert 
+        lock.lock();
+        try {
+          if (scoreDoc == hq.insertWithOverflow(scoreDoc))
+            break;
+        } finally {
+          lock.unlock();
+        }
+      }
+      return docs;
+    }
+  }
+
+  /**
+   * A thread subclass for searching a single searchable 
+   */
+  static class MultiSearcherCallableWithSort implements Callable<TopFieldDocs> {
+
+    private final Lock lock;
+    private final Searchable searchable;
+    private final Weight weight;
+    private final Filter filter;
+    private final int nDocs;
+    private final int i;
+    private final FieldDocSortedHitQueue hq;
+    private final int[] starts;
+    private final Sort sort;
+
+    public MultiSearcherCallableWithSort(Lock lock, Searchable searchable, Weight weight,
+        Filter filter, int nDocs, FieldDocSortedHitQueue hq, Sort sort, int i, int[] starts)
{
+      this.lock = lock;
+      this.searchable = searchable;
+      this.weight = weight;
+      this.filter = filter;
+      this.nDocs = nDocs;
+      this.hq = hq;
+      this.i = i;
+      this.starts = starts;
+      this.sort = sort;
+    }
+
+    public TopFieldDocs call() throws IOException {
+      final TopFieldDocs docs = searchable.search (weight, filter, nDocs, sort);
+      // If one of the Sort fields is FIELD_DOC, need to fix its values, so that
+      // it will break ties by doc Id properly. Otherwise, it will compare to
+      // 'relative' doc Ids, that belong to two different searchables.
+      for (int j = 0; j < docs.fields.length; j++) {
+        if (docs.fields[j].getType() == SortField.DOC) {
+          // iterate over the score docs and change their fields value
+          for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) {
+            FieldDoc fd = (FieldDoc) docs.scoreDocs[j2];
+            fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]);
+          }
+          break;
+        }
+      }
+
+      lock.lock();
+      try {
+        hq.setFields(docs.fields);
+      } finally {
+        lock.unlock();
+      }
+
+      final ScoreDoc[] scoreDocs = docs.scoreDocs;
+      for (int j = 0; j < scoreDocs.length; j++) { // merge scoreDocs into hq
+        final FieldDoc fieldDoc = (FieldDoc) scoreDocs[j];
+        fieldDoc.doc += starts[i]; // convert doc 
+        //it would be so nice if we had a thread-safe insert 
+        lock.lock();
+        try {
+          if (fieldDoc == hq.insertWithOverflow(fieldDoc))
+            break;
+        } finally {
+          lock.unlock();
+        }
+      }
+      return docs;
+    }
+  }
+
 }

Modified: lucene/java/trunk/src/java/org/apache/lucene/search/ParallelMultiSearcher.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/search/ParallelMultiSearcher.java?rev=834550&r1=834549&r2=834550&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/search/ParallelMultiSearcher.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/search/ParallelMultiSearcher.java Tue Nov
10 17:16:16 2009
@@ -18,9 +18,20 @@
  */
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.lucene.index.Term;
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.util.NamedThreadFactory;
 import org.apache.lucene.util.PriorityQueue;
 
 /** Implements parallel search over a set of <code>Searchables</code>.
@@ -29,69 +40,62 @@
  * or {@link #search(Query,Filter)} methods.
  */
 public class ParallelMultiSearcher extends MultiSearcher {
+  
+  private final ExecutorService executor;
+  private final Searchable[] searchables;
+  private final int[] starts;
 
-  private Searchable[] searchables;
-  private int[] starts;
-
-  /** Creates a searchable which searches <i>searchables</i>. */
+  /** Creates a {@link Searchable} which searches <i>searchables</i>. */
   public ParallelMultiSearcher(Searchable... searchables) throws IOException {
     super(searchables);
     this.searchables = searchables;
     this.starts = getStarts();
+    executor = Executors.newCachedThreadPool(new NamedThreadFactory(this.getClass().getSimpleName()));

   }
 
   /**
-   * TODO: parallelize this one too
+   * Executes each {@link Searchable}'s docFreq() in its own thread and waits for each search
to complete and merge
+   * the results back together.
    */
   @Override
-  public int docFreq(Term term) throws IOException {
-    return super.docFreq(term);
+  public int docFreq(final Term term) throws IOException {
+    @SuppressWarnings("unchecked") final Future<Integer>[] searchThreads = new Future[searchables.length];
+    for (int i = 0; i < searchables.length; i++) { // search each searchable
+      final Searchable searchable = searchables[i];
+      searchThreads[i] = executor.submit(new Callable<Integer>() {
+        public Integer call() throws IOException {
+          return Integer.valueOf(searchable.docFreq(term));
+        }
+      });
+    }
+    final CountDocFreq func = new CountDocFreq();
+    foreach(func, Arrays.asList(searchThreads));
+    return func.docFreq;
   }
 
   /**
-   * A search implementation which spans a new thread for each
-   * Searchable, waits for each search to complete and merge
+   * A search implementation which executes each 
+   * {@link Searchable} in its own thread and waits for each search to complete and merge
    * the results back together.
    */
   @Override
-  public TopDocs search(Weight weight, Filter filter, int nDocs)
-    throws IOException {
-    HitQueue hq = new HitQueue(nDocs, false);
-    int totalHits = 0;
-    MultiSearcherThread[] msta =
-      new MultiSearcherThread[searchables.length];
+  public TopDocs search(Weight weight, Filter filter, int nDocs) throws IOException {
+    final HitQueue hq = new HitQueue(nDocs, false);
+    final Lock lock = new ReentrantLock();
+    @SuppressWarnings("unchecked") final Future<TopDocs>[] searchThreads = new Future[searchables.length];
     for (int i = 0; i < searchables.length; i++) { // search each searchable
-      // Assume not too many searchables and cost of creating a thread is by far inferior
to a search
-      msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs,
-          hq, i, starts, "MultiSearcher thread #" + (i + 1));
-      msta[i].start();
+      searchThreads[i] = executor.submit(
+          new MultiSearcherCallableNoSort(lock, searchables[i], weight, filter, nDocs, hq,
i, starts));
     }
 
-    for (int i = 0; i < searchables.length; i++) {
-      try {
-        msta[i].join();
-      } catch (InterruptedException ie) {
-        // In 3.0 we will change this to throw
-        // InterruptedException instead
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(ie);
-      }
-      IOException ioe = msta[i].getIOException();
-      if (ioe == null) {
-        totalHits += msta[i].hits();
-      } else {
-        // if one search produced an IOException, rethrow it
-        throw ioe;
-      }
-    }
+    final CountTotalHits<TopDocs> func = new CountTotalHits<TopDocs>();
+    foreach(func, Arrays.asList(searchThreads));
 
-    ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
+    final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
     for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
       scoreDocs[i] = hq.pop();
 
-    float maxScore = (totalHits==0) ? Float.NEGATIVE_INFINITY : scoreDocs[0].score;
-    
-    return new TopDocs(totalHits, scoreDocs, maxScore);
+    return new TopDocs(func.totalHits, scoreDocs, func.maxScore);
   }
 
   /**
@@ -100,45 +104,25 @@
    * the results back together.
    */
   @Override
-  public TopFieldDocs search(Weight weight, Filter filter, int nDocs, Sort sort)
-    throws IOException {
-    // don't specify the fields - we'll wait to do this until we get results
-    FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue (null, nDocs);
-    int totalHits = 0;
-    MultiSearcherThread[] msta = new MultiSearcherThread[searchables.length];
+  public TopFieldDocs search(Weight weight, Filter filter, int nDocs, Sort sort) throws IOException
{
+    if (sort == null) throw new NullPointerException();
+
+    final FieldDocSortedHitQueue hq = new FieldDocSortedHitQueue(nDocs);
+    final Lock lock = new ReentrantLock();
+    @SuppressWarnings("unchecked") final Future<TopFieldDocs>[] searchThreads = new
Future[searchables.length];
     for (int i = 0; i < searchables.length; i++) { // search each searchable
-      // Assume not too many searchables and cost of creating a thread is by far inferior
to a search
-      msta[i] = new MultiSearcherThread(searchables[i], weight, filter, nDocs,
-          hq, sort, i, starts, "MultiSearcher thread #" + (i + 1));
-      msta[i].start();
+      searchThreads[i] = executor.submit(
+          new MultiSearcherCallableWithSort(lock, searchables[i], weight, filter, nDocs,
hq, sort, i, starts));
     }
 
-    float maxScore=Float.NEGATIVE_INFINITY;
-    
-    for (int i = 0; i < searchables.length; i++) {
-      try {
-        msta[i].join();
-      } catch (InterruptedException ie) {
-        // In 3.0 we will change this to throw
-        // InterruptedException instead
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(ie);
-      }
-      IOException ioe = msta[i].getIOException();
-      if (ioe == null) {
-        totalHits += msta[i].hits();
-        maxScore=Math.max(maxScore, msta[i].getMaxScore());
-      } else {
-        // if one search produced an IOException, rethrow it
-        throw ioe;
-      }
-    }
+    final CountTotalHits<TopFieldDocs> func = new CountTotalHits<TopFieldDocs>();
+    foreach(func, Arrays.asList(searchThreads));
 
-    ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
+    final ScoreDoc[] scoreDocs = new ScoreDoc[hq.size()];
     for (int i = hq.size() - 1; i >= 0; i--) // put docs in array
       scoreDocs[i] = hq.pop();
 
-    return new TopFieldDocs(totalHits, scoreDocs, hq.getFields(), maxScore);
+    return new TopFieldDocs(func.totalHits, scoreDocs, hq.getFields(), func.maxScore);
   }
 
   /** Lower-level search API.
@@ -149,15 +133,16 @@
   * matching documents.  The high-level search API ({@link
   * Searcher#search(Query)}) is usually more efficient, as it skips
   * non-high-scoring hits.
+  * 
+  * <p>This method cannot be parallelized, because {@link Collector}
+  * supports no concurrent access.
   *
   * @param weight to match documents
   * @param filter if non-null, a bitset used to eliminate some documents
   * @param collector to receive hits
-  * 
-  * TODO: parallelize this one too
   */
   @Override
-  public void search(Weight weight, Filter filter, final Collector collector)
+  public void search(final Weight weight, final Filter filter, final Collector collector)
    throws IOException {
    for (int i = 0; i < searchables.length; i++) {
 
@@ -165,17 +150,17 @@
 
      final Collector hc = new Collector() {
        @Override
-       public void setScorer(Scorer scorer) throws IOException {
+       public void setScorer(final Scorer scorer) throws IOException {
          collector.setScorer(scorer);
        }
        
        @Override
-       public void collect(int doc) throws IOException {
+       public void collect(final int doc) throws IOException {
          collector.collect(doc);
        }
        
        @Override
-       public void setNextReader(IndexReader reader, int docBase) throws IOException {
+       public void setNextReader(final IndexReader reader, final int docBase) throws IOException
{
          collector.setNextReader(reader, start + docBase);
        }
        
@@ -188,117 +173,60 @@
      searchables[i].search(weight, filter, hc);
    }
   }
-
+  
   /*
-   * TODO: this one could be parallelized too
-   * @see org.apache.lucene.search.Searchable#rewrite(org.apache.lucene.search.Query)
+   * apply the function to each element of the list. This method encapsulates the logic how

+   * to wait for concurrently executed searchables.  
    */
-  @Override
-  public Query rewrite(Query original) throws IOException {
-    return super.rewrite(original);
-  }
-
-}
-
-/**
- * A thread subclass for searching a single searchable 
- */
-class MultiSearcherThread extends Thread {
-
-  private Searchable searchable;
-  private Weight weight;
-  private Filter filter;
-  private int nDocs;
-  private TopDocs docs;
-  private int i;
-  private PriorityQueue<? extends ScoreDoc> hq;
-  private int[] starts;
-  private IOException ioe;
-  private Sort sort;
-
-  public MultiSearcherThread(Searchable searchable, Weight weight, Filter filter,
-      int nDocs, HitQueue hq, int i, int[] starts, String name) {
-    super(name);
-    this.searchable = searchable;
-    this.weight = weight;
-    this.filter = filter;
-    this.nDocs = nDocs;
-    this.hq = hq;
-    this.i = i;
-    this.starts = starts;
-  }
-
-  public MultiSearcherThread(Searchable searchable, Weight weight,
-      Filter filter, int nDocs, FieldDocSortedHitQueue hq, Sort sort, int i,
-      int[] starts, String name) {
-    super(name);
-    this.searchable = searchable;
-    this.weight = weight;
-    this.filter = filter;
-    this.nDocs = nDocs;
-    this.hq = hq;
-    this.i = i;
-    this.starts = starts;
-    this.sort = sort;
-  }
-
-  @Override
-  @SuppressWarnings ("unchecked")
-  public void run() {
-    try {
-      docs = (sort == null) ? searchable.search (weight, filter, nDocs)
-        : searchable.search (weight, filter, nDocs, sort);
-    }
-    // Store the IOException for later use by the caller of this thread
-    catch (IOException ioe) {
-      this.ioe = ioe;
-    }
-    if (ioe == null) {
-      if (sort != null) {
-        TopFieldDocs docsFields = (TopFieldDocs) docs;
-        // If one of the Sort fields is FIELD_DOC, need to fix its values, so that
-        // it will break ties by doc Id properly. Otherwise, it will compare to
-        // 'relative' doc Ids, that belong to two different searchables.
-        for (int j = 0; j < docsFields.fields.length; j++) {
-          if (docsFields.fields[j].getType() == SortField.DOC) {
-            // iterate over the score docs and change their fields value
-            for (int j2 = 0; j2 < docs.scoreDocs.length; j2++) {
-              FieldDoc fd = (FieldDoc) docs.scoreDocs[j2];
-              fd.fields[j] = Integer.valueOf(((Integer) fd.fields[j]).intValue() + starts[i]);
-            }
-            break;
-          }
-        }
-
-        ((FieldDocSortedHitQueue) hq).setFields(docsFields.fields);
-      }
-      ScoreDoc[] scoreDocs = docs.scoreDocs;
-      for (int j = 0;
-           j < scoreDocs.length;
-           j++) { // merge scoreDocs into hq
-        ScoreDoc scoreDoc = scoreDocs[j];
-        scoreDoc.doc += starts[i]; // convert doc 
-        //it would be so nice if we had a thread-safe insert 
-        synchronized (hq) {
-          // this cast is bad, because we assume that the list has correct type.
-          // Because of that we have the @SuppressWarnings :-(
-          if (scoreDoc == ((PriorityQueue<ScoreDoc>) hq).insertWithOverflow(scoreDoc))
-            break;
-        } // no more scores > minScore
+  private <T> void foreach(Function<T> func, List<Future<T>> list)
throws IOException{
+    for (Future<T> future : list) {
+      try{
+        func.apply(future.get());
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof IOException)
+          throw (IOException) e.getCause();
+        throw new RuntimeException(e.getCause());
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        // In 3.0 we will change this to throw
+        // InterruptedException instead
+        throw new RuntimeException(e);
       }
     }
   }
 
-  public int hits() {
-    return docs.totalHits;
+  // Both functions could be reduced to Int as other values of TopDocs
+  // are not needed. Using sep. functions is more self documenting.
+  /**
+   * A function with one argument
+   * @param <T> the argument type
+   */
+  private static interface Function<T> {
+    abstract void apply(T t);
   }
 
-  public float getMaxScore() {
-      return docs.getMaxScore();
+  /**
+   * Counts the total number of hits for all {@link TopDocs} instances
+   * provided. 
+   */
+  private static final class CountTotalHits<T extends TopDocs> implements Function<T>
{
+    int totalHits = 0;
+    float maxScore = Float.NEGATIVE_INFINITY;
+    
+    public void apply(T t) {
+      totalHits += t.totalHits;
+      maxScore = Math.max(maxScore, t.getMaxScore());
+    }
   }
-  
-  public IOException getIOException() {
-    return ioe;
+  /**
+   * Accumulates the document frequency for a term.
+   */
+  private static final class CountDocFreq implements Function<Integer>{
+    int docFreq = 0;
+    
+    public void apply(Integer t) {
+      docFreq += t.intValue();
+    }
   }
 
 }

Added: lucene/java/trunk/src/java/org/apache/lucene/util/DummyConcurrentLock.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/util/DummyConcurrentLock.java?rev=834550&view=auto
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/util/DummyConcurrentLock.java (added)
+++ lucene/java/trunk/src/java/org/apache/lucene/util/DummyConcurrentLock.java Tue Nov 10
17:16:16 2009
@@ -0,0 +1,51 @@
+package org.apache.lucene.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A dummy lock as a replacement for {@link ReentrantLock} to disable locking
+ */
+public final class DummyConcurrentLock implements Lock {
+
+  /** a default instance, can be always used, as this {@link Lock} is stateless. */
+  public static final DummyConcurrentLock INSTANCE = new DummyConcurrentLock();
+
+  public void lock() {}
+  
+  public void lockInterruptibly() {}
+  
+  public boolean tryLock() {
+    return true;
+  }
+  
+  public boolean tryLock(long time, TimeUnit unit) {
+    return true;
+  }
+  
+  public void unlock() {}
+  
+  public Condition newCondition() {
+    throw new UnsupportedOperationException();
+  }
+
+}

Propchange: lucene/java/trunk/src/java/org/apache/lucene/util/DummyConcurrentLock.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/java/trunk/src/java/org/apache/lucene/util/DummyConcurrentLock.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: lucene/java/trunk/src/java/org/apache/lucene/util/NamedThreadFactory.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/util/NamedThreadFactory.java?rev=834550&view=auto
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/util/NamedThreadFactory.java (added)
+++ lucene/java/trunk/src/java/org/apache/lucene/util/NamedThreadFactory.java Tue Nov 10 17:16:16
2009
@@ -0,0 +1,53 @@
+/**
+ * 
+ */
+package org.apache.lucene.util;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A default {@link ThreadFactory} implementation that accepts the name prefix
+ * of the created threads as a constructor argument. Otherwise, this factory
+ * yields the same semantics as the thread factory returned by
+ * {@link Executors#defaultThreadFactory()}.
+ */
+public class NamedThreadFactory implements ThreadFactory {
+  private static final AtomicInteger threadPoolNumber = new AtomicInteger(1);
+  private final ThreadGroup group;
+  private final AtomicInteger threadNumber = new AtomicInteger(1);
+  private static final String NAME_PATTERN = "%s-%d-thread";
+  private final String threadNamePrefix;
+
+  /**
+   * Creates a new {@link NamedThreadFactory} instance
+   * 
+   * @param threadNamePrefix the name prefix assigned to each thread created.
+   */
+  public NamedThreadFactory(String threadNamePrefix) {
+    final SecurityManager s = System.getSecurityManager();
+    group = (s != null) ? s.getThreadGroup() : Thread.currentThread()
+        .getThreadGroup();
+    this.threadNamePrefix = String.format(NAME_PATTERN,
+        checkPrefix(threadNamePrefix), threadPoolNumber.getAndIncrement());
+  }
+
+  private static String checkPrefix(String prefix) {
+    return prefix == null || prefix.length() == 0 ? "Lucene" : prefix;
+  }
+
+  /**
+   * Creates a new {@link Thread}
+   * 
+   * @see java.util.concurrent.ThreadFactory#newThread(java.lang.Runnable)
+   */
+  public Thread newThread(Runnable r) {
+    final Thread t = new Thread(group, r, String.format("%s-%d",
+        this.threadNamePrefix, threadNumber.getAndIncrement()), 0);
+    t.setDaemon(false);
+    t.setPriority(Thread.NORM_PRIORITY);
+    return t;
+  }
+
+}

Propchange: lucene/java/trunk/src/java/org/apache/lucene/util/NamedThreadFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: lucene/java/trunk/src/java/org/apache/lucene/util/NamedThreadFactory.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Modified: lucene/java/trunk/src/test/org/apache/lucene/search/TestMultiSearcher.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/search/TestMultiSearcher.java?rev=834550&r1=834549&r2=834550&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/search/TestMultiSearcher.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/search/TestMultiSearcher.java Tue Nov 10
17:16:16 2009
@@ -396,6 +396,19 @@
         
         // The scores from the IndexSearcher and Multisearcher should be the same
         // if the same similarity is used.
-        assertEquals("MultiSearcher score must be equal to single esrcher score!", score1,
scoreN, 1e-6);
+        assertEquals("MultiSearcher score must be equal to single searcher score!", score1,
scoreN, 1e-6);
+    }
+    
+    public void testDocFreq() throws IOException{
+      RAMDirectory dir1 = new RAMDirectory();
+      RAMDirectory dir2 = new RAMDirectory();
+
+      initIndex(dir1, 10, true, "x"); // documents with two tokens "doc0" and "x", "doc1"
and x, etc...
+      initIndex(dir2, 5, true, "x"); // documents with two tokens "doc0" and "x", "doc1"
and x, etc...
+      IndexSearcher searcher1 = new IndexSearcher(dir1, true);
+      IndexSearcher searcher2 = new IndexSearcher(dir2, true);
+      
+      MultiSearcher multiSearcher = getMultiSearcherInstance(new Searcher[]{searcher1, searcher2});
+      assertEquals(15, multiSearcher.docFreq(new Term("contents","x")));
     }
 }



Mime
View raw message