Author: dejanb
Date: Thu Dec 23 13:17:12 2010
New Revision: 1052259
URL: http://svn.apache.org/viewvc?rev=1052259&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3094 - ajax does not receive all messages
Added:
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java
Modified:
activemq/trunk/activemq-web-demo/pom.xml
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
Modified: activemq/trunk/activemq-web-demo/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/pom.xml?rev=1052259&r1=1052258&r2=1052259&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/pom.xml (original)
+++ activemq/trunk/activemq-web-demo/pom.xml Thu Dec 23 13:17:12 2010
@@ -131,6 +131,14 @@
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ <version>3.1</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<properties>
Added: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java?rev=1052259&view=auto
==============================================================================
--- activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java (added)
+++ activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java Thu
Dec 23 13:17:12 2010
@@ -0,0 +1,55 @@
+package org.apache.activemq.web;
+
+
+import java.util.Enumeration;
+import javax.jms.TextMessage;
+import javax.jms.MessageProducer;
+import javax.management.ObjectName;
+
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.httpclient.*;
+import org.apache.commons.httpclient.methods.*;
+import java.util.Set;
+
+public class AjaxTest extends JettyTestSupport {
+ private static final Log LOG = LogFactory.getLog(AjaxTest.class);
+
+ private String expectedResponse = "<ajax-response>\n" +
+ "<response id='handler' destination='queue://test' >test one</response>\n"
+
+ "<response id='handler' destination='queue://test' >test two</response>\n"
+
+ "<response id='handler' destination='queue://test' >test three</response>\n"
+
+ "</ajax-response>";
+
+ public void testReceiveMultipleMessagesFromQueue() throws Exception {
+
+ MessageProducer local_producer = session.createProducer(session.createQueue("test"));
+
+ HttpClient httpClient = new HttpClient();
+ PostMethod post = new PostMethod( "http://localhost:8080/amq" );
+ post.addParameter( "destination", "queue://test" );
+ post.addParameter( "type", "listen" );
+ post.addParameter( "message", "handler" );
+ httpClient.executeMethod( post );
+
+ // send message
+ TextMessage msg1 = session.createTextMessage("test one");
+ producer.send(msg1);
+ TextMessage msg2 = session.createTextMessage("test two");
+ producer.send(msg2);
+ TextMessage msg3 = session.createTextMessage("test three");
+ producer.send(msg3);
+
+ HttpMethod get = new GetMethod( "http://localhost:8080/amq?timeout=5000" );
+ httpClient.executeMethod( get );
+ byte[] responseBody = get.getResponseBody();
+ String response = new String( responseBody );
+
+ LOG.info("Poll response: " + response);
+ assertEquals("Poll response not right", expectedResponse.trim(), response.trim());
+ }
+
+}
Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java?rev=1052259&r1=1052258&r2=1052259&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java Thu
Dec 23 13:17:12 2010
@@ -27,6 +27,8 @@ import org.apache.commons.logging.LogFac
import org.apache.activemq.MessageAvailableListener;
+import java.util.LinkedList;
+
/*
* Listen for available messages and wakeup any continuations.
*/
@@ -37,10 +39,12 @@ public class AjaxListener implements Mes
private AjaxWebClient client;
private long lastAccess;
private Continuation continuation;
+ private LinkedList<Message> unconsumedMessages = new LinkedList<Message>();
AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
this.client = client;
this.maximumReadTimeout = maximumReadTimeout;
+ access();
}
public void access() {
@@ -51,9 +55,13 @@ public class AjaxListener implements Mes
this.continuation = continuation;
}
+ public LinkedList<Message> getUnconsumedMessages() {
+ return unconsumedMessages;
+ }
+
public synchronized void onMessageAvailable(MessageConsumer consumer) {
if (LOG.isDebugEnabled()) {
- LOG.debug("message for " + consumer + "continuation=" + continuation);
+ LOG.debug("message for " + consumer + " continuation=" + continuation);
}
if (continuation != null) {
try {
@@ -70,6 +78,15 @@ public class AjaxListener implements Mes
client.closeConsumers();
};
}.start();
+ } else {
+ try {
+ Message message = consumer.receive(10);
+ if (message != null) {
+ unconsumedMessages.addLast(message);
+ }
+ } catch (Exception e) {
+ LOG.error("Error receiving message " + e, e);
+ }
}
}
}
Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=1052259&r1=1052258&r2=1052259&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
(original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
Thu Dec 23 13:17:12 2010
@@ -20,13 +20,7 @@ package org.apache.activemq.web;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -297,10 +291,10 @@ public class MessageListenerServlet exte
response.setContentType("text/xml");
response.setHeader("Cache-Control", "no-cache");
- if (message == null) {
+ if (message == null && client.getListener().getUnconsumedMessages().size()
== 0) {
Continuation continuation = ContinuationSupport.getContinuation(request);
- if (continuation.isExpired()) {
+ if (continuation.isExpired()) {
response.setStatus(HttpServletResponse.SC_OK);
StringWriter swriter = new StringWriter();
PrintWriter writer = new PrintWriter(swriter);
@@ -319,6 +313,7 @@ public class MessageListenerServlet exte
// Fetch the listeners
AjaxListener listener = client.getListener();
+ listener.access();
// register this continuation with our listener.
listener.setContinuation(continuation);
@@ -350,6 +345,18 @@ public class MessageListenerServlet exte
continue;
}
+ LinkedList<Message> unconsumedMessages = ((AjaxListener)consumer.getAvailableListener()).getUnconsumedMessages();
+ LOG.debug("Send " + unconsumedMessages.size() + " unconsumed messages");
+ for (Message msg : unconsumedMessages) {
+ messages++;
+ String id = consumerIdMap.get(consumer);
+ String destinationName = consumerDestinationNameMap.get(consumer);
+ writeMessageResponse(writer, msg, id, destinationName);
+ if (messages >= maximumMessages) {
+ break;
+ }
+ }
+
// Look for any available messages
while (messages < maximumMessages) {
message = consumer.receiveNoWait();
|