activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/3] activemq-artemis git commit: ARTEMIS-1013 Queue deliver after AMQP msg release [Forced Update!]
Date Sat, 04 Mar 2017 16:47:13 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/artemis-1009 aea122292 -> 80f6ae6ba (forced update)


ARTEMIS-1013 Queue deliver after AMQP  msg release


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/15127aa5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/15127aa5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/15127aa5

Branch: refs/heads/artemis-1009
Commit: 15127aa5b119c3c4394951ba9ad9eb7f6add4aae
Parents: 8e9a83d
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Thu Mar 2 14:50:56 2017 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Sat Mar 4 11:45:57 2017 -0500

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        |  2 +-
 .../amqp/AmqpReceiverDispositionTest.java       |  9 ++++--
 .../tests/integration/amqp/ProtonTest.java      | 33 ++++++++++++++++++--
 3 files changed, 37 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/15127aa5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 5931afe..15816bb 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -327,6 +327,7 @@ public class AMQPSessionCallback implements SessionCallback {
       recoverContext();
       try {
          ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
+         ((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
       } finally {
          resetContext();
       }
@@ -560,7 +561,6 @@ public class AMQPSessionCallback implements SessionCallback {
       Transaction tx = protonSPI.getTransaction(txid);
       tx.rollback();
       protonSPI.removeTransaction(txid);
-
    }
 
    public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws
Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/15127aa5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
index d92fa0f..f206654 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
@@ -44,6 +44,10 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport
{
       receiver1.flow(1);
 
       AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
+
+      AmqpReceiver receiver2 = session.createReceiver(getTestName());
+
+
       assertNotNull("did not receive message first time", message);
       assertEquals("MessageID:0", message.getMessageId());
 
@@ -51,12 +55,11 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport
{
       assertNotNull(protonMessage);
       assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
 
+      receiver2.flow(1);
       message.release();
 
-      // Read the message again and validate its state
 
-      AmqpReceiver receiver2 = session.createReceiver(getTestName());
-      receiver2.flow(1);
+      // Read the message again and validate its state
       message = receiver2.receive(10, TimeUnit.SECONDS);
       assertNotNull("did not receive message again", message);
       assertEquals("MessageID:0", message.getMessageId());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/15127aa5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 4640c33..16f2e70 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -110,7 +110,6 @@ public class ProtonTest extends ProtonTestBase {
    private static final String amqpConnectionUri = "amqp://localhost:5672";
 
    private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
-
    private static final String brokerName = "my-broker";
 
    private static final long maxSizeBytes = 1 * 1024 * 1024;
@@ -472,7 +471,7 @@ public class ProtonTest extends ProtonTestBase {
       session.close();
       Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
       //because tx commit is executed async on broker, we use a timed wait.
-      assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10));
+      assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10));
    }
 
    @Test
@@ -548,7 +547,7 @@ public class ProtonTest extends ProtonTestBase {
       session.rollback();
       Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
       //because tx rollback is executed async on broker, we use a timed wait.
-      assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10));
+      assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10));
 
    }
 
@@ -1855,4 +1854,32 @@ public class ProtonTest extends ProtonTestBase {
          return count;
       }
    }
+
+   @Test
+   public void testReleaseDisposition() throws Exception {
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection connection = client.connect();
+      try {
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender(address);
+         AmqpMessage message = new AmqpMessage();
+         message.setText("Test-Message");
+         sender.send(message);
+
+         AmqpReceiver receiver = session.createReceiver(address);
+         receiver.flow(10);
+
+         AmqpMessage m1 = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull(m1);
+         m1.release();
+
+         //receiver.flow(10);
+         AmqpMessage m2 = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull(m2);
+         m2.accept();
+      } finally {
+         connection.close();
+      }
+   }
 }


Mime
View raw message