Return-Path: X-Original-To: apmail-lucene-commits-archive@www.apache.org Delivered-To: apmail-lucene-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DB1EE101CE for ; Wed, 19 Nov 2014 00:04:42 +0000 (UTC) Received: (qmail 52711 invoked by uid 500); 19 Nov 2014 00:04:42 -0000 Mailing-List: contact commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@lucene.apache.org Delivered-To: mailing list commits@lucene.apache.org Received: (qmail 52702 invoked by uid 99); 19 Nov 2014 00:04:42 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Nov 2014 00:04:42 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Nov 2014 00:04:19 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9AC53238888F; Wed, 19 Nov 2014 00:03:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1640456 - in /lucene/dev/trunk/lucene: ./ core/src/java/org/apache/lucene/index/ core/src/test/org/apache/lucene/index/ test-framework/src/java/org/apache/lucene/util/ Date: Wed, 19 Nov 2014 00:03:17 -0000 To: commits@lucene.apache.org From: mikemccand@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20141119000317.9AC53238888F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mikemccand Date: Wed Nov 19 00:03:16 2014 New Revision: 1640456 URL: http://svn.apache.org/r1640456 Log: LUCENE-6063: allow overriding whether/how ConcurrentMergeScheduler stalls incoming threads when merges are falling behind Modified: lucene/dev/trunk/lucene/CHANGES.txt lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java Modified: lucene/dev/trunk/lucene/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1640456&r1=1640455&r2=1640456&view=diff ============================================================================== --- lucene/dev/trunk/lucene/CHANGES.txt (original) +++ lucene/dev/trunk/lucene/CHANGES.txt Wed Nov 19 00:03:16 2014 @@ -98,6 +98,10 @@ New Features * LUCENE-5929: Also extract terms to highlight from block join queries. (Julie Tibshirani via Mike McCandless) +* LUCENE-6063: Allow overriding whether/how ConcurrentMergeScheduler + stalls incoming threads when merges are falling behind (Mike + McCandless) + API Changes * LUCENE-5900: Deprecated more constructors taking Version in *InfixSuggester and Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=1640456&r1=1640455&r2=1640456&view=diff ============================================================================== --- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original) +++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Wed Nov 19 00:03:16 2014 @@ -334,33 +334,7 @@ public class ConcurrentMergeScheduler ex // pending merges, until it's empty: while (true) { - long startStallTime = 0; - while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { - // This means merging has fallen too far behind: we - // have already created maxMergeCount threads, and - // now there's at least one more merge pending. - // Note that only maxThreadCount of - // those created merge threads will actually be - // running; the rest will be paused (see - // updateMergeThreads). We stall this producer - // thread to prevent creation of new segments, - // until merging has caught up: - startStallTime = System.currentTimeMillis(); - if (verbose()) { - message(" too many merges; stalling..."); - } - try { - wait(); - } catch (InterruptedException ie) { - throw new ThreadInterruptedException(ie); - } - } - - if (verbose()) { - if (startStallTime != 0) { - message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); - } - } + maybeStall(); MergePolicy.OneMerge merge = writer.getNextMerge(); if (merge == null) { @@ -400,6 +374,44 @@ public class ConcurrentMergeScheduler ex } } + /** This is invoked by {@link #merge} to possibly stall the incoming + * thread when there are too many merges running or pending. The + * default behavior is to force this thread, which is producing too + * many segments for merging to keep up, to wait until merges catch + * up. Applications that can take other less drastic measures, such + * as limiting how many threads are allowed to index, can do nothing + * here and throttle elsewhere. */ + + protected synchronized void maybeStall() { + long startStallTime = 0; + while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { + // This means merging has fallen too far behind: we + // have already created maxMergeCount threads, and + // now there's at least one more merge pending. + // Note that only maxThreadCount of + // those created merge threads will actually be + // running; the rest will be paused (see + // updateMergeThreads). We stall this producer + // thread to prevent creation of new segments, + // until merging has caught up: + startStallTime = System.currentTimeMillis(); + if (verbose()) { + message(" too many merges; stalling..."); + } + try { + wait(); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + } + + if (verbose()) { + if (startStallTime != 0) { + message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); + } + } + } + /** Does the actual merge, by calling {@link IndexWriter#merge} */ protected void doMerge(MergePolicy.OneMerge merge) throws IOException { writer.merge(merge); Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java?rev=1640456&r1=1640455&r2=1640456&view=diff ============================================================================== --- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (original) +++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java Wed Nov 19 00:03:16 2014 @@ -339,7 +339,6 @@ public class TestConcurrentMergeSchedule dir.close(); } - private static class TrackingCMS extends ConcurrentMergeScheduler { long totMergedBytes; CountDownLatch atLeastOneMerge; @@ -454,4 +453,24 @@ public class TestConcurrentMergeSchedule w.close(); d.close(); } + + // LUCENE-6063 + public void testMaybeStallCalled() throws Exception { + final AtomicBoolean wasCalled = new AtomicBoolean(); + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())); + iwc.setMergeScheduler(new ConcurrentMergeScheduler() { + @Override + protected void maybeStall() { + wasCalled.set(true); + } + }); + IndexWriter w = new IndexWriter(dir, iwc); + w.addDocument(new Document()); + w.forceMerge(1); + assertTrue(wasCalled.get()); + + w.close(); + dir.close(); + } } Modified: lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java?rev=1640456&r1=1640455&r2=1640456&view=diff ============================================================================== --- lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (original) +++ lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java Wed Nov 19 00:03:16 2014 @@ -886,7 +886,16 @@ public abstract class LuceneTestCase ext } else if (rarely(r)) { int maxThreadCount = TestUtil.nextInt(r, 1, 4); int maxMergeCount = TestUtil.nextInt(r, maxThreadCount, maxThreadCount + 4); - ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler(); + ConcurrentMergeScheduler cms; + if (r.nextBoolean()) { + cms = new ConcurrentMergeScheduler(); + } else { + cms = new ConcurrentMergeScheduler() { + @Override + protected synchronized void maybeStall() { + } + }; + } cms.setMaxMergesAndThreads(maxMergeCount, maxThreadCount); c.setMergeScheduler(cms); }