activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From foco...@apache.org
Subject svn commit: r357943 - in /incubator/activemq/trunk/activemq-web/src: java/ main/ main/java/ main/java/org/ main/java/org/activemq/ main/java/org/activemq/web/
Date Tue, 20 Dec 2005 09:55:22 GMT
Author: foconer
Date: Tue Dec 20 01:54:54 2005
New Revision: 357943

URL: http://svn.apache.org/viewcvs?rev=357943&view=rev
Log:
Issue: source is not compiled. The generated jar file is empty.
Solution: Restructured the source to src/main/java. 

Added:
    incubator/activemq/trunk/activemq-web/src/main/
    incubator/activemq/trunk/activemq-web/src/main/java/
    incubator/activemq/trunk/activemq-web/src/main/java/org/
    incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/
    incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/
    incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java
    incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java
    incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java
    incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java
    incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java
    incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java
    incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java
    incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html
Removed:
    incubator/activemq/trunk/activemq-web/src/java/

Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java?rev=357943&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java (added)
+++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/ConnectionManager.java Tue Dec 20 01:54:54 2005
@@ -0,0 +1,50 @@
+/** 
+ * 
+ * Copyright 2004 Protique Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+
+package org.activemq.web;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.servlet.http.HttpSessionEvent;
+import javax.servlet.http.HttpSessionListener;
+
+/**
+ * Listens to sessions closing to ensure that JMS connections are
+ * cleaned up nicely
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ConnectionManager implements HttpSessionListener {
+    private static final Log log = LogFactory.getLog(ConnectionManager.class);
+
+    public void sessionCreated(HttpSessionEvent event) {
+    }
+
+    public void sessionDestroyed(HttpSessionEvent event) {
+        /** TODO we can't use the session any more now!
+         WebClient client = WebClient.getWebClient(event.getSession());
+         try {
+         client.stop();
+         }
+         catch (JMSException e) {
+         log.warn("Error closing connection: " + e, e);
+         }
+         */
+    }
+}

Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java?rev=357943&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java (added)
+++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServlet.java Tue Dec 20 01:54:54 2005
@@ -0,0 +1,420 @@
+/** 
+ *
+ * Copyright 2004 Protique Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+
+package org.activemq.web;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.activemq.MessageAvailableConsumer;
+import org.activemq.MessageAvailableListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.mortbay.util.ajax.Continuation;
+import org.mortbay.util.ajax.ContinuationSupport;
+
+/**
+ * A servlet for sending and receiving messages to/from JMS destinations using
+ * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
+ * destination and whether it is a topic or queue via configuration details on
+ * the servlet or as request parameters. <p/> For reading messages you can
+ * specify a readTimeout parameter to determine how long the servlet should
+ * block for.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class MessageServlet extends MessageServletSupport {
+    private static final Log log = LogFactory.getLog(MessageServlet.class);
+
+    private String readTimeoutParameter = "readTimeout";
+    private long defaultReadTimeout = -1;
+    private long maximumReadTimeout = 20000;
+
+    public void init() throws ServletException {
+        ServletConfig servletConfig = getServletConfig();
+        String name = servletConfig.getInitParameter("defaultReadTimeout");
+        if (name != null) {
+            defaultReadTimeout = asLong(name);
+        }
+        name = servletConfig.getInitParameter("maximumReadTimeout");
+        if (name != null) {
+            maximumReadTimeout = asLong(name);
+        }
+    }
+
+    /**
+     * Sends a message to a destination
+     * 
+     * @param request
+     * @param response
+     * @throws ServletException
+     * @throws IOException
+     */
+    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+        // lets turn the HTTP post into a JMS Message
+        try {
+            WebClient client = getWebClient(request);
+
+            String text = getPostedMessageBody(request);
+
+            // lets create the destination from the URI?
+            Destination destination = getDestination(client, request);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Sending message to: " + destination + " with text: " + text);
+            }
+
+            TextMessage message = client.getSession().createTextMessage(text);
+            appendParametersToMessage(request, message);
+            client.send(destination, message);
+
+            // lets return a unique URI for reliable messaging
+            response.setHeader("messageID", message.getJMSMessageID());
+            response.setStatus(HttpServletResponse.SC_OK);
+        }
+        catch (JMSException e) {
+            throw new ServletException("Could not post JMS message: " + e, e);
+        }
+    }
+
+    /**
+     * Supports a HTTP DELETE to be equivlanent of consuming a singe message
+     * from a queue
+     */
+    protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+        doMessages(request, response, 1);
+    }
+
+    /**
+     * Supports a HTTP DELETE to be equivlanent of consuming a singe message
+     * from a queue
+     */
+    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+        doMessages(request, response, -1);
+    }
+
+    /**
+     * Reads a message from a destination up to some specific timeout period
+     * 
+     * @param request
+     * @param response
+     * @throws ServletException
+     * @throws IOException
+     */
+    protected void doMessages(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException {
+
+        int messages = 0;
+        try {
+            WebClient client = getWebClient(request);
+            Destination destination = getDestination(client, request);
+            long timeout = getReadTimeout(request);
+            boolean ajax = isRicoAjax(request);
+            if (!ajax)
+                maxMessages = 1;
+
+            if (log.isDebugEnabled()) {
+                log.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
+            }
+
+            MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
+            Continuation continuation = null;
+            Listener listener = null;
+            Message message = null;
+
+            synchronized (consumer) {
+                // Fetch the listeners
+                listener = (Listener) consumer.getAvailableListener();
+                if (listener == null) {
+                    listener = new Listener(consumer);
+                    consumer.setAvailableListener(listener);
+                }
+                // Look for any available messages
+                message = consumer.receiveNoWait();
+
+                // Get an existing Continuation or create a new one if there are
+                // no events.
+                if (message == null) {
+                    continuation = ContinuationSupport.getContinuation(request, consumer);
+
+                    // register this continuation with our listener.
+                    listener.setContinuation(continuation);
+
+                    // Get the continuation object (may wait and/or retry
+                    // request here).
+                    continuation.suspend(timeout);
+                }
+
+                // Try again now
+                if (message == null)
+                    message = consumer.receiveNoWait();
+
+                // write a responds
+                response.setContentType("text/xml");
+                PrintWriter writer = response.getWriter();
+
+                if (ajax)
+                    writer.println("<ajax-response>");
+
+                // handle any message(s)
+                if (message == null) {
+                    // No messages so OK response of for ajax else no content.
+                    response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
+                }
+                else {
+                    // We have at least one message so set up the response
+                    response.setStatus(HttpServletResponse.SC_OK);
+                    String type = getContentType(request);
+                    if (type != null)
+                        response.setContentType(type);
+
+                    // send a response for each available message (up to max
+                    // messages)
+                    while ((maxMessages < 0 || messages < maxMessages) && message != null) {
+                        // System.err.println("message["+messages+"]="+message);
+                        if (ajax) {
+                            writer.print("<response type='object' id='");
+                            writer.print(request.getParameter("id"));
+                            writer.println("'>");
+                        }
+                        else
+                            // only ever 1 message for non ajax!
+                            setResponseHeaders(response, message);
+
+                        writeMessageResponse(writer, message);
+
+                        if (ajax)
+                            writer.println("</response>");
+
+                        // look for next message
+                        message = consumer.receiveNoWait();
+                        messages++;
+                    }
+                }
+
+                if (ajax) {
+                    writer.println("<response type='object' id='poll'><ok/></response>");
+                    writer.println("</ajax-response>");
+                }
+            }
+        }
+        catch (JMSException e) {
+            throw new ServletException("Could not post JMS message: " + e, e);
+        }
+        finally {
+            if (log.isDebugEnabled()) {
+                log.debug("Received " + messages + " message(s)");
+            }
+        }
+    }
+
+    /**
+     * Reads a message from a destination up to some specific timeout period
+     * 
+     * @param request
+     * @param response
+     * @throws ServletException
+     * @throws IOException
+     */
+    protected void doMessagesWithoutContinuation(HttpServletRequest request, HttpServletResponse response,
+            int maxMessages) throws ServletException, IOException {
+
+        int messages = 0;
+        try {
+            WebClient client = getWebClient(request);
+            Destination destination = getDestination(client, request);
+            long timeout = getReadTimeout(request);
+            boolean ajax = isRicoAjax(request);
+            if (!ajax)
+                maxMessages = 1;
+
+            if (log.isDebugEnabled()) {
+                log.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
+            }
+
+            MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
+            Continuation continuation = null;
+            Listener listener = null;
+            Message message = null;
+
+            // write a responds
+            response.setContentType("text/xml");
+            PrintWriter writer = response.getWriter();
+
+            if (ajax)
+                writer.println("<ajax-response>");
+
+            // Only one client thread at a time should poll for messages.
+            if (client.getSemaphore().tryAcquire()) {
+                try {
+                    // Look for any available messages
+                    message = consumer.receive(timeout);
+
+                    // handle any message(s)
+                    if (message == null) {
+                        // No messages so OK response of for ajax else no
+                        // content.
+                        response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
+                    } else {
+                        // We have at least one message so set up the
+                        // response
+                        response.setStatus(HttpServletResponse.SC_OK);
+                        String type = getContentType(request);
+                        if (type != null)
+                            response.setContentType(type);
+
+                        // send a response for each available message (up to
+                        // max
+                        // messages)
+                        while ((maxMessages < 0 || messages < maxMessages) && message != null) {
+                            // System.err.println("message["+messages+"]="+message);
+                            if (ajax) {
+                                writer.print("<response type='object' id='");
+                                writer.print(request.getParameter("id"));
+                                writer.println("'>");
+                            } else
+                                // only ever 1 message for non ajax!
+                                setResponseHeaders(response, message);
+
+                            writeMessageResponse(writer, message);
+
+                            if (ajax)
+                                writer.println("</response>");
+
+                            // look for next message
+                            message = consumer.receiveNoWait();
+                            messages++;
+                        }
+                    }
+                } finally {
+                    client.getSemaphore().release();
+                }
+            } else {
+                // Client is using us in another thread.
+                response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
+            }
+
+            if (ajax) {
+                writer.println("<response type='object' id='poll'><ok/></response>");
+                writer.println("</ajax-response>");
+            }
+
+        } catch (JMSException e) {
+            throw new ServletException("Could not post JMS message: " + e, e);
+        } finally {
+            if (log.isDebugEnabled()) {
+                log.debug("Received " + messages + " message(s)");
+            }
+        }
+    }
+
+    protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException {
+        if (message instanceof TextMessage) {
+            TextMessage textMsg = (TextMessage) message;
+            writer.print(textMsg.getText());
+        }
+        else if (message instanceof ObjectMessage) {
+            ObjectMessage objectMsg = (ObjectMessage) message;
+            Object object = objectMsg.getObject();
+            writer.print(object.toString());
+        }
+    }
+
+    protected boolean isRicoAjax(HttpServletRequest request) {
+        String rico = request.getParameter("rico");
+        return rico != null && rico.equals("true");
+    }
+
+    protected String getContentType(HttpServletRequest request) {
+        /*
+         * log("Params: " + request.getParameterMap()); Enumeration iter =
+         * request.getHeaderNames(); while (iter.hasMoreElements()) { String
+         * name = (String) iter.nextElement(); log("Header: " + name + " = " +
+         * request.getHeader(name)); }
+         */
+        String value = request.getParameter("xml");
+        if (value != null && "true".equalsIgnoreCase(value)) {
+            return "text/xml";
+        }
+        return null;
+    }
+
+    protected void setResponseHeaders(HttpServletResponse response, Message message) throws JMSException {
+        response.setHeader("destination", message.getJMSDestination().toString());
+        response.setHeader("id", message.getJMSMessageID());
+    }
+
+    /**
+     * @return the timeout value for read requests which is always >= 0 and <=
+     *         maximumReadTimeout to avoid DoS attacks
+     */
+    protected long getReadTimeout(HttpServletRequest request) {
+        long answer = defaultReadTimeout;
+
+        String name = request.getParameter(readTimeoutParameter);
+        if (name != null) {
+            answer = asLong(name);
+        }
+        if (answer < 0 || answer > maximumReadTimeout) {
+            answer = maximumReadTimeout;
+        }
+        return answer;
+    }
+
+    /*
+     * Listen for available messages and wakeup any continuations.
+     */
+    private class Listener implements MessageAvailableListener {
+        MessageConsumer consumer;
+        Continuation continuation;
+        List queue = new LinkedList();
+
+        Listener(MessageConsumer consumer) {
+            this.consumer = consumer;
+        }
+
+        public void setContinuation(Continuation continuation) {
+            synchronized (consumer) {
+                this.continuation = continuation;
+            }
+        }
+
+        public void onMessageAvailable(MessageConsumer consumer) {
+            assert this.consumer == consumer;
+
+            synchronized (this.consumer) {
+                if (continuation != null)
+                    continuation.resume();
+                continuation = null;
+            }
+        }
+    }
+
+}

Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java?rev=357943&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java (added)
+++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/MessageServletSupport.java Tue Dec 20 01:54:54 2005
@@ -0,0 +1,223 @@
+/** 
+ * 
+ * Copyright 2004 Protique Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+
+package org.activemq.web;
+
+import org.activemq.command.ActiveMQQueue;
+import org.activemq.command.ActiveMQTopic;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpSession;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A useful base class for any JMS related servlet;
+ * there are various ways to map JMS operations to web requests
+ * so we put most of the common behaviour in a reusable base class.
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public abstract class MessageServletSupport extends HttpServlet {
+
+    private boolean defaultTopicFlag = true;
+    private Destination defaultDestination;
+    private String destinationParameter = "destination";
+    private String topicParameter = "topic";
+    private String bodyParameter = "body";
+
+
+    public void init(ServletConfig servletConfig) throws ServletException {
+        super.init(servletConfig);
+
+        String name = servletConfig.getInitParameter("topic");
+        if (name != null) {
+            defaultTopicFlag = asBoolean(name);
+        }
+
+        log("Defaulting to use topics: " + defaultTopicFlag);
+
+        name = servletConfig.getInitParameter("destination");
+        if (name != null) {
+            if (defaultTopicFlag) {
+                defaultDestination = new ActiveMQTopic(name);
+            }
+            else {
+                defaultDestination = new ActiveMQQueue(name);
+            }
+        }
+
+        // lets check to see if there's a connection factory set
+        WebClient.initContext(getServletContext());
+    }
+
+    protected WebClient createWebClient(HttpServletRequest request) {
+        return new WebClient(getServletContext());
+    }
+
+    public static boolean asBoolean(String param) {
+        return asBoolean(param, false);
+    }
+    
+    public static boolean asBoolean(String param, boolean defaultValue) {
+        if (param == null) {
+            return defaultValue;
+        }
+        else {
+            return param.equalsIgnoreCase("true");
+        }
+    }
+
+    /**
+     * Helper method to get the client for the current session
+     *
+     * @param request is the current HTTP request
+     * @return the current client or a newly creates
+     */
+    protected WebClient getWebClient(HttpServletRequest request) {
+        HttpSession session = request.getSession(true);
+        WebClient client = WebClient.getWebClient(session);
+        if (client == null) {
+            client = createWebClient(request);
+            session.setAttribute(WebClient.webClientAttribute, client);
+        }
+        return client;
+    }
+
+
+    protected void appendParametersToMessage(HttpServletRequest request, TextMessage message) throws JMSException {
+        for (Iterator iter = request.getParameterMap().entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Map.Entry) iter.next();
+            String name = (String) entry.getKey();
+            if (!destinationParameter.equals(name) && !topicParameter.equals(name) && !bodyParameter.equals(name)) {
+                Object value = entry.getValue();
+                if (value instanceof Object[]) {
+                    Object[] array = (Object[]) value;
+                    if (array.length == 1) {
+                        value = array[0];
+                    }
+                    else {
+                        log("Can't use property: " + name + " which is of type: " + value.getClass().getName() + " value");
+                        value = null;
+                        for (int i = 0, size = array.length; i < size; i++) {
+                            log("value[" + i + "] = " + array[i]);
+                        }
+                    }
+                }
+                if (value != null) {
+                    message.setObjectProperty(name, value);
+                }
+            }
+        }
+    }
+
+    /**
+     * @return the destination to use for the current request
+     */
+    protected Destination getDestination(WebClient client, HttpServletRequest request) throws JMSException, NoDestinationSuppliedException {
+        String destinationName = request.getParameter(destinationParameter);
+        if (destinationName == null) {
+            if (defaultDestination == null) {
+                return getDestinationFromURI(client, request);
+            }
+            else {
+                return defaultDestination;
+            }
+        }
+
+        return getDestination(client, request, destinationName);
+    }
+
+    /**
+     * @return the destination to use for the current request using the relative URI from
+     *         where this servlet was invoked as the destination name
+     */
+    protected Destination getDestinationFromURI(WebClient client, HttpServletRequest request) throws NoDestinationSuppliedException, JMSException {
+        String uri = request.getPathInfo();
+        if (uri == null) {
+            throw new NoDestinationSuppliedException();
+        }
+        // replace URI separator with JMS destination separator
+        if (uri.startsWith("/")) {
+            uri = uri.substring(1);
+        }
+        uri = uri.replace('/', '.');
+        return getDestination(client, request, uri);
+    }
+
+    /**
+     * @return the Destination object for the given destination name
+     */
+    protected Destination getDestination(WebClient client, HttpServletRequest request, String destinationName) throws JMSException {
+        if (isTopic(request)) {
+            return client.getSession().createTopic(destinationName);
+        }
+        else {
+            return client.getSession().createQueue(destinationName);
+        }
+    }
+
+    /**
+     * @return true if the current request is for a topic destination, else false if its for a queue
+     */
+    protected boolean isTopic
+            (HttpServletRequest
+            request) {
+        boolean aTopic = defaultTopicFlag;
+        String aTopicText = request.getParameter(topicParameter);
+        if (aTopicText != null) {
+            aTopic = asBoolean(aTopicText);
+        }
+        return aTopic;
+    }
+
+    protected long asLong(String name) {
+        return Long.parseLong(name);
+    }
+
+    /**
+     * @return the text that was posted to the servlet which is used as the body
+     *         of the message to be sent
+     */
+    protected String getPostedMessageBody(HttpServletRequest request) throws IOException {
+        String answer = request.getParameter(bodyParameter);
+        if (answer == null) {
+            // lets read the message body instead
+            BufferedReader reader = request.getReader();
+            StringBuffer buffer = new StringBuffer();
+            while (true) {
+                String line = reader.readLine();
+                if (line == null) {
+                    break;
+                }
+                buffer.append(line);
+                buffer.append("\n");
+            }
+            return buffer.toString();
+        }
+        return answer;
+    }
+}

Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java?rev=357943&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java (added)
+++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/NoDestinationSuppliedException.java Tue Dec 20 01:54:54 2005
@@ -0,0 +1,32 @@
+/** 
+ * 
+ * Copyright 2004 Protique Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.activemq.web;
+
+import javax.servlet.ServletException;
+
+/**
+ * Exception thrown if there was no destination available
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class NoDestinationSuppliedException extends ServletException {
+
+    public NoDestinationSuppliedException() {
+        super("Could not perform the JMS operation as no Destination was supplied");
+    }
+}

Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java?rev=357943&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java (added)
+++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/PortfolioPublishServlet.java Tue Dec 20 01:54:54 2005
@@ -0,0 +1,138 @@
+/** 
+ * 
+ * Copyright 2004 Protique Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.activemq.web;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Hashtable;
+import java.util.Map;
+
+/**
+ * A servlet which will publish dummy market data prices
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class PortfolioPublishServlet extends MessageServletSupport {
+
+    private static final int maxDeltaPercent = 1;
+    private static final Map lastPrices = new Hashtable();
+    private boolean ricoStyle = true;
+
+    
+    public void init() throws ServletException {
+        super.init();
+        
+        ricoStyle = asBoolean(getServletConfig().getInitParameter("rico"), true);
+    }
+
+    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+        PrintWriter out = response.getWriter();
+        String[] stocks = request.getParameterValues("stocks");
+        if (stocks == null || stocks.length == 0) {
+            out.println("<html><body>No <b>stocks</b> query parameter specified. Cannot publish market data</body></html>");
+        }
+        else {
+            Integer total=(Integer)request.getSession(true).getAttribute("total");
+            if (total==null)
+                total=new Integer(0);
+            
+            
+            int count = getNumberOfMessages(request);
+            total=new Integer(total.intValue()+count);
+            request.getSession().setAttribute("total",total);
+            
+            try {
+                WebClient client = getWebClient(request);
+                for (int i = 0; i < count; i++) {
+                    sendMessage(client, stocks);
+                }
+                out.print("<html><head><meta http-equiv='refresh' content='");
+                String refreshRate = request.getParameter("refresh");
+                if (refreshRate == null || refreshRate.length() == 0) {
+                    refreshRate = "1";
+                }
+                out.print(refreshRate);
+                out.println("'/></head>");
+                out.println("<body>Published <b>" + count + "</b> of "+total+ " price messages.  Refresh = "+refreshRate+"s");
+                out.println("</body></html>");
+
+            }
+            catch (JMSException e) {
+                out.println("<html><body>Failed sending price messages due to <b>" + e + "</b></body></html>");
+                log("Failed to send message: " + e, e);
+            }
+        }
+    }
+
+    protected void sendMessage(WebClient client, String[] stocks) throws JMSException {
+        Session session = client.getSession();
+
+        int idx = 0;
+        while (true) {
+            idx = (int) Math.round(stocks.length * Math.random());
+            if (idx < stocks.length) {
+                break;
+            }
+        }
+        String stock = stocks[idx];
+        Destination destination = session.createTopic("STOCKS." + stock);
+        String stockText = createStockText(stock);
+        log("Sending: " + stockText + " on destination: " + destination);
+        Message message = session.createTextMessage(stockText);
+        client.send(destination, message);
+    }
+
+    protected String createStockText(String stock) {
+        Double value = (Double) lastPrices.get(stock);
+        if (value == null) {
+            value = new Double(Math.random() * 100);
+        }
+
+        // lets mutate the value by some percentage
+        double oldPrice = value.doubleValue();
+        value = new Double(mutatePrice(oldPrice));
+        lastPrices.put(stock, value);
+        double price = value.doubleValue();
+
+        double offer = price * 1.001;
+
+        String movement = (price > oldPrice) ? "up" : "down";
+        return "<price stock='" + stock + "' bid='" + price + "' offer='" + offer + "' movement='" + movement + "'/>";
+    }
+
+    protected double mutatePrice(double price) {
+        double percentChange = (2 * Math.random() * maxDeltaPercent) - maxDeltaPercent;
+
+        return price * (100 + percentChange) / 100;
+    }
+
+    protected int getNumberOfMessages(HttpServletRequest request) {
+        String name = request.getParameter("count");
+        if (name != null) {
+            return Integer.parseInt(name);
+        }
+        return 1;
+    }
+}

Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java?rev=357943&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java (added)
+++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/SpringBrokerContextListener.java Tue Dec 20 01:54:54 2005
@@ -0,0 +1,117 @@
+/** 
+ * 
+ * Copyright 2004 Protique Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.activemq.web;
+
+import org.activemq.broker.BrokerService;
+import org.activemq.xbean.BrokerFactoryBean;
+import org.springframework.core.io.Resource;
+import org.springframework.web.context.support.ServletContextResource;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+
+/**
+ * Used to configure and instance of ActiveMQ <tt>BrokerService</tt> using
+ * ActiveMQ/Spring's xml configuration. <p/> The configuration file is specified
+ * via the context init parameter <tt>brokerURI</tt>, typically: <code>
+ * &lt;context-param&gt;
+ * &lt;param-name&gt;brokerURI&lt;/param-name&gt;
+ * &lt;param-value&gt;/WEB-INF/activemq.xml&lt;/param-value&gt;
+ * &lt;/context-param&gt;
+ * </code>
+ * 
+ * As a a default, if a <tt>brokerURI</tt> is not specified it will look up
+ * for <tt>activemq.xml</tt>
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class SpringBrokerContextListener implements ServletContextListener {
+
+    /** broker uri context parameter name: <tt>brokerURI</tt> */
+    public static final String INIT_PARAM_BROKER_URI = "brokerURI";
+
+    /** the broker container instance */
+    private BrokerService brokerContainer;
+
+    /**
+     * Set the broker container to be used by this listener
+     * 
+     * @param container
+     *            the container to be used.
+     */
+    protected void setBrokerService(BrokerService container) {
+        this.brokerContainer = container;
+    }
+
+    /**
+     * Return the broker container.
+     */
+    protected BrokerService getBrokerService() {
+        return this.brokerContainer;
+    }
+
+    public void contextInitialized(ServletContextEvent event) {
+        ServletContext context = event.getServletContext();
+        context.log("Creating ActiveMQ Broker...");
+        brokerContainer = createBroker(context);
+
+        context.log("Starting ActiveMQ Broker");
+        try {
+            brokerContainer.start();
+
+            context.log("Started ActiveMQ Broker");
+        }
+        catch (Exception e) {
+            context.log("Failed to start ActiveMQ broker: " + e, e);
+        }
+    }
+
+    public void contextDestroyed(ServletContextEvent event) {
+        ServletContext context = event.getServletContext();
+        if (brokerContainer != null) {
+            try {
+                brokerContainer.stop();
+            }
+            catch (Exception e) {
+                context.log("Failed to stop the ActiveMQ Broker: " + e, e);
+            }
+            brokerContainer = null;
+        }
+    }
+
+    /**
+     * Factory method to create a new ActiveMQ Broker
+     */
+    protected BrokerService createBroker(ServletContext context) {
+        String brokerURI = context.getInitParameter(INIT_PARAM_BROKER_URI);
+        if (brokerURI == null) {
+            brokerURI = "activemq.xml";
+        }
+        context.log("Loading ActiveMQ Broker configuration from: " + brokerURI);
+        Resource resource = new ServletContextResource(context, brokerURI);
+        BrokerFactoryBean factory = new BrokerFactoryBean(resource);
+        try {
+            factory.afterPropertiesSet();
+        }
+        catch (Exception e) {
+            context.log("Failed to create broker: " + e, e);
+        }
+        return factory.getBroker();
+    }
+}

Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java?rev=357943&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java (added)
+++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/WebClient.java Tue Dec 20 01:54:54 2005
@@ -0,0 +1,257 @@
+/**
+ * 
+ * Copyright 2004 Protique Ltd
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+
+package org.activemq.web;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpSession;
+import javax.servlet.http.HttpSessionActivationListener;
+import javax.servlet.http.HttpSessionEvent;
+
+import org.activemq.ActiveMQConnection;
+import org.activemq.ActiveMQConnectionFactory;
+import org.activemq.ActiveMQSession;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
+
+/**
+ * Represents a messaging client used from inside a web container
+ * typically stored inside a HttpSession
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class WebClient implements HttpSessionActivationListener, Externalizable {
+    public static final String webClientAttribute = "org.activemq.webclient";
+    public static final String connectionFactoryAttribute = "org.activemq.connectionFactory";
+    public static final String queueConsumersAttribute = "org.activemq.queueConsumers";
+    public static final String brokerUrlInitParam = "org.activemq.brokerURL";
+    public static final String embeddedBrokerInitParam = "org.activemq.embeddedBroker";
+
+    private static final Log log = LogFactory.getLog(WebClient.class);
+
+    private static transient ConnectionFactory factory;
+    private static transient Map queueConsumers;
+
+    private transient ServletContext context;
+    private transient ActiveMQConnection connection;
+    private transient ActiveMQSession session;
+    private transient MessageProducer producer;
+    private transient Map topicConsumers = new ConcurrentHashMap();
+    private int deliveryMode = DeliveryMode.NON_PERSISTENT;
+
+    private final Semaphore semaphore = new Semaphore(1);
+
+
+    /**
+     * @return the web client for the current HTTP session or null if there is not a web client created yet
+     */
+    public static WebClient getWebClient(HttpSession session) {
+        return (WebClient) session.getAttribute(webClientAttribute);
+    }
+
+
+    public static void initContext(ServletContext context) {
+        factory = initConnectionFactory(context);
+        if (factory == null) {
+            log.warn("No ConnectionFactory available in the ServletContext for: " + connectionFactoryAttribute);
+            factory = new ActiveMQConnectionFactory("vm://localhost");
+            context.setAttribute(connectionFactoryAttribute, factory);
+        }
+        queueConsumers = initQueueConsumers(context);
+    }
+
+    /**
+     * Only called by serialization
+     */
+    public WebClient() {
+    }
+
+    public WebClient(ServletContext context) {
+        this.context = context;
+        initContext(context);
+    }
+
+    
+    public int getDeliveryMode() {
+        return deliveryMode;
+    }
+
+
+    public void setDeliveryMode(int deliveryMode) {
+        this.deliveryMode = deliveryMode;
+    }
+
+
+    public void start() throws JMSException {
+    }
+
+    public void stop() throws JMSException {
+        System.out.println("Closing the WebClient!!! " + this);
+        
+        try {
+            connection.close();
+        }
+        finally {
+            producer = null;
+            session = null;
+            connection = null;
+            topicConsumers.clear();
+        }
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException {
+    }
+
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        topicConsumers = new HashMap();
+    }
+
+    public void send(Destination destination, Message message) throws JMSException {
+        if (producer == null) {
+            producer = getSession().createProducer(null);
+            producer.setDeliveryMode(deliveryMode );
+        }
+        producer.send(destination, message);
+        if (log.isDebugEnabled()) {
+            log.debug("Sent! to destination: " + destination + " message: " + message);
+        }
+    }
+
+    public Session getSession() throws JMSException {
+        if (session == null) {
+            session = createSession();
+        }
+        return session;
+    }
+
+    public ActiveMQConnection getConnection() throws JMSException {
+        if (connection == null) {
+            connection = (ActiveMQConnection) factory.createConnection();
+            connection.start();
+        }
+        return connection;
+    }
+
+    public void sessionWillPassivate(HttpSessionEvent event) {
+        try {
+            stop();
+        }
+        catch (JMSException e) {
+            log.warn("Could not close connection: " + e, e);
+        }
+    }
+
+    public void sessionDidActivate(HttpSessionEvent event) {
+        // lets update the connection factory from the servlet context
+        context = event.getSession().getServletContext();
+        initContext(context);
+    }
+
+    public static Map initQueueConsumers(ServletContext context) {
+        Map answer = (Map) context.getAttribute(queueConsumersAttribute);
+        if (answer == null) {
+            answer = new HashMap();
+            context.setAttribute(queueConsumersAttribute, answer);
+        }
+        return answer;
+    }
+
+
+    public static ConnectionFactory initConnectionFactory(ServletContext servletContext) {
+        ConnectionFactory connectionFactory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
+        if (connectionFactory == null) {
+            String brokerURL = (String) servletContext.getInitParameter(brokerUrlInitParam);
+
+            servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL);
+
+            if (brokerURL == null) {
+                brokerURL = "vm://localhost";
+            }
+
+            boolean embeddedBroker = MessageServletSupport.asBoolean(servletContext.getInitParameter(embeddedBrokerInitParam));
+            servletContext.log("Use embedded broker: " + embeddedBroker);
+
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
+            factory.setUseEmbeddedBroker(embeddedBroker);
+
+            connectionFactory = factory;
+            servletContext.setAttribute(connectionFactoryAttribute, connectionFactory);
+        }
+        return connectionFactory;
+    }
+
+    public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
+        if (destination instanceof Topic) {
+            MessageConsumer consumer = (MessageConsumer) topicConsumers.get(destination);
+            if (consumer == null) {
+                consumer = getSession().createConsumer(destination);
+                topicConsumers.put(destination, consumer);
+            }
+            return consumer;
+        }
+        else {
+            synchronized (queueConsumers) {
+                SessionConsumerPair pair = (SessionConsumerPair) queueConsumers.get(destination);
+                if (pair == null) {
+                    pair = createSessionConsumerPair(destination);
+                    queueConsumers.put(destination, pair);
+                }
+                return pair.consumer;
+            }
+        }
+    }
+
+    protected ActiveMQSession createSession() throws JMSException {
+        return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    protected SessionConsumerPair createSessionConsumerPair(Destination destination) throws JMSException {
+        SessionConsumerPair answer = new SessionConsumerPair();
+        answer.session = createSession();
+        answer.consumer = answer.session.createConsumer(destination);
+        return answer;
+    }
+
+    protected static class SessionConsumerPair {
+        public Session session;
+        public MessageConsumer consumer;
+    }
+
+    public Semaphore getSemaphore() {
+        return semaphore;
+    }
+}

Added: incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html?rev=357943&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html (added)
+++ incubator/activemq/trunk/activemq-web/src/main/java/org/activemq/web/package.html Tue Dec 20 01:54:54 2005
@@ -0,0 +1,13 @@
+<html>
+<head>
+</head>
+<body>
+
+<p>
+	Web Connectors so that messages can be sent via HTTP POST or read via
+	HTTP POST or GET as well as support for web streaming to we browser or
+	JavaScript clients.
+</p>
+
+</body>
+</html>



Mime
View raw message