camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1310989 - in /camel/trunk/components/camel-netty/src: main/java/org/apache/camel/component/netty/ main/java/org/apache/camel/component/netty/handlers/ test/java/org/apache/camel/component/netty/
Date Sun, 08 Apr 2012 12:46:56 GMT
Author: davsclaus
Date: Sun Apr  8 12:46:56 2012
New Revision: 1310989

URL: http://svn.apache.org/viewvc?rev=1310989&view=rev
Log:
CAMEL-4556: Netty producer now reuses connection.

Added:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
Modified:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
(original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
Sun Apr  8 12:46:56 2012
@@ -16,24 +16,18 @@
  */
 package org.apache.camel.component.netty;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
 
 public abstract class ClientPipelineFactory implements ChannelPipelineFactory {
     protected NettyProducer producer;
-    protected Exchange exchange;
-    protected AsyncCallback callback;
 
     public ClientPipelineFactory() {
     }
-    
-    public ClientPipelineFactory(NettyProducer producer, Exchange exchange, AsyncCallback
callback) {
+
+    public ClientPipelineFactory(NettyProducer producer) {
         this.producer = producer;
-        this.exchange = exchange;
-        this.callback = callback;
     }
     
     public ChannelPipeline getPipeline() throws Exception {
@@ -48,21 +42,4 @@ public abstract class ClientPipelineFact
     public void setProducer(NettyProducer producer) {
         this.producer = producer;
     }
-
-    public Exchange getExchange() {
-        return exchange;
-    }
-
-    public void setExchange(Exchange exchange) {
-        this.exchange = exchange;
-    }
-
-    public AsyncCallback getCallback() {
-        return callback;
-    }
-
-    public void setCallback(AsyncCallback callback) {
-        this.callback = callback;
-    }
-
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
(original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
Sun Apr  8 12:46:56 2012
@@ -17,12 +17,9 @@
 package org.apache.camel.component.netty;
 
 import java.util.List;
-
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
 import org.apache.camel.component.netty.handlers.ClientChannelHandler;
 import org.apache.camel.component.netty.ssl.SSLEngineFactory;
 import org.jboss.netty.channel.ChannelDownstreamHandler;
@@ -34,10 +31,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DefaultClientPipelineFactory extends ClientPipelineFactory {
-    private static final transient Logger LOG = LoggerFactory.getLogger(ClientPipelineFactory.class);
+    private static final transient Logger LOG = LoggerFactory.getLogger(DefaultClientPipelineFactory.class);
 
-    public DefaultClientPipelineFactory(NettyProducer producer, Exchange exchange, AsyncCallback
callback) {
-        super(producer, exchange, callback);
+    public DefaultClientPipelineFactory(NettyProducer producer) {
+        super(producer);
     }
 
     public ChannelPipeline getPipeline() throws Exception {
@@ -61,7 +58,7 @@ public class DefaultClientPipelineFactor
         }
 
         // our handler must be added last
-        channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange, callback));
+        channelPipeline.addLast("handler", new ClientChannelHandler(producer));
 
         return channelPipeline;
     }

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java?rev=1310989&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
(added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
Sun Apr  8 12:46:56 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ *
+ */
+public final class NettyCamelState {
+
+    private final Exchange exchange;
+    private final AsyncCallback callback;
+
+    public NettyCamelState(AsyncCallback callback, Exchange exchange) {
+        this.callback = callback;
+        this.exchange = exchange;
+    }
+
+    public AsyncCallback getCallback() {
+        return callback;
+    }
+
+    public Exchange getExchange() {
+        return exchange;
+    }
+}

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
(original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
Sun Apr  8 12:46:56 2012
@@ -27,7 +27,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.ServicePoolAware;
 import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
@@ -37,6 +36,7 @@ import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelLocal;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
@@ -57,6 +57,9 @@ public class NettyProducer extends Defau
     private CamelLogger noReplyLogger;
     private ExecutorService bossExecutor;
     private ExecutorService workerExecutor;
+    private final ChannelLocal<NettyCamelState> state = new ChannelLocal<NettyCamelState>();
+    private ChannelFuture channelFuture;
+    private Channel channel;
 
     public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
         super(nettyEndpoint);
@@ -157,11 +160,16 @@ public class NettyProducer extends Defau
             exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(getConfiguration().getCharsetName()));
         }
 
-        ChannelFuture channelFuture;
-        final Channel channel;
         try {
-            channelFuture = openConnection(exchange, callback);
-            channel = openChannel(channelFuture);
+            // allow to reuse channel, on this producer, to avoid creating a new connection
+            // for each message being sent
+            if (channelFuture == null || channel == null || !channel.isOpen()) {
+                channelFuture = openConnection();
+                channel = openChannel(channelFuture);
+            }
+            // setup state now we have the channel we can do this because
+            // this producer is not thread safe, but pooled using ServicePoolAware
+            state.set(channel, new NettyCamelState(callback, exchange));
         } catch (Exception e) {
             exchange.setException(e);
             callback.done(true);
@@ -218,6 +226,21 @@ public class NettyProducer extends Defau
         return false;
     }
 
+    /**
+     * To get the {@link NettyCamelState} from this producer.
+     */
+    public NettyCamelState getState(Channel channel) {
+        return state.get(channel);
+    }
+
+    /**
+     * To remove the {@link NettyCamelState} stored on this producer,
+     * when no longer needed
+     */
+    public void removeState(Channel channel) {
+        state.remove(channel);
+    }
+
     protected void setupTCPCommunication() throws Exception {
         if (channelFactory == null) {
             bossExecutor = context.getExecutorServiceManager().newCachedThreadPool(this,
"NettyTCPBoss");
@@ -233,19 +256,17 @@ public class NettyProducer extends Defau
         }
     }
 
-    private ChannelFuture openConnection(Exchange exchange, AsyncCallback callback) throws
Exception {
+    private ChannelFuture openConnection() throws Exception {
         ChannelFuture answer;
         ChannelPipeline clientPipeline;
 
         if (configuration.getClientPipelineFactory() != null) {
             // initialize user defined client pipeline factory
             configuration.getClientPipelineFactory().setProducer(this);
-            configuration.getClientPipelineFactory().setExchange(exchange);
-            configuration.getClientPipelineFactory().setCallback(callback);
             clientPipeline = configuration.getClientPipelineFactory().getPipeline();
         } else {
             // initialize client pipeline factory
-            ClientPipelineFactory clientPipelineFactory = new DefaultClientPipelineFactory(this,
exchange, callback);
+            ClientPipelineFactory clientPipelineFactory = new DefaultClientPipelineFactory(this);
             // must get the pipeline from the factory when opening a new connection
             clientPipeline = clientPipelineFactory.getPipeline();
         }
@@ -295,13 +316,10 @@ public class NettyProducer extends Defau
     }
 
     private void openAndCloseConnection() throws Exception {
-        ChannelFuture future = openConnection(new DefaultExchange(context), new AsyncCallback()
{
-            public void done(boolean doneSync) {
-                // noop
-            }
-        });
+        ChannelFuture future = openConnection();
         Channel channel = openChannel(future);
         NettyHelper.close(channel);
+        ALL_CHANNELS.remove(channel);
     }
 
     public NettyConfiguration getConfiguration() {

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
(original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
Sun Apr  8 12:46:56 2012
@@ -20,6 +20,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.component.netty.NettyCamelState;
 import org.apache.camel.component.netty.NettyConstants;
 import org.apache.camel.component.netty.NettyHelper;
 import org.apache.camel.component.netty.NettyPayloadHelper;
@@ -39,15 +40,11 @@ import org.slf4j.LoggerFactory;
 public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
     private static final transient Logger LOG = LoggerFactory.getLogger(ClientChannelHandler.class);
     private final NettyProducer producer;
-    private final Exchange exchange;
-    private final AsyncCallback callback;
     private boolean messageReceived;
     private volatile boolean exceptionHandled;
 
-    public ClientChannelHandler(NettyProducer producer, Exchange exchange, AsyncCallback
callback) {
+    public ClientChannelHandler(NettyProducer producer) {
         this.producer = producer;
-        this.exchange = exchange;
-        this.callback = callback;
     }
 
     @Override
@@ -73,20 +70,33 @@ public class ClientChannelHandler extend
         if (LOG.isDebugEnabled()) {
             LOG.debug("Closing channel as an exception was thrown from Netty", cause);
         }
-        // set the cause on the exchange
-        exchange.setException(cause);
 
-        // close channel in case an exception was thrown
-        NettyHelper.close(exceptionEvent.getChannel());
+        Exchange exchange = getExchange(ctx);
+        AsyncCallback callback = getAsyncCallback(ctx);
 
-        // signal callback
-        callback.done(false);
+        // the state may not be set
+        if (exchange != null && callback != null) {
+            // set the cause on the exchange
+            exchange.setException(cause);
+
+            // close channel in case an exception was thrown
+            NettyHelper.close(exceptionEvent.getChannel());
+
+            // signal callback
+            callback.done(false);
+        }
     }
 
     @Override
     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
{
         LOG.trace("Channel closed: {}", ctx.getChannel());
 
+        Exchange exchange = getExchange(ctx);
+        AsyncCallback callback = getAsyncCallback(ctx);
+
+        // remove state
+        producer.removeState(ctx.getChannel());
+
         if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled)
{
             // session was closed but no message received. This could be because the remote
server had an internal error
             // and could not return a response. We should count down to stop waiting for
a response
@@ -103,6 +113,9 @@ public class ClientChannelHandler extend
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws
Exception {
         messageReceived = true;
 
+        Exchange exchange = getExchange(ctx);
+        AsyncCallback callback = getAsyncCallback(ctx);
+
         Object body = messageEvent.getMessage();
         LOG.debug("Message received: {}", body);
 
@@ -150,4 +163,14 @@ public class ClientChannelHandler extend
         }
     }
 
+    private Exchange getExchange(ChannelHandlerContext ctx) {
+        NettyCamelState state = producer.getState(ctx.getChannel());
+        return state != null ? state.getExchange() : null;
+    }
+
+    private AsyncCallback getAsyncCallback(ChannelHandlerContext ctx) {
+        NettyCamelState state = producer.getState(ctx.getChannel());
+        return state != null ? state.getCallback() : null;
+    }
+
 }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
(original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
Sun Apr  8 12:46:56 2012
@@ -25,7 +25,6 @@ import org.apache.camel.component.netty.
 import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
-import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
@@ -35,9 +34,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Server handler which is shared
+ * Client handler which cannot be shared
  */
-@ChannelHandler.Sharable
 public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
     private static final transient Logger LOG = LoggerFactory.getLogger(ServerChannelHandler.class);
     private NettyConsumer consumer;

Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
(original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
Sun Apr  8 12:46:56 2012
@@ -97,10 +97,9 @@ public class NettyCustomPipelineFactoryA
             channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize,
true, Delimiters.lineDelimiter()));
             channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
             channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
           
-            channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange,
callback));
+            channelPipeline.addLast("handler", new ClientChannelHandler(producer));
 
             return channelPipeline;
-
         }
         
         public boolean isfactoryInvoked() {

Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
(original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
Sun Apr  8 12:46:56 2012
@@ -97,7 +97,7 @@ public class NettyCustomPipelineFactoryS
             channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize,
true, Delimiters.lineDelimiter()));
             channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
             channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
           
-            channelPipeline.addLast("handler", new ClientChannelHandler(producer, exchange,
callback));
+            channelPipeline.addLast("handler", new ClientChannelHandler(producer));
 
             return channelPipeline;
         }



Mime
View raw message