Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E6500200B9B for ; Tue, 27 Sep 2016 15:25:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E4E56160AD3; Tue, 27 Sep 2016 13:25:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B64AA160AD2 for ; Tue, 27 Sep 2016 15:25:16 +0200 (CEST) Received: (qmail 85758 invoked by uid 500); 27 Sep 2016 13:25:15 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 85747 invoked by uid 99); 27 Sep 2016 13:25:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Sep 2016 13:25:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C465EE0381; Tue, 27 Sep 2016 13:25:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 27 Sep 2016 13:25:16 -0000 Message-Id: <5562602948a0476dbbe533c8bde06370@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/6] activemq-artemis git commit: Fix WaitNotInLoop issues flagged by new Error Prone archived-at: Tue, 27 Sep 2016 13:25:18 -0000 Fix WaitNotInLoop issues flagged by new Error Prone Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1a4fe928 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1a4fe928 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1a4fe928 Branch: refs/heads/master Commit: 1a4fe928022eac30ef03de94f24902d0d5c20f92 Parents: 75b5216 Author: Ville Skyttä Authored: Tue Sep 27 14:28:29 2016 +0300 Committer: Ville Skyttä Committed: Tue Sep 27 14:28:29 2016 +0300 ---------------------------------------------------------------------- .../activemq/artemis/utils/ConcurrentUtil.java | 42 ++++++++++++++++ .../core/protocol/core/impl/ChannelImpl.java | 50 +++++++++----------- .../server/group/impl/LocalGroupingHandler.java | 6 +-- .../impl/AnyLiveNodeLocatorForReplication.java | 9 ++-- .../impl/AnyLiveNodeLocatorForScaleDown.java | 9 ++-- .../NamedLiveNodeLocatorForReplication.java | 9 ++-- .../impl/NamedLiveNodeLocatorForScaleDown.java | 9 ++-- 7 files changed, 87 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentUtil.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentUtil.java new file mode 100644 index 0000000..80397c6 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentUtil.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; + +public class ConcurrentUtil { + + /** + * Await for condition, handling + * spurious wakeups. + * @param condition condition to await for + * @param timeout the maximum time to wait in milliseconds + * @return value from {@link Condition#await(long, TimeUnit)} + */ + public static boolean await(final Condition condition, final long timeout) throws InterruptedException { + boolean awaited = false; + long timeoutRemaining = timeout; + long awaitStarted = System.currentTimeMillis(); + while (!awaited && timeoutRemaining > 0) { + awaited = condition.await(timeoutRemaining, TimeUnit.MILLISECONDS); + timeoutRemaining -= System.currentTimeMillis() - awaitStarted; + } + return awaited; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 89fa4f1..751dee0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.ConcurrentUtil; import org.jboss.logging.Logger; public final class ChannelImpl implements Channel { @@ -235,6 +236,25 @@ public final class ChannelImpl implements Channel { this.transferring = transferring; } + /** + * @param timeoutMsg message to log on blocking call failover timeout + */ + private void waitForFailOver(String timeoutMsg) { + try { + if (connection.getBlockingCallFailoverTimeout() < 0) { + while (failingOver) { + failoverCondition.await(); + } + } + else if (!ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout())) { + logger.debug(timeoutMsg); + } + } + catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } + } + // This must never called by more than one thread concurrently private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) { if (invokeInterceptors(packet, interceptors, connection) != null) { @@ -254,19 +274,7 @@ public final class ChannelImpl implements Channel { try { if (failingOver) { - try { - if (connection.getBlockingCallFailoverTimeout() < 0) { - failoverCondition.await(); - } - else { - if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS)) { - logger.debug("timed-out waiting for fail-over condition on non-blocking send"); - } - } - } - catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } + waitForFailOver("timed-out waiting for fail-over condition on non-blocking send"); } // Sanity check @@ -340,21 +348,7 @@ public final class ChannelImpl implements Channel { try { if (failingOver) { - try { - if (connection.getBlockingCallFailoverTimeout() < 0) { - while (failingOver) { - failoverCondition.await(); - } - } - else { - if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS)) { - logger.debug("timed-out waiting for fail-over condition on blocking send"); - } - } - } - catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } + waitForFailOver("timed-out waiting for fail-over condition on blocking send"); } response = null; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java index 4c62b46..383321e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; +import org.apache.activemq.artemis.utils.ConcurrentUtil; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.TypedProperties; import org.jboss.logging.Logger; @@ -267,11 +268,10 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { if (expectedBindings.size() > 0) { logger.debug("Waiting remote group bindings to arrive before starting the server. timeout=" + timeout + " milliseconds"); //now we wait here for the rest to be received in onNotification, it will signal once all have been received. - //if we arent signaled then bindingsAdded still has some groupids we need to remove. - if (!awaitCondition.await(timeout, TimeUnit.MILLISECONDS)) { + //if we aren't signaled then bindingsAdded still has some groupids we need to remove. + if (!ConcurrentUtil.await(awaitCondition, timeout)) { ActiveMQServerLogger.LOGGER.remoteGroupCoordinatorsNotStarted(); } - } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java index 859daaa..60506d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -30,6 +29,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.server.LiveNodeLocator; import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum; +import org.apache.activemq.artemis.utils.ConcurrentUtil; /** * This implementation looks for any available live node, once tried with no success it is marked as @@ -63,10 +63,12 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator { if (untriedConnectors.isEmpty()) { try { if (timeout != -1L) { - condition.await(timeout, TimeUnit.MILLISECONDS); + ConcurrentUtil.await(condition, timeout); } else { - condition.await(); + while (untriedConnectors.isEmpty()) { + condition.await(); + } } } catch (InterruptedException e) { @@ -153,4 +155,3 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator { super.notifyRegistrationFailed(alreadyReplicating); } } - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForScaleDown.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForScaleDown.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForScaleDown.java index 77a3e02..189e8cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForScaleDown.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForScaleDown.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -29,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.server.LiveNodeLocator; +import org.apache.activemq.artemis.utils.ConcurrentUtil; import org.jboss.logging.Logger; /** @@ -65,12 +65,14 @@ public class AnyLiveNodeLocatorForScaleDown extends LiveNodeLocator { if (connectors.isEmpty()) { try { if (timeout != -1L) { - if (!condition.await(timeout, TimeUnit.MILLISECONDS)) { + if (!ConcurrentUtil.await(condition, timeout)) { throw new ActiveMQException("Timeout elapsed while waiting for cluster node"); } } else { - condition.await(); + while (connectors.isEmpty()) { + condition.await(); + } } } catch (InterruptedException e) { @@ -153,4 +155,3 @@ public class AnyLiveNodeLocatorForScaleDown extends LiveNodeLocator { } } } - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java index 862e3a6..e669ec7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.server.impl; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -27,6 +26,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.server.LiveNodeLocator; import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum; +import org.apache.activemq.artemis.utils.ConcurrentUtil; /** * NamedLiveNodeLocatorForReplication looks for a live server in the cluster with a specific backupGroupName @@ -59,10 +59,12 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator { if (liveConfiguration == null) { try { if (timeout != -1L) { - condition.await(timeout, TimeUnit.MILLISECONDS); + ConcurrentUtil.await(condition, timeout); } else { - condition.await(); + while (liveConfiguration == null) { + condition.await(); + } } } catch (InterruptedException e) { @@ -117,4 +119,3 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator { } } } - http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForScaleDown.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForScaleDown.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForScaleDown.java index 13cc3aa..97d97b8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForScaleDown.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForScaleDown.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -29,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.server.LiveNodeLocator; +import org.apache.activemq.artemis.utils.ConcurrentUtil; import org.jboss.logging.Logger; /** @@ -66,12 +66,14 @@ public class NamedLiveNodeLocatorForScaleDown extends LiveNodeLocator { if (connectors.isEmpty()) { try { if (timeout != -1L) { - if (!condition.await(timeout, TimeUnit.MILLISECONDS)) { + if (!ConcurrentUtil.await(condition, timeout)) { throw new ActiveMQException("Timeout elapsed while waiting for cluster node"); } } else { - condition.await(); + while (connectors.isEmpty()) { + condition.await(); + } } } catch (InterruptedException e) { @@ -155,4 +157,3 @@ public class NamedLiveNodeLocatorForScaleDown extends LiveNodeLocator { } } } -