camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1420052 - in /camel/branches/camel-2.10.x: ./ components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
Date Tue, 11 Dec 2012 09:40:06 GMT
Author: davsclaus
Date: Tue Dec 11 09:40:04 2012
New Revision: 1420052

URL: http://svn.apache.org/viewvc?rev=1420052&view=rev
Log:
CAMEL-5862: Use attachments instead of StateLocal to store state on netty producer

Modified:
    camel/branches/camel-2.10.x/   (props changed)
    camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
    camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1420051

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=1420052&r1=1420051&r2=1420052&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
(original)
+++ camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
Tue Dec 11 09:40:04 2012
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
 public class DefaultClientPipelineFactory extends ClientPipelineFactory  {
     private static final transient Logger LOG = LoggerFactory.getLogger(DefaultClientPipelineFactory.class);
 
-    private NettyProducer producer;
+    private final NettyProducer producer;
 
     public DefaultClientPipelineFactory(NettyProducer producer) {
         this.producer = producer;

Modified: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1420052&r1=1420051&r2=1420052&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
(original)
+++ camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
Tue Dec 11 09:40:04 2012
@@ -24,6 +24,7 @@ import java.util.concurrent.RejectedExec
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelException;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.impl.DefaultAsyncProducer;
@@ -39,7 +40,6 @@ import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelLocal;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
@@ -60,7 +60,6 @@ public class NettyProducer extends Defau
     private CamelLogger noReplyLogger;
     private ExecutorService bossExecutor;
     private ExecutorService workerExecutor;
-    private final ChannelLocal<NettyCamelState> state = new ChannelLocal<NettyCamelState>();
     private ObjectPool<Channel> pool;
 
     public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
@@ -205,13 +204,19 @@ public class NettyProducer extends Defau
             return true;
         }
 
+        // we must have a channel
+        if (existing == null) {
+            exchange.setException(new CamelExchangeException("Cannot get channel from pool",
exchange));
+            callback.done(true);
+            return true;
+        }
+
         // need to declare as final
         final Channel channel = existing;
         final AsyncCallback producerCallback = new NettyProducerCallback(channel, callback);
 
-        // setup state now we have the channel we can do this because
-        // this producer is not thread safe, but pooled using ServicePoolAware
-        state.set(channel, new NettyCamelState(producerCallback, exchange));
+        // setup state as attachment on the channel, so we can access the state later when
needed
+        channel.setAttachment(new NettyCamelState(producerCallback, exchange));
 
         // write body
         NettyHelper.writeBodyAsync(LOG, channel, null, body, exchange, new ChannelFutureListener()
{
@@ -259,18 +264,18 @@ public class NettyProducer extends Defau
     }
 
     /**
-     * To get the {@link NettyCamelState} from this producer.
+     * To get the {@link NettyCamelState} from the given channel.
      */
     public NettyCamelState getState(Channel channel) {
-        return state.get(channel);
+        return (NettyCamelState) channel.getAttachment();
     }
 
     /**
-     * To remove the {@link NettyCamelState} stored on this producer,
+     * To remove the {@link NettyCamelState} stored on the channel,
      * when no longer needed
      */
     public void removeState(Channel channel) {
-        state.remove(channel);
+        channel.setAttachment(null);
     }
 
     protected void setupTCPCommunication() throws Exception {
@@ -300,6 +305,7 @@ public class NettyProducer extends Defau
         ChannelFuture answer;
 
         if (isTcp()) {
+            // its okay to create a new bootstrap for each new channel
             ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
             clientBootstrap.setOption("keepAlive", configuration.isKeepAlive());
             clientBootstrap.setOption("tcpNoDelay", configuration.isTcpNoDelay());
@@ -312,6 +318,7 @@ public class NettyProducer extends Defau
             LOG.trace("Created new TCP client bootstrap connecting to {}:{}", configuration.getHost(),
configuration.getPort());
             return answer;
         } else {
+            // its okay to create a new bootstrap for each new channel
             ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
             connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
             connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());



Mime
View raw message