camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmoulli...@apache.org
Subject svn commit: r1342565 - in /camel/trunk/components/camel-websocket: ./ src/main/java/org/apache/camel/component/websocket/ src/test/java/org/apache/camel/component/websocket/
Date Fri, 25 May 2012 10:40:56 GMT
Author: cmoulliard
Date: Fri May 25 10:40:55 2012
New Revision: 1342565

URL: http://svn.apache.org/viewvc?rev=1342565&view=rev
Log:
camel-5280 - Refactor of component - not yet finished

Added:
    camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentRouteExampleTest.java
Modified:
    camel/trunk/components/camel-websocket/pom.xml
    camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
    camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
    camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
    camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
    camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
    camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentTest.java
    camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketEndpointTest.java
    camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java

Modified: camel/trunk/components/camel-websocket/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/pom.xml?rev=1342565&r1=1342564&r2=1342565&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/pom.xml (original)
+++ camel/trunk/components/camel-websocket/pom.xml Fri May 25 10:40:55 2012
@@ -96,4 +96,18 @@
         </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>**/WebsocketEndpointTest.java</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+
+    </build>
 </project>
\ No newline at end of file

Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java?rev=1342565&r1=1342564&r2=1342565&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
(original)
+++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponent.java
Fri May 25 10:40:55 2012
@@ -24,11 +24,14 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.GeneralSecurityException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.spi.ManagementAgent;
+import org.apache.camel.spi.ManagementStrategy;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
@@ -36,9 +39,12 @@ import org.apache.camel.util.jsse.SSLCon
 import org.eclipse.jetty.http.ssl.SslContextFactory;
 import org.eclipse.jetty.jmx.MBeanContainer;
 import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.SessionManager;
 import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.server.handler.HandlerWrapper;
 import org.eclipse.jetty.server.nio.SelectChannelConnector;
 import org.eclipse.jetty.server.session.HashSessionManager;
 import org.eclipse.jetty.server.session.SessionHandler;
@@ -47,9 +53,12 @@ import org.eclipse.jetty.server.ssl.SslS
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.MBeanServer;
 import javax.servlet.Servlet;
 
 public class WebsocketComponent extends DefaultComponent {
@@ -57,16 +66,18 @@ public class WebsocketComponent extends 
     protected static final Logger LOG = LoggerFactory.getLogger(WebsocketComponent.class);
     protected static final HashMap<String, ConnectorRef> CONNECTORS = new HashMap<String,
ConnectorRef>();
 
-    protected ServletContextHandler context;
     protected SSLContextParameters sslContextParameters;
-    protected Server server;
     protected MBeanContainer mbContainer;
+    protected ThreadPool threadPool;
+    protected ServletContextHandler context;
 
     protected Integer port;
-    protected String host;
+    protected Integer minThreads;
+    protected Integer maxThreads;
 
     protected boolean enableJmx;
 
+    protected String host;
     protected String staticResources;
     protected String sslKeyPassword;
     protected String sslPassword;
@@ -75,10 +86,10 @@ public class WebsocketComponent extends 
     class ConnectorRef {
         Server server;
         Connector connector;
-        Servlet servlet;
+        WebsocketComponentServlet servlet;
         int refCount;
 
-        public ConnectorRef(Server server, Connector connector, Servlet servlet) {
+        public ConnectorRef(Server server, Connector connector, WebsocketComponentServlet
servlet) {
             this.server = server;
             this.connector = connector;
             this.servlet = servlet;
@@ -106,72 +117,6 @@ public class WebsocketComponent extends 
     public WebsocketComponent() {
     }
 
-    @Override
-    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object>
parameters) throws Exception {
-
-        Map<String, Object> websocketParameters = new HashMap<String, Object>(parameters);
-
-        Boolean enableJmx = getAndRemoveParameter(parameters, "enableJmx", Boolean.class);
-        SSLContextParameters sslContextParameters = resolveAndRemoveReferenceParameter(parameters,
"sslContextParametersRef", SSLContextParameters.class);
-        int port = extractPortNumber(remaining);
-        String host = extractHostName(remaining);
-
-        WebsocketEndpoint endpoint = new WebsocketEndpoint(this, uri, remaining, parameters);
-
-        if (enableJmx != null) {
-            endpoint.setEnableJmx(enableJmx);
-        } else {
-            endpoint.setEnableJmx(isEnableJmx());
-        }
-
-        if (sslContextParameters == null) {
-            sslContextParameters = this.sslContextParameters;
-        }
-
-        endpoint.setSslContextParameters(sslContextParameters);
-        endpoint.setPort(port);
-        endpoint.setHost(host);
-
-        setProperties(endpoint, parameters);
-        return endpoint;
-    }
-
-    public String getStaticResources() {
-        return staticResources;
-    }
-
-    /**
-     * Set a resource path for static resources (such as .html files etc).
-     * <p/>
-     * The resources can be loaded from classpath, if you prefix with <tt>classpath:</tt>,
-     * otherwise the resources is loaded from file system or from JAR files.
-     * <p/>
-     * For example to load from root classpath use <tt>classpath:.</tt>, or
-     * <tt>classpath:WEB-INF/static</tt>
-     * <p/>
-     * If not configured (eg <tt>null</tt>) then no static resource is in use.
-     *
-     * @param staticResources the base path
-     */
-    public void setStaticResources(String staticResources) {
-        this.staticResources = staticResources;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-    }
-
-    public Integer getPort() {
-        return port;
-    }
-
-    public void setPort(Integer port) {
-        this.port = port;
-    }
 
     /**
      * Connects the URL specified on the endpoint to the specified processor.
@@ -179,11 +124,10 @@ public class WebsocketComponent extends 
     public void connect(WebsocketProducerConsumer prodcon) throws Exception {
 
         Server server = null;
-        DefaultServlet defaultServlet = null;
         String baseResource = null;
         WebsocketEndpoint endpoint = prodcon.getEndpoint();
 
-        String connectorKey = "websocket" + ":" + endpoint.getHost() + ":" + endpoint.getPort();
+        String connectorKey = getConnectorKey(endpoint);
 
         synchronized (CONNECTORS) {
             ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
@@ -211,14 +155,11 @@ public class WebsocketComponent extends 
 
                 connector.setHost(endpoint.getHost());
 
-                // Define Context and SessionManager
-                context.setContextPath("/");
+                // TODO -  Is it the right place to define static resources ?
+                if (endpoint.getHome() != null) {
 
-                SessionManager sm = new HashSessionManager();
-                SessionHandler sh = new SessionHandler(sm);
-                context.setSessionHandler(sh);
+                    ServletContextHandler context = new ServletContextHandler(server, "/",
ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS);
 
-                if (endpoint.getHome() != null) {
                     if (endpoint.getHome().startsWith("classpath:")) {
                         baseResource = ObjectHelper.after(endpoint.getHome(), "classpath:");
                         LOG.debug("Using base resource from classpath: {}", baseResource);
@@ -227,7 +168,7 @@ public class WebsocketComponent extends 
                         LOG.debug("Using base resource: {}", baseResource);
                         context.setResourceBase(baseResource);
                     }
-                    defaultServlet = new DefaultServlet();
+                    DefaultServlet defaultServlet = new DefaultServlet();
                     ServletHolder holder = new ServletHolder(defaultServlet);
 
                     // avoid file locking on windows
@@ -237,11 +178,25 @@ public class WebsocketComponent extends 
                 }
 
                 // Create Server and add connector
-                server = new Server();
+                server = createServer();
+                if (endpoint.isEnableJmx()) {
+                    enableJmx(server);
+                }
+
                 server.addConnector(connector);
-                server.setHandler(context);
 
-                connectorRef = new ConnectorRef(server, connector, defaultServlet);
+                // Create ServletContextHandler
+                context = createContext(server,connector,endpoint.getHandlers());
+
+                // Don't provide a Servlet object as Producer/Consumer will create them later
on
+                connectorRef = new ConnectorRef(server, connector, null);
+
+                // must enable session before we start
+                if (endpoint.isSessionSupport()) {
+                    enableSessionSupport(connectorRef.server, connectorKey);
+                }
+                connectorRef.server.start();
+
                 CONNECTORS.put(connectorKey, connectorRef);
 
                 LOG.debug("Jetty Server started for host : " + connector.getHost() + ", on
port : " + connector.getPort());
@@ -251,6 +206,16 @@ public class WebsocketComponent extends 
                 connectorRef.increment();
             }
 
+            // check the session support
+            if (endpoint.isSessionSupport()) {
+                enableSessionSupport(connectorRef.server, connectorKey);
+            }
+
+            // TODO - As we can define WebSocket for Consumer/Producer
+            // This part of the code must be adapted compare to camel-jetty where we only
use
+            // Jetty as a server = Consumer
+            // connectorRef.servlet.connect(consumer);
+
         }
 
     }
@@ -260,9 +225,9 @@ public class WebsocketComponent extends 
      * processor.
      */
     public void disconnect(WebsocketProducerConsumer prodcon) throws Exception {
-
+        // If the connector is not needed anymore then stop it
         WebsocketEndpoint endpoint = prodcon.getEndpoint();
-        String connectorKey = "websocket" + ":" + endpoint.getHost() + ":" + endpoint.getPort();
+        String connectorKey = getConnectorKey(endpoint);
 
         synchronized (CONNECTORS) {
             ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
@@ -271,76 +236,180 @@ public class WebsocketComponent extends 
                     connectorRef.server.removeConnector(connectorRef.connector);
                     connectorRef.connector.stop();
                     connectorRef.server.stop();
-                    CONNECTORS.remove(CONNECTORS);
+                    CONNECTORS.remove(connectorKey);
+                    // Camel controls the lifecycle of these entities so remove the
+                    // registered MBeans when Camel is done with the managed objects.
+                    if (mbContainer != null) {
+                        mbContainer.removeBean(connectorRef.server);
+                        mbContainer.removeBean(connectorRef.connector);
+                    }
                 }
             }
         }
-
     }
 
-    /*protected Server createServer(ServletContextHandler context, String host, int port,
String home) {
+    public synchronized MBeanContainer getMbContainer() {
+        // If null, provide the default implementation.
+        if (mbContainer == null) {
+            MBeanServer mbs = null;
+
+            final ManagementStrategy mStrategy = this.getCamelContext().getManagementStrategy();
+            final ManagementAgent mAgent = mStrategy.getManagementAgent();
+            if (mAgent != null) {
+                mbs = mAgent.getMBeanServer();
+            }
 
-        String connectorKey = "websocket" + ":" + host + ":" + port;
-        Server server = null;
-        DefaultServlet defaultServlet = null;
-        // WebsocketComponent websocketComponent = (WebsocketComponent) this.getCamelContext().getEndpoint(connectorKey);
+            if (mbs != null) {
+                mbContainer = new MBeanContainer(mbs);
+                startMbContainer();
+            } else {
+                LOG.warn("JMX disabled in CamelContext. Jetty JMX extensions will remain
disabled.");
+            }
+        }
 
-        synchronized (CONNECTORS) {
-            ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
-            if (connectorRef == null) {
-                Connector connector;
-                if (sslContextParameters != null) {
-                    connector = getSslSocketConnector();
-                } else {
-                    connector = new SelectChannelConnector();
-                }
+        return this.mbContainer;
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object>
parameters) throws Exception {
 
-                connector.setHost(host);
-                connector.setPort(port);
+        Map<String, Object> websocketParameters = new HashMap<String, Object>(parameters);
 
-                // Define Context and SessionManager
-                context.setContextPath("/");
+        Boolean enableJmx = getAndRemoveParameter(parameters, "enableJmx", Boolean.class);
+        SSLContextParameters sslContextParameters = resolveAndRemoveReferenceParameter(parameters,
"sslContextParametersRef", SSLContextParameters.class);
+        int port = extractPortNumber(remaining);
+        String host = extractHostName(remaining);
 
-                SessionManager sm = new HashSessionManager();
-                SessionHandler sh = new SessionHandler(sm);
-                context.setSessionHandler(sh);
-
-                if (home != null) {
-                    if (home.startsWith("classpath:")) {
-                        home = ObjectHelper.after(home, "classpath:");
-                        LOG.debug("Using base resource from classpath: {}", home);
-                        context.setBaseResource(new JettyClassPathResource(getCamelContext().getClassResolver(),
home));
-                    } else {
-                        LOG.debug("Using base resource: {}", home);
-                        context.setResourceBase(home);
-                    }
-                    defaultServlet = new DefaultServlet();
-                    ServletHolder holder = new ServletHolder(defaultServlet);
+        WebsocketEndpoint endpoint = new WebsocketEndpoint(this, uri, remaining, parameters);
 
-                    // avoid file locking on windows
-                    // http://stackoverflow.com/questions/184312/how-to-make-jetty-dynamically-load-static-pages
-                    holder.setInitParameter("useFileMappedBuffer", "false");
-                    context.addServlet(holder, "/");
-                }
+        if (enableJmx != null) {
+            endpoint.setEnableJmx(enableJmx);
+        } else {
+            endpoint.setEnableJmx(isEnableJmx());
+        }
 
-                // Create Server and add connector
-                server = new Server();
-                server.addConnector(connector);
-                server.setHandler(context);
-                connectorRef = new ConnectorRef(server, connector, defaultServlet);
+        if (sslContextParameters == null) {
+            sslContextParameters = this.sslContextParameters;
+        }
 
-                CONNECTORS.put(connectorKey, connectorRef);
+        endpoint.setSslContextParameters(sslContextParameters);
+        endpoint.setPort(port);
+        endpoint.setHost(host);
 
-            } else {
-                connectorRef.increment();
+        setProperties(endpoint, parameters);
+        return endpoint;
+    }
+
+    protected Server createServer() throws Exception {
+        Server server = new Server();
+        ContextHandlerCollection collection = new ContextHandlerCollection();
+        server.setHandler(collection);
+
+        // configure thread pool if min/max given
+        if (minThreads != null || maxThreads != null) {
+            if (getThreadPool() != null) {
+                throw new IllegalArgumentException("You cannot configure both minThreads/maxThreads
and a custom threadPool on JettyHttpComponent: " + this);
+            }
+            QueuedThreadPool qtp = new QueuedThreadPool();
+            if (minThreads != null) {
+                qtp.setMinThreads(minThreads.intValue());
             }
+            if (maxThreads != null) {
+                qtp.setMaxThreads(maxThreads.intValue());
+            }
+            // let the thread names indicate they are from the server
+            qtp.setName("CamelJettyWebSocketServer(" + ObjectHelper.getIdentityHashCode(server)
+ ")");
+            try {
+                qtp.start();
+            } catch (Exception e) {
+                throw new RuntimeCamelException("Error starting JettyWebSocketServer thread
pool: " + qtp, e);
+            }
+            server.setThreadPool(qtp);
+        }
 
+        if (getThreadPool() != null) {
+            server.setThreadPool(getThreadPool());
         }
 
         return server;
     }
-*/
-    protected SslConnector getSslSocketConnector(SSLContextParameters sslContextParameters)
{
+
+    protected WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketConsumer
consumer, String remaining) {
+        String pathSpec = createPathSpec(remaining);
+        WebsocketComponentServlet servlet = servlets.get(pathSpec);
+        if (servlet == null) {
+            servlet = createServlet(sync, pathSpec, servlets, context);
+        }
+        if (servlet.getConsumer() == null && consumer != null) {
+            // TODO Do we have to call connect(consumer) or setConsumer on the Consumer endpoint
+            servlet.setConsumer(consumer);
+        }
+        return servlet;
+    }
+
+    protected WebsocketComponentServlet createServlet(NodeSynchronization sync, String pathSpec,
Map<String, WebsocketComponentServlet> servlets, ServletContextHandler handler) {
+        WebsocketComponentServlet servlet = new WebsocketComponentServlet(sync);
+        servlets.put(pathSpec, servlet);
+        handler.addServlet(new ServletHolder(servlet), pathSpec);
+        LOG.debug("WebSocket servlet added for the following path : " + pathSpec);
+        return servlet;
+    }
+
+
+    protected ServletContextHandler createContext(Server server, Connector connector, List<Handler>
handlers) throws Exception {
+        ServletContextHandler context = new ServletContextHandler(server, "/", ServletContextHandler.NO_SECURITY
| ServletContextHandler.NO_SESSIONS);
+        context.setConnectorNames(new String[] {connector.getName()});
+
+        if (handlers != null && !handlers.isEmpty()) {
+            for (Handler handler : handlers) {
+                if (handler instanceof HandlerWrapper) {
+                    ((HandlerWrapper) handler).setHandler(server.getHandler());
+                    server.setHandler(handler);
+                } else {
+                    HandlerCollection handlerCollection = new HandlerCollection();
+                    handlerCollection.addHandler(server.getHandler());
+                    handlerCollection.addHandler(handler);
+                    server.setHandler(handlerCollection);
+                }
+            }
+        }
+
+        this.context = context;
+        return context;
+
+    }
+
+    /**
+     * Starts {@link #mbContainer} and registers the container with itself as a managed bean
+     * logging an error if there is a problem starting the container.
+     * Does nothing if {@link #mbContainer} is {@code null}.
+     */
+    protected void startMbContainer() {
+        if (mbContainer != null && !mbContainer.isStarted()) {
+            try {
+                mbContainer.start();
+                // Publish the container itself for consistency with
+                // traditional embedded Jetty configurations.
+                mbContainer.addBean(mbContainer);
+            } catch (Throwable e) {
+                LOG.warn("Could not start JettyWebSocket MBeanContainer. Jetty JMX extensions
will remain disabled.", e);
+            }
+        }
+    }
+
+    private void enableSessionSupport(Server server, String connectorKey) throws Exception
{
+        ServletContextHandler context = server.getChildHandlerByClass(ServletContextHandler.class);
+        if (context.getSessionHandler() == null) {
+            SessionHandler sessionHandler = new SessionHandler();
+            if (context.isStarted()) {
+                throw new IllegalStateException("Server has already been started. Cannot
enabled sessionSupport on " + connectorKey);
+            } else {
+                context.setSessionHandler(sessionHandler);
+            }
+        }
+    }
+
+    private SslConnector getSslSocketConnector(SSLContextParameters sslContextParameters)
{
         SslSelectChannelConnector sslSocketConnector = null;
         if (sslContextParameters != null) {
             SslContextFactory sslContextFactory = new WebSocketComponentSslContextFactory();
@@ -404,30 +473,6 @@ public class WebsocketComponent extends 
         return false;
     }
 
-    public WebsocketComponentServlet addServlet(NodeSynchronization sync, WebsocketConsumer
consumer, String remaining) {
-        String pathSpec = createPathSpec(remaining);
-        WebsocketComponentServlet servlet = servlets.get(pathSpec);
-        if (servlet == null) {
-            servlet = createServlet(sync, pathSpec, servlets, context);
-        }
-        if (servlet.getConsumer() == null && consumer != null) {
-            servlet.setConsumer(consumer);
-        }
-        return servlet;
-    }
-
-    WebsocketComponentServlet createServlet(NodeSynchronization sync, String pathSpec, Map<String,
WebsocketComponentServlet> servlets, ServletContextHandler handler) {
-        WebsocketComponentServlet servlet = new WebsocketComponentServlet(sync);
-        servlets.put(pathSpec, servlet);
-        handler.addServlet(new ServletHolder(servlet), pathSpec);
-        LOG.debug("WebSocket servlet added for the following path : " + pathSpec);
-        return servlet;
-    }
-
-    ServletContextHandler createContext() {
-        return new ServletContextHandler(ServletContextHandler.SESSIONS);
-    }
-
     private static String createPathSpec(String remaining) {
         // Is not correct as it does not support to add port in the URI
         //return String.format("/%s/*", remaining);
@@ -463,6 +508,60 @@ public class WebsocketComponent extends 
 
     }
 
+    private String getConnectorKey(WebsocketEndpoint endpoint) {
+        return endpoint.getProtocol() + ":" + endpoint.getHost() + ":" + endpoint.getPort();
+    }
+
+    private void enableJmx(Server server) {
+        MBeanContainer containerToRegister = getMbContainer();
+        if (containerToRegister != null) {
+            LOG.info("Jetty JMX Extensions is enabled");
+            server.getContainer().addEventListener(containerToRegister);
+            // Since we may have many Servers running, don't tie the MBeanContainer
+            // to a Server lifecycle or we end up closing it while it is still in use.
+            //server.addBean(mbContainer);
+        }
+    }
+
+    // Properties
+    // -------------------------------------------------------------------------
+
+    public String getStaticResources() {
+        return staticResources;
+    }
+
+    /**
+     * Set a resource path for static resources (such as .html files etc).
+     * <p/>
+     * The resources can be loaded from classpath, if you prefix with <tt>classpath:</tt>,
+     * otherwise the resources is loaded from file system or from JAR files.
+     * <p/>
+     * For example to load from root classpath use <tt>classpath:.</tt>, or
+     * <tt>classpath:WEB-INF/static</tt>
+     * <p/>
+     * If not configured (eg <tt>null</tt>) then no static resource is in use.
+     *
+     * @param staticResources the base path
+     */
+    public void setStaticResources(String staticResources) {
+        this.staticResources = staticResources;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public Integer getPort() {
+        return port;
+    }
+
+    public void setPort(Integer port) {
+        this.port = port;
+    }
 
     public String getSslKeyPassword() {
         return sslKeyPassword;
@@ -496,6 +595,30 @@ public class WebsocketComponent extends 
         return enableJmx;
     }
 
+    public Integer getMinThreads() {
+        return minThreads;
+    }
+
+    public void setMinThreads(Integer minThreads) {
+        this.minThreads = minThreads;
+    }
+
+    public Integer getMaxThreads() {
+        return maxThreads;
+    }
+
+    public void setMaxThreads(Integer maxThreads) {
+        this.maxThreads = maxThreads;
+    }
+
+    public ThreadPool getThreadPool() {
+        return threadPool;
+    }
+
+    public void setThreadPool(ThreadPool threadPool) {
+        this.threadPool = threadPool;
+    }
+
     public SSLContextParameters getSslContextParameters() {
         return sslContextParameters;
     }
@@ -504,11 +627,14 @@ public class WebsocketComponent extends 
         this.sslContextParameters = sslContextParameters;
     }
 
+    public ServletContextHandler getContext() {
+        return context;
+    }
+
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        context = createContext();
         //LOG.info("Starting server {}:{}; static resources: {}", new Object[]{host, port,
staticResources});
         //server = createServer(context, host, port, staticResources);
     }
@@ -516,12 +642,18 @@ public class WebsocketComponent extends 
     @Override
     public void doStop() throws Exception {
         super.doStop();
-        for (ConnectorRef connectorRef : CONNECTORS.values()) {
-            connectorRef.server.removeConnector(connectorRef.connector);
-            connectorRef.connector.stop();
-            connectorRef.server.stop();
+        if (CONNECTORS.size() > 0) {
+            for (String connectorKey : CONNECTORS.keySet()) {
+                ConnectorRef connectorRef = CONNECTORS.get(connectorKey);
+                if (connectorRef != null && connectorRef.getRefCount() == 0) {
+                    connectorRef.server.removeConnector(connectorRef.connector);
+                    connectorRef.connector.stop();
+                    connectorRef.server.stop();
+                }
+                CONNECTORS.remove(connectorKey);
+            }
         }
-        CONNECTORS.clear();
-    }
 
+    }
 }
+

Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java?rev=1342565&r1=1342564&r2=1342565&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
(original)
+++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketComponentServlet.java
Fri May 25 10:40:55 2012
@@ -20,13 +20,21 @@ import javax.servlet.http.HttpServletReq
 
 import org.eclipse.jetty.websocket.WebSocket;
 import org.eclipse.jetty.websocket.WebSocketServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 public class WebsocketComponentServlet extends WebSocketServlet {
     private static final long serialVersionUID = 1L;
+    private final transient Logger log = LoggerFactory.getLogger(getClass());
 
     private final NodeSynchronization sync;
     private WebsocketConsumer consumer;
 
+    private ConcurrentMap<String, WebsocketConsumer> consumers = new ConcurrentHashMap<String,
WebsocketConsumer>();
+
     public WebsocketComponentServlet(NodeSynchronization sync) {
         this.sync = sync;
     }
@@ -39,6 +47,16 @@ public class WebsocketComponentServlet e
         this.consumer = consumer;
     }
 
+    public void connect(WebsocketConsumer consumer) {
+        log.debug("Connecting consumer: {}", consumer);
+        consumers.put(consumer.getPath(), consumer);
+    }
+
+    public void disconnect(WebsocketConsumer consumer) {
+        log.debug("Disconnecting consumer: {}", consumer);
+        consumers.remove(consumer.getPath());
+    }
+
     @Override
     public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
         return new DefaultWebsocket(sync, consumer);

Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java?rev=1342565&r1=1342564&r2=1342565&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
(original)
+++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketConsumer.java
Fri May 25 10:40:55 2012
@@ -33,8 +33,8 @@ public class WebsocketConsumer extends D
 
     @Override
     public void start() throws Exception {
-        endpoint.connect(this);
         super.start();
+        endpoint.connect(this);
     }
 
     @Override
@@ -47,6 +47,10 @@ public class WebsocketConsumer extends D
         return endpoint;
     }
 
+    public String getPath() {
+        return endpoint.getPath();
+    }
+
     public void sendMessage(final String connectionKey, final String message) {
 
 /*        if (!endpoint.isStarted()) {

Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java?rev=1342565&r1=1342564&r2=1342565&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
(original)
+++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketEndpoint.java
Fri May 25 10:40:55 2012
@@ -23,9 +23,11 @@ import org.apache.camel.impl.DefaultEndp
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.jsse.SSLContextParameters;
+import org.eclipse.jetty.server.Handler;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.List;
 import java.util.Map;
 
 public class WebsocketEndpoint extends DefaultEndpoint {
@@ -35,9 +37,11 @@ public class WebsocketEndpoint extends D
     private WebsocketComponent component;
     private SSLContextParameters sslContextParameters;
     private URI uri;
+    private List<Handler> handlers;
 
     private Boolean sendToAll;
     private boolean enableJmx;
+    private boolean sessionSupport;
 
     private String remaining;
     private String host;
@@ -69,22 +73,28 @@ public class WebsocketEndpoint extends D
     public Consumer createConsumer(Processor processor) throws Exception {
         ObjectHelper.notNull(component, "component");
         WebsocketConsumer consumer = new WebsocketConsumer(this, processor);
-        getComponent().addServlet(sync, consumer, remaining);
+        // We will create the servlet when we
+        // will call connect method and Jetty Server created
+        // getComponent().addServlet(sync, consumer, remaining);
         return consumer;
     }
 
     @Override
     public Producer createProducer() throws Exception {
-        getComponent().addServlet(sync, null, remaining);
+        // We will create the servlet when we
+        // will call connect method and Jetty Server created
+        // getComponent().addServlet(sync, null, remaining);
         return new WebsocketProducer(this, memoryStore);
     }
 
-    public void connect(WebsocketProducerConsumer prodcons) throws Exception {
-        component.connect(prodcons);
+    public void connect(WebsocketConsumer consumer) throws Exception {
+        component.connect(consumer);
+        getComponent().addServlet(sync, consumer, remaining);
     }
 
-    public void disconnect(WebsocketProducerConsumer prodcons) throws Exception {
-        component.disconnect(prodcons);
+    public void disconnect(WebsocketConsumer consumer) throws Exception {
+        component.disconnect(consumer);
+        getComponent().addServlet(sync, consumer, remaining);
     }
 
     @Override
@@ -128,6 +138,31 @@ public class WebsocketEndpoint extends D
         this.sendToAll = sendToAll;
     }
 
+    public String getProtocol() {
+        return uri.getScheme();
+    }
+
+    public String getPath() {
+        return uri.getPath();
+    }
+
+    public void setSessionSupport(boolean support) {
+        sessionSupport = support;
+    }
+
+    public boolean isSessionSupport() {
+        return sessionSupport;
+    }
+
+    public List<Handler> getHandlers() {
+        return handlers;
+    }
+
+    public void setHandlers(List<Handler> handlers) {
+        this.handlers = handlers;
+    }
+
+
     public SSLContextParameters getSslContextParameters() {
         return sslContextParameters;
     }

Modified: camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java?rev=1342565&r1=1342564&r2=1342565&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
(original)
+++ camel/trunk/components/camel-websocket/src/main/java/org/apache/camel/component/websocket/WebsocketProducer.java
Fri May 25 10:40:55 2012
@@ -68,13 +68,13 @@ public class WebsocketProducer extends D
 
     @Override
     public void start() throws Exception {
-        endpoint.connect(this);
         super.start();
+        // endpoint.connect(this);
     }
 
     @Override
     public void stop() throws Exception {
-        endpoint.disconnect(this);
+        // endpoint.disconnect(this);
         super.stop();
     }
 

Added: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentRouteExampleTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentRouteExampleTest.java?rev=1342565&view=auto
==============================================================================
--- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentRouteExampleTest.java
(added)
+++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentRouteExampleTest.java
Fri May 25 10:40:55 2012
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.websocket;
+
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.websocket.WebSocket;
+import com.ning.http.client.websocket.WebSocketTextListener;
+import com.ning.http.client.websocket.WebSocketUpgradeHandler;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class WebsocketComponentRouteExampleTest extends CamelTestSupport {
+
+    private static List<String> received = new ArrayList<String>();
+    private static CountDownLatch latch = new CountDownLatch(1);
+
+    @Test
+    public void testWSHttpCall() throws Exception {
+        AsyncHttpClient c = new AsyncHttpClient();
+
+        WebSocket websocket = c.prepareGet("ws://localhost:9494/echo").execute(
+            new WebSocketUpgradeHandler.Builder()
+                .addWebSocketListener(new WebSocketTextListener() {
+                    @Override
+                    public void onMessage(String message) {
+                        received.add(message);
+                        log.info("received --> " + message);
+                        latch.countDown();
+                    }
+
+                    @Override
+                    public void onFragment(String fragment, boolean last) {
+                    }
+
+                    @Override
+                    public void onOpen(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onClose(WebSocket websocket) {
+                    }
+
+                    @Override
+                    public void onError(Throwable t) {
+                        t.printStackTrace();
+                    }
+                }).build()).get();
+
+        websocket.sendTextMessage("Beer");
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        assertEquals(1, received.size());
+        assertEquals("BeerBeer", received.get(0));
+
+        websocket.close();
+        c.close();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+
+                WebsocketComponent websocketComponent = getContext().getComponent("websocket",
WebsocketComponent.class);
+                websocketComponent.setHost("localhost");
+                websocketComponent.setPort(9494);
+
+                from("websocket://echo")
+                    .log(">>> Message received from WebSocket Client : ${body}")
+                    .transform().simple("${body}${body}")
+                    .to("websocket://echo");
+
+            }
+        };
+    }
+}

Modified: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentTest.java?rev=1342565&r1=1342564&r2=1342565&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentTest.java
(original)
+++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketComponentTest.java
Fri May 25 10:40:55 2012
@@ -16,14 +16,20 @@
  */
 package org.apache.camel.component.websocket;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -66,16 +72,18 @@ public class WebsocketComponentTest {
     private CamelContext camelContext;
 
     private WebsocketComponent component;
+    private Server server;
+    private ServletContextHandler context;
 
     @Before
     public void setUp() throws Exception {
         component = new WebsocketComponent();
+        setUpJettyServer();
     }
 
-    @Test
-    public void testCreateContext() {
-        ServletContextHandler handler = component.createContext();
-        assertNotNull(handler);
+    @After
+    public void shutdown() throws Exception {
+        server.stop();
     }
 
     // TODO - Update tests to use endpoint instead of createServer - chm - 22/05/2012
@@ -83,8 +91,6 @@ public class WebsocketComponentTest {
 
     @Test
     public void testCreateServerWithoutStaticContent() throws Exception {
-        ServletContextHandler handler = component.createContext();
-        Server server = component.createServer(handler, "localhost", 1988, null);
         assertEquals(1, server.getConnectors().length);
         assertEquals("localhost", server.getConnectors()[0].getHost());
         assertEquals(1988, server.getConnectors()[0].getPort());
@@ -150,7 +156,6 @@ public class WebsocketComponentTest {
     @Test
     public void testAddServletProducersOnly() throws Exception {
         component.setCamelContext(camelContext);
-        component.setPort(0);
         component.doStart();
         WebsocketComponentServlet s1 = component.addServlet(sync, null, PATH_ONE);
         WebsocketComponentServlet s2 = component.addServlet(sync, null, PATH_TWO);
@@ -165,7 +170,6 @@ public class WebsocketComponentTest {
     @Test
     public void testAddServletConsumersOnly() throws Exception {
         component.setCamelContext(camelContext);
-        component.setPort(0);
         component.doStart();
         WebsocketComponentServlet s1 = component.addServlet(sync, consumer, PATH_ONE);
         WebsocketComponentServlet s2 = component.addServlet(sync, consumer, PATH_TWO);
@@ -180,7 +184,6 @@ public class WebsocketComponentTest {
     @Test
     public void testAddServletProducerAndConsumer() throws Exception {
         component.setCamelContext(camelContext);
-        component.setPort(0);
         component.doStart();
         WebsocketComponentServlet s1 = component.addServlet(sync, null, PATH_ONE);
         WebsocketComponentServlet s2 = component.addServlet(sync, consumer, PATH_ONE);
@@ -205,4 +208,15 @@ public class WebsocketComponentTest {
         component.doStop();
     }
 
+    private void setUpJettyServer() throws Exception {
+        server = component.createServer();
+        Connector connector = new SelectChannelConnector();
+        connector.setHost("localhost");
+        connector.setPort(1988);
+        context = component.createContext(server,connector,null);
+        server.addConnector(connector);
+        server.setHandler(context);
+        server.start();
+    }
+
 }

Modified: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketEndpointTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketEndpointTest.java?rev=1342565&r1=1342564&r2=1342565&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketEndpointTest.java
(original)
+++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketEndpointTest.java
Fri May 25 10:40:55 2012
@@ -58,6 +58,10 @@ public class WebsocketEndpointTest {
     @Before
     public void setUp() throws Exception {
         websocketEndpoint = new WebsocketEndpoint(component, URI, REMAINING, null);
+        component = new WebsocketComponent();
+        component.setPort(1988);
+        component.setHost("localhost");
+        component.createServer();
     }
 
     /**
@@ -66,6 +70,8 @@ public class WebsocketEndpointTest {
     @Test
     public void testCreateConsumer() throws Exception {
         Consumer consumer = websocketEndpoint.createConsumer(processor);
+        component.connect((WebsocketProducerConsumer) consumer);
+
         assertNotNull(consumer);
         assertEquals(WebsocketConsumer.class, consumer.getClass());
         InOrder inOrder = inOrder(component, processor);
@@ -85,6 +91,8 @@ public class WebsocketEndpointTest {
     @Test
     public void testCreateProducer() throws Exception {
         Producer producer = websocketEndpoint.createProducer();
+        component.connect((WebsocketProducerConsumer) producer);
+
         assertNotNull(producer);
         assertEquals(WebsocketProducer.class, producer.getClass());
         InOrder inOrder = inOrder(component, processor);

Modified: camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java?rev=1342565&r1=1342564&r2=1342565&view=diff
==============================================================================
--- camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java
(original)
+++ camel/trunk/components/camel-websocket/src/test/java/org/apache/camel/component/websocket/WebsocketTwoRoutesExampleTest.java
Fri May 25 10:40:55 2012
@@ -131,7 +131,7 @@ public class WebsocketTwoRoutesExampleTe
                 from("websocket://localhost:9393/echo")
                         .log(">>> Message received from WebSocket Client : ${body}")
                         .transform().simple("${body}${body}")
-                        .to("websocket://localhost:9292/echo");
+                        .to("websocket://localhost:9393/echo");
             }
         };
     }



Mime
View raw message