jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mreut...@apache.org
Subject svn commit: r834039 - in /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene: IndexMerger.java MultiIndex.java SearchIndex.java
Date Mon, 09 Nov 2009 12:48:24 GMT
Author: mreutegg
Date: Mon Nov  9 12:48:23 2009
New Revision: 834039

URL: http://svn.apache.org/viewvc?rev=834039&view=rev
Log:
JCR-2387: Use executor service from repository for index merging

Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java?rev=834039&r1=834038&r2=834039&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
Mon Nov  9 12:48:23 2009
@@ -16,20 +16,23 @@
  */
 package org.apache.jackrabbit.core.query.lucene;
 
-import org.apache.lucene.index.Term;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.Term;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Collections;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.io.IOException;
-
 /**
  * Merges indexes in a separate daemon thread.
  */
@@ -41,11 +44,6 @@
     private static final Logger log = LoggerFactory.getLogger(IndexMerger.class);
 
     /**
-     * Marker task to signal the background thread to quit.
-     */
-    private static final Merge QUIT = new Merge(new Index[0]);
-
-    /**
      * minMergeDocs config parameter.
      */
     private int minMergeDocs = SearchIndex.DEFAULT_MIN_MERGE_DOCS;
@@ -61,11 +59,6 @@
     private int mergeFactor = SearchIndex.DEFAULT_MERGE_FACTOR;
 
     /**
-     * Queue of merge Tasks
-     */
-    private final BlockingQueue<Merge> mergeTasks = new LinkedBlockingQueue<Merge>();
-
-    /**
      * List of <code>IndexBucket</code>s in ascending document limit.
      */
     private final List<IndexBucket> indexBuckets = new ArrayList<IndexBucket>();
@@ -76,14 +69,34 @@
     private final MultiIndex multiIndex;
 
     /**
+     * The executor of the repository.
+     */
+    private final Executor executor;
+
+    /**
+     * Flag that indicates that this index merger is shuting down and should
+     * quit. 
+     */
+    private final AtomicBoolean quit = new AtomicBoolean(false);
+
+    /**
+     * Flag that indicates if this index merger has already been started.
+     * @see #start()
+     */
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+    /**
      * Monitor object to synchronize merge calculation.
      */
     private final Object lock = new Object();
 
     /**
-     * Mutex that is acquired when replacing indexes on MultiIndex.
+     * Read/write lock for index segment replacement. A shared read lock is
+     * aquired for an index replacement. An exclusive write lock is acquired
+     * when this index merger is shuting down, to prevent further index
+     * replacements.
      */
-    private final Semaphore indexReplacement;
+    private final ReadWriteLock indexReplacement = new ReentrantReadWriteLock();
 
     /**
      * List of merger threads that are currently busy.
@@ -91,32 +104,23 @@
     private final List<Worker> busyMergers = new ArrayList<Worker>();
 
     /**
-     * List of merger threads.
-     */
-    private final List<Worker> workers = new ArrayList<Worker>();
-
-    /**
      * Creates an <code>IndexMerger</code>.
      *
      * @param multiIndex the <code>MultiIndex</code>.
-     * @param numWorkers the number of worker threads to use.
+     * @param executor   the executor of the repository.
      */
-    IndexMerger(MultiIndex multiIndex, int numWorkers) {
+    IndexMerger(MultiIndex multiIndex, Executor executor) {
         this.multiIndex = multiIndex;
-        for (int i = 0; i < numWorkers; i++) {
-            Worker w = new Worker();
-            workers.add(w);
-            busyMergers.add(w);
-        }
-        this.indexReplacement = new Semaphore(workers.size());
+        this.executor = executor;
     }
 
     /**
      * Starts this index merger.
      */
     void start() {
-        for (Thread t : workers) {
-            t.start();
+        isStarted.set(true);
+        for (Worker worker : busyMergers) {
+            worker.unblock();
         }
     }
 
@@ -190,7 +194,6 @@
                     }
                     addMergeTask(new Merge(idxs));
                     if (log.isDebugEnabled()) {
-                        log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
                         int numBusy;
                         synchronized (busyMergers) {
                             numBusy = busyMergers.size();
@@ -236,21 +239,17 @@
      */
     void dispose() {
         log.debug("dispose IndexMerger");
-        // get all permits for index replacements
+        // get exclusive lock on index replacements
         try {
-            indexReplacement.acquire(workers.size());
+            indexReplacement.writeLock().lockInterruptibly();
         } catch (InterruptedException e) {
-            log.warn("Interrupted while acquiring index replacement permits: " + e);
+            log.warn("Interrupted while acquiring index replacement exclusive lock: " + e);
             // try to stop IndexMerger without the sync
         }
 
-        log.debug("merge queue size: " + mergeTasks.size());
-        // clear task queue
-        mergeTasks.clear();
-
-        // send quit
-        addMergeTask(QUIT);
-        log.debug("quit sent");
+        // set quit
+        quit.set(true);
+        log.debug("quit flag set");
 
         try {
             // give the merger threads some time to quit,
@@ -259,9 +258,13 @@
             // die without being able to finish the merge.
             // at this point it is not possible anymore to replace indexes
             // on the MultiIndex because we hold all indexReplacement permits.
-            for (Thread t : workers) {
-                t.join(500);
-                if (t.isAlive()) {
+            Worker[] workers;
+            synchronized (busyMergers) {
+                workers = busyMergers.toArray(new Worker[busyMergers.size()]);
+            }
+            for (Worker w : workers) {
+                w.join(500);
+                if (w.isAlive()) {
                     log.info("Unable to stop IndexMerger.Worker. Daemon is busy.");
                 } else {
                     log.debug("IndexMerger.Worker thread stopped");
@@ -305,14 +308,17 @@
     //------------------------------< internal >--------------------------------
 
     private void addMergeTask(Merge task) {
-        for (;;) {
-            try {
-                mergeTasks.put(task);
-                break;
-            } catch (InterruptedException e) {
-                // try again
-                Thread.interrupted();
+        // only enqueue if still running
+        if (!quit.get()) {
+            Worker worker = new Worker(task);
+            if (isStarted.get()) {
+                // immediately unblock if this index merger is already started
+                worker.unblock();
+            }
+            synchronized (busyMergers) {
+                busyMergers.add(worker);
             }
+            executor.execute(worker);
         }
     }
 
@@ -448,7 +454,7 @@
         }
     }
 
-    private class Worker extends Thread implements IndexListener {
+    private class Worker implements Runnable, IndexListener {
 
         /**
          * List of id <code>Term</code> that identify documents that were deleted
@@ -456,53 +462,50 @@
          */
         private final List<Term> deletedDocuments = Collections.synchronizedList(new
ArrayList<Term>());
 
-        public Worker() {
-            setName("IndexMerger.Worker");
-            setDaemon(true);
+        /**
+         * A latch that is set to zero when this worker is unblocked.
+         */
+        private final CountDownLatch start = new CountDownLatch(1);
+
+        /**
+         * Flat that indicates whether this woker has finished its work.
+         */
+        private final AtomicBoolean terminated = new AtomicBoolean(true);
+
+        /**
+         * The merge task.
+         */
+        private final Merge task;
+
+        /**
+         * Creates a new worker which is initially blocked. Call
+         * {@link #unblock()} to unblock it.
+         *
+         * @param task the merge task.
+         */
+        private Worker(Merge task) {
+            this.task = task;
         }
 
         /**
          * Implements the index merging.
          */
         public void run() {
-            for (;;) {
-                boolean isIdle = false;
-                if (mergeTasks.size() == 0) {
-                    synchronized (busyMergers) {
-                        busyMergers.remove(this);
-                        busyMergers.notifyAll();
-                    }
-                    isIdle = true;
-                }
-                Merge task;
-                for (;;) {
-                    try {
-                        task = mergeTasks.take();
-                        break;
-                    } catch (InterruptedException e) {
-                        // try again
-                        Thread.interrupted();
-                    }
-                }
-                if (task == QUIT) {
-                    synchronized (busyMergers) {
-                        busyMergers.remove(this);
-                    }
-                    // put back QUIT to signal other workers
-                    addMergeTask(task);
-                    break;
-                }
-                if (isIdle) {
-                    synchronized (busyMergers) {
-                        busyMergers.add(this);
+            // worker is initially suspended
+            try {
+                try {
+                    start.await();
+                } catch (InterruptedException e) {
+                    // check if we should quit
+                    if (!quit.get()) {
+                        // enqueue task again and retry with another thread
+                        addMergeTask(task);
                     }
+                    return;
                 }
 
                 log.debug("accepted merge request");
 
-                // reset deleted documents
-                deletedDocuments.clear();
-
                 // get readers
                 String[] names = new String[task.indexes.length];
                 for (int i = 0; i < task.indexes.length; i++) {
@@ -538,15 +541,16 @@
 
                         // inform multi index
                         // if we cannot get the sync immediately we have to quit
-                        if (!indexReplacement.tryAcquire()) {
+                        Lock shared = indexReplacement.readLock();
+                        if (!shared.tryLock()) {
                             log.debug("index merging canceled");
-                            break;
+                            return;
                         }
                         try {
                             log.debug("replace indexes");
                             multiIndex.replaceIndexes(names, index, deletedDocuments);
                         } finally {
-                            indexReplacement.release();
+                            shared.unlock();
                         }
 
                         success = true;
@@ -556,13 +560,24 @@
                             // delete index
                             log.debug("deleting index " + index.getName());
                             multiIndex.deleteIndex(index);
+                            // add task again and retry
+                            addMergeTask(task);
                         }
                     }
                 } catch (Throwable e) {
                     log.error("Error while merging indexes: ", e);
                 }
+            } finally {
+                synchronized (terminated) {
+                    terminated.set(true);
+                    terminated.notifyAll();
+                }
+                synchronized (busyMergers) {
+                    busyMergers.remove(this);
+                    busyMergers.notifyAll();
+                }
+                log.debug("Worker finished");
             }
-            log.info("IndexMerger.Worker terminated");
         }
 
         /**
@@ -572,5 +587,37 @@
             log.debug("document deleted: " + id.text());
             deletedDocuments.add(id);
         }
+
+        /**
+         * Unblocks this worker and allows it to start with the index merging.
+         */
+        void unblock() {
+            start.countDown();
+        }
+
+        /**
+         * Waits until this worker is finished or the specified amount of time
+         * has elapsed.
+         *
+         * @param timeout the timeout in milliseconds.
+         * @throws InterruptedException if the current thread is interrupted
+         *                              while waiting for this worker to
+         *                              terminate.
+         */
+        void join(long timeout) throws InterruptedException {
+            synchronized (terminated) {
+                while (!terminated.get()) {
+                    terminated.wait(timeout);
+                }
+            }
+        }
+
+        /**
+         * @return <code>true</code> if this worker is still alive and not yet
+         *         terminated.
+         */
+        boolean isAlive() {
+            return !terminated.get();
+        }
     }
 }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java?rev=834039&r1=834038&r2=834039&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
Mon Nov  9 12:48:23 2009
@@ -16,40 +16,41 @@
  */
 package org.apache.jackrabbit.core.query.lucene;
 
+import java.io.IOException;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jcr.RepositoryException;
+
 import org.apache.jackrabbit.core.id.NodeId;
 import org.apache.jackrabbit.core.query.lucene.directory.DirectoryManager;
+import org.apache.jackrabbit.core.state.ChildNodeEntry;
 import org.apache.jackrabbit.core.state.ItemStateException;
 import org.apache.jackrabbit.core.state.ItemStateManager;
 import org.apache.jackrabbit.core.state.NoSuchItemStateException;
 import org.apache.jackrabbit.core.state.NodeState;
-import org.apache.jackrabbit.core.state.ChildNodeEntry;
-import org.apache.jackrabbit.util.Timer;
 import org.apache.jackrabbit.spi.Path;
 import org.apache.jackrabbit.spi.PathFactory;
-import org.apache.jackrabbit.spi.commons.name.PathFactoryImpl;
-import org.apache.jackrabbit.spi.commons.conversion.PathResolver;
 import org.apache.jackrabbit.spi.commons.conversion.DefaultNamePathResolver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.jackrabbit.spi.commons.conversion.PathResolver;
+import org.apache.jackrabbit.spi.commons.name.PathFactoryImpl;
+import org.apache.jackrabbit.util.Timer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.store.Directory;
-
-import javax.jcr.RepositoryException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Arrays;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Calendar;
-import java.text.DateFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A <code>MultiIndex</code> consists of a {@link VolatileIndex} and multiple
@@ -255,7 +256,7 @@
         this.redoLog = redoLogFactory.createRedoLog(this);
 
         // initialize IndexMerger
-        merger = new IndexMerger(this, handler.getIndexMergerPoolSize());
+        merger = new IndexMerger(this, handler.getContext().getExecutor());
         merger.setMaxMergeDocs(handler.getMaxMergeDocs());
         merger.setMergeFactor(handler.getMergeFactor());
         merger.setMinMergeDocs(handler.getMinMergeDocs());

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java?rev=834039&r1=834038&r2=834039&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
Mon Nov  9 12:48:23 2009
@@ -170,11 +170,6 @@
     public static final int DEFAULT_TERM_INFOS_INDEX_DIVISOR = 1;
 
     /**
-     * The default value for {@link #indexMergerPoolSize}.
-     */
-    public static final int DEFAULT_INDEX_MERGER_POOL_SIZE = 2;
-
-    /**
      * The path factory.
      */
     protected static final PathFactory PATH_FACTORY = PathFactoryImpl.getInstance();
@@ -447,11 +442,6 @@
     private boolean initializeHierarchyCache = true;
 
     /**
-     * The number of worker threads for merging index segments.
-     */
-    private int indexMergerPoolSize = DEFAULT_INDEX_MERGER_POOL_SIZE;
-
-    /**
      * The name of the redo log factory class implementation.
      */
     private String redoLogFactoryClass = DefaultRedoLogFactory.class.getName();
@@ -2227,26 +2217,6 @@
     }
 
     /**
-     * @return the current size of the index merger pool.
-     */
-    public int getIndexMergerPoolSize() {
-        return indexMergerPoolSize;
-    }
-
-    /**
-     * Sets a new value for the index merger pool size.
-     *
-     * @param indexMergerPoolSize the number of worker threads.
-     * @throws IllegalArgumentException if the size is less than or equal 0.
-     */
-    public void setIndexMergerPoolSize(int indexMergerPoolSize) {
-        if (indexMergerPoolSize <= 0) {
-            throw new IllegalArgumentException("must be greater than 0");
-        }
-        this.indexMergerPoolSize = indexMergerPoolSize;
-    }
-
-    /**
      * @return the maximum age in seconds for outdated generations of
      * {@link IndexInfos}.
      */



Mime
View raw message