Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 64899 invoked from network); 23 Dec 2010 13:17:37 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 23 Dec 2010 13:17:37 -0000 Received: (qmail 30595 invoked by uid 500); 23 Dec 2010 13:17:37 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 30529 invoked by uid 500); 23 Dec 2010 13:17:36 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 30522 invoked by uid 99); 23 Dec 2010 13:17:35 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Dec 2010 13:17:35 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Dec 2010 13:17:35 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8205223888CE; Thu, 23 Dec 2010 13:17:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1052259 - in /activemq/trunk: activemq-web-demo/ activemq-web-demo/src/test/java/org/apache/activemq/web/ activemq-web/src/main/java/org/apache/activemq/web/ Date: Thu, 23 Dec 2010 13:17:12 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101223131712.8205223888CE@eris.apache.org> Author: dejanb Date: Thu Dec 23 13:17:12 2010 New Revision: 1052259 URL: http://svn.apache.org/viewvc?rev=1052259&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3094 - ajax does not receive all messages Added: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java Modified: activemq/trunk/activemq-web-demo/pom.xml activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java Modified: activemq/trunk/activemq-web-demo/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/pom.xml?rev=1052259&r1=1052258&r2=1052259&view=diff ============================================================================== --- activemq/trunk/activemq-web-demo/pom.xml (original) +++ activemq/trunk/activemq-web-demo/pom.xml Thu Dec 23 13:17:12 2010 @@ -131,6 +131,14 @@ org.apache.derby derby + + + commons-httpclient + commons-httpclient + 3.1 + test + + Added: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java?rev=1052259&view=auto ============================================================================== --- activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java (added) +++ activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java Thu Dec 23 13:17:12 2010 @@ -0,0 +1,55 @@ +package org.apache.activemq.web; + + +import java.util.Enumeration; +import javax.jms.TextMessage; +import javax.jms.MessageProducer; +import javax.management.ObjectName; + +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.jmx.SubscriptionViewMBean; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.httpclient.*; +import org.apache.commons.httpclient.methods.*; +import java.util.Set; + +public class AjaxTest extends JettyTestSupport { + private static final Log LOG = LogFactory.getLog(AjaxTest.class); + + private String expectedResponse = "\n" + + "test one\n" + + "test two\n" + + "test three\n" + + ""; + + public void testReceiveMultipleMessagesFromQueue() throws Exception { + + MessageProducer local_producer = session.createProducer(session.createQueue("test")); + + HttpClient httpClient = new HttpClient(); + PostMethod post = new PostMethod( "http://localhost:8080/amq" ); + post.addParameter( "destination", "queue://test" ); + post.addParameter( "type", "listen" ); + post.addParameter( "message", "handler" ); + httpClient.executeMethod( post ); + + // send message + TextMessage msg1 = session.createTextMessage("test one"); + producer.send(msg1); + TextMessage msg2 = session.createTextMessage("test two"); + producer.send(msg2); + TextMessage msg3 = session.createTextMessage("test three"); + producer.send(msg3); + + HttpMethod get = new GetMethod( "http://localhost:8080/amq?timeout=5000" ); + httpClient.executeMethod( get ); + byte[] responseBody = get.getResponseBody(); + String response = new String( responseBody ); + + LOG.info("Poll response: " + response); + assertEquals("Poll response not right", expectedResponse.trim(), response.trim()); + } + +} Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java?rev=1052259&r1=1052258&r2=1052259&view=diff ============================================================================== --- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java (original) +++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java Thu Dec 23 13:17:12 2010 @@ -27,6 +27,8 @@ import org.apache.commons.logging.LogFac import org.apache.activemq.MessageAvailableListener; +import java.util.LinkedList; + /* * Listen for available messages and wakeup any continuations. */ @@ -37,10 +39,12 @@ public class AjaxListener implements Mes private AjaxWebClient client; private long lastAccess; private Continuation continuation; + private LinkedList unconsumedMessages = new LinkedList(); AjaxListener(AjaxWebClient client, long maximumReadTimeout) { this.client = client; this.maximumReadTimeout = maximumReadTimeout; + access(); } public void access() { @@ -51,9 +55,13 @@ public class AjaxListener implements Mes this.continuation = continuation; } + public LinkedList getUnconsumedMessages() { + return unconsumedMessages; + } + public synchronized void onMessageAvailable(MessageConsumer consumer) { if (LOG.isDebugEnabled()) { - LOG.debug("message for " + consumer + "continuation=" + continuation); + LOG.debug("message for " + consumer + " continuation=" + continuation); } if (continuation != null) { try { @@ -70,6 +78,15 @@ public class AjaxListener implements Mes client.closeConsumers(); }; }.start(); + } else { + try { + Message message = consumer.receive(10); + if (message != null) { + unconsumedMessages.addLast(message); + } + } catch (Exception e) { + LOG.error("Error receiving message " + e, e); + } } } } Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=1052259&r1=1052258&r2=1052259&view=diff ============================================================================== --- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java (original) +++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java Thu Dec 23 13:17:12 2010 @@ -20,13 +20,7 @@ package org.apache.activemq.web; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Date; -import java.util.Iterator; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; import javax.jms.Destination; import javax.jms.JMSException; @@ -297,10 +291,10 @@ public class MessageListenerServlet exte response.setContentType("text/xml"); response.setHeader("Cache-Control", "no-cache"); - if (message == null) { + if (message == null && client.getListener().getUnconsumedMessages().size() == 0) { Continuation continuation = ContinuationSupport.getContinuation(request); - if (continuation.isExpired()) { + if (continuation.isExpired()) { response.setStatus(HttpServletResponse.SC_OK); StringWriter swriter = new StringWriter(); PrintWriter writer = new PrintWriter(swriter); @@ -319,6 +313,7 @@ public class MessageListenerServlet exte // Fetch the listeners AjaxListener listener = client.getListener(); + listener.access(); // register this continuation with our listener. listener.setContinuation(continuation); @@ -350,6 +345,18 @@ public class MessageListenerServlet exte continue; } + LinkedList unconsumedMessages = ((AjaxListener)consumer.getAvailableListener()).getUnconsumedMessages(); + LOG.debug("Send " + unconsumedMessages.size() + " unconsumed messages"); + for (Message msg : unconsumedMessages) { + messages++; + String id = consumerIdMap.get(consumer); + String destinationName = consumerDestinationNameMap.get(consumer); + writeMessageResponse(writer, msg, id, destinationName); + if (messages >= maximumMessages) { + break; + } + } + // Look for any available messages while (messages < maximumMessages) { message = consumer.receiveNoWait();