Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0DD32200AF7 for ; Tue, 14 Jun 2016 10:09:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0C4E8160A47; Tue, 14 Jun 2016 08:09:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0291E1602C5 for ; Tue, 14 Jun 2016 10:09:49 +0200 (CEST) Received: (qmail 2941 invoked by uid 500); 14 Jun 2016 08:09:49 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 2932 invoked by uid 99); 14 Jun 2016 08:09:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 08:09:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EBB93E0200; Tue, 14 Jun 2016 08:09:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mikemccand@apache.org To: commits@lucene.apache.org Message-Id: <8128f979ad5447319071315df2610547@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: lucene-solr:master: LUCENE-7302: ensure IW.getMaxCompletedSequenceNumber only reflects a change after NRT reader refresh would also see it Date: Tue, 14 Jun 2016 08:09:48 +0000 (UTC) archived-at: Tue, 14 Jun 2016 08:09:51 -0000 Repository: lucene-solr Updated Branches: refs/heads/master 843adfb7b -> 5a0321680 LUCENE-7302: ensure IW.getMaxCompletedSequenceNumber only reflects a change after NRT reader refresh would also see it Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/5a032168 Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/5a032168 Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/5a032168 Branch: refs/heads/master Commit: 5a0321680fe5e57a17470b824024d5b56a4cbaa4 Parents: 843adfb Author: Mike McCandless Authored: Tue Jun 14 04:09:27 2016 -0400 Committer: Mike McCandless Committed: Tue Jun 14 04:09:27 2016 -0400 ---------------------------------------------------------------------- .../apache/lucene/index/DocumentsWriter.java | 40 +++++++++++++++----- .../index/DocumentsWriterPerThreadPool.java | 3 ++ .../org/apache/lucene/index/IndexWriter.java | 10 ++--- .../search/ControlledRealTimeReopenThread.java | 3 +- .../TestControlledRealTimeReopenThread.java | 20 +++++----- 5 files changed, 49 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5a032168/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java ---------------------------------------------------------------------- diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 13800a8..a33d640 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -122,7 +122,7 @@ final class DocumentsWriter implements Closeable, Accountable { final DocumentsWriterFlushControl flushControl; private final IndexWriter writer; private final Queue events; - + private long lastSeqNo; DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory) { this.directoryOrig = directoryOrig; @@ -144,6 +144,7 @@ final class DocumentsWriter implements Closeable, Accountable { if (applyAllDeletes(deleteQueue)) { seqNo = -seqNo; } + lastSeqNo = Math.max(lastSeqNo, seqNo); return seqNo; } @@ -158,6 +159,7 @@ final class DocumentsWriter implements Closeable, Accountable { if (applyAllDeletes(deleteQueue)) { seqNo = -seqNo; } + lastSeqNo = Math.max(lastSeqNo, seqNo); return seqNo; } @@ -168,7 +170,7 @@ final class DocumentsWriter implements Closeable, Accountable { if (applyAllDeletes(deleteQueue)) { seqNo = -seqNo; } - + lastSeqNo = Math.max(lastSeqNo, seqNo); return seqNo; } @@ -317,6 +319,17 @@ final class DocumentsWriter implements Closeable, Accountable { } } + /** returns the maximum sequence number for all previously completed operations */ + public long getMaxCompletedSequenceNumber() { + long value = lastSeqNo; + int limit = perThreadPool.getMaxThreadStates(); + for(int i = 0; i < limit; i++) { + ThreadState perThread = perThreadPool.getThreadState(i); + value = Math.max(value, perThread.lastSeqNo); + } + return value; + } + boolean anyChanges() { /* * changes are either in a DWPT or in the deleteQueue. @@ -413,7 +426,7 @@ final class DocumentsWriter implements Closeable, Accountable { final ThreadState perThread = flushControl.obtainAndLock(); final DocumentsWriterPerThread flushingDWPT; - final long seqNo; + long seqNo; try { // This must happen after we've pulled the ThreadState because IW.close @@ -437,15 +450,18 @@ final class DocumentsWriter implements Closeable, Accountable { } final boolean isUpdate = delTerm != null; flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); + + assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo; + perThread.lastSeqNo = seqNo; + } finally { perThreadPool.release(perThread); } if (postUpdate(flushingDWPT, hasEvents)) { - return -seqNo; - } else { - return seqNo; + seqNo = -seqNo; } + return seqNo; } long updateDocument(final Iterable doc, final Analyzer analyzer, @@ -456,7 +472,7 @@ final class DocumentsWriter implements Closeable, Accountable { final ThreadState perThread = flushControl.obtainAndLock(); final DocumentsWriterPerThread flushingDWPT; - final long seqNo; + long seqNo; try { // This must happen after we've pulled the ThreadState because IW.close // waits for all ThreadStates to be released: @@ -479,15 +495,19 @@ final class DocumentsWriter implements Closeable, Accountable { } final boolean isUpdate = delTerm != null; flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate); + + assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo; + perThread.lastSeqNo = seqNo; + } finally { perThreadPool.release(perThread); } if (postUpdate(flushingDWPT, hasEvents)) { - return -seqNo; - } else { - return seqNo; + seqNo = -seqNo; } + + return seqNo; } private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException { http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5a032168/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java ---------------------------------------------------------------------- diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java index 3802805..cc72342 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java @@ -59,6 +59,9 @@ final class DocumentsWriterPerThreadPool { // write access guarded by DocumentsWriterFlushControl long bytesUsed = 0; + // set by DocumentsWriter after each indexing op finishes + volatile long lastSeqNo; + ThreadState(DocumentsWriterPerThread dpwt) { this.dwpt = dpwt; } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5a032168/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java ---------------------------------------------------------------------- diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index b5e0c22..5fe1648 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -1457,7 +1457,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { changed(); } //System.out.println(" yes " + info.info.name + " " + docID); - return docWriter.deleteQueue.getNextSequenceNumber(); } } else { @@ -5049,12 +5048,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { }; } - /** Returns the last sequence number, or 0 - * if no index-changing operations have completed yet. + /** Returns the highest sequence number across + * all completed operations, or 0 if no operations have finished yet. Still + * in-flight operations (in other threads) are not counted until they finish. * * @lucene.experimental */ - public long getLastSequenceNumber() { + public long getMaxCompletedSequenceNumber() { ensureOpen(); - return docWriter.deleteQueue.getLastSequenceNumber(); + return docWriter.getMaxCompletedSequenceNumber(); } } http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5a032168/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java ---------------------------------------------------------------------- diff --git a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java index 466d793..a98a30d 100644 --- a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java +++ b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java @@ -150,7 +150,6 @@ public class ControlledRealTimeReopenThread extends Thread implements Closeab * or false if maxMS wait time was exceeded */ public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException { - final long curGen = writer.getLastSequenceNumber(); if (targetGen > searchingGen) { // Notify the reopen thread that the waitingGen has // changed, so it may wake up and realize it should @@ -232,7 +231,7 @@ public class ControlledRealTimeReopenThread extends Thread implements Closeab // Save the gen as of when we started the reopen; the // listener (HandleRefresh above) copies this to // searchingGen once the reopen completes: - refreshStartGen = writer.getLastSequenceNumber(); + refreshStartGen = writer.getMaxCompletedSequenceNumber(); try { manager.maybeRefreshBlocking(); } catch (IOException ioe) { http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5a032168/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java ---------------------------------------------------------------------- diff --git a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java index 69822a6..779c1f2 100644 --- a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java +++ b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java @@ -98,13 +98,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc // Randomly verify the update "took": if (random().nextInt(20) == 2) { if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); + System.out.println(Thread.currentThread().getName() + ": nrt: verify updateDocuments " + id + " gen=" + gen); } nrtDeletesThread.waitForGeneration(gen); assertTrue(gen <= nrtDeletesThread.getSearchingGen()); final IndexSearcher s = nrtDeletes.acquire(); if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s); } try { assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); @@ -122,13 +122,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc // Randomly verify the add "took": if (random().nextInt(20) == 2) { if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); + System.out.println(Thread.currentThread().getName() + ": nrt: verify addDocuments " + id + " gen=" + gen); } nrtNoDeletesThread.waitForGeneration(gen); assertTrue(gen <= nrtNoDeletesThread.getSearchingGen()); final IndexSearcher s = nrtNoDeletes.acquire(); if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s); } try { assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits); @@ -146,13 +146,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc // Randomly verify the add "took": if (random().nextInt(20) == 2) { if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); + System.out.println(Thread.currentThread().getName() + ": nrt: verify addDocument " + id + " gen=" + gen); } nrtNoDeletesThread.waitForGeneration(gen); assertTrue(gen <= nrtNoDeletesThread.getSearchingGen()); final IndexSearcher s = nrtNoDeletes.acquire(); if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s); } try { assertEquals(1, s.search(new TermQuery(id), 10).totalHits); @@ -169,13 +169,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc // Randomly verify the udpate "took": if (random().nextInt(20) == 2) { if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id); + System.out.println(Thread.currentThread().getName() + ": nrt: verify updateDocument " + id + " gen=" + gen); } nrtDeletesThread.waitForGeneration(gen); assertTrue(gen <= nrtDeletesThread.getSearchingGen()); final IndexSearcher s = nrtDeletes.acquire(); if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s); } try { assertEquals(1, s.search(new TermQuery(id), 10).totalHits); @@ -192,13 +192,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc // randomly verify the delete "took": if (random().nextInt(20) == 7) { if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id); + System.out.println(Thread.currentThread().getName() + ": nrt: verify deleteDocuments " + id + " gen=" + gen); } nrtDeletesThread.waitForGeneration(gen); assertTrue(gen <= nrtDeletesThread.getSearchingGen()); final IndexSearcher s = nrtDeletes.acquire(); if (VERBOSE) { - System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s); + System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s); } try { assertEquals(0, s.search(new TermQuery(id), 10).totalHits);