activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r703161 - in /activemq/trunk: activemq-web-demo/src/main/webapp/WEB-INF/ activemq-web/ activemq-web/src/main/java/org/apache/activemq/web/
Date Thu, 09 Oct 2008 13:17:18 GMT
Author: dejanb
Date: Thu Oct  9 06:17:17 2008
New Revision: 703161

URL: http://svn.apache.org/viewvc?rev=703161&view=rev
Log:
cumulative commit for issues AMQ-1955, AMQ-1453 and AMQ-1960

Modified:
    activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml
    activemq/trunk/activemq-web/pom.xml
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java

Modified: activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml?rev=703161&r1=703160&r2=703161&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml (original)
+++ activemq/trunk/activemq-web-demo/src/main/webapp/WEB-INF/web.xml Thu Oct  9 06:17:17 2008
@@ -52,6 +52,14 @@
         <servlet-name>MessageServlet</servlet-name>
         <servlet-class>org.apache.activemq.web.MessageServlet</servlet-class>
         <load-on-startup>1</load-on-startup>
+        <!--
+        Uncomment this parameter if you plan to use multiple consumers over REST
+        <init-param>
+                <param-name>destinationOptions</param-name>
+                <param-value>consumer.prefetchSize=1</param-value>
+        </init-param> 
+        -->
+
     </servlet>
 
     <!-- the queue browse servlet -->

Modified: activemq/trunk/activemq-web/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/pom.xml?rev=703161&r1=703160&r2=703161&view=diff
==============================================================================
--- activemq/trunk/activemq-web/pom.xml (original)
+++ activemq/trunk/activemq-web/pom.xml Thu Oct  9 06:17:17 2008
@@ -37,6 +37,10 @@
       <artifactId>activemq-core</artifactId>
     </dependency>
     <dependency>
+        <groupId>${pom.groupId}</groupId>
+        <artifactId>activemq-camel</artifactId>
+    </dependency>
+    <dependency>
       <groupId>${pom.groupId}</groupId>
       <artifactId>activemq-core</artifactId>
       <scope>test</scope>

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java?rev=703161&r1=703160&r2=703161&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
(original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
Thu Oct  9 06:17:17 2008
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -29,12 +30,20 @@
 import javax.jms.ObjectMessage;
 import javax.jms.TextMessage;
 import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.activemq.MessageAvailableConsumer;
 import org.apache.activemq.MessageAvailableListener;
+import org.apache.activemq.camel.converter.ActiveMQMessageConverter;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.mortbay.util.ajax.Continuation;
@@ -56,6 +65,9 @@
     private String readTimeoutParameter = "readTimeout";
     private long defaultReadTimeout = -1;
     private long maximumReadTimeout = 20000;
+    private long requestTimeout = 1000;
+    
+    private HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
 
     public void init() throws ServletException {
         ServletConfig servletConfig = getServletConfig();
@@ -67,6 +79,10 @@
         if (name != null) {
             maximumReadTimeout = asLong(name);
         }
+        name = servletConfig.getInitParameter("replyTimeout");
+        if (name != null) {
+        	requestTimeout = asLong(name);
+        }        
     }
 
     /**
@@ -80,7 +96,7 @@
     protected void doPost(HttpServletRequest request, HttpServletResponse response) throws
ServletException, IOException {
         // lets turn the HTTP post into a JMS Message
         try {
-            WebClient client = WebClient.getWebClient(request);
+            WebClient client = getWebClient(request);
 
             String text = getPostedMessageBody(request);
 
@@ -94,12 +110,28 @@
                 LOG.debug("Sending message to: " + destination + " with text: " + text);
             }
 
+            boolean sync = isSync(request);
             TextMessage message = client.getSession().createTextMessage(text);
-            appendParametersToMessage(request, message);
-            boolean persistent = isSendPersistent(request);
-            int priority = getSendPriority(request);
-            long timeToLive = getSendTimeToLive(request);
-            client.send(destination, message, persistent, priority, timeToLive);
+
+            if (sync) {
+               String point = "activemq:" 
+            	   + ((ActiveMQDestination)destination).getPhysicalName().replace("//", "")
+            	   + "?requestTimeout=" + requestTimeout;
+               try {
+            	   String body = (String)client.getProducerTemplate().requestBody(point, text);
+                   ActiveMQTextMessage answer = new ActiveMQTextMessage();
+                   answer.setText(body);
+            	   writeMessageResponse(response.getWriter(), answer);
+               } catch (Exception e) {
+            	   throw new IOException(e);
+               }
+            } else {
+                appendParametersToMessage(request, message);
+                boolean persistent = isSendPersistent(request);
+                int priority = getSendPriority(request);
+                long timeToLive = getSendTimeToLive(request);            	
+                client.send(destination, message, persistent, priority, timeToLive);
+            }
 
             // lets return a unique URI for reliable messaging
             response.setHeader("messageID", message.getJMSMessageID());
@@ -137,7 +169,7 @@
 
         int messages = 0;
         try {
-            WebClient client = WebClient.getWebClient(request);
+            WebClient client = getWebClient(request);
             Destination destination = getDestination(client, request);
             if (destination == null) {
                 throw new NoDestinationSuppliedException();
@@ -224,8 +256,10 @@
                         }
 
                         // look for next message
-                        message = consumer.receiveNoWait();
                         messages++;
+                        if(maxMessages < 0 || messages < maxMessages) {
+                        	message = consumer.receiveNoWait();
+                        }
                     }
                 }
 
@@ -255,7 +289,7 @@
 
         int messages = 0;
         try {
-            WebClient client = WebClient.getWebClient(request);
+            WebClient client = getWebClient(request);
             Destination destination = getDestination(client, request);
             long timeout = getReadTimeout(request);
             boolean ajax = isRicoAjax(request);
@@ -317,8 +351,11 @@
                             }
 
                             // look for next message
-                            message = consumer.receiveNoWait();
                             messages++;
+                            if(maxMessages < 0 || messages < maxMessages) {
+                            	message = consumer.receiveNoWait();
+                            }
+
                         }
                     }
                 } finally {
@@ -362,6 +399,25 @@
         String rico = request.getParameter("rico");
         return rico != null && rico.equals("true");
     }
+    
+    public WebClient getWebClient(HttpServletRequest request) {
+    	String clientId = request.getParameter("clientId");
+    	if (clientId != null) {
+    		synchronized(this) {
+    			LOG.debug("Getting local client [" + clientId + "]");
+    			WebClient client = clients.get(clientId);
+    			if (client == null) {
+    				LOG.debug("Creating new client [" + clientId + "]");
+    				client = new WebClient();
+    				clients.put(clientId, client);
+    			}
+    			return client;
+    		}
+    		
+    	} else {
+    		return WebClient.getWebClient(request);
+    	}
+    }    
 
     protected String getContentType(HttpServletRequest request) {
         /*

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java?rev=703161&r1=703160&r2=703161&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
(original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
Thu Oct  9 06:17:17 2008
@@ -179,6 +179,14 @@
         return defaultMessagePersistent;
     }
 
+    protected boolean isSync(HttpServletRequest request) {
+        String text = request.getParameter("sync");
+        if (text != null) {
+            return true;
+        }
+        return false;
+    }    
+
     protected Destination asDestination(Object value) {
         if (value instanceof Destination) {
             return (Destination)value;

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java?rev=703161&r1=703160&r2=703161&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java Thu Oct
 9 06:17:17 2008
@@ -47,9 +47,17 @@
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.MessageAvailableConsumer;
+import org.apache.activemq.camel.component.ActiveMQComponent;
+import org.apache.activemq.camel.component.ActiveMQConfiguration;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import sun.util.logging.resources.logging;
+
 /**
  * Represents a messaging client used from inside a web container typically
  * stored inside a HttpSession TODO controls to prevent DOS attacks with users
@@ -77,6 +85,9 @@
 
     private final Semaphore semaphore = new Semaphore(1);
 
+    private CamelContext camelContext;
+    private ProducerTemplate producerTemplate;
+
     public WebClient() {
         if (factory == null) {
             throw new IllegalStateException("initContext(ServletContext) not called");
@@ -111,6 +122,7 @@
 
     public static void initContext(ServletContext context) {
         initConnectionFactory(context);
+        context.setAttribute("webClients", new HashMap<String, WebClient>());
     }
 
     public int getDeliveryMode() {
@@ -143,12 +155,16 @@
             if (connection != null) {
                 connection.close();
             }
-        } catch (JMSException e) {
+            if (producerTemplate != null) {
+            	producerTemplate.stop();
+            }
+        } catch (Exception e) {
             LOG.debug("caught exception closing consumer", e);
         } finally {
             producer = null;
             session = null;
             connection = null;
+            producerTemplate = null;
             if (consumers != null) {
                 consumers.clear();
             }
@@ -256,6 +272,27 @@
             servletContext.setAttribute(CONNECTION_FACTORY_ATTRIBUTE, factory);
         }
     }
+    
+    public synchronized CamelContext getCamelContext() {
+    	if (camelContext == null) {
+    		LOG.debug("Creating camel context");
+    		camelContext = new DefaultCamelContext();
+    		ActiveMQConfiguration conf = new ActiveMQConfiguration();
+    		conf.setConnectionFactory(new PooledConnectionFactory((ActiveMQConnectionFactory)factory));
+    		ActiveMQComponent component = new ActiveMQComponent(conf);
+    		camelContext.addComponent("activemq", component);
+    	}
+    	return camelContext;
+    }
+    
+    public synchronized ProducerTemplate getProducerTemplate() throws Exception {
+    	if (producerTemplate == null) {
+    		LOG.debug("Creating producer template");
+    		producerTemplate = getCamelContext().createProducerTemplate();
+    		producerTemplate.start();
+    	}
+    	return producerTemplate;
+    }
 
     public synchronized MessageProducer getProducer() throws JMSException {
         if (producer == null) {



Mime
View raw message