activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5379 - amqp prefetch size and redelivery header problem
Date Thu, 27 Nov 2014 13:49:51 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 4e3499e41 -> 0ca376d54


https://issues.apache.org/jira/browse/AMQ-5379 - amqp prefetch size and redelivery header
problem


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0ca376d5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0ca376d5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0ca376d5

Branch: refs/heads/trunk
Commit: 0ca376d540e1f73da92494ed1ce34d4b64746480
Parents: 4e3499e
Author: Dejan Bosanac <dejan@nighttale.net>
Authored: Thu Nov 27 14:40:56 2014 +0100
Committer: Dejan Bosanac <dejan@nighttale.net>
Committed: Thu Nov 27 14:42:39 2014 +0100

----------------------------------------------------------------------
 .../amqp/AMQPProtocolDiscriminator.java         |  4 +-
 .../transport/amqp/AmqpProtocolConverter.java   | 66 +++++++++++---------
 .../activemq/transport/amqp/JMSClientTest.java  | 40 ++++++++++--
 3 files changed, 74 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0ca376d5/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
index a7607af..b484500 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
@@ -28,12 +28,12 @@ import org.apache.activemq.command.Command;
  */
 public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
 
-    private static final int DEFAULT_PREFETCH = 100;
+    public static final int DEFAULT_PREFETCH = 1000;
 
     private final AmqpTransport transport;
     private final BrokerService brokerService;
 
-    private int prefetch = DEFAULT_PREFETCH;
+    private int prefetch = 0;
     private int producerCredit = DEFAULT_PREFETCH;
 
     interface Discriminator {

http://git-wip-us.apache.org/repos/asf/activemq/blob/0ca376d5/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 9a95725..444cdb5 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -69,14 +69,7 @@ import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Outcome;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.*;
 import org.apache.qpid.proton.amqp.transaction.Coordinator;
 import org.apache.qpid.proton.amqp.transaction.Declare;
 import org.apache.qpid.proton.amqp.transaction.Declared;
@@ -322,10 +315,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     protected void processLinkFlow(Link link) throws Exception {
         Object context = link.getContext();
         int credit = link.getRemoteCredit();
-        if (context != null && context instanceof ConsumerContext) {
+        if (context instanceof ConsumerContext) {
             ConsumerContext consumerContext = (ConsumerContext)context;
-            // change ActiveMQ consumer prefetch if needed
-            if (consumerContext.credit == 0 && consumerContext.consumerPrefetch !=
credit && credit > 0) {
+            // change consumer prefetch if it's not been already set using
+            // transport connector property or consumer preference
+            if (consumerContext.consumerPrefetch == 0 && credit > 0) {
                 ConsumerControl control = new ConsumerControl();
                 control.setConsumerId(consumerContext.consumerId);
                 control.setDestination(consumerContext.destination);
@@ -612,6 +606,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         private final ActiveMQDestination destination;
         private boolean closed;
         private final boolean anonymous;
+        private MessageId lastDispatched;
 
         public ProducerContext(ProducerId producerId, ActiveMQDestination destination, boolean
anonymous) {
             this.producerId = producerId;
@@ -688,9 +683,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                                 rejected.setError(condition);
                                 delivery.disposition(rejected);
                             } else {
-                                if (receiver.getCredit() <= (prefetch * .2)) {
-                                    LOG.trace("Sending more credit ({}) to producer: {}",
prefetch - receiver.getCredit(), producerId);
-                                    receiver.flow(prefetch - receiver.getCredit());
+                                if (receiver.getCredit() <= (producerCredit * .2)) {
+                                    LOG.trace("Sending more credit ({}) to producer: {}",
producerCredit - receiver.getCredit(), producerId);
+                                    receiver.flow(producerCredit - receiver.getCredit());
                                 }
 
                                 if (remoteState != null && remoteState instanceof
TransactionalState) {
@@ -710,9 +705,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                         }
                     });
                 } else {
-                    if (receiver.getCredit() <= (prefetch * .2)) {
-                        LOG.trace("Sending more credit ({}) to producer: {}", prefetch -
receiver.getCredit(), producerId);
-                        receiver.flow(prefetch - receiver.getCredit());
+                    if (receiver.getCredit() <= (producerCredit * .2)) {
+                        LOG.trace("Sending more credit ({}) to producer: {}", producerCredit
- receiver.getCredit(), producerId);
+                        receiver.flow(producerCredit - receiver.getCredit());
                         pumpProtonToSocket();
                     }
                     sendToActiveMQ(message, null);
@@ -838,10 +833,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         // Client is producing to this receiver object
         org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
         int flow = producerCredit;
-        // use client's preference if set
-        if (receiver.getRemoteCredit() != 0) {
-            flow = receiver.getRemoteCredit();
-        }
         try {
             if (remoteTarget instanceof Coordinator) {
                 pumpProtonToSocket();
@@ -934,7 +925,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         private boolean endOfBrowse = false;
         public ActiveMQDestination destination;
         public int credit;
-        public int consumerPrefetch;
+        public int consumerPrefetch = 0;
+        private long lastDeliveredSequenceId;
 
         protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
 
@@ -978,8 +970,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 if (session != null) {
                     session.consumers.remove(info.getConsumerId());
                 }
-
-                sendToActiveMQ(new RemoveInfo(consumerId), null);
+                RemoveInfo removeCommand = new RemoveInfo(consumerId);
+                removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+                sendToActiveMQ(removeCommand, null);
             }
         }
 
@@ -1003,7 +996,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
         public void pumpOutbound() throws Exception {
             while (!closed) {
-
                 while (currentBuffer != null) {
                     int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
                     if (sent > 0) {
@@ -1089,6 +1081,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 onMessageDispatch((MessageDispatch) delivery.getContext());
             } else {
                 MessageDispatch md = (MessageDispatch) delivery.getContext();
+                lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
                 MessageAck ack = new MessageAck();
                 ack.setConsumerId(consumerId);
                 ack.setFirstMessageId(md.getMessage().getMessageId());
@@ -1110,6 +1103,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                     dispatchedInTx.addFirst(md);
                 }
 
+
                 LOG.trace("Sending Ack to ActiveMQ: {}", ack);
 
                 sendToActiveMQ(ack, new ResponseHandler() {
@@ -1335,9 +1329,25 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             consumerInfo.setNoRangeAcks(true);
             consumerInfo.setDestination(dest);
             consumerContext.destination = dest;
-            consumerInfo.setPrefetchSize(sender.getRemoteCredit());
-            consumerContext.credit = sender.getRemoteCredit();
-            consumerContext.consumerPrefetch = consumerInfo.getPrefetchSize();
+            int senderCredit = sender.getRemoteCredit();
+            if (prefetch != 0) {
+                // use the value configured on the transport connector
+                // this value will not be changed to the consumer's preference
+                consumerInfo.setPrefetchSize(prefetch);
+                consumerContext.consumerPrefetch = prefetch;
+            } else {
+                if (senderCredit != 0) {
+                    // set the prefetch to the value of the remote credit
+                    // and ignore the later changes
+                    consumerInfo.setPrefetchSize(senderCredit);
+                    consumerContext.consumerPrefetch = senderCredit;
+                } else {
+                    // set default value for now and change to the consumer's preference
+                    // on the first flow packet
+                    consumerInfo.setPrefetchSize(AMQPProtocolDiscriminator.DEFAULT_PREFETCH);
+                }
+            }
+            consumerContext.credit = senderCredit;
             consumerInfo.setDispatchAsync(true);
             if (source.getDistributionMode() == COPY && dest.isQueue()) {
                 consumerInfo.setBrowser(true);

http://git-wip-us.apache.org/repos/asf/activemq/blob/0ca376d5/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 64a4f3c..0380a87 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -16,12 +16,6 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.concurrent.CountDownLatch;
@@ -55,6 +49,8 @@ import org.objectweb.jtests.jms.framework.TestConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.*;
+
 public class JMSClientTest extends JMSClientTestSupport {
 
     protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
@@ -952,4 +948,36 @@ public class JMSClientTest extends JMSClientTestSupport {
         } catch (JMSException ex) {
         }
     }
+
+    @Test(timeout=30000)
+    public void testRedeliveredHeader() throws Exception {
+        connection = createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        connection.start();
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        for (int i = 1; i < 100; i++) {
+            Message m = session.createTextMessage(i + ". Sample text");
+            producer.send(m);
+        }
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        receiveMessages(consumer);
+        consumer.close();
+
+        consumer = session.createConsumer(queue);
+        receiveMessages(consumer);
+        consumer.close();
+    }
+
+    protected void receiveMessages(MessageConsumer consumer) throws Exception {
+        for (int i = 0; i < 10; i++) {
+            Message message = consumer.receive(1000);
+            assertNotNull(message);
+            assertFalse(message.getJMSRedelivered());
+        }
+    }
 }


Mime
View raw message