Return-Path: X-Original-To: apmail-lucene-commits-archive@www.apache.org Delivered-To: apmail-lucene-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 96A9A1AF5 for ; Tue, 26 Apr 2011 11:41:38 +0000 (UTC) Received: (qmail 63707 invoked by uid 500); 26 Apr 2011 11:41:37 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 63700 invoked by uid 99); 26 Apr 2011 11:41:37 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Apr 2011 11:41:37 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Apr 2011 11:41:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5BC1E2388900; Tue, 26 Apr 2011 11:41:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1096731 - in /lucene/dev/branches/realtime_search/lucene/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/ Date: Tue, 26 Apr 2011 11:41:10 -0000 To: commits@lucene.apache.org From: simonw@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110426114110.5BC1E2388900@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: simonw Date: Tue Apr 26 11:41:09 2011 New Revision: 1096731 URL: http://svn.apache.org/viewvc?rev=1096731&view=rev Log: LUCENE-3023: some polishing & removed all nocommit Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1096731&r1=1096730&r2=1096731&view=diff ============================================================================== --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original) +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue Apr 26 11:41:09 2011 @@ -148,38 +148,26 @@ final class DocumentsWriter { flushControl = new DocumentsWriterFlushControl(this, healthiness, maxRamPerDWPT); } - synchronized boolean deleteQueries(final Query... queries) throws IOException { + synchronized void deleteQueries(final Query... queries) throws IOException { deleteQueue.addDelete(queries); - // nocommit -- shouldn't we check for doApplyAllDeletes - // here too? - // nocommit shouldn't this consult flush policy? or - // should this return void now? - return false; - } - - boolean deleteQuery(final Query query) throws IOException { - return deleteQueries(query); - } - - synchronized boolean deleteTerms(final Term... terms) throws IOException { - final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; - deleteQueue.addDelete(terms); flushControl.doOnDelete(); if (flushControl.doApplyAllDeletes()) { applyAllDeletes(deleteQueue); } - // nocommit shouldn't this consult flush policy? or - // should this return void now? - return false; } // TODO: we could check w/ FreqProxTermsWriter: if the // term doesn't exist, don't bother buffering into the // per-DWPT map (but still must go into the global map) - boolean deleteTerm(final Term term) throws IOException { - return deleteTerms(term); + synchronized void deleteTerms(final Term... terms) throws IOException { + final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; + deleteQueue.addDelete(terms); + flushControl.doOnDelete(); + if (flushControl.doApplyAllDeletes()) { + applyAllDeletes(deleteQueue); + } } - + DocumentsWriterDeleteQueue currentDeleteSession() { return deleteQueue; } @@ -189,7 +177,7 @@ final class DocumentsWriter { synchronized (ticketQueue) { // Freeze and insert the delete flush ticket in the queue ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false)); - applyFlushTickets(null, null); + applyFlushTickets(); } } indexWriter.applyAllDeletes(); @@ -380,52 +368,48 @@ final class DocumentsWriter { * otherwise the deletes frozen by 'B' are not applied to 'A' and we * might miss to deletes documents in 'A'. */ - synchronized (ticketQueue) { - // Each flush is assigned a ticket in the order they accquire the ticketQueue lock - ticket = new FlushTicket(flushingDWPT.prepareFlush(), true); - ticketQueue.add(ticket); + try { + synchronized (ticketQueue) { + // Each flush is assigned a ticket in the order they accquire the ticketQueue lock + ticket = new FlushTicket(flushingDWPT.prepareFlush(), true); + ticketQueue.add(ticket); + } + + // flush concurrently without locking + final FlushedSegment newSegment = flushingDWPT.flush(); + synchronized (ticketQueue) { + ticket.segment = newSegment; + } + // flush was successful once we reached this point - new seg. has been assigned to the ticket! + success = true; + } finally { + if (!success && ticket != null) { + synchronized (ticketQueue) { + // In the case of a failure make sure we are making progress and + // apply all the deletes since the segment flush failed since the flush + // ticket could hold global deletes see FlushTicket#canPublish() + ticket.isSegmentFlush = false; + } + } } - - // flush concurrently without locking - final FlushedSegment newSegment = flushingDWPT.flush(); - - // nocommit -- should this success = true be moved - // under the applyFlushTickets? - success = true; - /* * Now we are done and try to flush the ticket queue if the head of the * queue has already finished the flush. */ - applyFlushTickets(ticket, newSegment); + applyFlushTickets(); } finally { flushControl.doAfterFlush(flushingDWPT); flushingDWPT.checkAndResetHasAborted(); indexWriter.flushCount.incrementAndGet(); - if (!success && ticket != null) { - synchronized (ticketQueue) { - // nocommit -- shouldn't we drop the ticket in - // this case? - // In the case of a failure make sure we are making progress and - // apply all the deletes since the segment flush failed - ticket.isSegmentFlush = false; - } - } } + flushingDWPT = flushControl.nextPendingFlush(); } return maybeMerge; } - private void applyFlushTickets(FlushTicket current, FlushedSegment segment) throws IOException { + private void applyFlushTickets() throws IOException { synchronized (ticketQueue) { - if (current != null) { - // nocommit -- can't caller set current.segment = segment? - // nocommit -- confused by this comment: - // This is a segment FlushTicket so assign the flushed segment so we can make progress. - assert segment != null; - current.segment = segment; - } while (true) { // Keep publishing eligible flushed segments: final FlushTicket head = ticketQueue.peek(); @@ -508,9 +492,7 @@ final class DocumentsWriter { /* Cutover to a new delete queue. This must be synced on the flush control * otherwise a new DWPT could sneak into the loop with an already flushing * delete queue */ - // nocommit -- shouldn't we do this?: - // assert Thread.holdsLock(flushControl); - flushControl.markForFullFlush(); + flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl assert setFlushingDeleteQueue(flushingDeleteQueue); } assert currentFullFlushDelQueue != null; @@ -531,7 +513,7 @@ final class DocumentsWriter { synchronized (ticketQueue) { ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false)); } - applyFlushTickets(null, null); + applyFlushTickets(); } } finally { assert flushingDeleteQueue == currentFullFlushDelQueue; @@ -549,11 +531,9 @@ final class DocumentsWriter { } } - // nocommit -- can we add comment justifying that these - // fields are safely changed across threads because they - // are always accessed in sync(ticketQueue)? static final class FlushTicket { final FrozenBufferedDeletes frozenDeletes; + /* access to non-final members must be synchronized on DW#ticketQueue */ FlushedSegment segment; boolean isSegmentFlush; Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java?rev=1096731&r1=1096730&r2=1096731&view=diff ============================================================================== --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (original) +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java Tue Apr 26 11:41:09 2011 @@ -375,4 +375,8 @@ final class DocumentsWriterDeleteQueue { globalBufferLock.unlock(); } } + + public long bytesUsed() { + return globalBufferedDeletes.bytesUsed.get(); + } } Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1096731&r1=1096730&r2=1096731&view=diff ============================================================================== --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original) +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Tue Apr 26 11:41:09 2011 @@ -279,10 +279,6 @@ public final class DocumentsWriterFlushC return perThreadPool.getActivePerThreadsIterator(); } - long maxNetBytes() { - return flushPolicy.getMaxNetBytes(); - } - synchronized void doOnDelete() { // pass null this is a global delete no update flushPolicy.onDelete(this, null); Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1096731&r1=1096730&r2=1096731&view=diff ============================================================================== --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (original) +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Tue Apr 26 11:41:09 2011 @@ -23,7 +23,23 @@ import org.apache.lucene.document.Docume import org.apache.lucene.index.FieldInfos.FieldNumberBiMap; import org.apache.lucene.index.SegmentCodecs.SegmentCodecsBuilder; import org.apache.lucene.index.codecs.CodecProvider; +import org.apache.lucene.util.SetOnce; +/** + * {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances + * and their thread assignments during indexing. Each {@link ThreadState} holds + * a reference to a {@link DocumentsWriterPerThread} that is once a + * {@link ThreadState} is obtained from the pool exclusively used for indexing a + * single document by the obtaining thread. Each indexing thread must obtain + * such a {@link ThreadState} to make progress. Depending on the + * {@link DocumentsWriterPerThreadPool} implementation {@link ThreadState} + * assignments might differ from document to document. + *

+ * Once a {@link DocumentsWriterPerThread} is selected for flush the thread pool + * is reusing the flushing {@link DocumentsWriterPerThread}s ThreadState with a + * new {@link DocumentsWriterPerThread} instance. + *

+ */ public abstract class DocumentsWriterPerThreadPool { /** @@ -39,7 +55,7 @@ public abstract class DocumentsWriterPer */ @SuppressWarnings("serial") public final static class ThreadState extends ReentrantLock { - // public for FlushPolicy + // package private for FlushPolicy DocumentsWriterPerThread perThread; // write access guarded by DocumentsWriterFlushControl volatile boolean flushPending = false; @@ -111,6 +127,7 @@ public abstract class DocumentsWriterPer private volatile int numThreadStatesActive; private CodecProvider codecProvider; private FieldNumberBiMap globalFieldMap; + private final SetOnce documentsWriter = new SetOnce(); public DocumentsWriterPerThreadPool(int maxNumPerThreads) { maxNumPerThreads = (maxNumPerThreads < 1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumPerThreads; @@ -120,23 +137,40 @@ public abstract class DocumentsWriterPer } public void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap, IndexWriterConfig config) { - codecProvider = config.getCodecProvider(); + this.documentsWriter.set(documentsWriter); // thread pool is bound to DW + final CodecProvider codecs = config.getCodecProvider(); + this.codecProvider = codecs; this.globalFieldMap = globalFieldMap; for (int i = 0; i < perThreads.length; i++) { - final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecProvider)); + final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecs)); perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain)); } } + /** + * Returns the max number of {@link ThreadState} instances available in this + * {@link DocumentsWriterPerThreadPool} + */ public int getMaxThreadStates() { return perThreads.length; } - public synchronized ThreadState newThreadState() { + /** + * Returns a new {@link ThreadState} iff any new state is available otherwise + * null. + * + * @param lock + * true iff the new {@link ThreadState} should be locked + * before published otherwise false. + * @return a new {@link ThreadState} iff any new state is available otherwise + * null + */ + public synchronized ThreadState newThreadState(boolean lock) { if (numThreadStatesActive < perThreads.length) { final ThreadState threadState = perThreads[numThreadStatesActive]; + threadState.lock(); threadState.perThread.initialize(); - numThreadStatesActive++; + numThreadStatesActive++; // increment will publish the ThreadState return threadState; } return null; @@ -164,7 +198,7 @@ public abstract class DocumentsWriterPer //public abstract void clearThreadBindings(ThreadState perThread); - // public abstract void clearAllThreadBindings(); + //public abstract void clearAllThreadBindings(); /** * Returns an iterator providing access to all {@link ThreadState} Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java?rev=1096731&r1=1096730&r2=1096731&view=diff ============================================================================== --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java (original) +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java Tue Apr 26 11:41:09 2011 @@ -20,18 +20,32 @@ package org.apache.lucene.index; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; /** - * Default {@link FlushPolicy} implementation that flushes based on RAM - * used, document count and number of buffered deletes depending on the - * IndexWriter's {@link IndexWriterConfig}. This {@link FlushPolicy} will only - * respect settings which are not disabled during initialization ( - * {@link #init(DocumentsWriter)}) (nocommit what does that mean?). All enabled {@link IndexWriterConfig} - * settings are used to mark {@link DocumentsWriterPerThread} as flush pending - * during indexing with respect to their live updates. + * Default {@link FlushPolicy} implementation that flushes based on RAM used, + * document count and number of buffered deletes depending on the IndexWriter's + * {@link IndexWriterConfig}. + * + *
    + *
  • {@link #onDelete(DocumentsWriterFlushControl, ThreadState)} - flushes + * based on the global number of buffered delete terms iff + * {@link IndexWriterConfig#getMaxBufferedDeleteTerms()} is enabled
  • + *
  • {@link #onInsert(DocumentsWriterFlushControl, ThreadState)} - flushes + * either on the number of documents per {@link DocumentsWriterPerThread} ( + * {@link DocumentsWriterPerThread#getNumDocsInRAM()}) or on the global active + * memory consumption in the current indexing session iff + * {@link IndexWriterConfig#getMaxBufferedDocs()} or + * {@link IndexWriterConfig#getRAMBufferSizeMB()} is enabled respectively
  • + *
  • {@link #onUpdate(DocumentsWriterFlushControl, ThreadState)} - calls + * {@link #onInsert(DocumentsWriterFlushControl, ThreadState)} and + * {@link #onDelete(DocumentsWriterFlushControl, ThreadState)} in order
  • + *
+ * All {@link IndexWriterConfig} settings are used to mark + * {@link DocumentsWriterPerThread} as flush pending during indexing with + * respect to their live updates. *

* If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled, the * largest ram consuming {@link DocumentsWriterPerThread} will be marked as - * pending iff the global active RAM consumption is >= the - * configured max RAM buffer. + * pending iff the global active RAM consumption is >= the configured max RAM + * buffer. */ public class FlushByRamOrCountsPolicy extends FlushPolicy { @@ -45,6 +59,18 @@ public class FlushByRamOrCountsPolicy ex control.setApplyAllDeletes(); } } + final DocumentsWriter writer = this.writer.get(); + // If deletes alone are consuming > 1/2 our RAM + // buffer, force them all to apply now. This is to + // prevent too-frequent flushing of a long tail of + // tiny segments: + if ((flushOnRAM() && + writer.deleteQueue.bytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()/2))) { + control.setApplyAllDeletes(); + if (writer.infoStream != null) { + writer.message("force apply deletes bytesUsed=" + writer.deleteQueue.bytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB())); + } + } } @Override @@ -54,14 +80,49 @@ public class FlushByRamOrCountsPolicy ex .getMaxBufferedDocs()) { // Flush this state by num docs control.setFlushPending(state); - } else {// flush by RAM - if (flushOnRAM()) { - final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d); - final long totalRam = control.activeBytes(); - if (totalRam >= limit) { - markLargestWriterPending(control, state, totalRam); - } + } else if (flushOnRAM()) {// flush by RAM + final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d); + final long totalRam = control.activeBytes(); + if (totalRam >= limit) { + markLargestWriterPending(control, state, totalRam); } } } + + /** + * Marks the most ram consuming active {@link DocumentsWriterPerThread} flush + * pending + */ + protected void markLargestWriterPending(DocumentsWriterFlushControl control, + ThreadState perThreadState, final long currentBytesPerThread) { + control + .setFlushPending(findLargestNonPendingWriter(control, perThreadState)); + } + + /** + * Returns true if this {@link FlushPolicy} flushes on + * {@link IndexWriterConfig#getMaxBufferedDocs()}, otherwise + * false. + */ + protected boolean flushOnDocCount() { + return indexWriterConfig.getMaxBufferedDocs() != IndexWriterConfig.DISABLE_AUTO_FLUSH; + } + + /** + * Returns true if this {@link FlushPolicy} flushes on + * {@link IndexWriterConfig#getMaxBufferedDeleteTerms()}, otherwise + * false. + */ + protected boolean flushOnDeleteTerms() { + return indexWriterConfig.getMaxBufferedDeleteTerms() != IndexWriterConfig.DISABLE_AUTO_FLUSH; + } + + /** + * Returns true if this {@link FlushPolicy} flushes on + * {@link IndexWriterConfig#getRAMBufferSizeMB()}, otherwise + * false. + */ + protected boolean flushOnRAM() { + return indexWriterConfig.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH; + } } Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java?rev=1096731&r1=1096730&r2=1096731&view=diff ============================================================================== --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java (original) +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java Tue Apr 26 11:41:09 2011 @@ -58,8 +58,7 @@ public abstract class FlushPolicy { * Called for each delete term. If this is a delete triggered due to an update * the given {@link ThreadState} is non-null. *

- * nocommit: what does this note mean...? - * Note: This method is synchronized by the given + * Note: This method is called synchronized on the given * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling * thread holds the lock on the given {@link ThreadState} */ @@ -70,8 +69,7 @@ public abstract class FlushPolicy { * Called for each document update on the given {@link ThreadState}'s * {@link DocumentsWriterPerThread}. *

- * nocommit: what does this note mean...? - * Note: This method is synchronized by the given + * Note: This method is called synchronized on the given * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling * thread holds the lock on the given {@link ThreadState} */ @@ -102,17 +100,6 @@ public abstract class FlushPolicy { } /** - * Marks the most ram consuming active {@link DocumentsWriterPerThread} flush - * pending - */ - // nocommit -- move to default policy? - protected void markLargestWriterPending(DocumentsWriterFlushControl control, - ThreadState perThreadState, final long currentBytesPerThread) { - control - .setFlushPending(findLargestNonPendingWriter(control, perThreadState)); - } - - /** * Returns the current most RAM consuming non-pending {@link ThreadState} with * at least one indexed document. *

@@ -141,63 +128,4 @@ public abstract class FlushPolicy { return maxRamUsingThreadState; } - // nocommit -- I thought we pause based on "too many flush - // states pending"? - /** - * Returns the max net memory which marks the upper watermark for the - * DocumentsWriter to be healthy. If all flushing and active - * {@link DocumentsWriterPerThread} consume more memory than the upper - * watermark all incoming threads should be stalled and blocked until the - * memory drops below this. - *

- * Note: the upper watermark is only taken into account if this - * {@link FlushPolicy} flushes by ram usage. - * - *

- * The default for the max net memory is set to 2 x - * {@link IndexWriterConfig#getRAMBufferSizeMB()} - * - */ - public long getMaxNetBytes() { - if (!flushOnRAM()) { - // nocommit explain that returning -1 is allowed? - return -1; - } - final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB(); - return (long) (ramBufferSizeMB * 1024.d * 1024.d * 2); - } - - /** - * Returns true if this {@link FlushPolicy} flushes on - * {@link IndexWriterConfig#getMaxBufferedDocs()}, otherwise - * false. - */ - // nocommit who needs this? policy shouldn't have to impl - // this? our default policy should? - protected boolean flushOnDocCount() { - return indexWriterConfig.getMaxBufferedDocs() != IndexWriterConfig.DISABLE_AUTO_FLUSH; - } - - /** - * Returns true if this {@link FlushPolicy} flushes on - * {@link IndexWriterConfig#getMaxBufferedDeleteTerms()}, otherwise - * false. - */ - // nocommit who needs this? policy shouldn't have to impl - // this? our default policy should? - protected boolean flushOnDeleteTerms() { - return indexWriterConfig.getMaxBufferedDeleteTerms() != IndexWriterConfig.DISABLE_AUTO_FLUSH; - } - - /** - * Returns true if this {@link FlushPolicy} flushes on - * {@link IndexWriterConfig#getRAMBufferSizeMB()}, otherwise - * false. - */ - // nocommit who needs this? policy shouldn't have to impl - // this? our default policy should? - protected boolean flushOnRAM() { - return indexWriterConfig.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH; - } - } Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1096731&r1=1096730&r2=1096731&view=diff ============================================================================== --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original) +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java Tue Apr 26 11:41:09 2011 @@ -1239,9 +1239,7 @@ public class IndexWriter implements Clos public void deleteDocuments(Term term) throws CorruptIndexException, IOException { ensureOpen(); try { - if (docWriter.deleteTerm(term)) { - flush(true, false); - } + docWriter.deleteTerms(term); } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Term)"); } @@ -1263,9 +1261,7 @@ public class IndexWriter implements Clos public void deleteDocuments(Term... terms) throws CorruptIndexException, IOException { ensureOpen(); try { - if (docWriter.deleteTerms(terms)) { - flush(true, false); - } + docWriter.deleteTerms(terms); } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Term..)"); } @@ -1285,9 +1281,7 @@ public class IndexWriter implements Clos public void deleteDocuments(Query query) throws CorruptIndexException, IOException { ensureOpen(); try { - if (docWriter.deleteQuery(query)) { - flush(true, false); - } + docWriter.deleteQueries(query); } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Query)"); } @@ -1309,9 +1303,7 @@ public class IndexWriter implements Clos public void deleteDocuments(Query... queries) throws CorruptIndexException, IOException { ensureOpen(); try { - if (docWriter.deleteQueries(queries)) { - flush(true, false); - } + docWriter.deleteQueries(queries); } catch (OutOfMemoryError oom) { handleOOM(oom, "deleteDocuments(Query..)"); } @@ -2646,22 +2638,6 @@ public class IndexWriter implements Clos } final synchronized void maybeApplyDeletes(boolean applyAllDeletes) throws IOException { - if (!applyAllDeletes) { - // nocommit -- shouldn't this move into the default - // flush policy? - // If deletes alone are consuming > 1/2 our RAM - // buffer, force them all to apply now. This is to - // prevent too-frequent flushing of a long tail of - // tiny segments: - if ((config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH && - bufferedDeletesStream.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2))) { - applyAllDeletes = true; - if (infoStream != null) { - message("force apply deletes bytesUsed=" + bufferedDeletesStream.bytesUsed() + " vs ramBuffer=" + (1024*1024*config.getRAMBufferSizeMB())); - } - } - } - if (applyAllDeletes) { if (infoStream != null) { message("apply all deletes during flush"); Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java?rev=1096731&r1=1096730&r2=1096731&view=diff ============================================================================== --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java (original) +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java Tue Apr 26 11:41:09 2011 @@ -20,8 +20,16 @@ import java.util.concurrent.ConcurrentHa import org.apache.lucene.document.Document; -// nocommit jdoc -// nocommit -- can/should apps set this via IWC +/** + * A {@link DocumentsWriterPerThreadPool} implementation that tries to assign an + * indexing thread to the same {@link ThreadState} each time the thread tries to + * obtain a {@link ThreadState}. Once a new {@link ThreadState} is created it is + * associated with the creating thread. Subsequently, if the threads associated + * {@link ThreadState} is not in use it will be associated with the requesting + * thread. Otherwise, if the {@link ThreadState} is used by another thread + * {@link ThreadAffinityDocumentsWriterThreadPool} tries to find the currently + * minimal contended {@link ThreadState}. + */ public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool { private Map threadBindings = new ConcurrentHashMap(); @@ -40,16 +48,17 @@ public class ThreadAffinityDocumentsWrit } ThreadState minThreadState = null; + + /* TODO -- another thread could lock the minThreadState we just got while + we should somehow prevent this. */ // Find the state that has minimum number of threads waiting - // noocommit -- can't another thread lock the - // minThreadState we just got? minThreadState = minContendedThreadState(); - if (minThreadState == null || minThreadState.hasQueuedThreads()) { - ThreadState newState = newThreadState(); + final ThreadState newState = newThreadState(true); if (newState != null) { - minThreadState = newState; + assert newState.isHeldByCurrentThread(); threadBindings.put(requestingThread, newState); + return newState; } else if (minThreadState == null) { /* * no new threadState available we just take the minContented one Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java?rev=1096731&r1=1096730&r2=1096731&view=diff ============================================================================== --- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java (original) +++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java Tue Apr 26 11:41:09 2011 @@ -167,28 +167,6 @@ public class TestFlushByRamOrCountsPolic } } - public void testFlushPolicySetup() throws IOException { - Directory dir = newDirectory(); - FlushByRamOrCountsPolicy flushPolicy = new FlushByRamOrCountsPolicy(); - IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT, - new MockAnalyzer(random)).setFlushPolicy(flushPolicy); - - final int numDWPT = 1 + random.nextInt(10); - DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool( - numDWPT); - iwc.setIndexerThreadPool(threadPool); - double maxMB = 1.0 + Math.ceil(random.nextDouble()); - iwc.setRAMBufferSizeMB(maxMB); - iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH); - - IndexWriter writer = new IndexWriter(dir, iwc); - assertEquals((long) (maxMB * 1024. * 1024. * 2.), - flushPolicy.getMaxNetBytes()); - - writer.close(); - dir.close(); - } - public void testRandom() throws IOException, InterruptedException { final int numThreads = 1 + random.nextInt(8); final int numDocumentsToIndex = 100 + random.nextInt(300);