qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1566531 - in /qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8: ConsumerTarget_0_8.java handler/BasicGetMethodHandler.java
Date Mon, 10 Feb 2014 08:58:38 GMT
Author: rgodfrey
Date: Mon Feb 10 08:58:38 2014
New Revision: 1566531

URL: http://svn.apache.org/r1566531
Log:
QPID-5504 : fixed implementation of 0-8 GET when using NoAck

Modified:
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1566531&r1=1566530&r2=1566531&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
(original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
Mon Feb 10 08:58:38 2014
@@ -31,6 +31,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
@@ -80,6 +81,16 @@ public abstract class ConsumerTarget_0_8
         return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(),
channel.getRecordDeliveryMethod());
     }
 
+    public static ConsumerTarget_0_8 createGetNoAckTarget(final AMQChannel channel,
+                                                          final AMQShortString consumerTag,
+                                                          final FieldTable filters,
+                                                          final FlowCreditManager creditManager,
+                                                          final ClientDeliveryMethod deliveryMethod,
+                                                          final RecordDeliveryMethod recordMethod)
throws AMQException
+    {
+        return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod,
recordMethod);
+    }
+
     static final class BrowserConsumer extends ConsumerTarget_0_8
     {
         public BrowserConsumer(AMQChannel channel,
@@ -132,10 +143,10 @@ public abstract class ConsumerTarget_0_8
     }
 
     public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
-                                                           AMQShortString consumerTag, FieldTable
filters,
-                                                           FlowCreditManager creditManager,
-                                                           ClientDeliveryMethod deliveryMethod,
-                                                           RecordDeliveryMethod recordMethod)
throws AMQException
+                                                       AMQShortString consumerTag, FieldTable
filters,
+                                                       FlowCreditManager creditManager,
+                                                       ClientDeliveryMethod deliveryMethod,
+                                                       RecordDeliveryMethod recordMethod)
throws AMQException
     {
         return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod,
recordMethod);
     }
@@ -223,9 +234,9 @@ public abstract class ConsumerTarget_0_8
      */
     public static final class GetNoAckConsumer extends NoAckConsumer
     {
-        public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession,
+        public GetNoAckConsumer(AMQChannel channel,
                                 AMQShortString consumerTag, FieldTable filters,
-                                boolean noLocal, FlowCreditManager creditManager,
+                                FlowCreditManager creditManager,
                                 ClientDeliveryMethod deliveryMethod,
                                 RecordDeliveryMethod recordMethod)
             throws AMQException

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1566531&r1=1566530&r2=1566531&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
(original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
Mon Feb 10 08:58:38 2014
@@ -128,24 +128,8 @@ public class BasicGetMethodHandler imple
 
         final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
 
-        final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
-        {
-
-            @Override
-            public void deliverToClient(final Consumer sub, final ServerMessage message,
final
-                                        InstanceProperties props, final long deliveryTag)
-            throws AMQException
-            {
-                singleMessageCredit.useCreditForMessage(message.getSize());
-                session.getProtocolOutputConverter().writeGetOk(message,
-                                                                props,
-                                                                channel.getChannelId(),
-                                                                deliveryTag,
-                                                                queue.getMessageCount());
-
-
-            }
-        };
+        final GetDeliveryMethod getDeliveryMethod =
+                new GetDeliveryMethod(singleMessageCredit, session, channel, queue);
         final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
         {
 
@@ -167,7 +151,7 @@ public class BasicGetMethodHandler imple
         }
         else
         {
-            target = ConsumerTarget_0_8.createNoAckTarget(channel,
+            target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
                                                           AMQShortString.EMPTY_STRING, null,
                                                           singleMessageCredit, getDeliveryMethod,
getRecordMethod);
         }
@@ -175,10 +159,48 @@ public class BasicGetMethodHandler imple
         Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
         sub.flush();
         sub.close();
-        return(!singleMessageCredit.hasCredit());
+        return(getDeliveryMethod.hasDeliveredMessage());
 
 
     }
 
 
+    private static class GetDeliveryMethod implements ClientDeliveryMethod
+    {
+
+        private final FlowCreditManager _singleMessageCredit;
+        private final AMQProtocolSession _session;
+        private final AMQChannel _channel;
+        private final AMQQueue _queue;
+        private boolean _deliveredMessage;
+
+        public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+                                 final AMQProtocolSession session,
+                                 final AMQChannel channel, final AMQQueue queue)
+        {
+            _singleMessageCredit = singleMessageCredit;
+            _session = session;
+            _channel = channel;
+            _queue = queue;
+        }
+
+        @Override
+        public void deliverToClient(final Consumer sub, final ServerMessage message,
+                                    final InstanceProperties props, final long deliveryTag)
throws AMQException
+        {
+            _singleMessageCredit.useCreditForMessage(message.getSize());
+            _session.getProtocolOutputConverter().writeGetOk(message,
+                                                            props,
+                                                            _channel.getChannelId(),
+                                                            deliveryTag,
+                                                            _queue.getMessageCount());
+
+            _deliveredMessage = true;
+        }
+
+        public boolean hasDeliveredMessage()
+        {
+            return _deliveredMessage;
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message