activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/3] activemq-artemis git commit: NO-JIRA Test fixes
Date Wed, 14 Feb 2018 16:07:55 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master f91432eec -> 646e55514


NO-JIRA Test fixes

- LargeServerMessageImpl.finalize is eventually causing deadlocks
- CoreMessage needs to check properties before decoding
- PagingTest tweaks
- ServerLocatorImpl can deadlock eventually, avoiding a lock and using actors
- ActiveMQServerImpl.finalize is also evil and can cause deadlocks on the testsuite
- MqttClusterRemoteSubscribeTest needs to setup the Address now on the setup


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b5bf5afd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b5bf5afd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b5bf5afd

Branch: refs/heads/master
Commit: b5bf5afde7b767683e71956b49cc6ec8f94c10b8
Parents: f91432e
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Feb 14 10:15:01 2018 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Feb 14 10:56:44 2018 -0500

----------------------------------------------------------------------
 .../core/client/impl/ServerLocatorImpl.java     | 34 +++++++++++++++++---
 .../artemis/core/message/impl/CoreMessage.java  |  3 +-
 .../impl/journal/LargeServerMessageImpl.java    |  6 ----
 .../core/server/impl/ActiveMQServerImpl.java    | 11 -------
 .../MqttClusterRemoteSubscribeTest.java         |  1 +
 .../tests/integration/paging/PagingTest.java    | 14 ++------
 6 files changed, 35 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5bf5afd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 02c17c6..978cc39 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -67,6 +67,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
 import org.apache.activemq.artemis.utils.ClassloadingUtil;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.activemq.artemis.utils.actors.Actor;
 import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores;
 import org.jboss.logging.Logger;
 
@@ -199,6 +200,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
 
    private Executor startExecutor;
 
+   private Actor<Long> updateArrayActor;
+
    private AfterConnectInternalListener afterConnectListener;
 
    private String groupID;
@@ -251,6 +254,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
 
          scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize,
factory);
       }
+
+      this.updateArrayActor = new Actor<>(threadPool, this::internalUpdateArray);
    }
 
    @Override
@@ -534,6 +539,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
    private TransportConfiguration selectConnector() {
       Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
 
+      flushTopology();
+
       synchronized (topologyArrayGuard) {
          usedTopology = topologyArray;
       }
@@ -743,6 +750,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
 
       initialise();
 
+      flushTopology();
+
       if (this.getNumInitialConnectors() == 0 && discoveryGroup != null) {
          // Wait for an initial broadcast to give us at least one node in the cluster
          long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
@@ -812,6 +821,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
       return factory;
    }
 
+   public void flushTopology() {
+      if (updateArrayActor != null) {
+         updateArrayActor.flush(10, TimeUnit.SECONDS);
+      }
+   }
+
    @Override
    public boolean isHA() {
       return ha;
@@ -1426,14 +1441,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
       topology.removeMember(eventTime, nodeID);
 
       if (clusterConnection) {
-         updateArraysAndPairs();
+         updateArraysAndPairs(eventTime);
       } else {
          if (topology.isEmpty()) {
             // Resetting the topology to its original condition as it was brand new
             receivedTopology = false;
             topologyArray = null;
          } else {
-            updateArraysAndPairs();
+            updateArraysAndPairs(eventTime);
 
             if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
{
                // Resetting the topology to its original condition as it was brand new
@@ -1472,7 +1487,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
          }
       }
 
-      updateArraysAndPairs();
+      updateArraysAndPairs(uniqueEventID);
 
       if (last) {
          receivedTopology = true;
@@ -1496,7 +1511,16 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
    }
 
    @SuppressWarnings("unchecked")
-   private void updateArraysAndPairs() {
+   private void updateArraysAndPairs(long time) {
+      if (updateArrayActor == null) {
+         // if for some reason we don't have an actor, just go straight
+         internalUpdateArray(time);
+      } else {
+         updateArrayActor.act(time);
+      }
+   }
+
+   private void internalUpdateArray(long time) {
       synchronized (topologyArrayGuard) {
          Collection<TopologyMemberImpl> membersCopy = topology.getMembers();
 
@@ -1506,7 +1530,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal,
Discovery
          for (TopologyMemberImpl pair : membersCopy) {
             Pair<TransportConfiguration, TransportConfiguration> transportConfigs =
pair.getConnector();
             topologyArrayLocal[count++] = new Pair<>(protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getA()),
-                    protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB()));
+                                                     protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB()));
          }
 
          this.topologyArray = topologyArrayLocal;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5bf5afd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 9119a0d..2c570b9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -369,6 +369,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage
{
 
    @Override
    public Message copy() {
+      checkProperties();
       checkEncode();
       return new CoreMessage(this);
    }
@@ -936,8 +937,8 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage
{
    @Override
    public CoreMessage putObjectProperty(final SimpleString key,
                                         final Object value) throws ActiveMQPropertyConversionException
{
-      messageChanged();
       checkProperties();
+      messageChanged();
       TypedProperties.setObjectProperty(key, value, properties);
       return this;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5bf5afd/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 287b261..9a2e285 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -362,12 +362,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements
LargeSe
       }
    }
 
-   @Override
-   protected void finalize() throws Throwable {
-      releaseResources();
-      super.finalize();
-   }
-
    // Private -------------------------------------------------------
 
    public synchronized void validateFile() throws ActiveMQException {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5bf5afd/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index dad9300..2f6cb7f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -680,17 +680,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
-   protected final void finalize() throws Throwable {
-      if (state != SERVER_STATE.STOPPED) {
-         ActiveMQServerLogger.LOGGER.serverFinalisedWIthoutBeingSTopped();
-
-         stop();
-      }
-
-      super.finalize();
-   }
-
-   @Override
    public void setState(SERVER_STATE state) {
       this.state = state;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5bf5afd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
index 630cdf5..8caba17 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
@@ -435,6 +435,7 @@ public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
       coreAddressConfiguration.setName(TOPIC);
       CoreQueueConfiguration coreQueueConfiguration = new CoreQueueConfiguration();
       coreQueueConfiguration.setName(TOPIC);
+      coreQueueConfiguration.setAddress(TOPIC);
       coreQueueConfiguration.setRoutingType(RoutingType.ANYCAST);
       coreAddressConfiguration.addQueueConfiguration(coreQueueConfiguration);
       return coreAddressConfiguration;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5bf5afd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 3de9203..bc09fa1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -835,7 +835,7 @@ public class PagingTest extends ActiveMQTestBase {
          ClientMessage message = session.createMessage(true);
 
          if (i < 1000) {
-            message.setExpiration(System.currentTimeMillis() + 1000);
+            message.setExpiration(System.currentTimeMillis() + 100);
          }
 
          message.putIntProperty("tst-count", i);
@@ -852,12 +852,7 @@ public class PagingTest extends ActiveMQTestBase {
       session.commit();
       producer.close();
 
-      for (long timeout = System.currentTimeMillis() + 60000; timeout > System.currentTimeMillis()
&& getMessageCount(qEXP) < 1000; ) {
-         System.out.println("count = " + getMessageCount(qEXP));
-         Thread.sleep(100);
-      }
-
-      assertEquals(1000, getMessageCount(qEXP));
+      Wait.assertEquals(1000, qEXP::getMessageCount);
 
       session.start();
 
@@ -874,10 +869,7 @@ public class PagingTest extends ActiveMQTestBase {
 
       assertNull(consumer.receiveImmediate());
 
-      for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis()
&& getMessageCount(queue1) != 0; ) {
-         Thread.sleep(100);
-      }
-      assertEquals(0, getMessageCount(queue1));
+      Wait.assertEquals(0, queue1::getMessageCount);
 
       consumer.close();
 


Mime
View raw message