lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sim...@apache.org
Subject svn commit: r1096731 - in /lucene/dev/branches/realtime_search/lucene/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/
Date Tue, 26 Apr 2011 11:41:10 GMT
Author: simonw
Date: Tue Apr 26 11:41:09 2011
New Revision: 1096731

URL: http://svn.apache.org/viewvc?rev=1096731&view=rev
Log:
LUCENE-3023: some polishing & removed all nocommit

Modified:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1096731&r1=1096730&r2=1096731&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
(original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
Tue Apr 26 11:41:09 2011
@@ -148,38 +148,26 @@ final class DocumentsWriter {
     flushControl = new DocumentsWriterFlushControl(this, healthiness, maxRamPerDWPT);
   }
 
-  synchronized boolean deleteQueries(final Query... queries) throws IOException {
+  synchronized void deleteQueries(final Query... queries) throws IOException {
     deleteQueue.addDelete(queries);
-    // nocommit -- shouldn't we check for doApplyAllDeletes
-    // here too?
-    // nocommit shouldn't this consult flush policy?  or
-    // should this return void now?
-    return false;
-  }
-
-  boolean deleteQuery(final Query query) throws IOException {
-    return deleteQueries(query);
-  }
-
-  synchronized boolean deleteTerms(final Term... terms) throws IOException {
-    final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
-    deleteQueue.addDelete(terms);
     flushControl.doOnDelete();
     if (flushControl.doApplyAllDeletes()) {
       applyAllDeletes(deleteQueue);
     }
-    // nocommit shouldn't this consult flush policy?  or
-    // should this return void now?
-    return false;
   }
 
   // TODO: we could check w/ FreqProxTermsWriter: if the
   // term doesn't exist, don't bother buffering into the
   // per-DWPT map (but still must go into the global map)
-  boolean deleteTerm(final Term term) throws IOException {
-    return deleteTerms(term);
+  synchronized void deleteTerms(final Term... terms) throws IOException {
+    final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
+    deleteQueue.addDelete(terms);
+    flushControl.doOnDelete();
+    if (flushControl.doApplyAllDeletes()) {
+      applyAllDeletes(deleteQueue);
+    }
   }
-  
+
   DocumentsWriterDeleteQueue currentDeleteSession() {
     return deleteQueue;
   }
@@ -189,7 +177,7 @@ final class DocumentsWriter {
       synchronized (ticketQueue) {
         // Freeze and insert the delete flush ticket in the queue
         ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
-        applyFlushTickets(null, null);
+        applyFlushTickets();
       }
     }
     indexWriter.applyAllDeletes();
@@ -380,52 +368,48 @@ final class DocumentsWriter {
          * otherwise the deletes frozen by 'B' are not applied to 'A' and we
          * might miss to deletes documents in 'A'.
          */
-        synchronized (ticketQueue) {
-          // Each flush is assigned a ticket in the order they accquire the ticketQueue lock
-          ticket =  new FlushTicket(flushingDWPT.prepareFlush(), true);
-          ticketQueue.add(ticket);
+        try {
+          synchronized (ticketQueue) {
+            // Each flush is assigned a ticket in the order they accquire the ticketQueue
lock
+            ticket =  new FlushTicket(flushingDWPT.prepareFlush(), true);
+            ticketQueue.add(ticket);
+          }
+  
+          // flush concurrently without locking
+          final FlushedSegment newSegment = flushingDWPT.flush();
+          synchronized (ticketQueue) {
+            ticket.segment = newSegment;
+          }
+          // flush was successful once we reached this point - new seg. has been assigned
to the ticket!
+          success = true;
+        } finally {
+          if (!success && ticket != null) {
+            synchronized (ticketQueue) {
+              // In the case of a failure make sure we are making progress and
+              // apply all the deletes since the segment flush failed since the flush
+              // ticket could hold global deletes see FlushTicket#canPublish()
+              ticket.isSegmentFlush = false;
+            }
+          }
         }
-
-        // flush concurrently without locking
-        final FlushedSegment newSegment = flushingDWPT.flush();
-
-        // nocommit -- should this success = true be moved
-        // under the applyFlushTickets?
-        success = true;
-
         /*
          * Now we are done and try to flush the ticket queue if the head of the
          * queue has already finished the flush.
          */
-        applyFlushTickets(ticket, newSegment);
+        applyFlushTickets();
       } finally {
         flushControl.doAfterFlush(flushingDWPT);
         flushingDWPT.checkAndResetHasAborted();
         indexWriter.flushCount.incrementAndGet();
-        if (!success && ticket != null) {
-          synchronized (ticketQueue) {
-            // nocommit -- shouldn't we drop the ticket in
-            // this case?
-            // In the case of a failure make sure we are making progress and
-            // apply all the deletes since the segment flush failed
-            ticket.isSegmentFlush = false;
-          }
-        }
       }
+     
       flushingDWPT = flushControl.nextPendingFlush();
     }
     return maybeMerge;
   }
 
-  private void applyFlushTickets(FlushTicket current, FlushedSegment segment) throws IOException
{
+  private void applyFlushTickets() throws IOException {
     synchronized (ticketQueue) {
-      if (current != null) {
-        // nocommit -- can't caller set current.segment = segment?
-        // nocommit -- confused by this comment:
-        // This is a segment FlushTicket so assign the flushed segment so we can make progress.
-        assert segment != null;
-        current.segment = segment;
-      }
       while (true) {
         // Keep publishing eligible flushed segments:
         final FlushTicket head = ticketQueue.peek();
@@ -508,9 +492,7 @@ final class DocumentsWriter {
       /* Cutover to a new delete queue.  This must be synced on the flush control
        * otherwise a new DWPT could sneak into the loop with an already flushing
        * delete queue */
-      // nocommit -- shouldn't we do this?:
-      // assert Thread.holdsLock(flushControl);
-      flushControl.markForFullFlush();
+      flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl
       assert setFlushingDeleteQueue(flushingDeleteQueue);
     }
     assert currentFullFlushDelQueue != null;
@@ -531,7 +513,7 @@ final class DocumentsWriter {
         synchronized (ticketQueue) {
           ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
         }
-        applyFlushTickets(null, null);
+        applyFlushTickets();
       }
     } finally {
       assert flushingDeleteQueue == currentFullFlushDelQueue;
@@ -549,11 +531,9 @@ final class DocumentsWriter {
     }
   }
 
-  // nocommit -- can we add comment justifying that these
-  // fields are safely changed across threads because they
-  // are always accessed in sync(ticketQueue)?
   static final class FlushTicket {
     final FrozenBufferedDeletes frozenDeletes;
+    /* access to non-final members must be synchronized on DW#ticketQueue */
     FlushedSegment segment;
     boolean isSegmentFlush;
     

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java?rev=1096731&r1=1096730&r2=1096731&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
(original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
Tue Apr 26 11:41:09 2011
@@ -375,4 +375,8 @@ final class DocumentsWriterDeleteQueue {
       globalBufferLock.unlock();
     }
   }
+  
+  public long bytesUsed() {
+    return globalBufferedDeletes.bytesUsed.get();
+  }
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1096731&r1=1096730&r2=1096731&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
(original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
Tue Apr 26 11:41:09 2011
@@ -279,10 +279,6 @@ public final class DocumentsWriterFlushC
     return perThreadPool.getActivePerThreadsIterator();
   }
 
-  long maxNetBytes() {
-    return flushPolicy.getMaxNetBytes();
-  }
-
   synchronized void doOnDelete() {
     // pass null this is a global delete no update
     flushPolicy.onDelete(this, null);

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1096731&r1=1096730&r2=1096731&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
(original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
Tue Apr 26 11:41:09 2011
@@ -23,7 +23,23 @@ import org.apache.lucene.document.Docume
 import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
 import org.apache.lucene.index.SegmentCodecs.SegmentCodecsBuilder;
 import org.apache.lucene.index.codecs.CodecProvider;
+import org.apache.lucene.util.SetOnce;
 
+/**
+ * {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances
+ * and their thread assignments during indexing. Each {@link ThreadState} holds
+ * a reference to a {@link DocumentsWriterPerThread} that is once a
+ * {@link ThreadState} is obtained from the pool exclusively used for indexing a
+ * single document by the obtaining thread. Each indexing thread must obtain
+ * such a {@link ThreadState} to make progress. Depending on the
+ * {@link DocumentsWriterPerThreadPool} implementation {@link ThreadState}
+ * assignments might differ from document to document.
+ * <p>
+ * Once a {@link DocumentsWriterPerThread} is selected for flush the thread pool
+ * is reusing the flushing {@link DocumentsWriterPerThread}s ThreadState with a
+ * new {@link DocumentsWriterPerThread} instance.
+ * </p>
+ */
 public abstract class DocumentsWriterPerThreadPool {
   
   /**
@@ -39,7 +55,7 @@ public abstract class DocumentsWriterPer
    */
   @SuppressWarnings("serial")
   public final static class ThreadState extends ReentrantLock {
-    // public for FlushPolicy
+    // package private for FlushPolicy
     DocumentsWriterPerThread perThread;
     // write access guarded by DocumentsWriterFlushControl
     volatile boolean flushPending = false;
@@ -111,6 +127,7 @@ public abstract class DocumentsWriterPer
   private volatile int numThreadStatesActive;
   private CodecProvider codecProvider;
   private FieldNumberBiMap globalFieldMap;
+  private final SetOnce<DocumentsWriter> documentsWriter = new SetOnce<DocumentsWriter>();
 
   public DocumentsWriterPerThreadPool(int maxNumPerThreads) {
     maxNumPerThreads = (maxNumPerThreads < 1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES
: maxNumPerThreads;
@@ -120,23 +137,40 @@ public abstract class DocumentsWriterPer
   }
 
   public void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap,
IndexWriterConfig config) {
-    codecProvider = config.getCodecProvider();
+    this.documentsWriter.set(documentsWriter); // thread pool is bound to DW
+    final CodecProvider codecs = config.getCodecProvider();
+    this.codecProvider = codecs;
     this.globalFieldMap = globalFieldMap;
     for (int i = 0; i < perThreads.length; i++) {
-      final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecProvider));
+      final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecs));
       perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory,
documentsWriter, infos, documentsWriter.chain));
     }
   }
 
+  /**
+   * Returns the max number of {@link ThreadState} instances available in this
+   * {@link DocumentsWriterPerThreadPool}
+   */
   public int getMaxThreadStates() {
     return perThreads.length;
   }
 
-  public synchronized ThreadState newThreadState() {
+  /**
+   * Returns a new {@link ThreadState} iff any new state is available otherwise
+   * <code>null</code>.
+   * 
+   * @param lock
+   *          <code>true</code> iff the new {@link ThreadState} should be locked
+   *          before published otherwise <code>false</code>.
+   * @return a new {@link ThreadState} iff any new state is available otherwise
+   *         <code>null</code>
+   */
+  public synchronized ThreadState newThreadState(boolean lock) {
     if (numThreadStatesActive < perThreads.length) {
       final ThreadState threadState = perThreads[numThreadStatesActive];
+      threadState.lock();
       threadState.perThread.initialize();
-      numThreadStatesActive++;
+      numThreadStatesActive++; // increment will publish the ThreadState
       return threadState;
     }
     return null;
@@ -164,7 +198,7 @@ public abstract class DocumentsWriterPer
 
   //public abstract void clearThreadBindings(ThreadState perThread);
 
-  // public abstract void clearAllThreadBindings();
+  //public abstract void clearAllThreadBindings();
 
   /**
    * Returns an iterator providing access to all {@link ThreadState}

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java?rev=1096731&r1=1096730&r2=1096731&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
(original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
Tue Apr 26 11:41:09 2011
@@ -20,18 +20,32 @@ package org.apache.lucene.index;
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
 
 /**
- * Default {@link FlushPolicy} implementation that flushes based on RAM
- * used, document count and number of buffered deletes depending on the
- * IndexWriter's {@link IndexWriterConfig}. This {@link FlushPolicy} will only
- * respect settings which are not disabled during initialization (
- * {@link #init(DocumentsWriter)}) (nocommit what does that mean?). All enabled {@link IndexWriterConfig}
- * settings are used to mark {@link DocumentsWriterPerThread} as flush pending
- * during indexing with respect to their live updates.
+ * Default {@link FlushPolicy} implementation that flushes based on RAM used,
+ * document count and number of buffered deletes depending on the IndexWriter's
+ * {@link IndexWriterConfig}.
+ * 
+ * <ul>
+ * <li>{@link #onDelete(DocumentsWriterFlushControl, ThreadState)} - flushes
+ * based on the global number of buffered delete terms iff
+ * {@link IndexWriterConfig#getMaxBufferedDeleteTerms()} is enabled</li>
+ * <li>{@link #onInsert(DocumentsWriterFlushControl, ThreadState)} - flushes
+ * either on the number of documents per {@link DocumentsWriterPerThread} (
+ * {@link DocumentsWriterPerThread#getNumDocsInRAM()}) or on the global active
+ * memory consumption in the current indexing session iff
+ * {@link IndexWriterConfig#getMaxBufferedDocs()} or
+ * {@link IndexWriterConfig#getRAMBufferSizeMB()} is enabled respectively</li>
+ * <li>{@link #onUpdate(DocumentsWriterFlushControl, ThreadState)} - calls
+ * {@link #onInsert(DocumentsWriterFlushControl, ThreadState)} and
+ * {@link #onDelete(DocumentsWriterFlushControl, ThreadState)} in order</li>
+ * </ul>
+ * All {@link IndexWriterConfig} settings are used to mark
+ * {@link DocumentsWriterPerThread} as flush pending during indexing with
+ * respect to their live updates.
  * <p>
  * If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled, the
  * largest ram consuming {@link DocumentsWriterPerThread} will be marked as
- * pending iff the global active RAM consumption is >= the
- * configured max RAM buffer.
+ * pending iff the global active RAM consumption is >= the configured max RAM
+ * buffer.
  */
 public class FlushByRamOrCountsPolicy extends FlushPolicy {
 
@@ -45,6 +59,18 @@ public class FlushByRamOrCountsPolicy ex
         control.setApplyAllDeletes();
       }
     }
+    final DocumentsWriter writer = this.writer.get();
+    // If deletes alone are consuming > 1/2 our RAM
+    // buffer, force them all to apply now. This is to
+    // prevent too-frequent flushing of a long tail of
+    // tiny segments:
+    if ((flushOnRAM() &&
+        writer.deleteQueue.bytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()/2)))
{
+      control.setApplyAllDeletes();
+     if (writer.infoStream != null) {
+       writer.message("force apply deletes bytesUsed=" +  writer.deleteQueue.bytesUsed()
+ " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
+     }
+   }
   }
 
   @Override
@@ -54,14 +80,49 @@ public class FlushByRamOrCountsPolicy ex
             .getMaxBufferedDocs()) {
       // Flush this state by num docs
       control.setFlushPending(state);
-    } else {// flush by RAM
-      if (flushOnRAM()) {
-        final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
-        final long totalRam = control.activeBytes();
-        if (totalRam >= limit) {
-          markLargestWriterPending(control, state, totalRam);
-        }
+    } else if (flushOnRAM()) {// flush by RAM
+      final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
+      final long totalRam = control.activeBytes();
+      if (totalRam >= limit) {
+        markLargestWriterPending(control, state, totalRam);
       }
     }
   }
+  
+  /**
+   * Marks the most ram consuming active {@link DocumentsWriterPerThread} flush
+   * pending
+   */
+  protected void markLargestWriterPending(DocumentsWriterFlushControl control,
+      ThreadState perThreadState, final long currentBytesPerThread) {
+    control
+        .setFlushPending(findLargestNonPendingWriter(control, perThreadState));
+  }
+  
+  /**
+   * Returns <code>true</code> if this {@link FlushPolicy} flushes on
+   * {@link IndexWriterConfig#getMaxBufferedDocs()}, otherwise
+   * <code>false</code>.
+   */
+  protected boolean flushOnDocCount() {
+    return indexWriterConfig.getMaxBufferedDocs() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
+  }
+
+  /**
+   * Returns <code>true</code> if this {@link FlushPolicy} flushes on
+   * {@link IndexWriterConfig#getMaxBufferedDeleteTerms()}, otherwise
+   * <code>false</code>.
+   */
+  protected boolean flushOnDeleteTerms() {
+    return indexWriterConfig.getMaxBufferedDeleteTerms() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
+  }
+
+  /**
+   * Returns <code>true</code> if this {@link FlushPolicy} flushes on
+   * {@link IndexWriterConfig#getRAMBufferSizeMB()}, otherwise
+   * <code>false</code>.
+   */
+  protected boolean flushOnRAM() {
+    return indexWriterConfig.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
+  }
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java?rev=1096731&r1=1096730&r2=1096731&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java
(original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java
Tue Apr 26 11:41:09 2011
@@ -58,8 +58,7 @@ public abstract class FlushPolicy {
    * Called for each delete term. If this is a delete triggered due to an update
    * the given {@link ThreadState} is non-null.
    * <p>
-   * nocommit: what does this note mean...?
-   * Note: This method is synchronized by the given
+   * Note: This method is called synchronized on the given
    * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
    * thread holds the lock on the given {@link ThreadState}
    */
@@ -70,8 +69,7 @@ public abstract class FlushPolicy {
    * Called for each document update on the given {@link ThreadState}'s
    * {@link DocumentsWriterPerThread}.
    * <p>
-   * nocommit: what does this note mean...?
-   * Note: This method is synchronized by the given
+   * Note: This method is called  synchronized on the given
    * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
    * thread holds the lock on the given {@link ThreadState}
    */
@@ -102,17 +100,6 @@ public abstract class FlushPolicy {
   }
 
   /**
-   * Marks the most ram consuming active {@link DocumentsWriterPerThread} flush
-   * pending
-   */
-  // nocommit -- move to default policy?
-  protected void markLargestWriterPending(DocumentsWriterFlushControl control,
-      ThreadState perThreadState, final long currentBytesPerThread) {
-    control
-        .setFlushPending(findLargestNonPendingWriter(control, perThreadState));
-  }
-
-  /**
    * Returns the current most RAM consuming non-pending {@link ThreadState} with
    * at least one indexed document.
    * <p>
@@ -141,63 +128,4 @@ public abstract class FlushPolicy {
     return maxRamUsingThreadState;
   }
 
-  // nocommit -- I thought we pause based on "too many flush
-  // states pending"?
-  /**
-   * Returns the max net memory which marks the upper watermark for the
-   * DocumentsWriter to be healthy. If all flushing and active
-   * {@link DocumentsWriterPerThread} consume more memory than the upper
-   * watermark all incoming threads should be stalled and blocked until the
-   * memory drops below this.
-   * <p>
-   * Note: the upper watermark is only taken into account if this
-   * {@link FlushPolicy} flushes by ram usage.
-   * 
-   * <p>
-   * The default for the max net memory is set to 2 x
-   * {@link IndexWriterConfig#getRAMBufferSizeMB()}
-   * 
-   */
-  public long getMaxNetBytes() {
-    if (!flushOnRAM()) {
-      // nocommit explain that returning -1 is allowed?
-      return -1;
-    }
-    final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
-    return (long) (ramBufferSizeMB * 1024.d * 1024.d * 2);
-  }
-
-  /**
-   * Returns <code>true</code> if this {@link FlushPolicy} flushes on
-   * {@link IndexWriterConfig#getMaxBufferedDocs()}, otherwise
-   * <code>false</code>.
-   */
-  // nocommit who needs this?  policy shouldn't have to impl
-  // this?  our default policy should?
-  protected boolean flushOnDocCount() {
-    return indexWriterConfig.getMaxBufferedDocs() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
-  }
-
-  /**
-   * Returns <code>true</code> if this {@link FlushPolicy} flushes on
-   * {@link IndexWriterConfig#getMaxBufferedDeleteTerms()}, otherwise
-   * <code>false</code>.
-   */
-  // nocommit who needs this?  policy shouldn't have to impl
-  // this?  our default policy should?
-  protected boolean flushOnDeleteTerms() {
-    return indexWriterConfig.getMaxBufferedDeleteTerms() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
-  }
-
-  /**
-   * Returns <code>true</code> if this {@link FlushPolicy} flushes on
-   * {@link IndexWriterConfig#getRAMBufferSizeMB()}, otherwise
-   * <code>false</code>.
-   */
-  // nocommit who needs this?  policy shouldn't have to impl
-  // this?  our default policy should?
-  protected boolean flushOnRAM() {
-    return indexWriterConfig.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
-  }
-
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1096731&r1=1096730&r2=1096731&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
(original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
Tue Apr 26 11:41:09 2011
@@ -1239,9 +1239,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Term term) throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      if (docWriter.deleteTerm(term)) {
-        flush(true, false);
-      }
+      docWriter.deleteTerms(term);
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "deleteDocuments(Term)");
     }
@@ -1263,9 +1261,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Term... terms) throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      if (docWriter.deleteTerms(terms)) {
-        flush(true, false);
-      }
+      docWriter.deleteTerms(terms);
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "deleteDocuments(Term..)");
     }
@@ -1285,9 +1281,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Query query) throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      if (docWriter.deleteQuery(query)) {
-        flush(true, false);
-      }
+      docWriter.deleteQueries(query);
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "deleteDocuments(Query)");
     }
@@ -1309,9 +1303,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Query... queries) throws CorruptIndexException, IOException
{
     ensureOpen();
     try {
-      if (docWriter.deleteQueries(queries)) {
-        flush(true, false);
-      }
+      docWriter.deleteQueries(queries);
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "deleteDocuments(Query..)");
     }
@@ -2646,22 +2638,6 @@ public class IndexWriter implements Clos
   }
   
   final synchronized void maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
-    if (!applyAllDeletes) {
-      // nocommit -- shouldn't this move into the default
-      // flush policy?
-      // If deletes alone are consuming > 1/2 our RAM
-      // buffer, force them all to apply now. This is to
-      // prevent too-frequent flushing of a long tail of
-      // tiny segments:
-      if ((config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-           bufferedDeletesStream.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2)))
{
-        applyAllDeletes = true;
-        if (infoStream != null) {
-          message("force apply deletes bytesUsed=" + bufferedDeletesStream.bytesUsed() +
" vs ramBuffer=" + (1024*1024*config.getRAMBufferSizeMB()));
-        }
-      }
-    }
-
     if (applyAllDeletes) {
       if (infoStream != null) {
         message("apply all deletes during flush");

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java?rev=1096731&r1=1096730&r2=1096731&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
(original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
Tue Apr 26 11:41:09 2011
@@ -20,8 +20,16 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.lucene.document.Document;
 
-// nocommit jdoc
-// nocommit -- can/should apps set this via IWC
+/**
+ * A {@link DocumentsWriterPerThreadPool} implementation that tries to assign an
+ * indexing thread to the same {@link ThreadState} each time the thread tries to
+ * obtain a {@link ThreadState}. Once a new {@link ThreadState} is created it is
+ * associated with the creating thread. Subsequently, if the threads associated
+ * {@link ThreadState} is not in use it will be associated with the requesting
+ * thread. Otherwise, if the {@link ThreadState} is used by another thread
+ * {@link ThreadAffinityDocumentsWriterThreadPool} tries to find the currently
+ * minimal contended {@link ThreadState}.
+ */
 public class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool
{
   private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<Thread,
ThreadState>();
 
@@ -40,16 +48,17 @@ public class ThreadAffinityDocumentsWrit
     }
     ThreadState minThreadState = null;
 
+    
+    /* TODO -- another thread could lock the minThreadState we just got while 
+     we should somehow prevent this. */
     // Find the state that has minimum number of threads waiting
-    // noocommit -- can't another thread lock the
-    // minThreadState we just got?
     minThreadState = minContendedThreadState();
-
     if (minThreadState == null || minThreadState.hasQueuedThreads()) {
-      ThreadState newState = newThreadState();
+      final ThreadState newState = newThreadState(true);
       if (newState != null) {
-        minThreadState = newState;
+        assert newState.isHeldByCurrentThread();
         threadBindings.put(requestingThread, newState);
+        return newState;
       } else if (minThreadState == null) {
         /*
          * no new threadState available we just take the minContented one

Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java?rev=1096731&r1=1096730&r2=1096731&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
(original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
Tue Apr 26 11:41:09 2011
@@ -167,28 +167,6 @@ public class TestFlushByRamOrCountsPolic
     }
   }
 
-  public void testFlushPolicySetup() throws IOException {
-    Directory dir = newDirectory();
-    FlushByRamOrCountsPolicy flushPolicy = new FlushByRamOrCountsPolicy();
-    IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
-        new MockAnalyzer(random)).setFlushPolicy(flushPolicy);
-
-    final int numDWPT = 1 + random.nextInt(10);
-    DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
-        numDWPT);
-    iwc.setIndexerThreadPool(threadPool);
-    double maxMB = 1.0 + Math.ceil(random.nextDouble());
-    iwc.setRAMBufferSizeMB(maxMB);
-    iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
-
-    IndexWriter writer = new IndexWriter(dir, iwc);
-    assertEquals((long) (maxMB * 1024. * 1024. * 2.),
-        flushPolicy.getMaxNetBytes());
-
-    writer.close();
-    dir.close();
-  }
-
   public void testRandom() throws IOException, InterruptedException {
     final int numThreads = 1 + random.nextInt(8);
     final int numDocumentsToIndex = 100 + random.nextInt(300);



Mime
View raw message