Return-Path: X-Original-To: apmail-brooklyn-dev-archive@minotaur.apache.org Delivered-To: apmail-brooklyn-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5137B181CB for ; Mon, 18 May 2015 21:16:27 +0000 (UTC) Received: (qmail 1099 invoked by uid 500); 18 May 2015 21:16:27 -0000 Delivered-To: apmail-brooklyn-dev-archive@brooklyn.apache.org Received: (qmail 1064 invoked by uid 500); 18 May 2015 21:16:27 -0000 Mailing-List: contact dev-help@brooklyn.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@brooklyn.incubator.apache.org Delivered-To: mailing list dev@brooklyn.incubator.apache.org Received: (qmail 1053 invoked by uid 99); 18 May 2015 21:16:27 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 May 2015 21:16:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 8A37D1823FE for ; Mon, 18 May 2015 21:16:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.971 X-Spam-Level: X-Spam-Status: No, score=0.971 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id VELTk2DfPk_2 for ; Mon, 18 May 2015 21:16:21 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id CD42D2496C for ; Mon, 18 May 2015 21:16:20 +0000 (UTC) Received: (qmail 967 invoked by uid 99); 18 May 2015 21:16:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 May 2015 21:16:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E398AE0914; Mon, 18 May 2015 21:16:19 +0000 (UTC) From: aledsage To: dev@brooklyn.incubator.apache.org Reply-To: dev@brooklyn.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-brooklyn pull request: CLI commands for manipulating cat... Content-Type: text/plain Message-Id: <20150518211619.E398AE0914@git1-us-west.apache.org> Date: Mon, 18 May 2015 21:16:19 +0000 (UTC) Github user aledsage commented on a diff in the pull request: https://github.com/apache/incubator-brooklyn/pull/617#discussion_r30548244 --- Diff: core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java --- @@ -247,68 +221,73 @@ void stop() { stop(Duration.TEN_SECONDS, Duration.ONE_SECOND); } void stop(Duration timeout, Duration graceTimeoutForSubsequentOperations) { - stopped = true; - running = false; - - if (scheduledTask != null) { - CountdownTimer expiry = timeout.countdownTimer(); - scheduledTask.cancel(false); + synchronized (startStopMutex) { + running = false; try { - waitForPendingComplete(expiry.getDurationRemaining().lowerBound(Duration.ZERO).add(graceTimeoutForSubsequentOperations)); - } catch (Exception e) { - throw Exceptions.propagate(e); - } - scheduledTask.blockUntilEnded(expiry.getDurationRemaining().lowerBound(Duration.ZERO).add(graceTimeoutForSubsequentOperations)); - scheduledTask.cancel(true); - boolean reallyEnded = Tasks.blockUntilInternalTasksEnded(scheduledTask, expiry.getDurationRemaining().lowerBound(Duration.ZERO).add(graceTimeoutForSubsequentOperations)); - if (!reallyEnded) { - LOG.warn("Persistence tasks took too long to complete when stopping persistence (ignoring): "+scheduledTask); - } - scheduledTask = null; - } + stopping = true; + + if (scheduledTask != null) { + CountdownTimer expiry = timeout.countdownTimer(); + try { + scheduledTask.cancel(false); + waitForPendingComplete(expiry.getDurationRemaining().lowerBound(Duration.ZERO).add(graceTimeoutForSubsequentOperations)); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + scheduledTask.blockUntilEnded(expiry.getDurationRemaining().lowerBound(Duration.ZERO).add(graceTimeoutForSubsequentOperations)); + scheduledTask.cancel(true); + boolean reallyEnded = Tasks.blockUntilInternalTasksEnded(scheduledTask, expiry.getDurationRemaining().lowerBound(Duration.ZERO).add(graceTimeoutForSubsequentOperations)); + if (!reallyEnded) { + LOG.warn("Persistence tasks took too long to terminate, when stopping persistence, although pending changes were persisted (ignoring): "+scheduledTask); + } + scheduledTask = null; + } - // Discard all state that was waiting to be persisted - synchronized (this) { - deltaCollector = new DeltaCollector(); + // Discard all state that was waiting to be persisted + synchronized (this) { + deltaCollector = new DeltaCollector(); + } + } finally { + stopCompleted = true; + stopping = false; + } } } /** - * This method must only be used for testing. If required in production, then revisit implementation! * @deprecated since 0.7.0, use {@link #waitForPendingComplete(Duration)} */ @VisibleForTesting public void waitForPendingComplete(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { waitForPendingComplete(Duration.of(timeout, unit)); } + /** Waits for any in-progress writes to be completed then for or any unwritten data to be written. */ @VisibleForTesting public void waitForPendingComplete(Duration timeout) throws InterruptedException, TimeoutException { - // Every time we finish writing, we increment a counter. We note the current val, and then - // wait until we can guarantee that a complete additional write has been done. Not sufficient - // to wait for `writeCount > origWriteCount` because we might have read the value when almost - // finished a write. + if (!isActive() && !stopping) return; - long startTime = System.currentTimeMillis(); - long maxEndtime = timeout.isPositive() ? startTime + timeout.toMillisecondsRoundingUp() : Long.MAX_VALUE; - long origWriteCount = writeCount.get(); - while (true) { - if (!isActive()) { - return; // no pending activity; - } else if (writeCount.get() > (origWriteCount+1)) { - return; - } - - if (System.currentTimeMillis() > maxEndtime) { - throw new TimeoutException("Timeout waiting for pending complete of rebind-periodic-delta, after "+Time.makeTimeStringRounded(timeout)); + CountdownTimer timer = timeout.isPositive() ? CountdownTimer.newInstanceStarted(timeout) : CountdownTimer.newInstancePaused(Duration.PRACTICALLY_FOREVER); + // wait for mutex, so we aren't tricked by an in-progress who has already recycled the collector + if (persistingMutex.tryAcquire(timer.getDurationRemaining().toMilliseconds(), TimeUnit.MILLISECONDS)) { + try { + // now no one else is writing + if (!deltaCollector.isEmpty()) { + // but there is data that needs to be written + persistNowSafely(true); --- End diff -- I can see this is simpler than the previously `writeCount`. However, it means that test code (calling `waitForPendingComplete`) will change the behaviour of the persistence. It no longer just waits, but actually does the persist. Therefore it means a class of bug will no longer be detected where the persisting was not happening for whatever reason. That's probably not an issue (because such an errors are unlikely, thought they could arise from some deadlock scenarios for example). No particularly strong feelings. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---