Return-Path: Delivered-To: apmail-lucene-java-dev-archive@www.apache.org Received: (qmail 86166 invoked from network); 29 Apr 2005 03:23:10 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 29 Apr 2005 03:23:10 -0000 Received: (qmail 67609 invoked by uid 500); 29 Apr 2005 03:24:17 -0000 Delivered-To: apmail-lucene-java-dev-archive@lucene.apache.org Received: (qmail 67585 invoked by uid 500); 29 Apr 2005 03:24:17 -0000 Mailing-List: contact java-dev-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: java-dev@lucene.apache.org Delivered-To: mailing list java-dev@lucene.apache.org Received: (qmail 67384 invoked by uid 99); 29 Apr 2005 03:24:16 -0000 X-ASF-Spam-Status: No, hits=2.2 required=10.0 tests=FORGED_YAHOO_RCVD X-Spam-Check-By: apache.org Received-SPF: pass (hermes.apache.org: local policy) Received: from web31101.mail.mud.yahoo.com (HELO web31101.mail.mud.yahoo.com) (68.142.200.34) by apache.org (qpsmtpd/0.28) with SMTP; Thu, 28 Apr 2005 20:24:12 -0700 Received: (qmail 31609 invoked by uid 60001); 29 Apr 2005 03:22:57 -0000 Comment: DomainKeys? See http://antispam.yahoo.com/domainkeys DomainKey-Signature: a=rsa-sha1; q=dns; c=nofws; s=s1024; d=yahoo.com; b=VT8SQXI9pcSNZtXyOhBotKwMG3lXsuL8IP0beY4KuGABXlAmPJavMOnPFGChIDR5ofYH65J10n1Ti71ouv6opO2QnltqYXfSvvRslocwQnovJOWTYzvAe3nlJmpVl4XuZQ5ghwUziPT1tF7RJpwMCH2dtdC2AAie1IUfEdoGkgY= ; Message-ID: <20050429032257.31607.qmail@web31101.mail.mud.yahoo.com> Received: from [69.201.130.192] by web31101.mail.mud.yahoo.com via HTTP; Thu, 28 Apr 2005 20:22:57 PDT Date: Thu, 28 Apr 2005 20:22:57 -0700 (PDT) From: Otis Gospodnetic Subject: Re: ParallelReader To: java-dev@lucene.apache.org In-Reply-To: <42715364.70804@apache.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii X-Virus-Checked: Checked X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N I like this! I also think this should be in the core. This would allow one to partition an index and spread it over multiple disks, for instance. I like the name ParallelReader, but I wonder if people will confuse it with "Aha, an IndexReader that reads multiple indices in parallel (using threads)", since what makes ParallelMultiSearcher parallel are its threading abilities. Do I have a better name? Hm... not right now. (MatchingReader? TwinReader? StripedReader? Don't like any of these too much...). Otis --- Doug Cutting wrote: > Please find attached something I wrote today. It has not been yet > tested extensively, and the documentation could be improved, but I > thought it would be good to get comments sooner rather than later. > > Would folks find this useful? > > Should it go into the core or in contrib? > > Doug > > Index: src/java/org/apache/lucene/index/ParallelReader.java > =================================================================== > --- src/java/org/apache/lucene/index/ParallelReader.java (revision 0) > +++ src/java/org/apache/lucene/index/ParallelReader.java (revision 0) > @@ -0,0 +1,329 @@ > +package org.apache.lucene.index; > + > +/** > + * Copyright 2004 The Apache Software Foundation > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions > and > + * limitations under the License. > + */ > + > +import org.apache.lucene.document.Document; > +import org.apache.lucene.document.Field; > +import org.apache.lucene.store.Directory; > + > +import java.io.IOException; > +import java.util.*; > + > +/** An IndexReader which reads multiple, parallel indexes. Each > index added > + * must have the same number of documents, but typically each > contains > + * different fields. Each document contains the union of the fields > of all > + * documents with the same document number. When searching, matches > for a > + * query term are from the first index added that has the field. > + * > + *

This is useful, e.g., with collections that have large fields > which > + * change rarely and small fields that change more frequently. The > smaller > + * fields may be re-indexed in a new index and both indexes may be > searched > + * together. > + */ > +public class ParallelReader extends IndexReader { > + private ArrayList readers = new ArrayList(); > + private SortedMap fieldToReader = new TreeMap(); > + private ArrayList storedFieldReaders = new ArrayList(); > + > + private int maxDoc; > + private int numDocs; > + private boolean hasDeletions; > + > + /** Construct a ParallelReader. */ > + public ParallelReader() throws IOException { super(null); } > + > + /** Add an IndexReader. */ > + public void add(IndexReader reader) throws IOException { > + add(reader, false); > + } > + > + /** Add an IndexReader whose stored fields will not be returned. > This can > + * accellerate search when stored fields are only needed from a > subset of > + * the IndexReaders. */ > + public void add(IndexReader reader, boolean ignoreStoredFields) > + throws IOException { > + > + if (readers.size() == 0) { > + this.maxDoc = reader.maxDoc(); > + this.numDocs = reader.numDocs(); > + this.hasDeletions = reader.hasDeletions(); > + } > + > + if (reader.maxDoc() != maxDoc) // check > compatibility > + throw new IllegalArgumentException > + ("All readers must have same maxDoc: > "+maxDoc+"!="+reader.maxDoc()); > + if (reader.numDocs() != numDocs) > + throw new IllegalArgumentException > + ("All readers must have same numDocs: > "+numDocs+"!="+reader.numDocs()); > + > + Iterator i = reader.getFieldNames().iterator(); > + while (i.hasNext()) { // update > fieldToReader map > + String field = (String)i.next(); > + if (fieldToReader.get(field) == null) > + fieldToReader.put(field, reader); > + } > + > + if (!ignoreStoredFields) > + storedFieldReaders.add(reader); // add to > storedFieldReaders > + > + } > + > + > + public int numDocs() { return numDocs; } > + > + public int maxDoc() { return maxDoc; } > + > + public boolean hasDeletions() { return hasDeletions; } > + > + // check first reader > + public boolean isDeleted(int n) { > + if (readers.size() > 0) > + return ((IndexReader)readers.get(0)).isDeleted(n); > + return false; > + } > + > + // delete in all readers > + protected void doDelete(int n) throws IOException { > + for (int i = 0; i < readers.size(); i++) { > + ((IndexReader)readers.get(i)).doDelete(n); > + } > + hasDeletions = true; > + } > + > + // undeleteAll in all readers > + protected void doUndeleteAll() throws IOException { > + for (int i = 0; i < readers.size(); i++) { > + ((IndexReader)readers.get(i)).doUndeleteAll(); > + } > + hasDeletions = false; > + } > + > + // append fields from storedFieldReaders > + public Document document(int n) throws IOException { > + Document result = new Document(); > + for (int i = 0; i < storedFieldReaders.size(); i++) { > + IndexReader reader = (IndexReader)storedFieldReaders.get(i); > + Enumeration fields = reader.document(n).fields(); > + while (fields.hasMoreElements()) { > + result.add((Field)fields.nextElement()); > + } > + } > + return result; > + } > + > + // get all vectors > + public TermFreqVector[] getTermFreqVectors(int n) throws > IOException { > + ArrayList results = new ArrayList(); > + Iterator i = fieldToReader.entrySet().iterator(); > + while (i.hasNext()) { > + Map.Entry e = (Map.Entry)i.next(); > + IndexReader reader = (IndexReader)e.getKey(); > + String field = (String)e.getValue(); > + TermFreqVector vector = reader.getTermFreqVector(n, field); > + if (vector != null) > + results.add(vector); > + } > + return (TermFreqVector[]) > + results.toArray(new TermFreqVector[results.size()]); > + } > + > + public TermFreqVector getTermFreqVector(int n, String field) > + throws IOException { > + return > ((IndexReader)fieldToReader.get(field)).getTermFreqVector(n, field); > + } > + > + public byte[] norms(String field) throws IOException { > + return ((IndexReader)fieldToReader.get(field)).norms(field); > + } > + > + public void norms(String field, byte[] result, int offset) > + throws IOException { > + ((IndexReader)fieldToReader.get(field)).norms(field, result, > offset); > + } > + > + protected void doSetNorm(int n, String field, byte value) > + throws IOException { > + ((IndexReader)fieldToReader.get(field)).doSetNorm(n, field, > value); > + } > + > + public TermEnum terms() throws IOException { > + return new ParallelTermEnum(); > + } > + > + public TermEnum terms(Term term) throws IOException { > + return new ParallelTermEnum(term); > + } > + > + public int docFreq(Term term) throws IOException { > + return > ((IndexReader)fieldToReader.get(term.field())).docFreq(term); > + } > + > + public TermDocs termDocs(Term term) throws IOException { > + return new ParallelTermDocs(term); > + } > + > + public TermDocs termDocs() throws IOException { > + return new ParallelTermDocs(); > + } > + > + public TermPositions termPositions(Term term) throws IOException { > + return new ParallelTermPositions(term); > + } > + > + public TermPositions termPositions() throws IOException { > + return new ParallelTermPositions(); > + } > + > + protected void doCommit() throws IOException { > + for (int i = 0; i < readers.size(); i++) > + ((IndexReader)readers.get(i)).commit(); > + } > + > + protected synchronized void doClose() throws IOException { > + for (int i = 0; i < readers.size(); i++) > + ((IndexReader)readers.get(i)).close(); > + } > + > + public Collection getFieldNames() throws IOException { > + return fieldToReader.keySet(); > + } > + > + public Collection getFieldNames(boolean indexed) throws > IOException { > + Set fieldSet = new HashSet(); > + for (int i = 0; i < readers.size(); i++) { > + IndexReader reader = ((IndexReader)readers.get(i)); > + Collection names = reader.getFieldNames(indexed); > + fieldSet.addAll(names); > + } > + return fieldSet; > + } > + > + public Collection getIndexedFieldNames (Field.TermVector tvSpec){ > + Set fieldSet = new HashSet(); > + for (int i = 0; i < readers.size(); i++) { > + IndexReader reader = ((IndexReader)readers.get(i)); > + Collection names = reader.getIndexedFieldNames(tvSpec); > + fieldSet.addAll(names); > + } > + return fieldSet; > + } > + > + public Collection getFieldNames (IndexReader.FieldOption > fieldNames) { > + Set fieldSet = new HashSet(); > + for (int i = 0; i < readers.size(); i++) { > + IndexReader reader = ((IndexReader)readers.get(i)); > + Collection names = reader.getFieldNames(fieldNames); > + fieldSet.addAll(names); > + } > + return fieldSet; > + } > + > + private class ParallelTermEnum extends TermEnum { > + private String field; > + private TermEnum enum; > + > + public ParallelTermEnum() throws IOException { > + field = (String)fieldToReader.firstKey(); > + if (field != null) > + enum = ((IndexReader)fieldToReader.get(field)).terms(); > + } > + > + public ParallelTermEnum(Term term) throws IOException { > + field = term.field(); > + enum = ((IndexReader)fieldToReader.get(field)).terms(term); > + } > + > + public boolean next() throws IOException { > + if (field == null) > + return false; > + > + boolean next = enum.next(); > + > + // still within field? > + if (next && enum.term().field() == field) > + return true; // yes, keep going > + > + enum.close(); // close old enum > + > + // find the next field, if any > + field = (String)fieldToReader.tailMap(field).firstKey(); > + if (field != null) { > + enum = ((IndexReader)fieldToReader.get(field)).terms(); > + return true; > + } > + > + return false; // no more fields > + > + } > + > + public Term term() { return enum.term(); } > + public int docFreq() { return enum.docFreq(); } > + public void close() throws IOException { enum.close(); } > + > + } > + > + // wrap a TermDocs in order to support seek(Term) > + private class ParallelTermDocs implements TermDocs { > + protected TermDocs termDocs; > + > + public ParallelTermDocs() {} > + public ParallelTermDocs(Term term) throws IOException { > seek(term); } > + > + public int doc() { return termDocs.doc(); } > + public int freq() { return termDocs.freq(); } > + > + public void seek(Term term) throws IOException { > + termDocs = > ((IndexReader)fieldToReader.get(term.field())).termDocs(term); > + } > + > + public void seek(TermEnum termEnum) throws IOException { > + seek(termEnum.term()); > + } > + > + public boolean next() throws IOException { return > termDocs.next(); } > + > + public int read(final int[] docs, final int[] freqs) throws > IOException { > + return termDocs.read(docs, freqs); > + } > + > + public boolean skipTo(int target) throws IOException { > + return termDocs.skipTo(target); > + } > + > + public void close() throws IOException { termDocs.close(); } > + > + } > + > + private class ParallelTermPositions > + extends ParallelTermDocs implements TermPositions { > + > + public ParallelTermPositions() {} > + public ParallelTermPositions(Term term) throws IOException { > seek(term); } > + > + public void seek(Term term) throws IOException { > + termDocs = ((IndexReader)fieldToReader.get(term.field())) > + .termPositions(term); > + } > + > + public int nextPosition() throws IOException { > + return ((TermPositions)termDocs).nextPosition(); > + } > + > + } > + > +} > + > Index: src/test/org/apache/lucene/index/TestParallelReader.java > =================================================================== > --- src/test/org/apache/lucene/index/TestParallelReader.java > (revision 0) > +++ src/test/org/apache/lucene/index/TestParallelReader.java > (revision 0) > @@ -0,0 +1,128 @@ > +package org.apache.lucene.index; > + > +/** > + * Copyright 2005 The Apache Software Foundation > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, > software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions > and > + * limitations under the License. > + */ > + > +import java.io.IOException; > + > +import junit.framework.TestCase; > + > +import org.apache.lucene.analysis.standard.StandardAnalyzer; > +import org.apache.lucene.document.Document; > +import org.apache.lucene.document.Field; > +import org.apache.lucene.index.IndexWriter; > +import org.apache.lucene.index.Term; > +import org.apache.lucene.store.Directory; > +import org.apache.lucene.store.RAMDirectory; > +import org.apache.lucene.search.BooleanClause.Occur; > +import org.apache.lucene.search.*; > + > +public class TestParallelReader extends TestCase { > + > + private Searcher parallel; > + private Searcher single; > + > + protected void setUp() throws Exception { > + single = single(); > + parallel = parallel(); > + } > + > + public void testQueries() throws Exception { > + queryTest(new TermQuery(new Term("f1", "v1"))); > + queryTest(new TermQuery(new Term("f1", "v2"))); > + queryTest(new TermQuery(new Term("f2", "v1"))); > + queryTest(new TermQuery(new Term("f2", "v2"))); > + queryTest(new TermQuery(new Term("f3", "v1"))); > + queryTest(new TermQuery(new Term("f3", "v2"))); > + queryTest(new TermQuery(new Term("f4", "v1"))); > + queryTest(new TermQuery(new Term("f4", "v2"))); > + > + BooleanQuery bq1 = new BooleanQuery(); > + bq1.add(new TermQuery(new Term("f1", "v1")), Occur.MUST); > + bq1.add(new TermQuery(new Term("f4", "v1")), Occur.MUST); > + queryTest(bq1); > + > + } > + > + private void queryTest(Query query) throws IOException { > + Hits parallelHits = parallel.search(query); > + Hits singleHits = single.search(query); > + assertEquals(parallelHits.length(), singleHits.length()); > + for(int i = 0; i < parallelHits.length(); i++) { > + assertEquals(parallelHits.score(i), singleHits.score(i), > 0.001f); > + Document docParallel = parallelHits.doc(i); > + Document docSingle = singleHits.doc(i); > + assertEquals(docParallel.get("f1"), docSingle.get("f1")); > + assertEquals(docParallel.get("f2"), docSingle.get("f2")); > + assertEquals(docParallel.get("f3"), docSingle.get("f3")); > + assertEquals(docParallel.get("f4"), docSingle.get("f4")); > + } > + } > + > + // Fiels 1-4 indexed together: > + private Searcher single() throws IOException { > + Directory dir = new RAMDirectory(); > + IndexWriter w = new IndexWriter(dir, new StandardAnalyzer(), > true); > + Document d1 = new Document(); > + d1.add(new Field("f1", "v1", Field.Store.YES, > Field.Index.TOKENIZED)); > + d1.add(new Field("f2", "v1", Field.Store.YES, > Field.Index.TOKENIZED)); > + d1.add(new Field("f3", "v1", Field.Store.YES, > Field.Index.TOKENIZED)); > + d1.add(new Field("f4", "v1", Field.Store.YES, > Field.Index.TOKENIZED)); > + w.addDocument(d1); > + Document d2 = new Document(); > + d2.add(new Field("f1", "v2", Field.Store.YES, > Field.Index.TOKENIZED)); > + d2.add(new Field("f2", "v2", Field.Store.YES, > Field.Index.TOKENIZED)); > + d2.add(new Field("f3", "v2", Field.Store.YES, > Field.Index.TOKENIZED)); > + d2.add(new Field("f4", "v2", Field.Store.YES, > Field.Index.TOKENIZED)); > + w.addDocument(d2); > + w.close(); > + > + return new IndexSearcher(dir); > + } > + > + // Fields 1 & 2 in one index, 3 & 4 in other, with ParallelReader: > + private Searcher parallel() throws IOException { > + Directory dir1 = new RAMDirectory(); > + IndexWriter w1 = new IndexWriter(dir1, new StandardAnalyzer(), > true); > + Document d1 = new Document(); > + d1.add(new Field("f1", "v1", Field.Store.YES, > Field.Index.TOKENIZED)); > + d1.add(new Field("f2", "v1", Field.Store.YES, > Field.Index.TOKENIZED)); > + w1.addDocument(d1); > + Document d2 = new Document(); > + d2.add(new Field("f1", "v2", Field.Store.YES, > Field.Index.TOKENIZED)); > + d2.add(new Field("f2", "v2", Field.Store.YES, > Field.Index.TOKENIZED)); > + w1.addDocument(d2); > + w1.close(); > + > + Directory dir2 = new RAMDirectory(); > + IndexWriter w2 = new IndexWriter(dir2, new StandardAnalyzer(), > true); > + Document d3 = new Document(); > + d3.add(new Field("f3", "v1", Field.Store.YES, > Field.Index.TOKENIZED)); > + d3.add(new Field("f4", "v1", Field.Store.YES, > Field.Index.TOKENIZED)); > + w2.addDocument(d3); > + Document d4 = new Document(); > + d4.add(new Field("f3", "v2", Field.Store.YES, > Field.Index.TOKENIZED)); > + d4.add(new Field("f4", "v2", Field.Store.YES, > Field.Index.TOKENIZED)); > + w2.addDocument(d4); > + w2.close(); > + > + ParallelReader pr = new ParallelReader(); > + pr.add(IndexReader.open(dir1)); > + pr.add(IndexReader.open(dir2)); > + > + return new IndexSearcher(pr); > + } > +} > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: java-dev-unsubscribe@lucene.apache.org > For additional commands, e-mail: java-dev-help@lucene.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: java-dev-unsubscribe@lucene.apache.org For additional commands, e-mail: java-dev-help@lucene.apache.org