activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject svn commit: r1064725 - in /activemq/trunk: activemq-web-demo/ activemq-web-demo/src/test/java/org/apache/activemq/web/ activemq-web/src/main/java/org/apache/activemq/web/
Date Fri, 28 Jan 2011 15:21:04 GMT
Author: alexd
Date: Fri Jan 28 15:21:02 2011
New Revision: 1064725

URL: http://svn.apache.org/viewvc?rev=1064725&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3094 - ajax does not receive all messages 
 * ensure delivery of messages received by AjaxListener when a continuation is not available
for resumption.
 * add test coverage for several common uses of AjaxServlet
 * switch back to jetty httpclient for better processing of asynchronous HTTP

Modified:
    activemq/trunk/activemq-web-demo/pom.xml
    activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java
    activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java

Modified: activemq/trunk/activemq-web-demo/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/pom.xml?rev=1064725&r1=1064724&r2=1064725&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/pom.xml (original)
+++ activemq/trunk/activemq-web-demo/pom.xml Fri Jan 28 15:21:02 2011
@@ -132,13 +132,6 @@
       <artifactId>derby</artifactId>
     </dependency>
 
-    <dependency>
-       <groupId>commons-httpclient</groupId>
-       <artifactId>commons-httpclient</artifactId>
-       <version>3.1</version>
-       <scope>test</scope>
-    </dependency>
-
   </dependencies>
 
   <properties>

Modified: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java?rev=1064725&r1=1064724&r2=1064725&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java (original)
+++ activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java Fri
Jan 28 15:21:02 2011
@@ -16,52 +16,451 @@
  */
 package org.apache.activemq.web;
 
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.transport.stomp.Stomp;
+
+import java.lang.Thread;
+import java.net.SocketTimeoutException;
 
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpMethod;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.*;
+import org.eclipse.jetty.io.Buffer;
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.io.ByteArrayBuffer;
 
 import javax.jms.MessageProducer;
+import javax.jms.Message;
 import javax.jms.TextMessage;
 
 public class AjaxTest extends JettyTestSupport {
     private static final Log LOG = LogFactory.getLog(AjaxTest.class);
-
-    private String expectedResponse = "<ajax-response>\n" +
-            "<response id='handler' destination='queue://test' >test one</response>\n"
+
+    
+    private class AjaxTestContentExchange extends ContentExchange  {
+        private HashMap<String,String> headers;
+        private String responseContent;
+        
+        AjaxTestContentExchange() {
+            super(true);
+            this.headers = new HashMap<String,String>();
+            this.responseContent = "";
+        }
+        protected void onResponseContent( Buffer content ) {
+            this.responseContent += content.toString();
+        }
+        protected void onResponseHeader( Buffer name, Buffer value ) {
+          headers.put( name.toString(), value.toString() );
+        }
+        public String getJsessionId() {
+            String cookie = headers.get( "Set-Cookie" );
+            String[] cookie_parts = cookie.split( ";" );
+            return cookie_parts[0];
+        }
+        public String getResponseContent() {
+            return responseContent;
+        }
+    }
+    
+    public void assertContains( String expected, String actual ) {
+        assertTrue( "'"+actual+"' does not contain expected fragment '"+expected+"'", actual.indexOf(
expected ) != -1 );
+    }
+    public void assertResponseCount( int expected, String actual ) {
+        int occurrences = StringUtils.countMatches( actual, "<response" );
+        assertEquals( "Expected number of <response> elements is not correct.", expected,
occurrences );
+    }
+    
+    public void testAjaxClientReceivesMessagesWhichAreSentToQueueWhileClientIsPolling() throws
Exception {
+        LOG.debug( "*** testAjaxClientReceivesMessagesWhichAreSentToQueueWhileClientIsPolling
***" );
+        
+        HttpClient httpClient = new HttpClient();
+        httpClient.start();
+        httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+        
+        // client 1 subscribes to a queue
+        LOG.debug( "SENDING LISTEN" );
+        AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestContent( new ByteArrayBuffer("destination=queue://test&type=listen&message=handler")
);
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8"
);
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        String jsessionid = contentExchange.getJsessionId();
+        
+        // client 1 polls for messages
+        LOG.debug( "SENDING POLL" );
+        AjaxTestContentExchange poll = new AjaxTestContentExchange();
+        poll.setMethod( "GET" );
+        poll.setURL("http://localhost:8080/amq?timeout=5000");
+        poll.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send( poll );
+        
+        // while client 1 is polling, client 2 sends messages to the queue
+        LOG.debug( "SENDING MESSAGES" );
+        contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestContent( new ByteArrayBuffer(
+            "destination=queue://test&type=send&message=msg1&"+
+            "d1=queue://test&t1=send&m1=msg2&"+
+            "d2=queue://test&t2=send&m2=msg3"
+        ) );
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8"
);
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        LOG.debug( "DONE POSTING MESSAGES" );
+        
+        // wait for poll to finish
+        poll.waitForDone();
+        String response = poll.getResponseContent();
+        
+        // messages might not all be delivered during the 1st poll.  We need to check again.
+        poll = new AjaxTestContentExchange();
+        poll.setMethod( "GET" );
+        poll.setURL("http://localhost:8080/amq?timeout=5000");
+        poll.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send( poll );
+        poll.waitForDone();
+        
+        String fullResponse = response + poll.getResponseContent();
+        LOG.debug( "full response : " + fullResponse );
+        
+        assertContains( "<response id='handler' destination='queue://test' >msg1</response>\n",
fullResponse );
+        assertContains( "<response id='handler' destination='queue://test' >msg2</response>\n",
fullResponse );
+        assertContains( "<response id='handler' destination='queue://test' >msg3</response>\n",
fullResponse );
+        assertResponseCount( 3, fullResponse );
+    }
+    
+    public void testAjaxClientReceivesMessagesWhichAreSentToTopicWhileClientIsPolling() throws
Exception {
+        LOG.debug( "*** testAjaxClientReceivesMessagesWhichAreSentToTopicWhileClientIsPolling
***" );
+        
+        HttpClient httpClient = new HttpClient();
+        httpClient.start();
+        httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+        
+        // client 1 subscribes to a queue
+        LOG.debug( "SENDING LISTEN" );
+        AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestContent( new ByteArrayBuffer("destination=topic://test&type=listen&message=handler")
);
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8"
);
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        String jsessionid = contentExchange.getJsessionId();
+        
+        // client 1 polls for messages
+        LOG.debug( "SENDING POLL" );
+        AjaxTestContentExchange poll = new AjaxTestContentExchange();
+        poll.setMethod( "GET" );
+        poll.setURL("http://localhost:8080/amq?timeout=5000");
+        poll.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send( poll );
+        
+        // while client 1 is polling, client 2 sends messages to the queue
+        LOG.debug( "SENDING MESSAGES" );
+        contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestContent( new ByteArrayBuffer(
+            "destination=topic://test&type=send&message=msg1&"+
+            "d1=topic://test&t1=send&m1=msg2&"+
+            "d2=topic://test&t2=send&m2=msg3"
+        ) );
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8"
);
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        LOG.debug( "DONE POSTING MESSAGES" );
+        
+        // wait for poll to finish
+        poll.waitForDone();
+        String response = poll.getResponseContent();
+        
+        // not all messages might be delivered during the 1st poll.  We need to check again.
+        poll = new AjaxTestContentExchange();
+        poll.setMethod( "GET" );
+        poll.setURL("http://localhost:8080/amq?timeout=5000");
+        poll.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send( poll );
+        poll.waitForDone();
+        
+        String fullResponse = response + poll.getResponseContent();
+        LOG.debug( "full response : " + fullResponse );
+        
+        assertContains( "<response id='handler' destination='topic://test' >msg1</response>\n",
fullResponse );
+        assertContains( "<response id='handler' destination='topic://test' >msg2</response>\n",
fullResponse );
+        assertContains( "<response id='handler' destination='topic://test' >msg3</response>\n",
fullResponse );
+        assertResponseCount( 3, fullResponse );
+    }
+    
+    public void testAjaxClientReceivesMessagesWhichAreQueuedBeforeClientSubscribes() throws
Exception {
+        LOG.debug( "*** testAjaxClientReceivesMessagesWhichAreQueuedBeforeClientSubscribes
***" );
+        // send messages to queue://test
+        producer.send( session.createTextMessage("test one") );
+        producer.send( session.createTextMessage("test two") );
+        producer.send( session.createTextMessage("test three") );
+        
+        HttpClient httpClient = new HttpClient();
+        httpClient.start();
+        httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+        
+        // client 1 subscribes to queue
+        LOG.debug( "SENDING LISTEN" );
+        AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestContent( new ByteArrayBuffer("destination=queue://test&type=listen&message=handler")
);
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8"
);
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        String jsessionid = contentExchange.getJsessionId();
+        
+        // client 1 polls for messages
+        LOG.debug( "SENDING POLL" );
+        AjaxTestContentExchange poll = new AjaxTestContentExchange();
+        poll.setMethod( "GET" );
+        poll.setURL("http://localhost:8080/amq?timeout=5000");
+        poll.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send( poll );
+        
+        // wait for poll to finish
+        poll.waitForDone();
+        String response = poll.getResponseContent();
+        
+        assertContains( "<response id='handler' destination='queue://test' >test one</response>\n",
response );
+        assertContains( "<response id='handler' destination='queue://test' >test two</response>\n",
response );
+        assertContains( "<response id='handler' destination='queue://test' >test three</response>\n",
response );
+        assertResponseCount( 3, response );
+    }
+    
+    public void testStompMessagesAreReceivedByAjaxClient() throws Exception {
+        LOG.debug( "*** testStompMessagesAreRecievedByAjaxClient ***" );
+        
+        HttpClient httpClient = new HttpClient();
+        httpClient.start();
+        httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+        
+        // client 1 subscribes to a queue
+        LOG.debug( "SENDING LISTEN" );
+        AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestContent( new ByteArrayBuffer("destination=queue://test&type=listen&message=handler")
);
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8"
);
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        String jsessionid = contentExchange.getJsessionId();
+        
+        // client 1 polls for messages
+        LOG.debug( "SENDING POLL" );
+        AjaxTestContentExchange poll = new AjaxTestContentExchange();
+        poll.setMethod( "GET" );
+        poll.setURL("http://localhost:8080/amq?timeout=5000");
+        poll.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send( poll );
+        
+        // stomp client queues some messages
+        StompConnection connection = new StompConnection();
+        connection.open("localhost", 61613);
+        connection.connect("user", "password");
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put( "amq-msg-type", "text" );
+        connection.send( "/queue/test", "message1", (String)null, headers );
+        connection.send( "/queue/test", "message2", (String)null, headers );
+        connection.send( "/queue/test", "message3", (String)null, headers );
+        connection.send( "/queue/test", "message4", (String)null, headers );
+        connection.send( "/queue/test", "message5", (String)null, headers );
+        connection.disconnect();
+        
+        // wait for poll to finish
+        poll.waitForDone();
+        String response = poll.getResponseContent();
+        
+        // not all messages might be delivered during the 1st poll.  We need to check again.
+        poll = new AjaxTestContentExchange();
+        poll.setMethod( "GET" );
+        poll.setURL("http://localhost:8080/amq?timeout=5000");
+        poll.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send( poll );
+        poll.waitForDone();
+        
+        String fullResponse = response + poll.getResponseContent();
+        
+        assertContains( "<response id='handler' destination='queue://test' >message1</response>\n",
fullResponse );
+        assertContains( "<response id='handler' destination='queue://test' >message2</response>\n",
fullResponse );
+        assertContains( "<response id='handler' destination='queue://test' >message3</response>\n",
fullResponse );
+        assertContains( "<response id='handler' destination='queue://test' >message4</response>\n",
fullResponse );
+        assertContains( "<response id='handler' destination='queue://test' >message5</response>\n",
fullResponse );
+        assertResponseCount( 5, fullResponse );
+    }
+    
+    public void testAjaxMessagesAreReceivedByStompClient() throws Exception {
+        LOG.debug( "*** testAjaxMessagesAreReceivedByStompClient ***" );
+        
+        HttpClient httpClient = new HttpClient();
+        httpClient.start();
+        httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+        
+        AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestContent( new ByteArrayBuffer(
+            "destination=queue://test&type=send&message=msg1&"+
+            "d1=queue://test&t1=send&m1=msg2&"+
+            "d2=queue://test&t2=send&m2=msg3&"+
+            "d3=queue://test&t3=send&m3=msg4") );
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8"
);
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        
+        StompConnection connection = new StompConnection();
+        connection.open("localhost", 61613);
+        connection.connect("user", "password");
+        connection.subscribe( "/queue/test" );
+        
+        StompFrame message;
+        String allMessageBodies = "";
+        try {
+            while( true ) {
+                message = connection.receive(5000);
+                allMessageBodies = allMessageBodies +"\n"+ message.getBody();
+            }
+        } catch (SocketTimeoutException e) {}
+        
+        LOG.debug( "All message bodies : " + allMessageBodies );
+        
+        assertContains( "msg1", allMessageBodies );
+        assertContains( "msg2", allMessageBodies );
+        assertContains( "msg3", allMessageBodies );
+        assertContains( "msg4", allMessageBodies );
+    }
+    
+    public void testAjaxClientMayUseSelectors() throws Exception {
+        LOG.debug( "*** testAjaxClientMayUseSelectors ***" );
+        
+        // send 2 messages to the same queue w/ different 'filter' values.
+        Message msg = session.createTextMessage("test one");
+        msg.setStringProperty( "filter", "one" );
+        producer.send( msg );
+        msg = session.createTextMessage("test two");
+        msg.setStringProperty( "filter", "two" );
+        producer.send( msg );
+        
+        HttpClient httpClient = new HttpClient();
+        httpClient.start();
+        httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+        
+        // client ubscribes to queue
+        LOG.debug( "SENDING LISTEN" );
+        AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestContent( new ByteArrayBuffer("destination=queue://test&type=listen&message=handler")
);
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8"
);
+        // SELECTOR
+        contentExchange.setRequestHeader( "selector", "filter='two'" );
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        String jsessionid = contentExchange.getJsessionId();
+        
+        // client 1 polls for messages
+        LOG.debug( "SENDING POLL" );
+        AjaxTestContentExchange poll = new AjaxTestContentExchange();
+        poll.setMethod( "GET" );
+        poll.setURL("http://localhost:8080/amq?timeout=5000");
+        poll.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send( poll );
+        poll.waitForDone();
+        
+        LOG.debug( poll.getResponseContent() );
+        
+        String expected = "<ajax-response>\n" +
             "<response id='handler' destination='queue://test' >test two</response>\n"
+
-            "<response id='handler' destination='queue://test' >test three</response>\n"
+
-            "</ajax-response>";
-
-    public void testReceiveMultipleMessagesFromQueue() throws Exception {
-
-        MessageProducer local_producer = session.createProducer(session.createQueue("test"));
-
+            "</ajax-response>\n";
+        assertEquals( "Poll response is not correct.", expected, poll.getResponseContent()
);
+        
+    }
+    
+    public void testMultipleAjaxClientsMayExistInTheSameSession() throws Exception {
+        LOG.debug( "*** testMultipleAjaxClientsMayExistInTheSameSession ***" );
+        
+        // send messages to queues testA and testB.
+        MessageProducer producerA = session.createProducer(session.createQueue("testA"));
+        MessageProducer producerB = session.createProducer(session.createQueue("testB"));
+        producerA.send( session.createTextMessage("A1") );
+        producerA.send( session.createTextMessage("A2") );
+        producerB.send( session.createTextMessage("B1") );
+        producerB.send( session.createTextMessage("B2") );
+        
         HttpClient httpClient = new HttpClient();
-        PostMethod post = new PostMethod( "http://localhost:8080/amq" );
-        post.addParameter( "destination", "queue://test" );
-        post.addParameter( "type", "listen" );
-        post.addParameter( "message", "handler" );
-        httpClient.executeMethod( post );
-
-        // send message
-        TextMessage msg1 = session.createTextMessage("test one");
-        producer.send(msg1);
-        TextMessage msg2 = session.createTextMessage("test two");
-        producer.send(msg2);
-        TextMessage msg3 = session.createTextMessage("test three");
-        producer.send(msg3);
-
-        HttpMethod get = new GetMethod( "http://localhost:8080/amq?timeout=5000" );
-        httpClient.executeMethod( get );
-        byte[] responseBody = get.getResponseBody();
-        String response = new String( responseBody );
-
-        LOG.info("Poll response: " + response);
-        assertEquals("Poll response not right", expectedResponse.trim(), response.trim());
+        httpClient.start();
+        httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+        
+        // clientA subscribes to /queue/testA
+        LOG.debug( "SENDING LISTEN" );
+        AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestContent( new ByteArrayBuffer(
+            "destination=queue://testA&"+
+            "type=listen&"+
+            "message=handlerA&"+
+            "clientId=clientA"
+        ) );
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8"
);
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        String jsessionid = contentExchange.getJsessionId();
+        
+        // clientB subscribes to /queue/testB using the same JSESSIONID.
+        contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestHeader( "Cookie", jsessionid );
+        contentExchange.setRequestContent( new ByteArrayBuffer(
+            "destination=queue://testB&"+
+            "type=listen&"+
+            "message=handlerB&"+
+            "clientId=clientB"
+        ) );
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8"
);
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        
+        // clientA polls for messages
+        AjaxTestContentExchange poll = new AjaxTestContentExchange();
+        poll.setMethod( "GET" );
+        poll.setURL("http://localhost:8080/amq?timeout=5000&clientId=clientA");
+        poll.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send( poll );
+        poll.waitForDone();
+        
+        LOG.debug( "clientA response : " + poll.getResponseContent() );
+        String expected = "<ajax-response>\n" +
+            "<response id='handlerA' destination='queue://testA' >A1</response>\n"
+
+            "<response id='handlerA' destination='queue://testA' >A2</response>\n"
+
+            "</ajax-response>\n";
+        assertEquals( "Poll response is not correct.", expected, poll.getResponseContent()
);
+        
+        // clientB polls for messages
+        poll = new AjaxTestContentExchange();
+        poll.setMethod( "GET" );
+        poll.setURL("http://localhost:8080/amq?timeout=5000&clientId=clientB");
+        poll.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send( poll );
+        poll.waitForDone();
+        
+        LOG.debug( "clientB response : " + poll.getResponseContent() );
+        expected = "<ajax-response>\n" +
+            "<response id='handlerB' destination='queue://testB' >B1</response>\n"
+
+            "<response id='handlerB' destination='queue://testB' >B2</response>\n"
+
+            "</ajax-response>\n";
+        assertEquals( "Poll response is not correct.", expected, poll.getResponseContent()
);
     }
-
+    
 }

Modified: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java?rev=1064725&r1=1064724&r2=1064725&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java
(original)
+++ activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java
Fri Jan 28 15:21:02 2011
@@ -51,6 +51,7 @@ public class JettyTestSupport extends Te
         broker.setPersistent(false);
         broker.setUseJmx(true);
         broker.addConnector("tcp://localhost:61616");
+        broker.addConnector("stomp://localhost:61613");
         broker.start();
         broker.waitUntilStarted();
         

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java?rev=1064725&r1=1064724&r2=1064725&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java Fri
Jan 28 15:21:02 2011
@@ -66,12 +66,22 @@ public class AjaxListener implements Mes
         if (continuation != null) {
             try {
                 Message message = consumer.receive(10);
-                continuation.setAttribute("message", message);
-                continuation.setAttribute("consumer", consumer);
+                LOG.debug( "message is " + message );
+                if( message != null ) {
+                    if( continuation.isSuspended() ) {
+                        LOG.debug( "Resuming suspended continuation " + continuation );
+                        continuation.setAttribute("message", message);
+                        continuation.setAttribute("consumer", consumer);
+                        continuation.resume();
+                    } else {
+                        LOG.debug( "Message available, but continuation is already resumed.
 Buffer for next time." );
+                        bufferMessageForDelivery( message );
+                    }
+                }
             } catch (Exception e) {
                 LOG.error("Error receiving message " + e, e);
             }
-            continuation.resume();
+            
         } else if (System.currentTimeMillis() - lastAccess > 2 * this.maximumReadTimeout)
{
             new Thread() {
                 public void run() {
@@ -81,12 +91,18 @@ public class AjaxListener implements Mes
         } else {
             try {
                 Message message = consumer.receive(10);
-                if (message != null) {
-                    unconsumedMessages.addLast(message);
-                }
+                bufferMessageForDelivery( message );
             } catch (Exception e) {
                 LOG.error("Error receiving message " + e, e);
             }
         }
     }
+    
+    public void bufferMessageForDelivery( Message message ) {
+        if( message != null ) {
+            synchronized( unconsumedMessages ) {
+                unconsumedMessages.addLast(message);
+            }
+        }
+    }
 }

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=1064725&r1=1064724&r2=1064725&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
(original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
Fri Jan 28 15:21:02 2011
@@ -264,6 +264,8 @@ public class MessageListenerServlet exte
         }
 
         Message message = null;
+        // this is non-null if we're resuming the continuation.
+        // attributes set in AjaxListener
         message = (Message)request.getAttribute("message"); 
         
         synchronized (client) {
@@ -310,6 +312,7 @@ public class MessageListenerServlet exte
 
                 continuation.setTimeout(timeout);
                 continuation.suspend();
+                LOG.debug( "Suspending continuation " + continuation );
                 
                 // Fetch the listeners
                 AjaxListener listener = client.getListener();
@@ -347,13 +350,17 @@ public class MessageListenerServlet exte
 
                 LinkedList<Message> unconsumedMessages = ((AjaxListener)consumer.getAvailableListener()).getUnconsumedMessages();
                 LOG.debug("Send " + unconsumedMessages.size() + " unconsumed messages");
-                for (Message msg : unconsumedMessages) {
-                    messages++;
-                    String id = consumerIdMap.get(consumer);
-                    String destinationName = consumerDestinationNameMap.get(consumer);
-                    writeMessageResponse(writer, msg, id, destinationName);
-                    if (messages >= maximumMessages) {
-                        break;
+                synchronized( unconsumedMessages ) {
+                    for (Iterator<Message> it = unconsumedMessages.iterator(); it.hasNext();
) {
+                        messages++;
+                        Message msg = it.next();
+                        String id = consumerIdMap.get(consumer);
+                        String destinationName = consumerDestinationNameMap.get(consumer);
+                        writeMessageResponse(writer, msg, id, destinationName);
+                        it.remove();
+                        if (messages >= maximumMessages) {
+                            break;
+                        }
                     }
                 }
 



Mime
View raw message