activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1052259 - in /activemq/trunk: activemq-web-demo/ activemq-web-demo/src/test/java/org/apache/activemq/web/ activemq-web/src/main/java/org/apache/activemq/web/
Date Thu, 23 Dec 2010 13:17:12 GMT
Author: dejanb
Date: Thu Dec 23 13:17:12 2010
New Revision: 1052259

URL: http://svn.apache.org/viewvc?rev=1052259&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3094 - ajax does not receive all messages

Added:
    activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java
Modified:
    activemq/trunk/activemq-web-demo/pom.xml
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java

Modified: activemq/trunk/activemq-web-demo/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/pom.xml?rev=1052259&r1=1052258&r2=1052259&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/pom.xml (original)
+++ activemq/trunk/activemq-web-demo/pom.xml Thu Dec 23 13:17:12 2010
@@ -131,6 +131,14 @@
       <groupId>org.apache.derby</groupId>
       <artifactId>derby</artifactId>
     </dependency>
+
+    <dependency>
+       <groupId>commons-httpclient</groupId>
+       <artifactId>commons-httpclient</artifactId>
+       <version>3.1</version>
+       <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <properties>

Added: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java?rev=1052259&view=auto
==============================================================================
--- activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java (added)
+++ activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java Thu
Dec 23 13:17:12 2010
@@ -0,0 +1,55 @@
+package org.apache.activemq.web;
+
+
+import java.util.Enumeration;
+import javax.jms.TextMessage;
+import javax.jms.MessageProducer;
+import javax.management.ObjectName;
+
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.httpclient.*;
+import org.apache.commons.httpclient.methods.*;
+import java.util.Set;
+
+public class AjaxTest extends JettyTestSupport {
+    private static final Log LOG = LogFactory.getLog(AjaxTest.class);
+
+    private String expectedResponse = "<ajax-response>\n" +
+            "<response id='handler' destination='queue://test' >test one</response>\n"
+
+            "<response id='handler' destination='queue://test' >test two</response>\n"
+
+            "<response id='handler' destination='queue://test' >test three</response>\n"
+
+            "</ajax-response>";
+
+    public void testReceiveMultipleMessagesFromQueue() throws Exception {
+
+        MessageProducer local_producer = session.createProducer(session.createQueue("test"));
+
+        HttpClient httpClient = new HttpClient();
+        PostMethod post = new PostMethod( "http://localhost:8080/amq" );
+        post.addParameter( "destination", "queue://test" );
+        post.addParameter( "type", "listen" );
+        post.addParameter( "message", "handler" );
+        httpClient.executeMethod( post );
+
+        // send message
+        TextMessage msg1 = session.createTextMessage("test one");
+        producer.send(msg1);
+        TextMessage msg2 = session.createTextMessage("test two");
+        producer.send(msg2);
+        TextMessage msg3 = session.createTextMessage("test three");
+        producer.send(msg3);
+
+        HttpMethod get = new GetMethod( "http://localhost:8080/amq?timeout=5000" );
+        httpClient.executeMethod( get );
+        byte[] responseBody = get.getResponseBody();
+        String response = new String( responseBody );
+
+        LOG.info("Poll response: " + response);
+        assertEquals("Poll response not right", expectedResponse.trim(), response.trim());
+    }
+
+}

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java?rev=1052259&r1=1052258&r2=1052259&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java Thu
Dec 23 13:17:12 2010
@@ -27,6 +27,8 @@ import org.apache.commons.logging.LogFac
 
 import org.apache.activemq.MessageAvailableListener;
 
+import java.util.LinkedList;
+
 /*
  * Listen for available messages and wakeup any continuations.
  */
@@ -37,10 +39,12 @@ public class AjaxListener implements Mes
     private AjaxWebClient client;
     private long lastAccess;
     private Continuation continuation;
+    private LinkedList<Message> unconsumedMessages = new LinkedList<Message>();
 
     AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
         this.client = client;
         this.maximumReadTimeout = maximumReadTimeout;
+        access();
     }
 
     public void access() {
@@ -51,9 +55,13 @@ public class AjaxListener implements Mes
         this.continuation = continuation;
     }
 
+    public LinkedList<Message> getUnconsumedMessages() {
+        return unconsumedMessages;
+    }
+
     public synchronized void onMessageAvailable(MessageConsumer consumer) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("message for " + consumer + "continuation=" + continuation);
+            LOG.debug("message for " + consumer + " continuation=" + continuation);
         }
         if (continuation != null) {
             try {
@@ -70,6 +78,15 @@ public class AjaxListener implements Mes
                     client.closeConsumers();
                 };
             }.start();
+        } else {
+            try {
+                Message message = consumer.receive(10);
+                if (message != null) {
+                    unconsumedMessages.addLast(message);
+                }
+            } catch (Exception e) {
+                LOG.error("Error receiving message " + e, e);
+            }
         }
     }
 }

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=1052259&r1=1052258&r2=1052259&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
(original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
Thu Dec 23 13:17:12 2010
@@ -20,13 +20,7 @@ package org.apache.activemq.web;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -297,10 +291,10 @@ public class MessageListenerServlet exte
             response.setContentType("text/xml");
             response.setHeader("Cache-Control", "no-cache");
             
-            if (message == null) {
+            if (message == null && client.getListener().getUnconsumedMessages().size()
== 0) {
                 Continuation continuation = ContinuationSupport.getContinuation(request);
                 
-                if (continuation.isExpired()) {  
+                if (continuation.isExpired()) {
                     response.setStatus(HttpServletResponse.SC_OK);
                     StringWriter swriter = new StringWriter();
                     PrintWriter writer = new PrintWriter(swriter);
@@ -319,6 +313,7 @@ public class MessageListenerServlet exte
                 
                 // Fetch the listeners
                 AjaxListener listener = client.getListener();
+                listener.access();
 
                 // register this continuation with our listener.
                 listener.setContinuation(continuation);
@@ -350,6 +345,18 @@ public class MessageListenerServlet exte
                     continue;
                 }
 
+                LinkedList<Message> unconsumedMessages = ((AjaxListener)consumer.getAvailableListener()).getUnconsumedMessages();
+                LOG.debug("Send " + unconsumedMessages.size() + " unconsumed messages");
+                for (Message msg : unconsumedMessages) {
+                    messages++;
+                    String id = consumerIdMap.get(consumer);
+                    String destinationName = consumerDestinationNameMap.get(consumer);
+                    writeMessageResponse(writer, msg, id, destinationName);
+                    if (messages >= maximumMessages) {
+                        break;
+                    }
+                }
+
                 // Look for any available messages
                 while (messages < maximumMessages) {
                     message = consumer.receiveNoWait();



Mime
View raw message