activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r692181 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java test/java/org/apache/activemq/transport/stomp/StompTest.java
Date Thu, 04 Sep 2008 18:10:47 GMT
Author: rajdavies
Date: Thu Sep  4 11:10:45 2008
New Revision: 692181

URL: http://svn.apache.org/viewvc?rev=692181&view=rev
Log:
Added patch for https://issues.apache.org/activemq/browse/AMQ-1890

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=692181&r1=692180&r2=692181&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Thu Sep  4 11:10:45 2008
@@ -45,6 +45,7 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SessionInfo;
@@ -412,6 +413,17 @@
         if (subscriptionId == null && destination == null) {
             throw new ProtocolException("Must specify the subscriptionId or the destination
you are unsubscribing from");
         }
+       
+        // check if it is a durable subscription
+        String durable = command.getHeaders().get("activemq.subscriptionName"); 
+        if (durable != null) {
+            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
+            info.setClientId(durable);
+            info.setSubscriptionName(durable);
+            info.setConnectionId(connectionId);
+            sendToActiveMQ(info, createResponseHandler(command));
+            return;
+        }
 
         // TODO: Unsubscribing using a destination is a bit wierd if multiple
         // subscriptions
@@ -426,7 +438,7 @@
                 return;
             }
         }
-
+       
         throw new ProtocolException("No subscription matched.");
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=692181&r1=692180&r2=692181&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Thu Sep  4 11:10:45 2008
@@ -35,11 +35,15 @@
 import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.commons.logging.Log;
@@ -857,6 +861,58 @@
         stompConnection.sendFrame(frame);    	
     }
     
+    public void testDurableUnsub() throws Exception {
+    	// get broker JMX view
+        MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
+        
+        String domain = "org.apache.activemq";
+        ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
+        
+    	BrokerViewMBean view = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);    	
+    	
+    	// connect
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n"
+ Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+        assertEquals(view.getDurableTopicSubscribers().length, 0);
+        
+        // subscribe
+        frame = "SUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n"
+ Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        // wait a bit for MBean to get refreshed
+        try {
+        	Thread.sleep(100);
+        } catch (InterruptedException e){}
+        
+        assertEquals(view.getDurableTopicSubscribers().length, 1);
+        // disconnect
+        frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        try {
+        	Thread.sleep(100);
+        } catch (InterruptedException e){}
+        
+        //reconnect
+        stompConnect();
+    	// connect
+        frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n"
+ Stomp.NULL;
+        stompConnection.sendFrame(frame);      
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+        
+        // unsubscribe
+        frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n"
+ Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);   
+        try {
+        	Thread.sleep(100);
+        } catch (InterruptedException e){}
+        assertEquals(view.getDurableTopicSubscribers().length, 0);
+    }
+    
     protected void assertClients(int expected) throws Exception {
         org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
         int actual = clients.length;



Mime
View raw message