lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mikemcc...@apache.org
Subject svn commit: r1592620 - in /lucene/dev/trunk/lucene: ./ core/src/java/org/apache/lucene/index/ core/src/test/org/apache/lucene/index/
Date Mon, 05 May 2014 19:37:12 GMT
Author: mikemccand
Date: Mon May  5 19:37:11 2014
New Revision: 1592620

URL: http://svn.apache.org/r1592620
Log:
LUCENE-5644: clear IW's thread state bindings on flush

Modified:
    lucene/dev/trunk/lucene/CHANGES.txt
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java

Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1592620&r1=1592619&r2=1592620&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Mon May  5 19:37:11 2014
@@ -165,6 +165,12 @@ Bug fixes
 * LUCENE-5641: SimpleRateLimiter would silently rate limit at 8 MB/sec
   even if you asked for higher rates.  (Mike McCandless)
 
+* LUCENE-5644: IndexWriter clears which threads use which internal
+  thread states on flush, so that if an application reduces how many
+  threads it uses for indexing, that results in a reduction of how
+  many segments are flushed on a full-flush (e.g. to obtain a
+  near-real-time reader).  (Simon Willnauer, Mike McCandless)
+
 Test Framework
 
 * LUCENE-5622: Fail tests if they print over the given limit of bytes to 

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1592620&r1=1592619&r2=1592620&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
(original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
Mon May  5 19:37:11 2014
@@ -18,9 +18,6 @@ package org.apache.lucene.index;
 
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.lucene.index.FieldInfos.FieldNumbers;
-import org.apache.lucene.util.SetOnce;
-
 /**
  * {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances
  * and their thread assignments during indexing. Each {@link ThreadState} holds
@@ -307,9 +304,9 @@ abstract class DocumentsWriterPerThreadP
       final ThreadState threadState = threadStates[i];
       threadState.lock();
       try {
-       if (!threadState.isActive) {
-         count++;
-       }
+        if (!threadState.isActive) {
+          count++;
+        }
       } finally {
         threadState.unlock();
       }

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java?rev=1592620&r1=1592619&r2=1592620&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
(original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
Mon May  5 19:37:11 2014
@@ -16,6 +16,7 @@ package org.apache.lucene.index;
  * limitations under the License.
  */
 import java.util.Map;
+import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; //javadoc
@@ -48,7 +49,6 @@ class ThreadAffinityDocumentsWriterThrea
       return threadState;
     }
     ThreadState minThreadState = null;
-
     
     /* TODO -- another thread could lock the minThreadState we just got while 
      we should somehow prevent this. */
@@ -68,14 +68,36 @@ class ThreadAffinityDocumentsWriterThrea
          */
         minThreadState = minContendedThreadState();
       }
+    } else {
+      threadBindings.put(requestingThread, minThreadState);
     }
+
     assert minThreadState != null: "ThreadState is null";
     
     minThreadState.lock();
+
+    if (minThreadState.isInitialized() == false) {
+      // Another thread just flushed this thread state and cleared our binding; put it back:
+      threadBindings.put(requestingThread, minThreadState); // make sure we get the same
state next time 
+    }
+
     return minThreadState;
   }
 
   @Override
+  DocumentsWriterPerThread reset(ThreadState threadState, boolean closed) {
+    // Remove all previous bindings to this ThreadState on flush:
+    Iterator<Map.Entry<Thread,ThreadState>> it = threadBindings.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<Thread,ThreadState> ent = it.next();
+      if (ent.getValue() == threadState) {
+        it.remove();
+      }
+    }
+    return super.reset(threadState, closed);
+  }
+
+  @Override
   public ThreadAffinityDocumentsWriterThreadPool clone() {
     ThreadAffinityDocumentsWriterThreadPool clone = (ThreadAffinityDocumentsWriterThreadPool)
super.clone();
     clone.threadBindings = new ConcurrentHashMap<>();

Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=1592620&r1=1592619&r2=1592620&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java Mon
May  5 19:37:11 2014
@@ -2788,4 +2788,66 @@ public class TestIndexWriter extends Luc
     r.close();
     dir.close();
   }
+
+  // LUCENE-5644
+  public void testSegmentCountOnFlush() throws Exception {
+    Directory dir = newDirectory();
+    final IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(TEST_VERSION_CURRENT,
new MockAnalyzer(random())));
+    final CountDownLatch startingGun = new CountDownLatch(1);
+    final CountDownLatch startDone = new CountDownLatch(2);
+    final CountDownLatch middleGun = new CountDownLatch(1);
+    final CountDownLatch finalGun = new CountDownLatch(1);
+    Thread[] threads = new Thread[2];
+    for(int i=0;i<threads.length;i++) {
+      final int threadID = i;
+      threads[i] = new Thread() {
+          @Override
+          public void run() {
+            try {
+              startingGun.await();
+              Document doc = new Document();
+              doc.add(newTextField("field", "here is some text", Field.Store.NO));
+              w.addDocument(doc);
+              startDone.countDown();
+
+              middleGun.await();
+              if (threadID == 0) {
+                w.addDocument(doc);
+              } else {
+                finalGun.await();
+                w.addDocument(doc);
+              }
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+      threads[i].start();
+    }
+
+    startingGun.countDown();
+    startDone.await();
+
+    IndexReader r = DirectoryReader.open(w, true);
+    assertEquals(2, r.numDocs());
+    int numSegments = r.leaves().size();
+    // 1 segment if the threads ran sequentially, else 2:
+    assertTrue(numSegments <= 2);
+    r.close();
+
+    middleGun.countDown();
+    threads[0].join();
+
+    finalGun.countDown();
+    threads[1].join();
+
+    r = DirectoryReader.open(w, true);
+    assertEquals(4, r.numDocs());
+    // Both threads should have shared a single thread state since they did not try to index
concurrently:
+    assertEquals(1+numSegments, r.leaves().size());
+    r.close();
+
+    w.close();
+    dir.close();
+  }
 }



Mime
View raw message