jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mreut...@apache.org
Subject svn commit: r792109 - in /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene: IndexMerger.java MultiIndex.java SearchIndex.java
Date Wed, 08 Jul 2009 12:14:11 GMT
Author: mreutegg
Date: Wed Jul  8 12:14:11 2009
New Revision: 792109

URL: http://svn.apache.org/viewvc?rev=792109&view=rev
Log:
JCR-1818: Too many open files when merging large index segments

Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java?rev=792109&r1=792108&r2=792109&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
Wed Jul  8 12:14:11 2009
@@ -18,9 +18,6 @@
 
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.IndexReader;
-import org.apache.commons.collections.Buffer;
-import org.apache.commons.collections.BufferUtils;
-import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,15 +25,15 @@
 import java.util.Collections;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.io.IOException;
 
-import EDU.oswego.cs.dl.util.concurrent.Sync;
-import EDU.oswego.cs.dl.util.concurrent.Mutex;
-
 /**
  * Merges indexes in a separate daemon thread.
  */
-class IndexMerger extends Thread implements IndexListener {
+class IndexMerger implements IndexListener {
 
     /**
      * Logger instance for this class.
@@ -66,18 +63,12 @@
     /**
      * Queue of merge Tasks
      */
-    private final Buffer mergeTasks = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
-
-    /**
-     * List of id <code>Term</code> that identify documents that were deleted
-     * while a merge was running.
-     */
-    private final List deletedDocuments = Collections.synchronizedList(new ArrayList());
+    private final BlockingQueue<Merge> mergeTasks = new LinkedBlockingQueue<Merge>();
 
     /**
      * List of <code>IndexBucket</code>s in ascending document limit.
      */
-    private final List indexBuckets = new ArrayList();
+    private final List<IndexBucket> indexBuckets = new ArrayList<IndexBucket>();
 
     /**
      * The <code>MultiIndex</code> this index merger is working on.
@@ -92,27 +83,40 @@
     /**
      * Mutex that is acquired when replacing indexes on MultiIndex.
      */
-    private final Sync indexReplacement = new Mutex();
+    private final Semaphore indexReplacement;
 
     /**
-     * When released, indicates that this index merger is idle.
+     * List of merger threads that are currently busy.
      */
-    private final Sync mergerIdle = new Mutex();
+    private final List<Worker> busyMergers = new ArrayList<Worker>();
+
+    /**
+     * List of merger threads.
+     */
+    private final List<Worker> workers = new ArrayList<Worker>();
 
     /**
      * Creates an <code>IndexMerger</code>.
      *
      * @param multiIndex the <code>MultiIndex</code>.
+     * @param numWorkers the number of worker threads to use.
      */
-    IndexMerger(MultiIndex multiIndex) {
+    IndexMerger(MultiIndex multiIndex, int numWorkers) {
         this.multiIndex = multiIndex;
-        setName("IndexMerger");
-        setDaemon(true);
-        try {
-            mergerIdle.acquire();
-        } catch (InterruptedException e) {
-            // will never happen, lock is free upon construction
-            throw new InternalError("Unable to acquire mutex after construction");
+        for (int i = 0; i < numWorkers; i++) {
+            Worker w = new Worker();
+            workers.add(w);
+            busyMergers.add(w);
+        }
+        this.indexReplacement = new Semaphore(workers.size());
+    }
+
+    /**
+     * Starts this index merger.
+     */
+    void start() {
+        for (Thread t : workers) {
+            t.start();
         }
     }
 
@@ -149,9 +153,9 @@
             }
 
             // put index in bucket
-            IndexBucket bucket = (IndexBucket) indexBuckets.get(indexBuckets.size() - 1);
-            for (int i = 0; i < indexBuckets.size(); i++) {
-                bucket = (IndexBucket) indexBuckets.get(i);
+            IndexBucket bucket = indexBuckets.get(indexBuckets.size() - 1);
+            for (IndexBucket indexBucket : indexBuckets) {
+                bucket = indexBucket;
                 if (bucket.fits(numDocs)) {
                     break;
                 }
@@ -172,20 +176,27 @@
                 long targetMergeDocs = bucket.upper;
                 targetMergeDocs = Math.min(targetMergeDocs * mergeFactor, maxMergeDocs);
                 // sum up docs in bucket
-                List indexesToMerge = new ArrayList();
+                List<Index> indexesToMerge = new ArrayList<Index>();
                 int mergeDocs = 0;
-                for (Iterator it = bucket.iterator(); it.hasNext() && mergeDocs <=
targetMergeDocs;) {
+                for (Iterator<Index> it = bucket.iterator(); it.hasNext() &&
mergeDocs <= targetMergeDocs;) {
                     indexesToMerge.add(it.next());
                 }
                 if (indexesToMerge.size() > 2) {
                     // found merge
-                    Index[] idxs = (Index[]) indexesToMerge.toArray(new Index[indexesToMerge.size()]);
+                    Index[] idxs = indexesToMerge.toArray(new Index[indexesToMerge.size()]);
                     bucket.removeAll(indexesToMerge);
                     if (log.isDebugEnabled()) {
                         log.debug("requesting merge for " + indexesToMerge);
                     }
-                    mergeTasks.add(new Merge(idxs));
-                    log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
+                    addMergeTask(new Merge(idxs));
+                    if (log.isDebugEnabled()) {
+                        log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
+                        int numBusy;
+                        synchronized (busyMergers) {
+                            numBusy = busyMergers.size();
+                        }
+                        log.debug("# of busy merge workers: " + numBusy);
+                    }
                 }
             }
         }
@@ -196,18 +207,27 @@
      */
     public void documentDeleted(Term id) {
         log.debug("document deleted: " + id.text());
-        deletedDocuments.add(id);
+        synchronized (busyMergers) {
+            for (Worker w : busyMergers) {
+                w.documentDeleted(id);
+            }
+        }
     }
 
     /**
      * When the calling thread returns this index merger will be idle, that is
-     * there will be no merge tasks pending anymore. The method returns immediately
-     * if there are currently no tasks pending at all.
+     * there will be no merge tasks pending anymore. The method returns
+     * immediately if there are currently no tasks pending at all.
+     *
+     * @throws InterruptedException if this thread is interrupted while waiting
+     *                              for the worker threads to become idle.
      */
     void waitUntilIdle() throws InterruptedException {
-        mergerIdle.acquire();
-        // and immediately release again
-        mergerIdle.release();
+        synchronized (busyMergers) {
+            while (!busyMergers.isEmpty()) {
+                busyMergers.wait();
+            }
+        }
     }
 
     /**
@@ -216,135 +236,48 @@
      */
     void dispose() {
         log.debug("dispose IndexMerger");
-        // get mutex for index replacements
+        // get all permits for index replacements
         try {
-            indexReplacement.acquire();
+            indexReplacement.acquire(workers.size());
         } catch (InterruptedException e) {
-            log.warn("Interrupted while acquiring index replacement sync: " + e);
+            log.warn("Interrupted while acquiring index replacement permits: " + e);
             // try to stop IndexMerger without the sync
         }
 
+        log.debug("merge queue size: " + mergeTasks.size());
         // clear task queue
         mergeTasks.clear();
 
         // send quit
-        mergeTasks.add(QUIT);
+        addMergeTask(QUIT);
         log.debug("quit sent");
 
         try {
-            // give the merger thread some time to quit,
-            // it is possible that the merger is busy working on a large index.
+            // give the merger threads some time to quit,
+            // it is possible that the mergers are busy working on a large index.
             // if that is the case we will just ignore it and the daemon will
             // die without being able to finish the merge.
             // at this point it is not possible anymore to replace indexes
-            // on the MultiIndex because we hold the indexReplacement Sync.
-            this.join(500);
-            if (isAlive()) {
-                log.info("Unable to stop IndexMerger. Daemon is busy.");
-            } else {
-                log.debug("IndexMerger thread stopped");
-            }
-            log.debug("merge queue size: " + mergeTasks.size());
-        } catch (InterruptedException e) {
-            log.warn("Interrupted while waiting for IndexMerger thread to terminate.");
-        }
-    }
-
-    /**
-     * Implements the index merging.
-     */
-    public void run() {
-        for (;;) {
-            boolean isIdle = false;
-            if (mergeTasks.size() == 0) {
-                mergerIdle.release();
-                isIdle = true;
-            }
-            Merge task = (Merge) mergeTasks.remove();
-            if (task == QUIT) {
-                mergerIdle.release();
-                break;
-            }
-            if (isIdle) {
-                try {
-                    mergerIdle.acquire();
-                } catch (InterruptedException e) {
-                    Thread.interrupted();
-                    log.warn("Unable to acquire mergerIdle sync");
+            // on the MultiIndex because we hold all indexReplacement permits.
+            for (Thread t : workers) {
+                t.join(500);
+                if (t.isAlive()) {
+                    log.info("Unable to stop IndexMerger.Worker. Daemon is busy.");
+                } else {
+                    log.debug("IndexMerger.Worker thread stopped");
                 }
             }
-
-            log.debug("accepted merge request");
-
-            // reset deleted documents
-            deletedDocuments.clear();
-
-            // get readers
-            String[] names = new String[task.indexes.length];
-            for (int i = 0; i < task.indexes.length; i++) {
-                names[i] = task.indexes[i].name;
-            }
-            try {
-                log.debug("create new index");
-                PersistentIndex index = multiIndex.getOrCreateIndex(null);
-                boolean success = false;
-                try {
-
-                    log.debug("get index readers from MultiIndex");
-                    IndexReader[] readers = multiIndex.getIndexReaders(names, this);
-                    try {
-                        // do the merge
-                        long time = System.currentTimeMillis();
-                        index.addIndexes(readers);
-                        time = System.currentTimeMillis() - time;
-                        int docCount = 0;
-                        for (int i = 0; i < readers.length; i++) {
-                            docCount += readers[i].numDocs();
-                        }
-                        log.info("merged " + docCount + " documents in " + time + " ms into
" + index.getName() + ".");
-                    } finally {
-                        for (int i = 0; i < readers.length; i++) {
-                            try {
-                                Util.closeOrRelease(readers[i]);
-                            } catch (IOException e) {
-                                log.warn("Unable to close IndexReader: " + e);
-                            }
-                        }
-                    }
-
-                    // inform multi index
-                    // if we cannot get the sync immediately we have to quit
-                    if (!indexReplacement.attempt(0)) {
-                        log.debug("index merging canceled");
-                        break;
-                    }
-                    try {
-                        log.debug("replace indexes");
-                        multiIndex.replaceIndexes(names, index, deletedDocuments);
-                    } finally {
-                        indexReplacement.release();
-                    }
-
-                    success = true;
-
-                } finally {
-                    if (!success) {
-                        // delete index
-                        log.debug("deleting index " + index.getName());
-                        multiIndex.deleteIndex(index);
-                    }
-                }
-            } catch (Throwable e) {
-                log.error("Error while merging indexes: ", e);
-            }
+        } catch (InterruptedException e) {
+            log.warn("Interrupted while waiting for IndexMerger threads to terminate.");
         }
-        log.info("IndexMerger terminated");
     }
 
     //-----------------------< merge properties >-------------------------------
 
     /**
      * The merge factor.
+     *
+     * @param mergeFactor the merge factor.
      */
     public void setMergeFactor(int mergeFactor) {
         this.mergeFactor = mergeFactor;
@@ -353,6 +286,8 @@
 
     /**
      * The initial threshold for number of documents to merge to a new index.
+     *
+     * @param minMergeDocs the min merge docs number.
      */
     public void setMinMergeDocs(int minMergeDocs) {
         this.minMergeDocs = minMergeDocs;
@@ -360,6 +295,8 @@
 
     /**
      * The maximum number of document to merge.
+     *
+     * @param maxMergeDocs the max merge docs number.
      */
     public void setMaxMergeDocs(int maxMergeDocs) {
         this.maxMergeDocs = maxMergeDocs;
@@ -367,6 +304,18 @@
 
     //------------------------------< internal >--------------------------------
 
+    private void addMergeTask(Merge task) {
+        for (;;) {
+            try {
+                mergeTasks.put(task);
+                break;
+            } catch (InterruptedException e) {
+                // try again
+                Thread.interrupted();
+            }
+        }
+    }
+
     /**
      * Implements a simple struct that holds the name of an index and how
      * many document it contains. <code>Index</code> is comparable using the
@@ -446,7 +395,9 @@
      * <code>IndexBucket</code> contains {@link Index}es with documents less
      * or equal the document limit of the bucket.
      */
-    private static final class IndexBucket extends ArrayList {
+    private static final class IndexBucket extends ArrayList<Index> {
+
+        private static final long serialVersionUID = 2985514550083374904L;
 
         /**
          * The lower document limit.
@@ -496,4 +447,130 @@
             return allowMerge;
         }
     }
+
+    private class Worker extends Thread implements IndexListener {
+
+        /**
+         * List of id <code>Term</code> that identify documents that were deleted
+         * while a merge was running.
+         */
+        private final List<Term> deletedDocuments = Collections.synchronizedList(new
ArrayList<Term>());
+
+        public Worker() {
+            setName("IndexMerger.Worker");
+            setDaemon(true);
+        }
+
+        /**
+         * Implements the index merging.
+         */
+        public void run() {
+            for (;;) {
+                boolean isIdle = false;
+                if (mergeTasks.size() == 0) {
+                    synchronized (busyMergers) {
+                        busyMergers.remove(this);
+                        busyMergers.notifyAll();
+                    }
+                    isIdle = true;
+                }
+                Merge task;
+                for (;;) {
+                    try {
+                        task = mergeTasks.take();
+                        break;
+                    } catch (InterruptedException e) {
+                        // try again
+                        Thread.interrupted();
+                    }
+                }
+                if (task == QUIT) {
+                    synchronized (busyMergers) {
+                        busyMergers.remove(this);
+                    }
+                    // put back QUIT to signal other workers
+                    addMergeTask(task);
+                    break;
+                }
+                if (isIdle) {
+                    synchronized (busyMergers) {
+                        busyMergers.add(this);
+                    }
+                }
+
+                log.debug("accepted merge request");
+
+                // reset deleted documents
+                deletedDocuments.clear();
+
+                // get readers
+                String[] names = new String[task.indexes.length];
+                for (int i = 0; i < task.indexes.length; i++) {
+                    names[i] = task.indexes[i].name;
+                }
+                try {
+                    log.debug("create new index");
+                    PersistentIndex index = multiIndex.getOrCreateIndex(null);
+                    boolean success = false;
+                    try {
+
+                        log.debug("get index readers from MultiIndex");
+                        IndexReader[] readers = multiIndex.getIndexReaders(names, IndexMerger.this);
+                        try {
+                            // do the merge
+                            long time = System.currentTimeMillis();
+                            index.addIndexes(readers);
+                            time = System.currentTimeMillis() - time;
+                            int docCount = 0;
+                            for (IndexReader reader : readers) {
+                                docCount += reader.numDocs();
+                            }
+                            log.info("merged " + docCount + " documents in " + time + " ms
into " + index.getName() + ".");
+                        } finally {
+                            for (IndexReader reader : readers) {
+                                try {
+                                    Util.closeOrRelease(reader);
+                                } catch (IOException e) {
+                                    log.warn("Unable to close IndexReader: " + e);
+                                }
+                            }
+                        }
+
+                        // inform multi index
+                        // if we cannot get the sync immediately we have to quit
+                        if (!indexReplacement.tryAcquire()) {
+                            log.debug("index merging canceled");
+                            break;
+                        }
+                        try {
+                            log.debug("replace indexes");
+                            multiIndex.replaceIndexes(names, index, deletedDocuments);
+                        } finally {
+                            indexReplacement.release();
+                        }
+
+                        success = true;
+
+                    } finally {
+                        if (!success) {
+                            // delete index
+                            log.debug("deleting index " + index.getName());
+                            multiIndex.deleteIndex(index);
+                        }
+                    }
+                } catch (Throwable e) {
+                    log.error("Error while merging indexes: ", e);
+                }
+            }
+            log.info("IndexMerger.Worker terminated");
+        }
+
+        /**
+         * @inheritDoc
+         */
+        public void documentDeleted(Term id) {
+            log.debug("document deleted: " + id.text());
+            deletedDocuments.add(id);
+        }
+    }
 }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java?rev=792109&r1=792108&r2=792109&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
Wed Jul  8 12:14:11 2009
@@ -241,7 +241,7 @@
         removeDeletable();
 
         // initialize IndexMerger
-        merger = new IndexMerger(this);
+        merger = new IndexMerger(this, handler.getIndexMergerPoolSize());
         merger.setMaxMergeDocs(handler.getMaxMergeDocs());
         merger.setMergeFactor(handler.getMergeFactor());
         merger.setMinMergeDocs(handler.getMinMergeDocs());

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java?rev=792109&r1=792108&r2=792109&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
Wed Jul  8 12:14:11 2009
@@ -172,6 +172,11 @@
     public static final int DEFAULT_TERM_INFOS_INDEX_DIVISOR = 1;
 
     /**
+     * The default value for {@link #indexMergerPoolSize}.
+     */
+    public static final int DEFAULT_INDEX_MERGER_POOL_SIZE = 2;
+
+    /**
      * The path factory.
      */
     protected static final PathFactory PATH_FACTORY = PathFactoryImpl.getInstance();
@@ -445,6 +450,11 @@
     private boolean initializeHierarchyCache = true;
 
     /**
+     * The number of worker threads for merging index segments.
+     */
+    private int indexMergerPoolSize = DEFAULT_INDEX_MERGER_POOL_SIZE;
+
+    /**
      * Indicates if this <code>SearchIndex</code> is closed and cannot be used
      * anymore.
      */
@@ -731,6 +741,8 @@
     /**
      * This method returns the QueryNodeFactory used to parse Queries. This method
      * may be overridden to provide a customized QueryNodeFactory
+     *
+     * @return the query node factory.
      */
     protected DefaultQueryNodeFactory getQueryNodeFactory() {
         return DEFAULT_QUERY_NODE_FACTORY;
@@ -2169,6 +2181,26 @@
         this.initializeHierarchyCache = initializeHierarchyCache;
     }
 
+    /**
+     * @return the current size of the index merger pool.
+     */
+    public int getIndexMergerPoolSize() {
+        return indexMergerPoolSize;
+    }
+
+    /**
+     * Sets a new value for the index merger pool size.
+     *
+     * @param indexMergerPoolSize the number of worker threads.
+     * @throws IllegalArgumentException if the size is less than or equal 0.
+     */
+    public void setIndexMergerPoolSize(int indexMergerPoolSize) {
+        if (indexMergerPoolSize <= 0) {
+            throw new IllegalArgumentException("must be greater than 0");
+        }
+        this.indexMergerPoolSize = indexMergerPoolSize;
+    }
+
     //----------------------------< internal >----------------------------------
 
     /**



Mime
View raw message