lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r1593228 - in /lucene/dev/branches/branch_4x: ./ lucene/ lucene/core/ lucene/core/src/java/org/apache/lucene/index/ lucene/core/src/test/org/apache/lucene/index/ lucene/test-framework/ lucene/test-framework/src/java/org/apache/lucene/index/...
Date Thu, 08 May 2014 10:19:48 GMT
Author: mikemccand
Date: Thu May  8 10:19:48 2014
New Revision: 1593228

URL: http://svn.apache.org/r1593228
Log:
LUCENE-5644: switch to simpler LIFO thread to ThreadState allocator during indexing

Removed:
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
    lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/index/RandomDocumentsWriterPerThreadPool.java
Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/lucene/   (props changed)
    lucene/dev/branches/branch_4x/lucene/core/   (props changed)
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
    lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
    lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
    lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestStressIndexing2.java
    lucene/dev/branches/branch_4x/lucene/test-framework/   (props changed)
    lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1593228&r1=1593227&r2=1593228&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
Thu May  8 10:19:48 2014
@@ -432,7 +432,7 @@ final class DocumentsWriter implements C
       final boolean isUpdate = delTerm != null;
       flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
     } finally {
-      perThread.unlock();
+      perThreadPool.release(perThread);
     }
 
     return postUpdate(flushingDWPT, hasEvents);
@@ -470,7 +470,7 @@ final class DocumentsWriter implements C
       final boolean isUpdate = delTerm != null;
       flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
     } finally {
-      perThread.unlock();
+      perThreadPool.release(perThread);
     }
 
     return postUpdate(flushingDWPT, hasEvents);

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1593228&r1=1593227&r2=1593228&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
Thu May  8 10:19:48 2014
@@ -458,7 +458,7 @@ final class DocumentsWriterFlushControl 
       return perThread;
     } finally {
       if (!success) { // make sure we unlock if this fails
-        perThread.unlock();
+        perThreadPool.release(perThread);
       }
     }
   }

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1593228&r1=1593227&r2=1593228&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
Thu May  8 10:19:48 2014
@@ -18,6 +18,8 @@ package org.apache.lucene.index;
 
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.lucene.util.ThreadInterruptedException;
+
 /**
  * {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances
  * and their thread assignments during indexing. Each {@link ThreadState} holds
@@ -33,7 +35,7 @@ import java.util.concurrent.locks.Reentr
  * new {@link DocumentsWriterPerThread} instance.
  * </p>
  */
-abstract class DocumentsWriterPerThreadPool implements Cloneable {
+final class DocumentsWriterPerThreadPool implements Cloneable {
   
   /**
    * {@link ThreadState} references and guards a
@@ -125,9 +127,12 @@ abstract class DocumentsWriterPerThreadP
     }
   }
 
-  private ThreadState[] threadStates;
+  private final ThreadState[] threadStates;
   private volatile int numThreadStatesActive;
 
+  private final ThreadState[] freeList;
+  private int freeCount;
+
   /**
    * Creates a new {@link DocumentsWriterPerThreadPool} with a given maximum of {@link ThreadState}s.
    */
@@ -140,6 +145,7 @@ abstract class DocumentsWriterPerThreadP
     for (int i = 0; i < threadStates.length; i++) {
       threadStates[i] = new ThreadState(null);
     }
+    freeList = new ThreadState[maxNumThreadStates];
   }
 
   @Override
@@ -148,19 +154,8 @@ abstract class DocumentsWriterPerThreadP
     if (numThreadStatesActive != 0) {
       throw new IllegalStateException("clone this object before it is used!");
     }
-    
-    DocumentsWriterPerThreadPool clone;
-    try {
-      clone = (DocumentsWriterPerThreadPool) super.clone();
-    } catch (CloneNotSupportedException e) {
-      // should not happen
-      throw new RuntimeException(e);
-    }
-    clone.threadStates = new ThreadState[threadStates.length];
-    for (int i = 0; i < threadStates.length; i++) {
-      clone.threadStates[i] = new ThreadState(null);
-    }
-    return clone;
+
+    return new DocumentsWriterPerThreadPool(threadStates.length);
   }
   
   /**
@@ -189,30 +184,29 @@ abstract class DocumentsWriterPerThreadP
    * @return a new {@link ThreadState} iff any new state is available otherwise
    *         <code>null</code>
    */
-  synchronized ThreadState newThreadState() {
-    if (numThreadStatesActive < threadStates.length) {
-      final ThreadState threadState = threadStates[numThreadStatesActive];
-      threadState.lock(); // lock so nobody else will get this ThreadState
-      boolean unlock = true;
-      try {
-        if (threadState.isActive()) {
-          // unreleased thread states are deactivated during DW#close()
-          numThreadStatesActive++; // increment will publish the ThreadState
-          assert threadState.dwpt == null;
-          unlock = false;
-          return threadState;
-        }
-        // unlock since the threadstate is not active anymore - we are closed!
-        assert assertUnreleasedThreadStatesInactive();
-        return null;
-      } finally {
-        if (unlock) {
-          // in any case make sure we unlock if we fail 
-          threadState.unlock();
-        }
+  private ThreadState newThreadState() {
+    assert numThreadStatesActive < threadStates.length;
+    final ThreadState threadState = threadStates[numThreadStatesActive];
+    threadState.lock(); // lock so nobody else will get this ThreadState
+    boolean unlock = true;
+    try {
+      if (threadState.isActive()) {
+        // unreleased thread states are deactivated during DW#close()
+        numThreadStatesActive++; // increment will publish the ThreadState
+        //System.out.println("activeCount=" + numThreadStatesActive);
+        assert threadState.dwpt == null;
+        unlock = false;
+        return threadState;
+      }
+      // we are closed: unlock since the threadstate is not active anymore
+      assert assertUnreleasedThreadStatesInactive();
+      return null;
+    } finally {
+      if (unlock) {
+        // in any case make sure we unlock if we fail 
+        threadState.unlock();
       }
     }
-    return null;
   }
   
   private synchronized boolean assertUnreleasedThreadStatesInactive() {
@@ -240,6 +234,9 @@ abstract class DocumentsWriterPerThreadP
         threadState.unlock();
       }
     }
+    
+    // In case any threads are waiting for indexing:
+    notifyAll();
   }
   
   DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) {
@@ -256,11 +253,48 @@ abstract class DocumentsWriterPerThreadP
   void recycle(DocumentsWriterPerThread dwpt) {
     // don't recycle DWPT by default
   }
-  
-  // you cannot subclass this without being in o.a.l.index package anyway, so
-  // the class is already pkg-private... fix me: see LUCENE-4013
-  abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter);
 
+  /** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an
indexing operation (add/updateDocument). */
+  ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
+    ThreadState threadState = null;
+    synchronized (this) {
+      while (true) {
+        if (freeCount > 0) {
+          // Important that we are LIFO here! This way if number of concurrent indexing threads
was once high, but has now reduced, we only use a
+          // limited number of thread states:
+          threadState = freeList[freeCount-1];
+          freeCount--;
+          break;
+        } else if (numThreadStatesActive < threadStates.length) {
+          // ThreadState is already locked before return by this method:
+          return newThreadState();
+        } else {
+          // Wait until a thread state frees up:
+          try {
+            wait();
+          } catch (InterruptedException ie) {
+            throw new ThreadInterruptedException(ie);
+          }
+        }
+      }
+    }
+
+    // This could take time, e.g. if the threadState is [briefly] checked for flushing:
+    threadState.lock();
+
+    return threadState;
+  }
+
+  void release(ThreadState state) {
+    state.unlock();
+    synchronized (this) {
+      assert freeCount < freeList.length;
+      freeList[freeCount++] = state;
+      // In case any thread is waiting, wake one of them up since we just released a thread
state; notify() should be sufficient but we do
+      // notifyAll defensively:
+      notifyAll();
+    }
+  }
   
   /**
    * Returns the <i>i</i>th active {@link ThreadState} where <i>i</i>
is the

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java?rev=1593228&r1=1593227&r2=1593228&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java
Thu May  8 10:19:48 2014
@@ -352,11 +352,7 @@ public final class IndexWriterConfig ext
   }
 
   /** Expert: Sets the {@link DocumentsWriterPerThreadPool} instance used by the
-   * IndexWriter to assign thread-states to incoming indexing threads. If no
-   * {@link DocumentsWriterPerThreadPool} is set {@link IndexWriter} will use
-   * {@link ThreadAffinityDocumentsWriterThreadPool} with max number of
-   * thread-states set to {@link #DEFAULT_MAX_THREAD_STATES} (see
-   * {@link #DEFAULT_MAX_THREAD_STATES}).
+   * IndexWriter to assign thread-states to incoming indexing threads.
    * </p>
    * <p>
    * NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with
@@ -386,17 +382,13 @@ public final class IndexWriterConfig ext
    *
    * <p>Only takes effect when IndexWriter is first created. */
   public IndexWriterConfig setMaxThreadStates(int maxThreadStates) {
-    this.indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(maxThreadStates);
+    this.indexerThreadPool = new DocumentsWriterPerThreadPool(maxThreadStates);
     return this;
   }
 
   @Override
   public int getMaxThreadStates() {
-    try {
-      return ((ThreadAffinityDocumentsWriterThreadPool) indexerThreadPool).getMaxThreadStates();
-    } catch (ClassCastException cce) {
-      throw new IllegalStateException(cce);
-    }
+    return indexerThreadPool.getMaxThreadStates();
   }
 
   /** By default, IndexWriter does not pool the

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java?rev=1593228&r1=1593227&r2=1593228&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java
Thu May  8 10:19:48 2014
@@ -130,7 +130,7 @@ public class LiveIndexWriterConfig {
     mergePolicy = new TieredMergePolicy();
     flushPolicy = new FlushByRamOrCountsPolicy();
     readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
-    indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);
+    indexerThreadPool = new DocumentsWriterPerThreadPool(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES);
     perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
   }
   
@@ -505,11 +505,7 @@ public class LiveIndexWriterConfig {
    * documents at once in IndexWriter.
    */
   public int getMaxThreadStates() {
-    try {
-      return ((ThreadAffinityDocumentsWriterThreadPool) indexerThreadPool).getMaxThreadStates();
-    } catch (ClassCastException cce) {
-      throw new IllegalStateException(cce);
-    }
+    return indexerThreadPool.getMaxThreadStates();
   }
 
   /**

Modified: lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java?rev=1593228&r1=1593227&r2=1593228&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
Thu May  8 10:19:48 2014
@@ -71,7 +71,7 @@ public class TestFlushByRamOrCountsPolic
     IndexWriterConfig iwc = newIndexWriterConfig(TEST_VERSION_CURRENT,
         analyzer).setFlushPolicy(flushPolicy);
     final int numDWPT = 1 + atLeast(2);
-    DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+    DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(
         numDWPT);
     iwc.setIndexerThreadPool(threadPool);
     iwc.setRAMBufferSizeMB(maxRamMB);
@@ -128,7 +128,7 @@ public class TestFlushByRamOrCountsPolic
           new MockAnalyzer(random())).setFlushPolicy(flushPolicy);
 
       final int numDWPT = 1 + atLeast(2);
-      DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+      DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(
           numDWPT);
       iwc.setIndexerThreadPool(threadPool);
       iwc.setMaxBufferedDocs(2 + atLeast(10));
@@ -179,7 +179,7 @@ public class TestFlushByRamOrCountsPolic
     iwc.setFlushPolicy(flushPolicy);
 
     final int numDWPT = 1 + random().nextInt(8);
-    DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+    DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(
         numDWPT);
     iwc.setIndexerThreadPool(threadPool);
 
@@ -245,7 +245,7 @@ public class TestFlushByRamOrCountsPolic
       FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy();
       iwc.setFlushPolicy(flushPolicy);
       
-      DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
+      DocumentsWriterPerThreadPool threadPool = new DocumentsWriterPerThreadPool(
           numThreads[i]== 1 ? 1 : 2);
       iwc.setIndexerThreadPool(threadPool);
       // with such a small ram buffer we should be stalled quiet quickly

Modified: lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java?rev=1593228&r1=1593227&r2=1593228&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
Thu May  8 10:19:48 2014
@@ -77,7 +77,7 @@ public class TestIndexWriterConfig exten
     assertNull(conf.getMergedSegmentWarmer());
     assertEquals(IndexWriterConfig.DEFAULT_READER_TERMS_INDEX_DIVISOR, conf.getReaderTermsIndexDivisor());
     assertEquals(TieredMergePolicy.class, conf.getMergePolicy().getClass());
-    assertEquals(ThreadAffinityDocumentsWriterThreadPool.class, conf.getIndexerThreadPool().getClass());
+    assertEquals(DocumentsWriterPerThreadPool.class, conf.getIndexerThreadPool().getClass());
     assertEquals(FlushByRamOrCountsPolicy.class, conf.getFlushPolicy().getClass());
     assertEquals(IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB, conf.getRAMPerThreadHardLimitMB());
     assertEquals(Codec.getDefault(), conf.getCodec());

Modified: lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestStressIndexing2.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestStressIndexing2.java?rev=1593228&r1=1593227&r2=1593228&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestStressIndexing2.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestStressIndexing2.java
Thu May  8 10:19:48 2014
@@ -198,7 +198,7 @@ public class TestStressIndexing2 extends
     Map<String,Document> docs = new HashMap<>();
     IndexWriter w = RandomIndexWriter.mockIndexWriter(dir, newIndexWriterConfig(
         TEST_VERSION_CURRENT, new MockAnalyzer(random())).setOpenMode(OpenMode.CREATE)
-             .setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setIndexerThreadPool(new
ThreadAffinityDocumentsWriterThreadPool(maxThreadStates))
+             .setRAMBufferSizeMB(0.1).setMaxBufferedDocs(maxBufferedDocs).setIndexerThreadPool(new
DocumentsWriterPerThreadPool(maxThreadStates))
              .setReaderPooling(doReaderPooling).setMergePolicy(newLogMergePolicy()), new
YieldTestPoint());
     LogMergePolicy lmp = (LogMergePolicy) w.getConfig().getMergePolicy();
     lmp.setNoCFSRatio(0.0);

Modified: lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java?rev=1593228&r1=1593227&r2=1593228&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
(original)
+++ lucene/dev/branches/branch_4x/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
Thu May  8 10:19:48 2014
@@ -930,25 +930,7 @@ public abstract class LuceneTestCase ext
       int maxNumThreadStates = rarely(r) ? TestUtil.nextInt(r, 5, 20) // crazy value
           : TestUtil.nextInt(r, 1, 4); // reasonable value
 
-      try {
-        if (rarely(r)) {
-          // Retrieve the package-private setIndexerThreadPool
-          // method:
-          Method setIndexerThreadPoolMethod = IndexWriterConfig.class.getDeclaredMethod("setIndexerThreadPool",
-            Class.forName("org.apache.lucene.index.DocumentsWriterPerThreadPool"));
-          setIndexerThreadPoolMethod.setAccessible(true);
-          Class<?> clazz = Class.forName("org.apache.lucene.index.RandomDocumentsWriterPerThreadPool");
-          Constructor<?> ctor = clazz.getConstructor(int.class, Random.class);
-          ctor.setAccessible(true);
-          // random thread pool
-          setIndexerThreadPoolMethod.invoke(c, ctor.newInstance(maxNumThreadStates, r));
-        } else {
-          // random thread pool
-          c.setMaxThreadStates(maxNumThreadStates);
-        }
-      } catch (Exception e) {
-        Rethrow.rethrow(e);
-      }
+      c.setMaxThreadStates(maxNumThreadStates);
     }
 
     c.setMergePolicy(newMergePolicy(r));



Mime
View raw message