Return-Path: Delivered-To: apmail-jackrabbit-commits-archive@www.apache.org Received: (qmail 86493 invoked from network); 7 Aug 2009 08:48:06 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 7 Aug 2009 08:48:06 -0000 Received: (qmail 536 invoked by uid 500); 7 Aug 2009 08:48:13 -0000 Delivered-To: apmail-jackrabbit-commits-archive@jackrabbit.apache.org Received: (qmail 486 invoked by uid 500); 7 Aug 2009 08:48:13 -0000 Mailing-List: contact commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@jackrabbit.apache.org Delivered-To: mailing list commits@jackrabbit.apache.org Received: (qmail 477 invoked by uid 99); 7 Aug 2009 08:48:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Aug 2009 08:48:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Aug 2009 08:48:02 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0496B2388854; Fri, 7 Aug 2009 08:47:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r801913 - in /jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene: IndexMerger.java MultiIndex.java SearchIndex.java Date: Fri, 07 Aug 2009 08:47:40 -0000 To: commits@jackrabbit.apache.org From: jukka@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090807084741.0496B2388854@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jukka Date: Fri Aug 7 08:47:40 2009 New Revision: 801913 URL: http://svn.apache.org/viewvc?rev=801913&view=rev Log: 1.x: Reverted revision 801245 as it used java.util.concurrent classes not available in Java 1.4 (JCR-1818) Modified: jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java Modified: jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java URL: http://svn.apache.org/viewvc/jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java?rev=801913&r1=801912&r2=801913&view=diff ============================================================================== --- jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java (original) +++ jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java Fri Aug 7 08:47:40 2009 @@ -18,6 +18,9 @@ import org.apache.lucene.index.Term; import org.apache.lucene.index.IndexReader; +import org.apache.commons.collections.Buffer; +import org.apache.commons.collections.BufferUtils; +import org.apache.commons.collections.buffer.UnboundedFifoBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,15 +28,15 @@ import java.util.Collections; import java.util.ArrayList; import java.util.Iterator; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Semaphore; import java.io.IOException; +import EDU.oswego.cs.dl.util.concurrent.Sync; +import EDU.oswego.cs.dl.util.concurrent.Mutex; + /** * Merges indexes in a separate daemon thread. */ -class IndexMerger implements IndexListener { +class IndexMerger extends Thread implements IndexListener { /** * Logger instance for this class. @@ -63,7 +66,13 @@ /** * Queue of merge Tasks */ - private final BlockingQueue mergeTasks = new LinkedBlockingQueue(); + private final Buffer mergeTasks = BufferUtils.blockingBuffer(new UnboundedFifoBuffer()); + + /** + * List of id Term that identify documents that were deleted + * while a merge was running. + */ + private final List deletedDocuments = Collections.synchronizedList(new ArrayList()); /** * List of IndexBuckets in ascending document limit. @@ -83,41 +92,27 @@ /** * Mutex that is acquired when replacing indexes on MultiIndex. */ - private final Semaphore indexReplacement; + private final Sync indexReplacement = new Mutex(); /** - * List of merger threads that are currently busy. + * When released, indicates that this index merger is idle. */ - private final List busyMergers = new ArrayList(); - - /** - * List of merger threads. - */ - private final List workers = new ArrayList(); + private final Sync mergerIdle = new Mutex(); /** * Creates an IndexMerger. * * @param multiIndex the MultiIndex. - * @param numWorkers the number of worker threads to use. */ - IndexMerger(MultiIndex multiIndex, int numWorkers) { + IndexMerger(MultiIndex multiIndex) { this.multiIndex = multiIndex; - for (int i = 0; i < numWorkers; i++) { - Worker w = new Worker(); - workers.add(w); - busyMergers.add(w); - } - this.indexReplacement = new Semaphore(workers.size()); - } - - /** - * Starts this index merger. - */ - void start() { - Iterator iterator = workers.iterator(); - while (iterator.hasNext()) { - ((Worker) iterator.next()).start(); + setName("IndexMerger"); + setDaemon(true); + try { + mergerIdle.acquire(); + } catch (InterruptedException e) { + // will never happen, lock is free upon construction + throw new InternalError("Unable to acquire mutex after construction"); } } @@ -155,9 +150,8 @@ // put index in bucket IndexBucket bucket = (IndexBucket) indexBuckets.get(indexBuckets.size() - 1); - Iterator iterator = indexBuckets.iterator(); - while (iterator.hasNext()) { - bucket = (IndexBucket) iterator.next(); + for (int i = 0; i < indexBuckets.size(); i++) { + bucket = (IndexBucket) indexBuckets.get(i); if (bucket.fits(numDocs)) { break; } @@ -190,15 +184,8 @@ if (log.isDebugEnabled()) { log.debug("requesting merge for " + indexesToMerge); } - addMergeTask(new Merge(idxs)); - if (log.isDebugEnabled()) { - log.debug("merge queue now contains " + mergeTasks.size() + " tasks."); - int numBusy; - synchronized (busyMergers) { - numBusy = busyMergers.size(); - } - log.debug("# of busy merge workers: " + numBusy); - } + mergeTasks.add(new Merge(idxs)); + log.debug("merge queue now contains " + mergeTasks.size() + " tasks."); } } } @@ -209,28 +196,18 @@ */ public void documentDeleted(Term id) { log.debug("document deleted: " + id.text()); - synchronized (busyMergers) { - Iterator iterator = busyMergers.iterator(); - while (iterator.hasNext()) { - ((Worker) iterator.next()).documentDeleted(id); - } - } + deletedDocuments.add(id); } /** * When the calling thread returns this index merger will be idle, that is - * there will be no merge tasks pending anymore. The method returns - * immediately if there are currently no tasks pending at all. - * - * @throws InterruptedException if this thread is interrupted while waiting - * for the worker threads to become idle. + * there will be no merge tasks pending anymore. The method returns immediately + * if there are currently no tasks pending at all. */ void waitUntilIdle() throws InterruptedException { - synchronized (busyMergers) { - while (!busyMergers.isEmpty()) { - busyMergers.wait(); - } - } + mergerIdle.acquire(); + // and immediately release again + mergerIdle.release(); } /** @@ -239,50 +216,135 @@ */ void dispose() { log.debug("dispose IndexMerger"); - // get all permits for index replacements + // get mutex for index replacements try { - indexReplacement.acquire(workers.size()); + indexReplacement.acquire(); } catch (InterruptedException e) { - log.warn("Interrupted while acquiring index replacement permits: " + e); + log.warn("Interrupted while acquiring index replacement sync: " + e); // try to stop IndexMerger without the sync } - log.debug("merge queue size: " + mergeTasks.size()); // clear task queue mergeTasks.clear(); // send quit - addMergeTask(QUIT); + mergeTasks.add(QUIT); log.debug("quit sent"); try { - // give the merger threads some time to quit, - // it is possible that the mergers are busy working on a large index. + // give the merger thread some time to quit, + // it is possible that the merger is busy working on a large index. // if that is the case we will just ignore it and the daemon will // die without being able to finish the merge. // at this point it is not possible anymore to replace indexes - // on the MultiIndex because we hold all indexReplacement permits. - Iterator iterator = workers.iterator(); - while (iterator.hasNext()) { - Thread t = (Thread) iterator.next(); - t.join(500); - if (t.isAlive()) { - log.info("Unable to stop IndexMerger.Worker. Daemon is busy."); - } else { - log.debug("IndexMerger.Worker thread stopped"); - } + // on the MultiIndex because we hold the indexReplacement Sync. + this.join(500); + if (isAlive()) { + log.info("Unable to stop IndexMerger. Daemon is busy."); + } else { + log.debug("IndexMerger thread stopped"); } + log.debug("merge queue size: " + mergeTasks.size()); } catch (InterruptedException e) { - log.warn("Interrupted while waiting for IndexMerger threads to terminate."); + log.warn("Interrupted while waiting for IndexMerger thread to terminate."); + } + } + + /** + * Implements the index merging. + */ + public void run() { + for (;;) { + boolean isIdle = false; + if (mergeTasks.size() == 0) { + mergerIdle.release(); + isIdle = true; + } + Merge task = (Merge) mergeTasks.remove(); + if (task == QUIT) { + mergerIdle.release(); + break; + } + if (isIdle) { + try { + mergerIdle.acquire(); + } catch (InterruptedException e) { + Thread.interrupted(); + log.warn("Unable to acquire mergerIdle sync"); + } + } + + log.debug("accepted merge request"); + + // reset deleted documents + deletedDocuments.clear(); + + // get readers + String[] names = new String[task.indexes.length]; + for (int i = 0; i < task.indexes.length; i++) { + names[i] = task.indexes[i].name; + } + try { + log.debug("create new index"); + PersistentIndex index = multiIndex.getOrCreateIndex(null); + boolean success = false; + try { + + log.debug("get index readers from MultiIndex"); + IndexReader[] readers = multiIndex.getIndexReaders(names, this); + try { + // do the merge + long time = System.currentTimeMillis(); + index.addIndexes(readers); + time = System.currentTimeMillis() - time; + int docCount = 0; + for (int i = 0; i < readers.length; i++) { + docCount += readers[i].numDocs(); + } + log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + "."); + } finally { + for (int i = 0; i < readers.length; i++) { + try { + Util.closeOrRelease(readers[i]); + } catch (IOException e) { + log.warn("Unable to close IndexReader: " + e); + } + } + } + + // inform multi index + // if we cannot get the sync immediately we have to quit + if (!indexReplacement.attempt(0)) { + log.debug("index merging canceled"); + break; + } + try { + log.debug("replace indexes"); + multiIndex.replaceIndexes(names, index, deletedDocuments); + } finally { + indexReplacement.release(); + } + + success = true; + + } finally { + if (!success) { + // delete index + log.debug("deleting index " + index.getName()); + multiIndex.deleteIndex(index); + } + } + } catch (Throwable e) { + log.error("Error while merging indexes: ", e); + } } + log.info("IndexMerger terminated"); } //-----------------------< merge properties >------------------------------- /** * The merge factor. - * - * @param mergeFactor the merge factor. */ public void setMergeFactor(int mergeFactor) { this.mergeFactor = mergeFactor; @@ -291,8 +353,6 @@ /** * The initial threshold for number of documents to merge to a new index. - * - * @param minMergeDocs the min merge docs number. */ public void setMinMergeDocs(int minMergeDocs) { this.minMergeDocs = minMergeDocs; @@ -300,8 +360,6 @@ /** * The maximum number of document to merge. - * - * @param maxMergeDocs the max merge docs number. */ public void setMaxMergeDocs(int maxMergeDocs) { this.maxMergeDocs = maxMergeDocs; @@ -309,18 +367,6 @@ //------------------------------< internal >-------------------------------- - private void addMergeTask(Merge task) { - for (;;) { - try { - mergeTasks.put(task); - break; - } catch (InterruptedException e) { - // try again - Thread.interrupted(); - } - } - } - /** * Implements a simple struct that holds the name of an index and how * many document it contains. Index is comparable using the @@ -402,8 +448,6 @@ */ private static final class IndexBucket extends ArrayList { - private static final long serialVersionUID = 2985514550083374904L; - /** * The lower document limit. */ @@ -452,130 +496,4 @@ return allowMerge; } } - - private class Worker extends Thread implements IndexListener { - - /** - * List of id Term that identify documents that were deleted - * while a merge was running. - */ - private final List deletedDocuments = Collections.synchronizedList(new ArrayList()); - - public Worker() { - setName("IndexMerger.Worker"); - setDaemon(true); - } - - /** - * Implements the index merging. - */ - public void run() { - for (;;) { - boolean isIdle = false; - if (mergeTasks.size() == 0) { - synchronized (busyMergers) { - busyMergers.remove(this); - busyMergers.notifyAll(); - } - isIdle = true; - } - Merge task; - for (;;) { - try { - task = (Merge) mergeTasks.take(); - break; - } catch (InterruptedException e) { - // try again - Thread.interrupted(); - } - } - if (task == QUIT) { - synchronized (busyMergers) { - busyMergers.remove(this); - } - // put back QUIT to signal other workers - addMergeTask(task); - break; - } - if (isIdle) { - synchronized (busyMergers) { - busyMergers.add(this); - } - } - - log.debug("accepted merge request"); - - // reset deleted documents - deletedDocuments.clear(); - - // get readers - String[] names = new String[task.indexes.length]; - for (int i = 0; i < task.indexes.length; i++) { - names[i] = task.indexes[i].name; - } - try { - log.debug("create new index"); - PersistentIndex index = multiIndex.getOrCreateIndex(null); - boolean success = false; - try { - - log.debug("get index readers from MultiIndex"); - IndexReader[] readers = multiIndex.getIndexReaders(names, IndexMerger.this); - try { - // do the merge - long time = System.currentTimeMillis(); - index.addIndexes(readers); - time = System.currentTimeMillis() - time; - int docCount = 0; - for (int i = 0; i < readers.length; i++) { - docCount += readers[i].numDocs(); - } - log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + "."); - } finally { - for (int i = 0; i < readers.length; i++) { - try { - Util.closeOrRelease(readers[i]); - } catch (IOException e) { - log.warn("Unable to close IndexReader: " + e); - } - } - } - - // inform multi index - // if we cannot get the sync immediately we have to quit - if (!indexReplacement.tryAcquire()) { - log.debug("index merging canceled"); - break; - } - try { - log.debug("replace indexes"); - multiIndex.replaceIndexes(names, index, deletedDocuments); - } finally { - indexReplacement.release(); - } - - success = true; - - } finally { - if (!success) { - // delete index - log.debug("deleting index " + index.getName()); - multiIndex.deleteIndex(index); - } - } - } catch (Throwable e) { - log.error("Error while merging indexes: ", e); - } - } - log.info("IndexMerger.Worker terminated"); - } - - /** - * @inheritDoc - */ - public void documentDeleted(Term id) { - log.debug("document deleted: " + id.text()); - deletedDocuments.add(id); - } - } } Modified: jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java URL: http://svn.apache.org/viewvc/jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java?rev=801913&r1=801912&r2=801913&view=diff ============================================================================== --- jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (original) +++ jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java Fri Aug 7 08:47:40 2009 @@ -241,7 +241,7 @@ removeDeletable(); // initialize IndexMerger - merger = new IndexMerger(this, handler.getIndexMergerPoolSize()); + merger = new IndexMerger(this); merger.setMaxMergeDocs(handler.getMaxMergeDocs()); merger.setMergeFactor(handler.getMergeFactor()); merger.setMinMergeDocs(handler.getMinMergeDocs()); Modified: jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java URL: http://svn.apache.org/viewvc/jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java?rev=801913&r1=801912&r2=801913&view=diff ============================================================================== --- jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (original) +++ jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java Fri Aug 7 08:47:40 2009 @@ -167,11 +167,6 @@ public static final int DEFAULT_TERM_INFOS_INDEX_DIVISOR = 1; /** - * The default value for {@link #indexMergerPoolSize}. - */ - public static final int DEFAULT_INDEX_MERGER_POOL_SIZE = 2; - - /** * The path factory. */ protected static final PathFactory PATH_FACTORY = PathFactoryImpl.getInstance(); @@ -445,11 +440,6 @@ private boolean initializeHierarchyCache = true; /** - * The number of worker threads for merging index segments. - */ - private int indexMergerPoolSize = DEFAULT_INDEX_MERGER_POOL_SIZE; - - /** * Indicates if this SearchIndex is closed and cannot be used * anymore. */ @@ -708,8 +698,6 @@ /** * This method returns the QueryNodeFactory used to parse Queries. This method * may be overridden to provide a customized QueryNodeFactory - * - * @return the query node factory. */ protected DefaultQueryNodeFactory getQueryNodeFactory() { return DEFAULT_QUERY_NODE_FACTORY; @@ -2155,26 +2143,6 @@ this.initializeHierarchyCache = initializeHierarchyCache; } - /** - * @return the current size of the index merger pool. - */ - public int getIndexMergerPoolSize() { - return indexMergerPoolSize; - } - - /** - * Sets a new value for the index merger pool size. - * - * @param indexMergerPoolSize the number of worker threads. - * @throws IllegalArgumentException if the size is less than or equal 0. - */ - public void setIndexMergerPoolSize(int indexMergerPoolSize) { - if (indexMergerPoolSize <= 0) { - throw new IllegalArgumentException("must be greater than 0"); - } - this.indexMergerPoolSize = indexMergerPoolSize; - } - //----------------------------< internal >---------------------------------- /**