activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r394710 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/Subscription.java test/java/org/apache/activemq/transport/stomp/StompTest.java
Date Mon, 17 Apr 2006 15:36:06 GMT
Author: chirino
Date: Mon Apr 17 08:36:04 2006
New Revision: 394710

URL: http://svn.apache.org/viewcvs?rev=394710&view=rev
Log:
Missing synchronization would cause acks to not be delivered to the broker.  After enough
acks were missed, 
the consumer would stop receiving messages due to the broker thinking the consumers prefetch
is full.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java?rev=394710&r1=394709&r2=394710&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Subscription.java
Mon Apr 17 08:36:04 2006
@@ -93,7 +93,7 @@
         out.write(builder.toFrame());
     }
 
-    private void addMessageDispatch(MessageDispatch md) {
+    synchronized private void addMessageDispatch(MessageDispatch md) {
         dispatchedMessages.addLast(md);
     }
 
@@ -117,7 +117,7 @@
         return subscriptionId;
     }
 
-    public MessageAck createMessageAck(String message_id) {
+    synchronized public MessageAck createMessageAck(String message_id) {
         MessageAck ack = new MessageAck();
         ack.setDestination(consumerInfo.getDestination());
         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
@@ -136,6 +136,7 @@
             count++;
             if( id.equals(message_id)  ) {
                 ack.setLastMessageId(md.getMessage().getMessageId());
+                break;
             }
         }
         ack.setMessageCount(count);

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=394710&r1=394709&r2=394710&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Mon Apr 17 08:36:04 2006
@@ -96,6 +96,8 @@
             if( c < 0 ) {
                 throw new IOException("socket closed.");
             } else if( c == 0 ) {
+                c = is.read();
+                assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
                 byte[] ba = inputBuffer.toByteArray();
                 inputBuffer.reset();
                 return new String(ba, "UTF-8");



Mime
View raw message