activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/6] activemq-artemis git commit: Fix WaitNotInLoop issues flagged by new Error Prone
Date Tue, 27 Sep 2016 13:25:16 GMT
Fix WaitNotInLoop issues flagged by new Error Prone


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1a4fe928
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1a4fe928
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1a4fe928

Branch: refs/heads/master
Commit: 1a4fe928022eac30ef03de94f24902d0d5c20f92
Parents: 75b5216
Author: Ville Skyttä <ville.skytta@iki.fi>
Authored: Tue Sep 27 14:28:29 2016 +0300
Committer: Ville Skyttä <ville.skytta@iki.fi>
Committed: Tue Sep 27 14:28:29 2016 +0300

----------------------------------------------------------------------
 .../activemq/artemis/utils/ConcurrentUtil.java  | 42 ++++++++++++++++
 .../core/protocol/core/impl/ChannelImpl.java    | 50 +++++++++-----------
 .../server/group/impl/LocalGroupingHandler.java |  6 +--
 .../impl/AnyLiveNodeLocatorForReplication.java  |  9 ++--
 .../impl/AnyLiveNodeLocatorForScaleDown.java    |  9 ++--
 .../NamedLiveNodeLocatorForReplication.java     |  9 ++--
 .../impl/NamedLiveNodeLocatorForScaleDown.java  |  9 ++--
 7 files changed, 87 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentUtil.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentUtil.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentUtil.java
new file mode 100644
index 0000000..80397c6
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ConcurrentUtil.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+
+public class ConcurrentUtil {
+
+   /**
+    * Await for condition, handling
+    * <a href="http://errorprone.info/bugpattern/WaitNotInLoop">spurious wakeups</a>.
+    * @param condition condition to await for
+    * @param timeout the maximum time to wait in milliseconds
+    * @return value from {@link Condition#await(long, TimeUnit)}
+    */
+   public static boolean await(final Condition condition, final long timeout) throws InterruptedException
{
+      boolean awaited = false;
+      long timeoutRemaining = timeout;
+      long awaitStarted = System.currentTimeMillis();
+      while (!awaited && timeoutRemaining > 0) {
+         awaited = condition.await(timeoutRemaining, TimeUnit.MILLISECONDS);
+         timeoutRemaining -= System.currentTimeMillis() - awaitStarted;
+      }
+      return awaited;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 89fa4f1..751dee0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.ConcurrentUtil;
 import org.jboss.logging.Logger;
 
 public final class ChannelImpl implements Channel {
@@ -235,6 +236,25 @@ public final class ChannelImpl implements Channel {
       this.transferring = transferring;
    }
 
+   /**
+    * @param timeoutMsg message to log on blocking call failover timeout
+    */
+   private void waitForFailOver(String timeoutMsg) {
+      try {
+         if (connection.getBlockingCallFailoverTimeout() < 0) {
+            while (failingOver) {
+               failoverCondition.await();
+            }
+         }
+         else if (!ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout()))
{
+            logger.debug(timeoutMsg);
+         }
+      }
+      catch (InterruptedException e) {
+         throw new ActiveMQInterruptedException(e);
+      }
+   }
+
    // This must never called by more than one thread concurrently
    private boolean send(final Packet packet, final int reconnectID, final boolean flush,
final boolean batch) {
       if (invokeInterceptors(packet, interceptors, connection) != null) {
@@ -254,19 +274,7 @@ public final class ChannelImpl implements Channel {
 
          try {
             if (failingOver) {
-               try {
-                  if (connection.getBlockingCallFailoverTimeout() < 0) {
-                     failoverCondition.await();
-                  }
-                  else {
-                     if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(),
TimeUnit.MILLISECONDS)) {
-                        logger.debug("timed-out waiting for fail-over condition on non-blocking
send");
-                     }
-                  }
-               }
-               catch (InterruptedException e) {
-                  throw new ActiveMQInterruptedException(e);
-               }
+               waitForFailOver("timed-out waiting for fail-over condition on non-blocking
send");
             }
 
             // Sanity check
@@ -340,21 +348,7 @@ public final class ChannelImpl implements Channel {
 
          try {
             if (failingOver) {
-               try {
-                  if (connection.getBlockingCallFailoverTimeout() < 0) {
-                     while (failingOver) {
-                        failoverCondition.await();
-                     }
-                  }
-                  else {
-                     if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(),
TimeUnit.MILLISECONDS)) {
-                        logger.debug("timed-out waiting for fail-over condition on blocking
send");
-                     }
-                  }
-               }
-               catch (InterruptedException e) {
-                  throw new ActiveMQInterruptedException(e);
-               }
+               waitForFailOver("timed-out waiting for fail-over condition on blocking send");
             }
 
             response = null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java
index 4c62b46..383321e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/group/impl/LocalGroupingHandler.java
@@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
+import org.apache.activemq.artemis.utils.ConcurrentUtil;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.TypedProperties;
 import org.jboss.logging.Logger;
@@ -267,11 +268,10 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract
{
             if (expectedBindings.size() > 0) {
                logger.debug("Waiting remote group bindings to arrive before starting the
server. timeout=" + timeout + " milliseconds");
                //now we wait here for the rest to be received in onNotification, it will
signal once all have been received.
-               //if we arent signaled then bindingsAdded still has some groupids we need
to remove.
-               if (!awaitCondition.await(timeout, TimeUnit.MILLISECONDS)) {
+               //if we aren't signaled then bindingsAdded still has some groupids we need
to remove.
+               if (!ConcurrentUtil.await(awaitCondition, timeout)) {
                   ActiveMQServerLogger.LOGGER.remoteGroupCoordinatorsNotStarted();
                }
-
             }
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java
index 859daaa..60506d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForReplication.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -30,6 +29,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.core.server.LiveNodeLocator;
 import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
+import org.apache.activemq.artemis.utils.ConcurrentUtil;
 
 /**
  * This implementation looks for any available live node, once tried with no success it is
marked as
@@ -63,10 +63,12 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator
{
          if (untriedConnectors.isEmpty()) {
             try {
                if (timeout != -1L) {
-                  condition.await(timeout, TimeUnit.MILLISECONDS);
+                  ConcurrentUtil.await(condition, timeout);
                }
                else {
-                  condition.await();
+                  while (untriedConnectors.isEmpty()) {
+                     condition.await();
+                  }
                }
             }
             catch (InterruptedException e) {
@@ -153,4 +155,3 @@ public class AnyLiveNodeLocatorForReplication extends LiveNodeLocator
{
       super.notifyRegistrationFailed(alreadyReplicating);
    }
 }
-

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForScaleDown.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForScaleDown.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForScaleDown.java
index 77a3e02..189e8cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForScaleDown.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AnyLiveNodeLocatorForScaleDown.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -29,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.core.server.LiveNodeLocator;
+import org.apache.activemq.artemis.utils.ConcurrentUtil;
 import org.jboss.logging.Logger;
 
 /**
@@ -65,12 +65,14 @@ public class AnyLiveNodeLocatorForScaleDown extends LiveNodeLocator {
          if (connectors.isEmpty()) {
             try {
                if (timeout != -1L) {
-                  if (!condition.await(timeout, TimeUnit.MILLISECONDS)) {
+                  if (!ConcurrentUtil.await(condition, timeout)) {
                      throw new ActiveMQException("Timeout elapsed while waiting for cluster
node");
                   }
                }
                else {
-                  condition.await();
+                  while (connectors.isEmpty()) {
+                     condition.await();
+                  }
                }
             }
             catch (InterruptedException e) {
@@ -153,4 +155,3 @@ public class AnyLiveNodeLocatorForScaleDown extends LiveNodeLocator {
       }
    }
 }
-

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java
index 862e3a6..e669ec7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForReplication.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -27,6 +26,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.core.server.LiveNodeLocator;
 import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
+import org.apache.activemq.artemis.utils.ConcurrentUtil;
 
 /**
  * NamedLiveNodeLocatorForReplication looks for a live server in the cluster with a specific
backupGroupName
@@ -59,10 +59,12 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator
{
          if (liveConfiguration == null) {
             try {
                if (timeout != -1L) {
-                  condition.await(timeout, TimeUnit.MILLISECONDS);
+                  ConcurrentUtil.await(condition, timeout);
                }
                else {
-                  condition.await();
+                  while (liveConfiguration == null) {
+                     condition.await();
+                  }
                }
             }
             catch (InterruptedException e) {
@@ -117,4 +119,3 @@ public class NamedLiveNodeLocatorForReplication extends LiveNodeLocator
{
       }
    }
 }
-

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a4fe928/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForScaleDown.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForScaleDown.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForScaleDown.java
index 13cc3aa..97d97b8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForScaleDown.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/NamedLiveNodeLocatorForScaleDown.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.server.impl;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -29,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.core.server.LiveNodeLocator;
+import org.apache.activemq.artemis.utils.ConcurrentUtil;
 import org.jboss.logging.Logger;
 
 /**
@@ -66,12 +66,14 @@ public class NamedLiveNodeLocatorForScaleDown extends LiveNodeLocator
{
          if (connectors.isEmpty()) {
             try {
                if (timeout != -1L) {
-                  if (!condition.await(timeout, TimeUnit.MILLISECONDS)) {
+                  if (!ConcurrentUtil.await(condition, timeout)) {
                      throw new ActiveMQException("Timeout elapsed while waiting for cluster
node");
                   }
                }
                else {
-                  condition.await();
+                  while (connectors.isEmpty()) {
+                     condition.await();
+                  }
                }
             }
             catch (InterruptedException e) {
@@ -155,4 +157,3 @@ public class NamedLiveNodeLocatorForScaleDown extends LiveNodeLocator
{
       }
    }
 }
-


Mime
View raw message