activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1182049 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ test/java/org/apache/activemq/transport/stomp/
Date Tue, 11 Oct 2011 19:39:45 GMT
Author: tabish
Date: Tue Oct 11 19:39:45 2011
New Revision: 1182049

URL: http://svn.apache.org/viewvc?rev=1182049&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3481

Further refine this fix to address some test failures. 

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1182049&r1=1182048&r2=1182049&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Tue Oct 11 19:39:45 2011
@@ -166,7 +166,16 @@ public class ProtocolConverter {
             command.setResponseRequired(true);
             resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
         }
-        stompTransport.sendToActiveMQ(command);
+        stompTransport.asyncSendToActiveMQ(command);
+    }
+
+    protected void asyncSendToActiveMQ(Command command, ResponseHandler handler) {
+        command.setCommandId(generateCommandId());
+        if (handler != null) {
+            command.setResponseRequired(true);
+            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
+        }
+        stompTransport.asyncSendToActiveMQ(command);
     }
 
     protected void sendToStomp(StompFrame command) throws IOException {
@@ -292,7 +301,7 @@ public class ProtocolConverter {
         }
 
         message.onSend();
-        sendToActiveMQ(message, createResponseHandler(command));
+        asyncSendToActiveMQ(message, createResponseHandler(command));
     }
 
     protected void onStompNack(StompFrame command) throws ProtocolException {
@@ -329,7 +338,7 @@ public class ProtocolConverter {
             if (sub != null) {
                 MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
                 if (ack != null) {
-                    sendToActiveMQ(ack, createResponseHandler(command));
+                    asyncSendToActiveMQ(ack, createResponseHandler(command));
                 } else {
                     throw new ProtocolException("Unexpected NACK received for message-id
[" + messageId + "]");
                 }
@@ -368,7 +377,7 @@ public class ProtocolConverter {
             if (sub != null) {
                 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
                 if (ack != null) {
-                    sendToActiveMQ(ack, createResponseHandler(command));
+                    asyncSendToActiveMQ(ack, createResponseHandler(command));
                     acked = true;
                 }
             }
@@ -382,7 +391,7 @@ public class ProtocolConverter {
             for (StompSubscription sub : subscriptionsByConsumerId.values()) {
                 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
                 if (ack != null) {
-                    sendToActiveMQ(ack, createResponseHandler(command));
+                    asyncSendToActiveMQ(ack, createResponseHandler(command));
                     acked = true;
                     break;
                 }
@@ -417,7 +426,7 @@ public class ProtocolConverter {
         tx.setTransactionId(activemqTx);
         tx.setType(TransactionInfo.BEGIN);
 
-        sendToActiveMQ(tx, createResponseHandler(command));
+        asyncSendToActiveMQ(tx, createResponseHandler(command));
     }
 
     protected void onStompCommit(StompFrame command) throws ProtocolException {
@@ -444,7 +453,7 @@ public class ProtocolConverter {
         tx.setTransactionId(activemqTx);
         tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
 
-        sendToActiveMQ(tx, createResponseHandler(command));
+        asyncSendToActiveMQ(tx, createResponseHandler(command));
     }
 
     protected void onStompAbort(StompFrame command) throws ProtocolException {
@@ -473,7 +482,7 @@ public class ProtocolConverter {
         tx.setTransactionId(activemqTx);
         tx.setType(TransactionInfo.ROLLBACK);
 
-        sendToActiveMQ(tx, createResponseHandler(command));
+        asyncSendToActiveMQ(tx, createResponseHandler(command));
     }
 
     protected void onStompSubscribe(StompFrame command) throws ProtocolException {
@@ -541,7 +550,7 @@ public class ProtocolConverter {
 
         // dispatch can beat the receipt so send it early
         sendReceipt(command);
-        sendToActiveMQ(consumerInfo, null);
+        asyncSendToActiveMQ(consumerInfo, null);
     }
 
     protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
@@ -570,7 +579,7 @@ public class ProtocolConverter {
             info.setClientId(durable);
             info.setSubscriptionName(durable);
             info.setConnectionId(connectionId);
-            sendToActiveMQ(info, createResponseHandler(command));
+            asyncSendToActiveMQ(info, createResponseHandler(command));
             return;
         }
 
@@ -578,7 +587,7 @@ public class ProtocolConverter {
 
             StompSubscription sub = this.subscriptions.remove(subscriptionId);
             if (sub != null) {
-                sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
+                asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
                 return;
             }
 
@@ -589,7 +598,7 @@ public class ProtocolConverter {
             for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator();
iter.hasNext();) {
                 StompSubscription sub = iter.next();
                 if (destination != null && destination.equals(sub.getDestination()))
{
-                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
+                    asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
                     iter.remove();
                     return;
                 }
@@ -712,8 +721,8 @@ public class ProtocolConverter {
 
     protected void onStompDisconnect(StompFrame command) throws ProtocolException {
         checkConnected();
-        sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
-        sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
+        asyncSendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
+        asyncSendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
         connected.set(false);
     }
 
@@ -778,7 +787,7 @@ public class ProtocolConverter {
         ActiveMQDestination rc = tempDestinations.get(name);
         if( rc == null ) {
             rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
-            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE,
rc), null);
+            asyncSendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE,
rc), null);
             tempDestinations.put(name, rc);
         }
         return rc;
@@ -788,7 +797,7 @@ public class ProtocolConverter {
         ActiveMQDestination rc = tempDestinations.get(name);
         if( rc == null ) {
             rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
-            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE,
rc), null);
+            asyncSendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE,
rc), null);
             tempDestinations.put(name, rc);
             tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1182049&r1=1182048&r2=1182049&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
Tue Oct 11 19:39:45 2011
@@ -115,7 +115,7 @@ public class StompSubscription {
 
         if (!unconsumedMessage.isEmpty()) {
             MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE,
unconsumedMessage.size());
-            protocolConverter.getStompTransport().sendToActiveMQ(ack);
+            protocolConverter.getStompTransport().asyncSendToActiveMQ(ack);
             unconsumedMessage.clear();
         }
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1182049&r1=1182048&r2=1182049&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Tue Oct 11 19:39:45 2011
@@ -1588,7 +1588,6 @@ public class StompTest extends Combinati
         stompConnection.connect("system", "manager");
 
         HashMap<String, String> headers = new HashMap<String, String>();
-        long timestamp = System.currentTimeMillis();
         headers.put(Stomp.Headers.Send.REPLY_TO, "JustAString");
         headers.put(Stomp.Headers.Send.PERSISTENT, "true");
 



Mime
View raw message