activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [60/60] [abbrv] activemq-artemis git commit: fix AMQ1924Test
Date Wed, 16 Mar 2016 01:54:24 GMT
fix AMQ1924Test


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/22325fbb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/22325fbb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/22325fbb

Branch: refs/heads/refactor-openwire
Commit: 22325fbb1203843db7e5ddf551b910767dc7f969
Parents: bd46bbc
Author: Howard Gao <howard.gao@gmail.com>
Authored: Tue Mar 15 23:11:39 2016 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Mar 15 20:58:03 2016 -0400

----------------------------------------------------------------------
 .../openwire/OpenWireProtocolManager.java       |  5 ++
 .../core/server/impl/ServerConsumerImpl.java    | 64 +++++++++++++++-----
 .../transport/failover/AMQ1925Test.java         | 54 +++++++++--------
 3 files changed, 82 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/22325fbb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index abfcca5..3cb1215 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -488,6 +488,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>,
Cl
       if (txSession != null) {
          txSession.rollback(info);
       }
+      else if (info.getTransactionId().isLocalTransaction()) {
+         //during a broker restart, recovered local transaction may not be registered
+         //in that case we ignore and let the tx removed silently by connection.
+         //see AMQ1925Test.testAMQ1925_TXBegin
+      }
       else {
          throw newXAException("Transaction '" + info.getTransactionId() + "' has not been
started.", XAException.XAER_NOTA);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/22325fbb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index b2ca0df..cb2cd38 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -89,6 +89,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
 
    private Object protocolContext;
 
+   private final ActiveMQServer server;
+
    private SlowConsumerDetectionListener slowConsumerListener;
 
    /**
@@ -153,8 +155,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
                              final SessionCallback callback,
                              final boolean preAcknowledge,
                              final boolean strictUpdateDeliveryCount,
-                             final ManagementService managementService) throws Exception
{
-      this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge,
strictUpdateDeliveryCount, managementService, true, null);
+                             final ManagementService managementService,
+                             final ActiveMQServer server) throws Exception {
+      this(id, session, binding, filter, started, browseOnly, storageManager, callback, preAcknowledge,
strictUpdateDeliveryCount, managementService, true, null, server);
    }
 
    public ServerConsumerImpl(final long id,
@@ -169,7 +172,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
                              final boolean strictUpdateDeliveryCount,
                              final ManagementService managementService,
                              final boolean supportLargeMessage,
-                             final Integer credits) throws Exception {
+                             final Integer credits,
+                             final ActiveMQServer server) throws Exception {
       this.id = id;
 
       this.filter = filter;
@@ -214,6 +218,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
             availableCredits.set(credits);
          }
       }
+
+      this.server = server;
    }
 
    @Override
@@ -398,7 +404,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
       }
       finally {
          lockDelivery.readLock().unlock();
+         callback.afterDelivery();
       }
+
    }
 
    @Override
@@ -583,12 +591,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
    @Override
    public void setStarted(final boolean started) {
       synchronized (lock) {
-         lockDelivery.writeLock().lock();
+         boolean locked = lockDelivery();
+
+         // This is to make sure nothing would sneak to the client while started = false
+         // the client will stop the session and perform a rollback in certain cases.
+         // in case something sneaks to the client you could get to messaging delivering
forever until
+         // you restart the server
          try {
             this.started = browseOnly || started;
          }
          finally {
-            lockDelivery.writeLock().unlock();
+            if (locked) {
+               lockDelivery.writeLock().unlock();
+            }
          }
       }
 
@@ -598,21 +613,38 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
       }
    }
 
-   @Override
-   public void setTransferring(final boolean transferring) {
-      synchronized (lock) {
-         this.transferring = transferring;
-      }
-
-      // This is to make sure that the delivery process has finished any pending delivery
-      // otherwise a message may sneak in on the client while we are trying to stop the consumer
+   private boolean lockDelivery() {
       try {
-         lockDelivery.writeLock().lock();
+         if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
+            ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
+            if (server != null) {
+               server.threadDump();
+            }
+            return false;
+         }
+         return true;
       }
-      finally {
-         lockDelivery.writeLock().unlock();
+      catch (Exception e) {
+         ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+         return false;
       }
+   }
 
+   @Override
+   public void setTransferring(final boolean transferring) {
+      synchronized (lock) {
+         // This is to make sure that the delivery process has finished any pending delivery
+         // otherwise a message may sneak in on the client while we are trying to stop the
consumer
+         boolean locked = lockDelivery();
+         try {
+            this.transferring = transferring;
+         }
+         finally {
+            if (locked) {
+               lockDelivery.writeLock().unlock();
+            }
+         }
+      }
 
       // Outside the lock
       if (transferring) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/22325fbb/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
index 3d75905..564fd86 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
@@ -33,8 +33,6 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.TransactionRolledBackException;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerFactory;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -235,32 +233,39 @@ public class AMQ1925Test extends OpenwireArtemisBaseTest implements
ExceptionLis
       MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
 
       boolean restartDone = false;
-      for (int i = 0; i < MESSAGE_COUNT; i++) {
-         Message message = consumer.receive(5000);
-         Assert.assertNotNull(message);
+      try {
+         for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message message = consumer.receive(5000);
+            Assert.assertNotNull(message);
 
-         if (i == 222 && !restartDone) {
-            // Simulate broker failure & restart
-            bs.stop();
-            bs = createNewServer();
-            bs.start();
-            restartDone = true;
-         }
+            if (i == 222 && !restartDone) {
+               // Simulate broker failure & restart
+               bs.stop();
+               bs = createNewServer();
+               bs.start();
+               restartDone = true;
+            }
 
-         Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
-         try {
-            session.commit();
-         }
-         catch (TransactionRolledBackException expectedOnOccasion) {
-            log.info("got rollback: " + expectedOnOccasion);
-            i--;
+            Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+            try {
+               session.commit();
+            }
+            catch (TransactionRolledBackException expectedOnOccasion) {
+               log.info("got rollback: " + expectedOnOccasion);
+               i--;
+            }
          }
+         Assert.assertNull(consumer.receive(500));
+      }
+      catch (Exception eee) {
+         log.error("got exception", eee);
+         throw eee;
+      }
+      finally {
+         consumer.close();
+         session.close();
+         connection.close();
       }
-      Assert.assertNull(consumer.receive(500));
-
-      consumer.close();
-      session.close();
-      connection.close();
 
       assertQueueEmpty();
       Assert.assertNull("no exception on connection listener: " + exception, exception);
@@ -368,7 +373,6 @@ public class AMQ1925Test extends OpenwireArtemisBaseTest implements ExceptionLis
       } catch (Exception e) {
          log.error(e);
       }
-
    }
 
    public void onException(JMSException exception) {


Mime
View raw message