activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r692183 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp: ProtocolConverter.java Stomp.java StompSubscription.java
Date Thu, 04 Sep 2008 18:21:47 GMT
Author: rajdavies
Date: Thu Sep  4 11:21:46 2008
New Revision: 692183

URL: http://svn.apache.org/viewvc?rev=692183&view=rev
Log:
Patch applied to https://issues.apache.org/activemq/browse/AMQ-1874

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=692183&r1=692182&r2=692183&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Thu Sep  4 11:21:46 2008
@@ -389,6 +389,8 @@
         String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
         if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
             stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
+        } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
+            stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
         } else {
             stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=692183&r1=692182&r2=692183&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
Thu Sep  4 11:21:46 2008
@@ -87,6 +87,7 @@
             public interface AckModeValues {
                 String AUTO = "auto";
                 String CLIENT = "client";
+                String INDIVIDUAL = "client-individual";
             }
         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=692183&r1=692182&r2=692183&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Thu Sep  4 11:21:46 2008
@@ -40,6 +40,7 @@
 
     public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
     public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
+    public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
 
     private final ProtocolConverter protocolConverter;
     private final String subscriptionId;
@@ -66,6 +67,10 @@
             synchronized (this) {
                 dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId());
             }
+        } else if (ackMode == INDIVIDUAL_ACK) {
+            synchronized (this) {
+                dispatchedMessage.put(message.getJMSMessageID(), message.getMessageId());
+            }
         } else if (ackMode == AUTO_ACK) {
             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
             protocolConverter.getTransportFilter().sendToActiveMQ(ack);
@@ -99,31 +104,38 @@
 
         MessageAck ack = new MessageAck();
         ack.setDestination(consumerInfo.getDestination());
-        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
         ack.setConsumerId(consumerInfo.getConsumerId());
 
-        int count = 0;
-        for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
-
-            Map.Entry entry = (Entry)iter.next();
-            String id = (String)entry.getKey();
-            MessageId msgid = (MessageId)entry.getValue();
-
-            if (ack.getFirstMessageId() == null) {
-                ack.setFirstMessageId(msgid);
-            }
-
-            iter.remove();
-            count++;
+        if (ackMode == CLIENT_ACK) {
+            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+            int count = 0;
+            for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();)
{
+
+                Map.Entry entry = (Entry)iter.next();
+                String id = (String)entry.getKey();
+                MessageId msgid = (MessageId)entry.getValue();
+
+                if (ack.getFirstMessageId() == null) {
+                    ack.setFirstMessageId(msgid);
+                }
+
+                iter.remove();
+                count++;
+
+                if (id.equals(messageId)) {
+                    ack.setLastMessageId(msgid);
+                    break;
+                }
 
-            if (id.equals(messageId)) {
-                ack.setLastMessageId(msgid);
-                break;
             }
-
+            ack.setMessageCount(count);
+        }
+        else if (ackMode == INDIVIDUAL_ACK) {
+            ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
+            MessageId msgid = (MessageId)dispatchedMessage.get(messageId);
+            ack.setMessageID(msgid);
+            dispatchedMessage.remove(messageId);
         }
-
-        ack.setMessageCount(count);
         return ack;
     }
 



Mime
View raw message