Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 68560 invoked from network); 20 Dec 2005 09:55:48 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 20 Dec 2005 09:55:48 -0000 Received: (qmail 3417 invoked by uid 500); 20 Dec 2005 09:55:48 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 3393 invoked by uid 500); 20 Dec 2005 09:55:47 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 3382 invoked by uid 99); 20 Dec 2005 09:55:47 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Dec 2005 01:55:47 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 20 Dec 2005 01:55:45 -0800 Received: (qmail 68409 invoked by uid 65534); 20 Dec 2005 09:55:25 -0000 Message-ID: <20051220095525.68408.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r357943 - in /incubator/activemq/trunk/activemq-web/src: java/ main/ main/java/ main/java/org/ main/java/org/activemq/ main/java/org/activemq/web/ Date: Tue, 20 Dec 2005 09:55:22 -0000 To: activemq-commits@geronimo.apache.org From: foconer@apache.org X-Mailer: svnmailer-1.0.5 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: foconer Date: Tue Dec 20 01:54:54 2005 New Revision: 357943 URL: http://svn.apache.org/viewcvs?rev=357943&view=rev Log: Issue: source is not compiled. The generated jar file is empty. Solution: Restructured the source to src/main/java. Added: incubator/activemq/trunk/activemq-web/src/main/ incubator/activemq/trunk/activemq-web/src/main/java/ incubator/activemq/trunk/activemq-web/src/main/java/org/ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html Removed: incubator/activemq/trunk/activemq-web/src/java/ Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java?rev=357943&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java (added) +++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java Tue Dec 20 01:54:54 2005 @@ -0,0 +1,50 @@ +/** + * + * Copyright 2004 Protique Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ + +package org.activemq.web; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.servlet.http.HttpSessionEvent; +import javax.servlet.http.HttpSessionListener; + +/** + * Listens to sessions closing to ensure that JMS connections are + * cleaned up nicely + * + * @version $Revision: 1.1.1.1 $ + */ +public class ConnectionManager implements HttpSessionListener { + private static final Log log = LogFactory.getLog(ConnectionManager.class); + + public void sessionCreated(HttpSessionEvent event) { + } + + public void sessionDestroyed(HttpSessionEvent event) { + /** TODO we can't use the session any more now! + WebClient client = WebClient.getWebClient(event.getSession()); + try { + client.stop(); + } + catch (JMSException e) { + log.warn("Error closing connection: " + e, e); + } + */ + } +} Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java?rev=357943&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java (added) +++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java Tue Dec 20 01:54:54 2005 @@ -0,0 +1,420 @@ +/** + * + * Copyright 2004 Protique Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ + +package org.activemq.web; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.LinkedList; +import java.util.List; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; +import javax.jms.TextMessage; +import javax.servlet.ServletConfig; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.activemq.MessageAvailableConsumer; +import org.activemq.MessageAvailableListener; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.mortbay.util.ajax.Continuation; +import org.mortbay.util.ajax.ContinuationSupport; + +/** + * A servlet for sending and receiving messages to/from JMS destinations using + * HTTP POST for sending and HTTP GET for receiving.

You can specify the + * destination and whether it is a topic or queue via configuration details on + * the servlet or as request parameters.

For reading messages you can + * specify a readTimeout parameter to determine how long the servlet should + * block for. + * + * @version $Revision: 1.1.1.1 $ + */ +public class MessageServlet extends MessageServletSupport { + private static final Log log = LogFactory.getLog(MessageServlet.class); + + private String readTimeoutParameter = "readTimeout"; + private long defaultReadTimeout = -1; + private long maximumReadTimeout = 20000; + + public void init() throws ServletException { + ServletConfig servletConfig = getServletConfig(); + String name = servletConfig.getInitParameter("defaultReadTimeout"); + if (name != null) { + defaultReadTimeout = asLong(name); + } + name = servletConfig.getInitParameter("maximumReadTimeout"); + if (name != null) { + maximumReadTimeout = asLong(name); + } + } + + /** + * Sends a message to a destination + * + * @param request + * @param response + * @throws ServletException + * @throws IOException + */ + protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + // lets turn the HTTP post into a JMS Message + try { + WebClient client = getWebClient(request); + + String text = getPostedMessageBody(request); + + // lets create the destination from the URI? + Destination destination = getDestination(client, request); + + if (log.isDebugEnabled()) { + log.debug("Sending message to: " + destination + " with text: " + text); + } + + TextMessage message = client.getSession().createTextMessage(text); + appendParametersToMessage(request, message); + client.send(destination, message); + + // lets return a unique URI for reliable messaging + response.setHeader("messageID", message.getJMSMessageID()); + response.setStatus(HttpServletResponse.SC_OK); + } + catch (JMSException e) { + throw new ServletException("Could not post JMS message: " + e, e); + } + } + + /** + * Supports a HTTP DELETE to be equivlanent of consuming a singe message + * from a queue + */ + protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + doMessages(request, response, 1); + } + + /** + * Supports a HTTP DELETE to be equivlanent of consuming a singe message + * from a queue + */ + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + doMessages(request, response, -1); + } + + /** + * Reads a message from a destination up to some specific timeout period + * + * @param request + * @param response + * @throws ServletException + * @throws IOException + */ + protected void doMessages(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException { + + int messages = 0; + try { + WebClient client = getWebClient(request); + Destination destination = getDestination(client, request); + long timeout = getReadTimeout(request); + boolean ajax = isRicoAjax(request); + if (!ajax) + maxMessages = 1; + + if (log.isDebugEnabled()) { + log.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout); + } + + MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination); + Continuation continuation = null; + Listener listener = null; + Message message = null; + + synchronized (consumer) { + // Fetch the listeners + listener = (Listener) consumer.getAvailableListener(); + if (listener == null) { + listener = new Listener(consumer); + consumer.setAvailableListener(listener); + } + // Look for any available messages + message = consumer.receiveNoWait(); + + // Get an existing Continuation or create a new one if there are + // no events. + if (message == null) { + continuation = ContinuationSupport.getContinuation(request, consumer); + + // register this continuation with our listener. + listener.setContinuation(continuation); + + // Get the continuation object (may wait and/or retry + // request here). + continuation.suspend(timeout); + } + + // Try again now + if (message == null) + message = consumer.receiveNoWait(); + + // write a responds + response.setContentType("text/xml"); + PrintWriter writer = response.getWriter(); + + if (ajax) + writer.println(""); + + // handle any message(s) + if (message == null) { + // No messages so OK response of for ajax else no content. + response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT); + } + else { + // We have at least one message so set up the response + response.setStatus(HttpServletResponse.SC_OK); + String type = getContentType(request); + if (type != null) + response.setContentType(type); + + // send a response for each available message (up to max + // messages) + while ((maxMessages < 0 || messages < maxMessages) && message != null) { + // System.err.println("message["+messages+"]="+message); + if (ajax) { + writer.print(""); + } + else + // only ever 1 message for non ajax! + setResponseHeaders(response, message); + + writeMessageResponse(writer, message); + + if (ajax) + writer.println(""); + + // look for next message + message = consumer.receiveNoWait(); + messages++; + } + } + + if (ajax) { + writer.println(""); + writer.println(""); + } + } + } + catch (JMSException e) { + throw new ServletException("Could not post JMS message: " + e, e); + } + finally { + if (log.isDebugEnabled()) { + log.debug("Received " + messages + " message(s)"); + } + } + } + + /** + * Reads a message from a destination up to some specific timeout period + * + * @param request + * @param response + * @throws ServletException + * @throws IOException + */ + protected void doMessagesWithoutContinuation(HttpServletRequest request, HttpServletResponse response, + int maxMessages) throws ServletException, IOException { + + int messages = 0; + try { + WebClient client = getWebClient(request); + Destination destination = getDestination(client, request); + long timeout = getReadTimeout(request); + boolean ajax = isRicoAjax(request); + if (!ajax) + maxMessages = 1; + + if (log.isDebugEnabled()) { + log.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout); + } + + MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination); + Continuation continuation = null; + Listener listener = null; + Message message = null; + + // write a responds + response.setContentType("text/xml"); + PrintWriter writer = response.getWriter(); + + if (ajax) + writer.println(""); + + // Only one client thread at a time should poll for messages. + if (client.getSemaphore().tryAcquire()) { + try { + // Look for any available messages + message = consumer.receive(timeout); + + // handle any message(s) + if (message == null) { + // No messages so OK response of for ajax else no + // content. + response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT); + } else { + // We have at least one message so set up the + // response + response.setStatus(HttpServletResponse.SC_OK); + String type = getContentType(request); + if (type != null) + response.setContentType(type); + + // send a response for each available message (up to + // max + // messages) + while ((maxMessages < 0 || messages < maxMessages) && message != null) { + // System.err.println("message["+messages+"]="+message); + if (ajax) { + writer.print(""); + } else + // only ever 1 message for non ajax! + setResponseHeaders(response, message); + + writeMessageResponse(writer, message); + + if (ajax) + writer.println(""); + + // look for next message + message = consumer.receiveNoWait(); + messages++; + } + } + } finally { + client.getSemaphore().release(); + } + } else { + // Client is using us in another thread. + response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT); + } + + if (ajax) { + writer.println(""); + writer.println(""); + } + + } catch (JMSException e) { + throw new ServletException("Could not post JMS message: " + e, e); + } finally { + if (log.isDebugEnabled()) { + log.debug("Received " + messages + " message(s)"); + } + } + } + + protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException { + if (message instanceof TextMessage) { + TextMessage textMsg = (TextMessage) message; + writer.print(textMsg.getText()); + } + else if (message instanceof ObjectMessage) { + ObjectMessage objectMsg = (ObjectMessage) message; + Object object = objectMsg.getObject(); + writer.print(object.toString()); + } + } + + protected boolean isRicoAjax(HttpServletRequest request) { + String rico = request.getParameter("rico"); + return rico != null && rico.equals("true"); + } + + protected String getContentType(HttpServletRequest request) { + /* + * log("Params: " + request.getParameterMap()); Enumeration iter = + * request.getHeaderNames(); while (iter.hasMoreElements()) { String + * name = (String) iter.nextElement(); log("Header: " + name + " = " + + * request.getHeader(name)); } + */ + String value = request.getParameter("xml"); + if (value != null && "true".equalsIgnoreCase(value)) { + return "text/xml"; + } + return null; + } + + protected void setResponseHeaders(HttpServletResponse response, Message message) throws JMSException { + response.setHeader("destination", message.getJMSDestination().toString()); + response.setHeader("id", message.getJMSMessageID()); + } + + /** + * @return the timeout value for read requests which is always >= 0 and <= + * maximumReadTimeout to avoid DoS attacks + */ + protected long getReadTimeout(HttpServletRequest request) { + long answer = defaultReadTimeout; + + String name = request.getParameter(readTimeoutParameter); + if (name != null) { + answer = asLong(name); + } + if (answer < 0 || answer > maximumReadTimeout) { + answer = maximumReadTimeout; + } + return answer; + } + + /* + * Listen for available messages and wakeup any continuations. + */ + private class Listener implements MessageAvailableListener { + MessageConsumer consumer; + Continuation continuation; + List queue = new LinkedList(); + + Listener(MessageConsumer consumer) { + this.consumer = consumer; + } + + public void setContinuation(Continuation continuation) { + synchronized (consumer) { + this.continuation = continuation; + } + } + + public void onMessageAvailable(MessageConsumer consumer) { + assert this.consumer == consumer; + + synchronized (this.consumer) { + if (continuation != null) + continuation.resume(); + continuation = null; + } + } + } + +} Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java?rev=357943&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java (added) +++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java Tue Dec 20 01:54:54 2005 @@ -0,0 +1,223 @@ +/** + * + * Copyright 2004 Protique Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ + +package org.activemq.web; + +import org.activemq.command.ActiveMQQueue; +import org.activemq.command.ActiveMQTopic; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.TextMessage; +import javax.servlet.ServletConfig; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpSession; +import java.io.BufferedReader; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + * A useful base class for any JMS related servlet; + * there are various ways to map JMS operations to web requests + * so we put most of the common behaviour in a reusable base class. + * + * @version $Revision: 1.1.1.1 $ + */ +public abstract class MessageServletSupport extends HttpServlet { + + private boolean defaultTopicFlag = true; + private Destination defaultDestination; + private String destinationParameter = "destination"; + private String topicParameter = "topic"; + private String bodyParameter = "body"; + + + public void init(ServletConfig servletConfig) throws ServletException { + super.init(servletConfig); + + String name = servletConfig.getInitParameter("topic"); + if (name != null) { + defaultTopicFlag = asBoolean(name); + } + + log("Defaulting to use topics: " + defaultTopicFlag); + + name = servletConfig.getInitParameter("destination"); + if (name != null) { + if (defaultTopicFlag) { + defaultDestination = new ActiveMQTopic(name); + } + else { + defaultDestination = new ActiveMQQueue(name); + } + } + + // lets check to see if there's a connection factory set + WebClient.initContext(getServletContext()); + } + + protected WebClient createWebClient(HttpServletRequest request) { + return new WebClient(getServletContext()); + } + + public static boolean asBoolean(String param) { + return asBoolean(param, false); + } + + public static boolean asBoolean(String param, boolean defaultValue) { + if (param == null) { + return defaultValue; + } + else { + return param.equalsIgnoreCase("true"); + } + } + + /** + * Helper method to get the client for the current session + * + * @param request is the current HTTP request + * @return the current client or a newly creates + */ + protected WebClient getWebClient(HttpServletRequest request) { + HttpSession session = request.getSession(true); + WebClient client = WebClient.getWebClient(session); + if (client == null) { + client = createWebClient(request); + session.setAttribute(WebClient.webClientAttribute, client); + } + return client; + } + + + protected void appendParametersToMessage(HttpServletRequest request, TextMessage message) throws JMSException { + for (Iterator iter = request.getParameterMap().entrySet().iterator(); iter.hasNext();) { + Map.Entry entry = (Map.Entry) iter.next(); + String name = (String) entry.getKey(); + if (!destinationParameter.equals(name) && !topicParameter.equals(name) && !bodyParameter.equals(name)) { + Object value = entry.getValue(); + if (value instanceof Object[]) { + Object[] array = (Object[]) value; + if (array.length == 1) { + value = array[0]; + } + else { + log("Can't use property: " + name + " which is of type: " + value.getClass().getName() + " value"); + value = null; + for (int i = 0, size = array.length; i < size; i++) { + log("value[" + i + "] = " + array[i]); + } + } + } + if (value != null) { + message.setObjectProperty(name, value); + } + } + } + } + + /** + * @return the destination to use for the current request + */ + protected Destination getDestination(WebClient client, HttpServletRequest request) throws JMSException, NoDestinationSuppliedException { + String destinationName = request.getParameter(destinationParameter); + if (destinationName == null) { + if (defaultDestination == null) { + return getDestinationFromURI(client, request); + } + else { + return defaultDestination; + } + } + + return getDestination(client, request, destinationName); + } + + /** + * @return the destination to use for the current request using the relative URI from + * where this servlet was invoked as the destination name + */ + protected Destination getDestinationFromURI(WebClient client, HttpServletRequest request) throws NoDestinationSuppliedException, JMSException { + String uri = request.getPathInfo(); + if (uri == null) { + throw new NoDestinationSuppliedException(); + } + // replace URI separator with JMS destination separator + if (uri.startsWith("/")) { + uri = uri.substring(1); + } + uri = uri.replace('/', '.'); + return getDestination(client, request, uri); + } + + /** + * @return the Destination object for the given destination name + */ + protected Destination getDestination(WebClient client, HttpServletRequest request, String destinationName) throws JMSException { + if (isTopic(request)) { + return client.getSession().createTopic(destinationName); + } + else { + return client.getSession().createQueue(destinationName); + } + } + + /** + * @return true if the current request is for a topic destination, else false if its for a queue + */ + protected boolean isTopic + (HttpServletRequest + request) { + boolean aTopic = defaultTopicFlag; + String aTopicText = request.getParameter(topicParameter); + if (aTopicText != null) { + aTopic = asBoolean(aTopicText); + } + return aTopic; + } + + protected long asLong(String name) { + return Long.parseLong(name); + } + + /** + * @return the text that was posted to the servlet which is used as the body + * of the message to be sent + */ + protected String getPostedMessageBody(HttpServletRequest request) throws IOException { + String answer = request.getParameter(bodyParameter); + if (answer == null) { + // lets read the message body instead + BufferedReader reader = request.getReader(); + StringBuffer buffer = new StringBuffer(); + while (true) { + String line = reader.readLine(); + if (line == null) { + break; + } + buffer.append(line); + buffer.append("\n"); + } + return buffer.toString(); + } + return answer; + } +} Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java?rev=357943&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java (added) +++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java Tue Dec 20 01:54:54 2005 @@ -0,0 +1,32 @@ +/** + * + * Copyright 2004 Protique Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.web; + +import javax.servlet.ServletException; + +/** + * Exception thrown if there was no destination available + * + * @version $Revision: 1.1.1.1 $ + */ +public class NoDestinationSuppliedException extends ServletException { + + public NoDestinationSuppliedException() { + super("Could not perform the JMS operation as no Destination was supplied"); + } +} Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java?rev=357943&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java (added) +++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java Tue Dec 20 01:54:54 2005 @@ -0,0 +1,138 @@ +/** + * + * Copyright 2004 Protique Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.web; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Hashtable; +import java.util.Map; + +/** + * A servlet which will publish dummy market data prices + * + * @version $Revision: 1.1.1.1 $ + */ +public class PortfolioPublishServlet extends MessageServletSupport { + + private static final int maxDeltaPercent = 1; + private static final Map lastPrices = new Hashtable(); + private boolean ricoStyle = true; + + + public void init() throws ServletException { + super.init(); + + ricoStyle = asBoolean(getServletConfig().getInitParameter("rico"), true); + } + + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + PrintWriter out = response.getWriter(); + String[] stocks = request.getParameterValues("stocks"); + if (stocks == null || stocks.length == 0) { + out.println("No stocks query parameter specified. Cannot publish market data"); + } + else { + Integer total=(Integer)request.getSession(true).getAttribute("total"); + if (total==null) + total=new Integer(0); + + + int count = getNumberOfMessages(request); + total=new Integer(total.intValue()+count); + request.getSession().setAttribute("total",total); + + try { + WebClient client = getWebClient(request); + for (int i = 0; i < count; i++) { + sendMessage(client, stocks); + } + out.print(""); + out.println("Published " + count + " of "+total+ " price messages. Refresh = "+refreshRate+"s"); + out.println(""); + + } + catch (JMSException e) { + out.println("Failed sending price messages due to " + e + ""); + log("Failed to send message: " + e, e); + } + } + } + + protected void sendMessage(WebClient client, String[] stocks) throws JMSException { + Session session = client.getSession(); + + int idx = 0; + while (true) { + idx = (int) Math.round(stocks.length * Math.random()); + if (idx < stocks.length) { + break; + } + } + String stock = stocks[idx]; + Destination destination = session.createTopic("STOCKS." + stock); + String stockText = createStockText(stock); + log("Sending: " + stockText + " on destination: " + destination); + Message message = session.createTextMessage(stockText); + client.send(destination, message); + } + + protected String createStockText(String stock) { + Double value = (Double) lastPrices.get(stock); + if (value == null) { + value = new Double(Math.random() * 100); + } + + // lets mutate the value by some percentage + double oldPrice = value.doubleValue(); + value = new Double(mutatePrice(oldPrice)); + lastPrices.put(stock, value); + double price = value.doubleValue(); + + double offer = price * 1.001; + + String movement = (price > oldPrice) ? "up" : "down"; + return ""; + } + + protected double mutatePrice(double price) { + double percentChange = (2 * Math.random() * maxDeltaPercent) - maxDeltaPercent; + + return price * (100 + percentChange) / 100; + } + + protected int getNumberOfMessages(HttpServletRequest request) { + String name = request.getParameter("count"); + if (name != null) { + return Integer.parseInt(name); + } + return 1; + } +} Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java?rev=357943&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java (added) +++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java Tue Dec 20 01:54:54 2005 @@ -0,0 +1,117 @@ +/** + * + * Copyright 2004 Protique Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.web; + +import org.activemq.broker.BrokerService; +import org.activemq.xbean.BrokerFactoryBean; +import org.springframework.core.io.Resource; +import org.springframework.web.context.support.ServletContextResource; + +import javax.servlet.ServletContext; +import javax.servlet.ServletContextEvent; +import javax.servlet.ServletContextListener; + +/** + * Used to configure and instance of ActiveMQ BrokerService using + * ActiveMQ/Spring's xml configuration.

The configuration file is specified + * via the context init parameter brokerURI, typically: + * <context-param> + * <param-name>brokerURI</param-name> + * <param-value>/WEB-INF/activemq.xml</param-value> + * </context-param> + * + * + * As a a default, if a brokerURI is not specified it will look up + * for activemq.xml + * + * @version $Revision: 1.1 $ + */ +public class SpringBrokerContextListener implements ServletContextListener { + + /** broker uri context parameter name: brokerURI */ + public static final String INIT_PARAM_BROKER_URI = "brokerURI"; + + /** the broker container instance */ + private BrokerService brokerContainer; + + /** + * Set the broker container to be used by this listener + * + * @param container + * the container to be used. + */ + protected void setBrokerService(BrokerService container) { + this.brokerContainer = container; + } + + /** + * Return the broker container. + */ + protected BrokerService getBrokerService() { + return this.brokerContainer; + } + + public void contextInitialized(ServletContextEvent event) { + ServletContext context = event.getServletContext(); + context.log("Creating ActiveMQ Broker..."); + brokerContainer = createBroker(context); + + context.log("Starting ActiveMQ Broker"); + try { + brokerContainer.start(); + + context.log("Started ActiveMQ Broker"); + } + catch (Exception e) { + context.log("Failed to start ActiveMQ broker: " + e, e); + } + } + + public void contextDestroyed(ServletContextEvent event) { + ServletContext context = event.getServletContext(); + if (brokerContainer != null) { + try { + brokerContainer.stop(); + } + catch (Exception e) { + context.log("Failed to stop the ActiveMQ Broker: " + e, e); + } + brokerContainer = null; + } + } + + /** + * Factory method to create a new ActiveMQ Broker + */ + protected BrokerService createBroker(ServletContext context) { + String brokerURI = context.getInitParameter(INIT_PARAM_BROKER_URI); + if (brokerURI == null) { + brokerURI = "activemq.xml"; + } + context.log("Loading ActiveMQ Broker configuration from: " + brokerURI); + Resource resource = new ServletContextResource(context, brokerURI); + BrokerFactoryBean factory = new BrokerFactoryBean(resource); + try { + factory.afterPropertiesSet(); + } + catch (Exception e) { + context.log("Failed to create broker: " + e, e); + } + return factory.getBroker(); + } +} Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java?rev=357943&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java (added) +++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java Tue Dec 20 01:54:54 2005 @@ -0,0 +1,257 @@ +/** + * + * Copyright 2004 Protique Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ + +package org.activemq.web; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.servlet.ServletContext; +import javax.servlet.http.HttpSession; +import javax.servlet.http.HttpSessionActivationListener; +import javax.servlet.http.HttpSessionEvent; + +import org.activemq.ActiveMQConnection; +import org.activemq.ActiveMQConnectionFactory; +import org.activemq.ActiveMQSession; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import edu.emory.mathcs.backport.java.util.concurrent.Semaphore; + +/** + * Represents a messaging client used from inside a web container + * typically stored inside a HttpSession + * + * @version $Revision: 1.1.1.1 $ + */ +public class WebClient implements HttpSessionActivationListener, Externalizable { + public static final String webClientAttribute = "org.activemq.webclient"; + public static final String connectionFactoryAttribute = "org.activemq.connectionFactory"; + public static final String queueConsumersAttribute = "org.activemq.queueConsumers"; + public static final String brokerUrlInitParam = "org.activemq.brokerURL"; + public static final String embeddedBrokerInitParam = "org.activemq.embeddedBroker"; + + private static final Log log = LogFactory.getLog(WebClient.class); + + private static transient ConnectionFactory factory; + private static transient Map queueConsumers; + + private transient ServletContext context; + private transient ActiveMQConnection connection; + private transient ActiveMQSession session; + private transient MessageProducer producer; + private transient Map topicConsumers = new ConcurrentHashMap(); + private int deliveryMode = DeliveryMode.NON_PERSISTENT; + + private final Semaphore semaphore = new Semaphore(1); + + + /** + * @return the web client for the current HTTP session or null if there is not a web client created yet + */ + public static WebClient getWebClient(HttpSession session) { + return (WebClient) session.getAttribute(webClientAttribute); + } + + + public static void initContext(ServletContext context) { + factory = initConnectionFactory(context); + if (factory == null) { + log.warn("No ConnectionFactory available in the ServletContext for: " + connectionFactoryAttribute); + factory = new ActiveMQConnectionFactory("vm://localhost"); + context.setAttribute(connectionFactoryAttribute, factory); + } + queueConsumers = initQueueConsumers(context); + } + + /** + * Only called by serialization + */ + public WebClient() { + } + + public WebClient(ServletContext context) { + this.context = context; + initContext(context); + } + + + public int getDeliveryMode() { + return deliveryMode; + } + + + public void setDeliveryMode(int deliveryMode) { + this.deliveryMode = deliveryMode; + } + + + public void start() throws JMSException { + } + + public void stop() throws JMSException { + System.out.println("Closing the WebClient!!! " + this); + + try { + connection.close(); + } + finally { + producer = null; + session = null; + connection = null; + topicConsumers.clear(); + } + } + + public void writeExternal(ObjectOutput out) throws IOException { + } + + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + topicConsumers = new HashMap(); + } + + public void send(Destination destination, Message message) throws JMSException { + if (producer == null) { + producer = getSession().createProducer(null); + producer.setDeliveryMode(deliveryMode ); + } + producer.send(destination, message); + if (log.isDebugEnabled()) { + log.debug("Sent! to destination: " + destination + " message: " + message); + } + } + + public Session getSession() throws JMSException { + if (session == null) { + session = createSession(); + } + return session; + } + + public ActiveMQConnection getConnection() throws JMSException { + if (connection == null) { + connection = (ActiveMQConnection) factory.createConnection(); + connection.start(); + } + return connection; + } + + public void sessionWillPassivate(HttpSessionEvent event) { + try { + stop(); + } + catch (JMSException e) { + log.warn("Could not close connection: " + e, e); + } + } + + public void sessionDidActivate(HttpSessionEvent event) { + // lets update the connection factory from the servlet context + context = event.getSession().getServletContext(); + initContext(context); + } + + public static Map initQueueConsumers(ServletContext context) { + Map answer = (Map) context.getAttribute(queueConsumersAttribute); + if (answer == null) { + answer = new HashMap(); + context.setAttribute(queueConsumersAttribute, answer); + } + return answer; + } + + + public static ConnectionFactory initConnectionFactory(ServletContext servletContext) { + ConnectionFactory connectionFactory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute); + if (connectionFactory == null) { + String brokerURL = (String) servletContext.getInitParameter(brokerUrlInitParam); + + servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL); + + if (brokerURL == null) { + brokerURL = "vm://localhost"; + } + + boolean embeddedBroker = MessageServletSupport.asBoolean(servletContext.getInitParameter(embeddedBrokerInitParam)); + servletContext.log("Use embedded broker: " + embeddedBroker); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL); + factory.setUseEmbeddedBroker(embeddedBroker); + + connectionFactory = factory; + servletContext.setAttribute(connectionFactoryAttribute, connectionFactory); + } + return connectionFactory; + } + + public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException { + if (destination instanceof Topic) { + MessageConsumer consumer = (MessageConsumer) topicConsumers.get(destination); + if (consumer == null) { + consumer = getSession().createConsumer(destination); + topicConsumers.put(destination, consumer); + } + return consumer; + } + else { + synchronized (queueConsumers) { + SessionConsumerPair pair = (SessionConsumerPair) queueConsumers.get(destination); + if (pair == null) { + pair = createSessionConsumerPair(destination); + queueConsumers.put(destination, pair); + } + return pair.consumer; + } + } + } + + protected ActiveMQSession createSession() throws JMSException { + return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + protected SessionConsumerPair createSessionConsumerPair(Destination destination) throws JMSException { + SessionConsumerPair answer = new SessionConsumerPair(); + answer.session = createSession(); + answer.consumer = answer.session.createConsumer(destination); + return answer; + } + + protected static class SessionConsumerPair { + public Session session; + public MessageConsumer consumer; + } + + public Semaphore getSemaphore() { + return semaphore; + } +} Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html?rev=357943&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html (added) +++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html Tue Dec 20 01:54:54 2005 @@ -0,0 +1,13 @@ + + + + + +

+ Web Connectors so that messages can be sent via HTTP POST or read via + HTTP POST or GET as well as support for web streaming to we browser or + JavaScript clients. +

+ + +