jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mreut...@apache.org
Subject svn commit: r596654 - in /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene: AbstractIndex.java DynamicPooledExecutor.java MultiIndex.java VolatileIndex.java
Date Tue, 20 Nov 2007 13:18:27 GMT
Author: mreutegg
Date: Tue Nov 20 05:18:27 2007
New Revision: 596654

URL: http://svn.apache.org/viewvc?rev=596654&view=rev
Log:
JCR-1222: Index nodes in parallel

Added:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java
  (with props)
Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/VolatileIndex.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java?rev=596654&r1=596653&r2=596654&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java
Tue Nov 20 05:18:27 2007
@@ -55,6 +55,9 @@
     /** PrintStream that pipes all calls to println(String) into log.info() */
     private static final LoggingPrintStream STREAM_LOGGER = new LoggingPrintStream();
 
+    /** Executor with a pool size equal to the number of available processors */
+    private static final DynamicPooledExecutor EXECUTOR = new DynamicPooledExecutor();
+
     /** The currently set IndexWriter or <code>null</code> if none is set */
     private IndexWriter indexWriter;
 
@@ -136,16 +139,53 @@
     }
 
     /**
-     * Adds a document to this index and invalidates the shared reader.
+     * Adds documents to this index and invalidates the shared reader.
      *
-     * @param doc the document to add.
+     * @param docs the documents to add.
      * @throws IOException if an error occurs while writing to the index.
      */
-    void addDocument(Document doc) throws IOException {
-        // check if text extractor completed its work
-        doc = getFinishedDocument(doc);
-        getIndexWriter().addDocument(doc);
+    void addDocuments(Document[] docs) throws IOException {
+        final IndexWriter writer = getIndexWriter();
+        DynamicPooledExecutor.Command commands[] =
+                new DynamicPooledExecutor.Command[docs.length];
+        for (int i = 0; i < docs.length; i++) {
+            // check if text extractor completed its work
+            final Document doc = getFinishedDocument(docs[i]);
+            // create a command for inverting the document
+            commands[i] = new DynamicPooledExecutor.Command() {
+                public Object call() throws Exception {
+                    long time = System.currentTimeMillis();
+                    writer.addDocument(doc);
+                    return new Long(System.currentTimeMillis() - time);
+                }
+            };
+        }
+        DynamicPooledExecutor.Result results[] = EXECUTOR.executeAndWait(commands);
         invalidateSharedReader();
+        IOException ex = null;
+        for (int i = 0; i < results.length; i++) {
+            if (results[i].getException() != null) {
+                Throwable cause = results[i].getException().getCause();
+                if (ex == null) {
+                    // only throw the first exception
+                    if (cause instanceof IOException) {
+                        ex = (IOException) cause;
+                    } else {
+                        IOException e = new IOException();
+                        e.initCause(cause);
+                        ex = e;
+                    }
+                } else {
+                    // all others are logged
+                    log.warn("Exception while inverting document", cause);
+                }
+            } else {
+                log.debug("Inverted document in {} ms", results[i].get());
+            }
+        }
+        if (ex != null) {
+            throw ex;
+        }
     }
 
     /**

Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java?rev=596654&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java
(added)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java
Tue Nov 20 05:18:27 2007
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.query.lucene;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.FutureResult;
+import EDU.oswego.cs.dl.util.concurrent.Callable;
+
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * <code>DynamicPooledExecutor</code> implements an executor, which dynamically
+ * adjusts its maximum number of threads according to the number of available
+ * processors returned by {@link Runtime#availableProcessors()}.
+ */
+public class DynamicPooledExecutor {
+
+    /**
+     * The underlying pooled executor.
+     */
+    private final PooledExecutor executor;
+
+    /**
+     * Timestamp when the pool size was last checked.
+     */
+    private volatile long lastCheck;
+
+    /**
+     * The number of processors.
+     */
+    private volatile int numProcessors;
+
+    /**
+     * Creates a new DynamicPooledExecutor.
+     */
+    public DynamicPooledExecutor() {
+        executor = new PooledExecutor();
+        executor.waitWhenBlocked();
+        adjustPoolSize();
+    }
+
+    /**
+     * Executes the given command. This method will block if all threads in the
+     * pool are busy and return only when the command has been accepted. Care
+     * must be taken, that no deadlock occurs when multiple commands are
+     * scheduled for execution. In general commands should not depend on the
+     * execution of other commands!
+     *
+     * @param command the command to execute.
+     */
+    public void execute(Runnable command) {
+        adjustPoolSize();
+        if (numProcessors == 1) {
+            // if there is only one processor execute with current thread
+            command.run();
+        } else {
+            try {
+                executor.execute(command);
+            } catch (InterruptedException e) {
+                // run with current thread instead
+                command.run();
+            }
+        }
+    }
+
+    /**
+     * Executes a set of commands and waits until all commands have been
+     * executed. The results of the commands are returned in the same order as
+     * the commands.
+     *
+     * @param commands the commands to execute.
+     * @return the results.
+     */
+    public Result[] executeAndWait(Command[] commands) {
+        Result[] results = new Result[commands.length];
+        if (numProcessors == 1) {
+            // optimize for one processor
+            for (int i = 0; i < commands.length; i++) {
+                Object obj = null;
+                InvocationTargetException ex = null;
+                try {
+                    obj = commands[i].call();
+                } catch (Exception e) {
+                    ex = new InvocationTargetException(e);
+                }
+                results[i] = new Result(obj, ex);
+            }
+        } else {
+            FutureResult[] futures = new FutureResult[commands.length];
+            for (int i = 0; i < commands.length; i++) {
+                final Command c = commands[i];
+                futures[i] = new FutureResult();
+                Runnable r = futures[i].setter(new Callable() {
+                    public Object call() throws Exception {
+                        return c.call();
+                    }
+                });
+                try {
+                    executor.execute(r);
+                } catch (InterruptedException e) {
+                    // run with current thread instead
+                    r.run();
+                }
+            }
+            // wait for all results
+            boolean interrupted = false;
+            for (int i = 0; i < futures.length; i++) {
+                Object obj = null;
+                InvocationTargetException ex = null;
+                for (;;) {
+                    try {
+                        obj = futures[i].get();
+                    } catch (InterruptedException e) {
+                        interrupted = true;
+                        // reset interrupted status and try again
+                        Thread.interrupted();
+                        continue;
+                    } catch (InvocationTargetException e) {
+                        ex = e;
+                    }
+                    results[i] = new Result(obj, ex);
+                    break;
+                }
+            }
+            if (interrupted) {
+                // restore interrupt status again
+                Thread.currentThread().interrupt();
+            }
+        }
+        return results;
+    }
+
+    /**
+     * Adjusts the pool size at most once every second.
+     */
+    private void adjustPoolSize() {
+        if (lastCheck + 1000 < System.currentTimeMillis()) {
+            int n = Runtime.getRuntime().availableProcessors();
+            if (numProcessors != n) {
+                executor.setMaximumPoolSize(n);
+                numProcessors = n;
+            }
+            lastCheck = System.currentTimeMillis();
+        }
+    }
+
+    public interface Command {
+
+        /**
+         * Perform some action that returns a result or throws an exception
+         */
+        Object call() throws Exception;
+    }
+
+    public static class Result {
+
+        /**
+         * The result object or <code>null</code> if an exception was thrown.
+         */
+        private final Object object;
+
+        /**
+         * The exception or <code>null</code> if no exception was thrown.
+         */
+        private final InvocationTargetException exception;
+
+        private Result(Object object, InvocationTargetException exception) {
+            this.object = object;
+            this.exception = exception;
+        }
+
+        /**
+         * @return the result object or <code>null</code> if an exception was
+         *         thrown.
+         */
+        public Object get() {
+            return object;
+        }
+
+        /**
+         * @return the exception or <code>null</code> if no exception was
+         *         thrown.
+         */
+        public InvocationTargetException getException() {
+            return exception;
+        }
+    }
+}

Propchange: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java?rev=596654&r1=596653&r2=596654&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
Tue Nov 20 05:18:27 2007
@@ -1523,7 +1523,7 @@
                 }
             }
             if (doc != null) {
-                index.volatileIndex.addDocument(doc);
+                index.volatileIndex.addDocuments(new Document[]{doc});
             }
         }
 

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/VolatileIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/VolatileIndex.java?rev=596654&r1=596653&r2=596654&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/VolatileIndex.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/VolatileIndex.java
Tue Nov 20 05:18:27 2007
@@ -64,22 +64,24 @@
     }
 
     /**
-     * Overwrites the default implementation by adding the document to a
+     * Overwrites the default implementation by adding the documents to a
      * pending list and commits the pending list if needed.
      *
-     * @param doc the document to add to the index.
+     * @param docs the documents to add to the index.
      * @throws IOException if an error occurs while writing to the index.
      */
-    void addDocument(Document doc) throws IOException {
-        Document old = (Document) pending.put(doc.get(FieldNames.UUID), doc);
-        if (old != null) {
-            Util.disposeDocument(old);
-        }
-        if (pending.size() >= bufferSize) {
-            commitPending();
+    void addDocuments(Document[] docs) throws IOException {
+        for (int i = 0; i < docs.length; i++) {
+            Document old = (Document) pending.put(docs[i].get(FieldNames.UUID), docs[i]);
+            if (old != null) {
+                Util.disposeDocument(old);
+            }
+            if (pending.size() >= bufferSize) {
+                commitPending();
+            }
+            numDocs++;
         }
         invalidateSharedReader();
-        numDocs++;
     }
 
     /**
@@ -152,10 +154,8 @@
      * Commits pending documents to the index.
      */
     private void commitPending() throws IOException {
-        for (Iterator it = pending.values().iterator(); it.hasNext();) {
-            Document doc = (Document) it.next();
-            super.addDocument(doc);
-            it.remove();
-        }
+        super.addDocuments((Document[]) pending.values().toArray(
+                new Document[pending.size()]));
+        pending.clear();
     }
 }



Mime
View raw message