From commits-return-4848-apmail-jackrabbit-commits-archive=jackrabbit.apache.org@jackrabbit.apache.org Tue Nov 20 13:18:49 2007 Return-Path: Delivered-To: apmail-jackrabbit-commits-archive@www.apache.org Received: (qmail 72764 invoked from network); 20 Nov 2007 13:18:49 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Nov 2007 13:18:49 -0000 Received: (qmail 84836 invoked by uid 500); 20 Nov 2007 13:18:36 -0000 Delivered-To: apmail-jackrabbit-commits-archive@jackrabbit.apache.org Received: (qmail 84786 invoked by uid 500); 20 Nov 2007 13:18:36 -0000 Mailing-List: contact commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@jackrabbit.apache.org Delivered-To: mailing list commits@jackrabbit.apache.org Received: (qmail 84777 invoked by uid 99); 20 Nov 2007 13:18:36 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Nov 2007 05:18:36 -0800 X-ASF-Spam-Status: No, hits=-98.0 required=10.0 tests=ALL_TRUSTED,URIBL_BLACK X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Nov 2007 13:18:34 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 46C601A9838; Tue, 20 Nov 2007 05:18:28 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r596654 - in /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene: AbstractIndex.java DynamicPooledExecutor.java MultiIndex.java VolatileIndex.java Date: Tue, 20 Nov 2007 13:18:27 -0000 To: commits@jackrabbit.apache.org From: mreutegg@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071120131828.46C601A9838@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mreutegg Date: Tue Nov 20 05:18:27 2007 New Revision: 596654 URL: http://svn.apache.org/viewvc?rev=596654&view=rev Log: JCR-1222: Index nodes in parallel Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java (with props) Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/VolatileIndex.java Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java?rev=596654&r1=596653&r2=596654&view=diff ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java Tue Nov 20 05:18:27 2007 @@ -55,6 +55,9 @@ /** PrintStream that pipes all calls to println(String) into log.info() */ private static final LoggingPrintStream STREAM_LOGGER = new LoggingPrintStream(); + /** Executor with a pool size equal to the number of available processors */ + private static final DynamicPooledExecutor EXECUTOR = new DynamicPooledExecutor(); + /** The currently set IndexWriter or null if none is set */ private IndexWriter indexWriter; @@ -136,16 +139,53 @@ } /** - * Adds a document to this index and invalidates the shared reader. + * Adds documents to this index and invalidates the shared reader. * - * @param doc the document to add. + * @param docs the documents to add. * @throws IOException if an error occurs while writing to the index. */ - void addDocument(Document doc) throws IOException { - // check if text extractor completed its work - doc = getFinishedDocument(doc); - getIndexWriter().addDocument(doc); + void addDocuments(Document[] docs) throws IOException { + final IndexWriter writer = getIndexWriter(); + DynamicPooledExecutor.Command commands[] = + new DynamicPooledExecutor.Command[docs.length]; + for (int i = 0; i < docs.length; i++) { + // check if text extractor completed its work + final Document doc = getFinishedDocument(docs[i]); + // create a command for inverting the document + commands[i] = new DynamicPooledExecutor.Command() { + public Object call() throws Exception { + long time = System.currentTimeMillis(); + writer.addDocument(doc); + return new Long(System.currentTimeMillis() - time); + } + }; + } + DynamicPooledExecutor.Result results[] = EXECUTOR.executeAndWait(commands); invalidateSharedReader(); + IOException ex = null; + for (int i = 0; i < results.length; i++) { + if (results[i].getException() != null) { + Throwable cause = results[i].getException().getCause(); + if (ex == null) { + // only throw the first exception + if (cause instanceof IOException) { + ex = (IOException) cause; + } else { + IOException e = new IOException(); + e.initCause(cause); + ex = e; + } + } else { + // all others are logged + log.warn("Exception while inverting document", cause); + } + } else { + log.debug("Inverted document in {} ms", results[i].get()); + } + } + if (ex != null) { + throw ex; + } } /** Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java?rev=596654&view=auto ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java Tue Nov 20 05:18:27 2007 @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.query.lucene; + +import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; +import EDU.oswego.cs.dl.util.concurrent.FutureResult; +import EDU.oswego.cs.dl.util.concurrent.Callable; + +import java.lang.reflect.InvocationTargetException; + +/** + * DynamicPooledExecutor implements an executor, which dynamically + * adjusts its maximum number of threads according to the number of available + * processors returned by {@link Runtime#availableProcessors()}. + */ +public class DynamicPooledExecutor { + + /** + * The underlying pooled executor. + */ + private final PooledExecutor executor; + + /** + * Timestamp when the pool size was last checked. + */ + private volatile long lastCheck; + + /** + * The number of processors. + */ + private volatile int numProcessors; + + /** + * Creates a new DynamicPooledExecutor. + */ + public DynamicPooledExecutor() { + executor = new PooledExecutor(); + executor.waitWhenBlocked(); + adjustPoolSize(); + } + + /** + * Executes the given command. This method will block if all threads in the + * pool are busy and return only when the command has been accepted. Care + * must be taken, that no deadlock occurs when multiple commands are + * scheduled for execution. In general commands should not depend on the + * execution of other commands! + * + * @param command the command to execute. + */ + public void execute(Runnable command) { + adjustPoolSize(); + if (numProcessors == 1) { + // if there is only one processor execute with current thread + command.run(); + } else { + try { + executor.execute(command); + } catch (InterruptedException e) { + // run with current thread instead + command.run(); + } + } + } + + /** + * Executes a set of commands and waits until all commands have been + * executed. The results of the commands are returned in the same order as + * the commands. + * + * @param commands the commands to execute. + * @return the results. + */ + public Result[] executeAndWait(Command[] commands) { + Result[] results = new Result[commands.length]; + if (numProcessors == 1) { + // optimize for one processor + for (int i = 0; i < commands.length; i++) { + Object obj = null; + InvocationTargetException ex = null; + try { + obj = commands[i].call(); + } catch (Exception e) { + ex = new InvocationTargetException(e); + } + results[i] = new Result(obj, ex); + } + } else { + FutureResult[] futures = new FutureResult[commands.length]; + for (int i = 0; i < commands.length; i++) { + final Command c = commands[i]; + futures[i] = new FutureResult(); + Runnable r = futures[i].setter(new Callable() { + public Object call() throws Exception { + return c.call(); + } + }); + try { + executor.execute(r); + } catch (InterruptedException e) { + // run with current thread instead + r.run(); + } + } + // wait for all results + boolean interrupted = false; + for (int i = 0; i < futures.length; i++) { + Object obj = null; + InvocationTargetException ex = null; + for (;;) { + try { + obj = futures[i].get(); + } catch (InterruptedException e) { + interrupted = true; + // reset interrupted status and try again + Thread.interrupted(); + continue; + } catch (InvocationTargetException e) { + ex = e; + } + results[i] = new Result(obj, ex); + break; + } + } + if (interrupted) { + // restore interrupt status again + Thread.currentThread().interrupt(); + } + } + return results; + } + + /** + * Adjusts the pool size at most once every second. + */ + private void adjustPoolSize() { + if (lastCheck + 1000 < System.currentTimeMillis()) { + int n = Runtime.getRuntime().availableProcessors(); + if (numProcessors != n) { + executor.setMaximumPoolSize(n); + numProcessors = n; + } + lastCheck = System.currentTimeMillis(); + } + } + + public interface Command { + + /** + * Perform some action that returns a result or throws an exception + */ + Object call() throws Exception; + } + + public static class Result { + + /** + * The result object or null if an exception was thrown. + */ + private final Object object; + + /** + * The exception or null if no exception was thrown. + */ + private final InvocationTargetException exception; + + private Result(Object object, InvocationTargetException exception) { + this.object = object; + this.exception = exception; + } + + /** + * @return the result object or null if an exception was + * thrown. + */ + public Object get() { + return object; + } + + /** + * @return the exception or null if no exception was + * thrown. + */ + public InvocationTargetException getException() { + return exception; + } + } +} Propchange: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/DynamicPooledExecutor.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java?rev=596654&r1=596653&r2=596654&view=diff ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java Tue Nov 20 05:18:27 2007 @@ -1523,7 +1523,7 @@ } } if (doc != null) { - index.volatileIndex.addDocument(doc); + index.volatileIndex.addDocuments(new Document[]{doc}); } } Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/VolatileIndex.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/VolatileIndex.java?rev=596654&r1=596653&r2=596654&view=diff ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/VolatileIndex.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/VolatileIndex.java Tue Nov 20 05:18:27 2007 @@ -64,22 +64,24 @@ } /** - * Overwrites the default implementation by adding the document to a + * Overwrites the default implementation by adding the documents to a * pending list and commits the pending list if needed. * - * @param doc the document to add to the index. + * @param docs the documents to add to the index. * @throws IOException if an error occurs while writing to the index. */ - void addDocument(Document doc) throws IOException { - Document old = (Document) pending.put(doc.get(FieldNames.UUID), doc); - if (old != null) { - Util.disposeDocument(old); - } - if (pending.size() >= bufferSize) { - commitPending(); + void addDocuments(Document[] docs) throws IOException { + for (int i = 0; i < docs.length; i++) { + Document old = (Document) pending.put(docs[i].get(FieldNames.UUID), docs[i]); + if (old != null) { + Util.disposeDocument(old); + } + if (pending.size() >= bufferSize) { + commitPending(); + } + numDocs++; } invalidateSharedReader(); - numDocs++; } /** @@ -152,10 +154,8 @@ * Commits pending documents to the index. */ private void commitPending() throws IOException { - for (Iterator it = pending.values().iterator(); it.hasNext();) { - Document doc = (Document) it.next(); - super.addDocument(doc); - it.remove(); - } + super.addDocuments((Document[]) pending.values().toArray( + new Document[pending.size()])); + pending.clear(); } }