activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r356666 - in /incubator/activemq/trunk/activemq-web/src/java/org/activemq/web: MessageServlet.java WebClient.java
Date Wed, 14 Dec 2005 01:07:10 GMT
Author: chirino
Date: Tue Dec 13 17:07:08 2005
New Revision: 356666

URL: http://svn.apache.org/viewcvs?rev=356666&view=rev
Log:
Was having some wierd issues with the consumer..  disabled continuations for now (but i don't
think that was the issue).
added a semaphore to avoid multiple blocking threads (i've got a feeling this might be what
fixed it).


Modified:
    incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/MessageServlet.java
    incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/WebClient.java

Modified: incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/MessageServlet.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/MessageServlet.java?rev=356666&r1=356665&r2=356666&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/MessageServlet.java (original)
+++ incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/MessageServlet.java Tue
Dec 13 17:07:08 2005
@@ -18,12 +18,10 @@
 
 package org.activemq.web;
 
-import org.activemq.MessageAvailableConsumer;
-import org.activemq.MessageAvailableListener;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.mortbay.util.ajax.Continuation;
-import org.mortbay.util.ajax.ContinuationSupport;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.LinkedList;
+import java.util.List;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -36,10 +34,12 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.LinkedList;
-import java.util.List;
+import org.activemq.MessageAvailableConsumer;
+import org.activemq.MessageAvailableListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.mortbay.util.ajax.Continuation;
+import org.mortbay.util.ajax.ContinuationSupport;
 
 /**
  * A servlet for sending and receiving messages to/from JMS destinations using
@@ -110,7 +110,7 @@
      * from a queue
      */
     protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws
ServletException, IOException {
-        doMessages(request, response, 1);
+        doMessagesWithoutContinuation(request, response, 1);
     }
 
     /**
@@ -118,7 +118,7 @@
      * from a queue
      */
     protected void doGet(HttpServletRequest request, HttpServletResponse response) throws
ServletException, IOException {
-        doMessages(request, response, -1);
+        doMessagesWithoutContinuation(request, response, -1);
     }
 
     /**
@@ -229,6 +229,106 @@
             throw new ServletException("Could not post JMS message: " + e, e);
         }
         finally {
+            if (log.isDebugEnabled()) {
+                log.debug("Received " + messages + " message(s)");
+            }
+        }
+    }
+
+    /**
+     * Reads a message from a destination up to some specific timeout period
+     * 
+     * @param request
+     * @param response
+     * @throws ServletException
+     * @throws IOException
+     */
+    protected void doMessagesWithoutContinuation(HttpServletRequest request, HttpServletResponse
response,
+            int maxMessages) throws ServletException, IOException {
+
+        int messages = 0;
+        try {
+            WebClient client = getWebClient(request);
+            Destination destination = getDestination(client, request);
+            long timeout = getReadTimeout(request);
+            boolean ajax = isRicoAjax(request);
+            if (!ajax)
+                maxMessages = 1;
+
+            if (log.isDebugEnabled()) {
+                log.debug("Receiving message(s) from: " + destination + " with timeout: "
+ timeout);
+            }
+
+            MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
+            Continuation continuation = null;
+            Listener listener = null;
+            Message message = null;
+
+            // write a responds
+            response.setContentType("text/xml");
+            PrintWriter writer = response.getWriter();
+
+            if (ajax)
+                writer.println("<ajax-response>");
+
+            // Only one client thread at a time should poll for messages.
+            if (client.getSemaphore().tryAcquire()) {
+                try {
+                    // Look for any available messages
+                    message = consumer.receive(timeout);
+
+                    // handle any message(s)
+                    if (message == null) {
+                        // No messages so OK response of for ajax else no
+                        // content.
+                        response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
+                    } else {
+                        // We have at least one message so set up the
+                        // response
+                        response.setStatus(HttpServletResponse.SC_OK);
+                        String type = getContentType(request);
+                        if (type != null)
+                            response.setContentType(type);
+
+                        // send a response for each available message (up to
+                        // max
+                        // messages)
+                        while ((maxMessages < 0 || messages < maxMessages) &&
message != null) {
+                            // System.err.println("message["+messages+"]="+message);
+                            if (ajax) {
+                                writer.print("<response type='object' id='");
+                                writer.print(request.getParameter("id"));
+                                writer.println("'>");
+                            } else
+                                // only ever 1 message for non ajax!
+                                setResponseHeaders(response, message);
+
+                            writeMessageResponse(writer, message);
+
+                            if (ajax)
+                                writer.println("</response>");
+
+                            // look for next message
+                            message = consumer.receiveNoWait();
+                            messages++;
+                        }
+                    }
+                } finally {
+                    client.getSemaphore().release();
+                }
+            } else {
+                // Client is using us in another thread.
+                response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
+            }
+
+            if (ajax) {
+                writer.println("<response type='object' id='poll'><ok/></response>");
+                writer.println("</ajax-response>");
+            }
+
+        } catch (JMSException e) {
+            throw new ServletException("Could not post JMS message: " + e, e);
+        } finally {
             if (log.isDebugEnabled()) {
                 log.debug("Received " + messages + " message(s)");
             }

Modified: incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/WebClient.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/WebClient.java?rev=356666&r1=356665&r2=356666&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/WebClient.java (original)
+++ incubator/activemq/trunk/activemq-web/src/java/org/activemq/web/WebClient.java Tue Dec
13 17:07:08 2005
@@ -18,13 +18,12 @@
 
 package org.activemq.web;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-
-import org.activemq.ActiveMQConnection;
-import org.activemq.ActiveMQConnectionFactory;
-import org.activemq.ActiveMQSession;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
 
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
@@ -40,12 +39,14 @@
 import javax.servlet.http.HttpSessionActivationListener;
 import javax.servlet.http.HttpSessionEvent;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.HashMap;
-import java.util.Map;
+import org.activemq.ActiveMQConnection;
+import org.activemq.ActiveMQConnectionFactory;
+import org.activemq.ActiveMQSession;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
 
 /**
  * Represents a messaging client used from inside a web container
@@ -72,6 +73,8 @@
     private transient Map topicConsumers = new ConcurrentHashMap();
     private int deliveryMode = DeliveryMode.NON_PERSISTENT;
 
+    private final Semaphore semaphore = new Semaphore(1);
+
 
     /**
      * @return the web client for the current HTTP session or null if there is not a web
client created yet
@@ -246,5 +249,9 @@
     protected static class SessionConsumerPair {
         public Session session;
         public MessageConsumer consumer;
+    }
+
+    public Semaphore getSemaphore() {
+        return semaphore;
     }
 }



Mime
View raw message