camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [2/2] camel git commit: CAMEL-1077 Added remote address support to camel-mina2
Date Thu, 12 Feb 2015 12:58:10 GMT
CAMEL-1077 Added remote address support to camel-mina2


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/49c27d30
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/49c27d30
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/49c27d30

Branch: refs/heads/master
Commit: 49c27d30f5a649d9fe1931a961c3d348d6cfba18
Parents: c6e0496
Author: Willem Jiang <willem.jiang@gmail.com>
Authored: Thu Feb 12 20:56:19 2015 +0800
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Thu Feb 12 20:57:07 2015 +0800

----------------------------------------------------------------------
 .../component/mina2/Mina2Configuration.java     | 10 +++
 .../camel/component/mina2/Mina2Consumer.java    | 88 +++++++++++++++----
 ...Mina2ClientModeTcpTextlineDelimiterTest.java | 90 ++++++++++++++++++++
 3 files changed, 171 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/49c27d30/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java
b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java
index 53b5f33..e5de9f3 100644
--- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java
+++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Configuration.java
@@ -82,6 +82,8 @@ public class Mina2Configuration implements Cloneable {
     private boolean orderedThreadPoolExecutor = true;
     @UriParam(defaultValue = "true")
     private boolean cachedAddress = true;
+    @UriParam(defaultValue = "false")
+    private boolean clientMode;
 
     /**
      * Returns a copy of this configuration
@@ -300,6 +302,14 @@ public class Mina2Configuration implements Cloneable {
     public boolean isCachedAddress() {
         return cachedAddress;
     }
+    
+    public void setClientMode(boolean clientMode) {
+        this.clientMode = clientMode;
+    }
+    
+    public boolean isClientMode() {
+        return clientMode;
+    }
 
     // here we just shows the option setting of host, port, protocol 
     public String getUriString() {

http://git-wip-us.apache.org/repos/asf/camel/blob/49c27d30/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
index 1df470b..86cc421 100644
--- a/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
+++ b/components/camel-mina2/src/main/java/org/apache/camel/component/mina2/Mina2Consumer.java
@@ -31,7 +31,10 @@ import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
 import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.future.CloseFuture;
+import org.apache.mina.core.future.ConnectFuture;
 import org.apache.mina.core.service.IoAcceptor;
+import org.apache.mina.core.service.IoConnector;
 import org.apache.mina.core.service.IoHandlerAdapter;
 import org.apache.mina.core.service.IoService;
 import org.apache.mina.core.session.IoSession;
@@ -46,6 +49,7 @@ import org.apache.mina.filter.logging.LoggingFilter;
 import org.apache.mina.filter.ssl.SslFilter;
 import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
 import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.slf4j.Logger;
@@ -59,6 +63,8 @@ import org.slf4j.LoggerFactory;
 public class Mina2Consumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(Mina2Consumer.class);
+    private IoSession session;
+    private IoConnector connector;
     private SocketAddress address;
     private IoAcceptor acceptor;
     private Mina2Configuration configuration;
@@ -75,7 +81,11 @@ public class Mina2Consumer extends DefaultConsumer {
 
         String protocol = configuration.getProtocol();
         if (protocol.equals("tcp")) {
-            setupSocketProtocol(protocol, configuration);
+            if (configuration.isClientMode()) {
+                setupClientSocketProtocol(protocol, configuration);
+            } else {
+                setupSocketProtocol(protocol, configuration);
+            }
         } else if (configuration.isDatagramProtocol()) {
             setupDatagramProtocol(protocol, configuration);
         } else if (protocol.equals("vm")) {
@@ -86,25 +96,41 @@ public class Mina2Consumer extends DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-
-        acceptor.setHandler(new ReceiveHandler());
-        acceptor.bind(address);
-        LOG.info("Bound to server address: {} using acceptor: {}", address, acceptor);
+        if (configuration.isClientMode() && configuration.getProtocol().equals("tcp"))
{
+            connector.setHandler(new ReceiveHandler());
+            ConnectFuture future = connector.connect(address);
+            future.awaitUninterruptibly();
+            session = future.getSession();
+            LOG.info("Connected to server address: {} using connector: {} timeout: {} millis.",
new Object[]{address, connector, configuration.getTimeout()});
+        } else {
+            acceptor.setHandler(new ReceiveHandler());
+            acceptor.bind(address);
+            LOG.info("Bound to server address: {} using acceptor: {}", address, acceptor);
+        }
     }
 
     @Override
     protected void doStop() throws Exception {
-        LOG.info("Unbinding from server address: {} using acceptor: {}", address, acceptor);
-        if (address instanceof InetSocketAddress) {
-            // need to check if the address is IPV4 all network address
-            if ("0.0.0.0".equals(((InetSocketAddress)address).getAddress().getHostAddress()))
{
-                LOG.info("Unbind the server address {}", acceptor.getLocalAddresses());
-                acceptor.unbind(acceptor.getLocalAddresses());
+        if (configuration.isClientMode() && configuration.getProtocol().equals("tcp"))
{
+            LOG.info("Disconnect from server address: {} using connector: {}", address, connector);
+            if (session != null) {
+                CloseFuture closeFuture = session.close(true);
+                closeFuture.awaitUninterruptibly();
+            }
+            connector.dispose(true);
+        } else {
+            LOG.info("Unbinding from server address: {} using acceptor: {}", address, acceptor);
+            if (address instanceof InetSocketAddress) {
+                // need to check if the address is IPV4 all network address
+                if ("0.0.0.0".equals(((InetSocketAddress)address).getAddress().getHostAddress()))
{
+                    LOG.info("Unbind the server address {}", acceptor.getLocalAddresses());
+                    acceptor.unbind(acceptor.getLocalAddresses());
+                } else {
+                    acceptor.unbind(address);
+                }   
             } else {
                 acceptor.unbind(address);
-            }   
-        } else {
-            acceptor.unbind(address);
+            }
         }
         super.doStop();
     }
@@ -116,8 +142,7 @@ public class Mina2Consumer extends DefaultConsumer {
         }
         super.doShutdown();
     }
-
-
+   
     // Implementation methods
     //-------------------------------------------------------------------------
     protected void setupVmProtocol(String uri, Mina2Configuration configuration) {
@@ -171,6 +196,35 @@ public class Mina2Consumer extends DefaultConsumer {
             acceptor.getFilterChain().addFirst("sslFilter", filter);
         }
     }
+    
+    protected void setupClientSocketProtocol(String uri, Mina2Configuration configuration)
throws Exception {
+        boolean minaLogger = configuration.isMinaLogger();
+        long timeout = configuration.getTimeout();
+        List<IoFilter> filters = configuration.getFilters();
+
+        address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
+
+        final int processorCount = Runtime.getRuntime().availableProcessors() + 1;
+        connector = new NioSocketConnector(processorCount);
+
+        if (configuration.isOrderedThreadPoolExecutor()) {
+            workerPool = new OrderedThreadPoolExecutor(configuration.getMaximumPoolSize());
+        } else {
+            workerPool = new UnorderedThreadPoolExecutor(configuration.getMaximumPoolSize());
+        }
+        connector.getFilterChain().addLast("threadPool", new ExecutorFilter(workerPool));
+        if (minaLogger) {
+            connector.getFilterChain().addLast("logger", new LoggingFilter());
+        }
+        appendIoFiltersToChain(filters, connector.getFilterChain());
+        if (configuration.getSslContextParameters() != null) {
+            SslFilter filter = new SslFilter(configuration.getSslContextParameters().createSSLContext(),
configuration.isAutoStartTls());
+            filter.setUseClientMode(true);
+            connector.getFilterChain().addFirst("sslFilter", filter);
+        }
+        configureCodecFactory("Mina2Consumer", connector, configuration);
+        connector.setConnectTimeoutMillis(timeout);
+    }
 
     protected void configureCodecFactory(String type, IoService service, Mina2Configuration
configuration) {
         if (configuration.getCodec() != null) {
@@ -213,7 +267,7 @@ public class Mina2Consumer extends DefaultConsumer {
         acceptor = new NioDatagramAcceptor();
 
         // acceptor connectorConfig
-        configureDataGramCodecFactory("MinaConsumer", acceptor, configuration);
+        configureDataGramCodecFactory("Mina2Consumer", acceptor, configuration);
         acceptor.setCloseOnDeactivation(true);
         // reuse address is default true for datagram
         if (configuration.isOrderedThreadPoolExecutor()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/49c27d30/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
new file mode 100644
index 0000000..cf5cfda
--- /dev/null
+++ b/components/camel-mina2/src/test/java/org/apache/camel/component/mina2/Mina2ClientModeTcpTextlineDelimiterTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina2;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.mina.core.service.IoAcceptor;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.textline.LineDelimiter;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.junit.Test;
+
+public class Mina2ClientModeTcpTextlineDelimiterTest extends BaseMina2Test {
+
+    @Test
+    public void testMinaRoute() throws Exception {
+        MockEndpoint endpoint = getMockEndpoint("mock:result");
+        Object body = "Hello there!";
+        endpoint.expectedBodiesReceived(body);
+        // need to start the server first
+        Server server = new Server(getPort());
+        server.startup();
+        // start the camel route to connect to the server
+        context.startRoute("minaRoute");
+        endpoint.assertIsSatisfied();
+        server.shutdown();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            public void configure() {
+                from(String.format("mina2:tcp://localhost:%1$s?sync=false&textline=true&textlineDelimiter=UNIX&clientMode=true",
getPort()))
+                    .id("minaRoute")
+                    .noAutoStartup()
+                    .to("log:before?showAll=true")
+                    .to("mock:result")
+                    .to("log:after?showAll=true");
+            }
+        };
+    }
+    
+    private class Server {
+        private final int port;
+        private IoAcceptor acceptor;
+        public Server(int port) {
+            this.port = port;
+        }
+        
+        public void startup() throws Exception {
+            acceptor = new NioSocketAcceptor();
+            Mina2TextLineCodecFactory codecFactory = new Mina2TextLineCodecFactory(Charset.forName("UTF-8"),
LineDelimiter.UNIX);
+            acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));
+            acceptor.setHandler(new ServerHandler());
+            acceptor.bind(new InetSocketAddress(port));
+        }
+        
+        public void shutdown() throws Exception {
+            acceptor.unbind();
+            acceptor.dispose();
+        }
+    }
+    
+    private class ServerHandler extends IoHandlerAdapter {
+        public void sessionOpened(IoSession session) throws Exception {
+            session.write("Hello there!\n");
+            session.close(true);
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message