camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [5/6] CAMEL-6555: camel-netty4 for Netty 4.x based component. Work in progress.
Date Wed, 07 Aug 2013 13:45:54 GMT
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
new file mode 100644
index 0000000..37e28f9
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.net.SocketAddress;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class used internally by camel-netty using Netty.
+ *
+ * @version 
+ */
+public final class NettyHelper {
+
+    public static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
+    private static final Logger LOG = LoggerFactory.getLogger(NettyHelper.class);
+
+    private NettyHelper() {
+        // Utility class
+    }
+
+    /**
+     * Gets the string body to be used when sending with the textline codec.
+     *
+     * @param body                 the current body
+     * @param exchange             the exchange
+     * @param delimiter            the textline delimiter
+     * @param autoAppendDelimiter  whether absent delimiter should be auto appended
+     * @return the string body to send
+     * @throws NoTypeConversionAvailableException is thrown if the current body could not be converted to a String type
+     */
+    public static String getTextlineBody(Object body, Exchange exchange, TextLineDelimiter delimiter, boolean autoAppendDelimiter) throws NoTypeConversionAvailableException {
+        String s = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
+
+        // auto append delimiter if missing?
+        if (autoAppendDelimiter) {
+            if (TextLineDelimiter.LINE.equals(delimiter)) {
+                // line delimiter so ensure it ends with newline
+                if (!s.endsWith("\n")) {
+                    LOG.trace("Auto appending missing newline delimiter to body");
+                    s = s + "\n";
+                }
+            } else {
+                // null delimiter so ensure it ends with null
+                if (!s.endsWith("\u0000")) {
+                    LOG.trace("Auto appending missing null delimiter to body");
+                    s = s + "\u0000";
+                }
+            }
+        }
+
+        return s;
+    }
+
+    /**
+     * Writes the given body to Netty channel. Will <b>not</b >wait until the body has been written.
+     *
+     * @param log             logger to use
+     * @param channel         the Netty channel
+     * @param remoteAddress   the remote address when using UDP
+     * @param body            the body to write (send)
+     * @param exchange        the exchange
+     * @param listener        listener with work to be executed when the operation is complete
+     */
+    public static void writeBodyAsync(Logger log, Channel channel, SocketAddress remoteAddress, Object body,
+                                      Exchange exchange, ChannelFutureListener listener) {
+        ChannelFuture future;
+        if (remoteAddress != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Channel: {} remote address: {} writing body: {}", new Object[]{channel, remoteAddress, body});
+            }
+            future = channel.write(body, remoteAddress);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Channel: {} writing body: {}", new Object[]{channel, body});
+            }
+            future = channel.write(body);
+        }
+
+        if (listener != null) {
+            future.addListener(listener);
+        }
+    }
+
+    /**
+     * Closes the given channel asynchronously
+     *
+     * @param channel the channel to close
+     */
+    public static void close(Channel channel) {
+        if (channel != null) {
+            LOG.trace("Closing channel: {}", channel);
+            channel.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
new file mode 100644
index 0000000..dca12b2
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+
+/**
+ * Helper to get and set the correct payload when transferring data using camel-netty.
+ * Always use this helper instead of direct access on the exchange object.
+ * <p/>
+ * This helper ensures that we can also transfer exchange objects over the wire using the
+ * <tt>transferExchange=true</tt> option.
+ *
+ * @version 
+ */
+public final class NettyPayloadHelper {
+    
+    private NettyPayloadHelper() {
+        //Helper class
+    }
+
+    public static Object getIn(NettyEndpoint endpoint, Exchange exchange) {
+        if (endpoint.getConfiguration().isTransferExchange()) {
+            // we should transfer the entire exchange over the wire (includes in/out)
+            return DefaultExchangeHolder.marshal(exchange);
+        } else {
+            // normal transfer using the body only
+            return exchange.getIn().getBody();
+        }
+    }
+
+    public static Object getOut(NettyEndpoint endpoint, Exchange exchange) {
+        if (endpoint.getConfiguration().isTransferExchange()) {
+            // we should transfer the entire exchange over the wire (includes in/out)
+            return DefaultExchangeHolder.marshal(exchange);
+        } else {
+            // normal transfer using the body only
+            return exchange.getOut().getBody();
+        }
+    }
+
+    public static void setIn(Exchange exchange, Object payload) {
+        if (payload instanceof DefaultExchangeHolder) {
+            DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
+        } else {
+            // normal transfer using the body only
+            exchange.getIn().setBody(payload);
+        }
+    }
+
+    public static void setOut(Exchange exchange, Object payload) {
+        if (payload instanceof DefaultExchangeHolder) {
+            DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
+        } else {
+            // normal transfer using the body only and preserve the headers
+            exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+            exchange.getOut().setBody(payload);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
new file mode 100644
index 0000000..e584b58
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -0,0 +1,535 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelException;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.IOHelper;
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.PoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyProducer extends DefaultAsyncProducer {
+    private static final Logger LOG = LoggerFactory.getLogger(NettyProducer.class);
+    private static final ChannelGroup ALL_CHANNELS = new DefaultChannelGroup("NettyProducer");
+    private CamelContext context;
+    private NettyConfiguration configuration;
+    private ChannelFactory channelFactory;
+    private DatagramChannelFactory datagramChannelFactory;
+    private ClientPipelineFactory pipelineFactory;
+    private CamelLogger noReplyLogger;
+    private BossPool bossPool;
+    private WorkerPool workerPool;
+    private ObjectPool<Channel> pool;
+
+    public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
+        super(nettyEndpoint);
+        this.configuration = configuration;
+        this.context = this.getEndpoint().getCamelContext();
+        this.noReplyLogger = new CamelLogger(LOG, configuration.getNoReplyLogLevel());
+    }
+
+    @Override
+    public NettyEndpoint getEndpoint() {
+        return (NettyEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public CamelContext getContext() {
+        return context;
+    }
+
+    protected boolean isTcp() {
+        return configuration.getProtocol().equalsIgnoreCase("tcp");
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        if (configuration.isProducerPoolEnabled()) {
+            // setup pool where we want an unbounded pool, which allows the pool to shrink on no demand
+            GenericObjectPool.Config config = new GenericObjectPool.Config();
+            config.maxActive = configuration.getProducerPoolMaxActive();
+            config.minIdle = configuration.getProducerPoolMinIdle();
+            config.maxIdle = configuration.getProducerPoolMaxIdle();
+            // we should test on borrow to ensure the channel is still valid
+            config.testOnBorrow = true;
+            // only evict channels which are no longer valid
+            config.testWhileIdle = true;
+            // run eviction every 30th second
+            config.timeBetweenEvictionRunsMillis = 30 * 1000L;
+            config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle();
+            config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
+            pool = new GenericObjectPool<Channel>(new NettyProducerPoolableObjectFactory(), config);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Created NettyProducer pool[maxActive={}, minIdle={}, maxIdle={}, minEvictableIdleTimeMillis={}] -> {}",
+                        new Object[]{config.maxActive, config.minIdle, config.maxIdle, config.minEvictableIdleTimeMillis, pool});
+            }
+        } else {
+            pool = new SharedSingletonObjectPool<Channel>(new NettyProducerPoolableObjectFactory());
+            if (LOG.isDebugEnabled()) {
+                LOG.info("Created NettyProducer shared singleton pool -> {}", pool);
+            }
+        }
+
+        // setup pipeline factory
+        ClientPipelineFactory factory = configuration.getClientPipelineFactory();
+        if (factory != null) {
+            pipelineFactory = factory.createPipelineFactory(this);
+        } else {
+            pipelineFactory = new DefaultClientPipelineFactory(this);
+        }
+
+        if (isTcp()) {
+            setupTCPCommunication();
+        } else {
+            setupUDPCommunication();
+        }
+
+        if (!configuration.isLazyChannelCreation()) {
+            // ensure the connection can be established when we start up
+            Channel channel = pool.borrowObject();
+            pool.returnObject(channel);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        LOG.debug("Stopping producer at address: {}", configuration.getAddress());
+        // close all channels
+        LOG.trace("Closing {} channels", ALL_CHANNELS.size());
+        ChannelGroupFuture future = ALL_CHANNELS.close();
+        future.awaitUninterruptibly();
+
+        // and then release other resources
+        if (channelFactory != null) {
+            channelFactory.releaseExternalResources();
+        }
+
+        // and then shutdown the thread pools
+        if (bossPool != null) {
+            bossPool.shutdown();
+            bossPool = null;
+        }
+        if (workerPool != null) {
+            workerPool.shutdown();
+            workerPool = null;
+        }
+
+        if (pool != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopping producer with channel pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle());
+            }
+            pool.close();
+            pool = null;
+        }
+
+        super.doStop();
+    }
+
+    public boolean process(final Exchange exchange, AsyncCallback callback) {
+        if (!isRunAllowed()) {
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException());
+            }
+            callback.done(true);
+            return true;
+        }
+
+        Object body;
+        try {
+            body = getRequestBody(exchange);
+            if (body == null) {
+                noReplyLogger.log("No payload to send for exchange: " + exchange);
+                callback.done(true);
+                return true;
+            }
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
+        // set the exchange encoding property
+        if (getConfiguration().getCharsetName() != null) {
+            exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(getConfiguration().getCharsetName()));
+        }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle());
+        }
+
+        // get a channel from the pool
+        Channel existing;
+        try {
+            existing = pool.borrowObject();
+            if (existing != null) {
+                LOG.trace("Got channel from pool {}", existing);
+            }
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
+        // we must have a channel
+        if (existing == null) {
+            exchange.setException(new CamelExchangeException("Cannot get channel from pool", exchange));
+            callback.done(true);
+            return true;
+        }
+
+        // need to declare as final
+        final Channel channel = existing;
+        final AsyncCallback producerCallback = new NettyProducerCallback(channel, callback);
+
+        // setup state as attachment on the channel, so we can access the state later when needed
+        channel.setAttachment(new NettyCamelState(producerCallback, exchange));
+
+        // write body
+        NettyHelper.writeBodyAsync(LOG, channel, null, body, exchange, new ChannelFutureListener() {
+            public void operationComplete(ChannelFuture channelFuture) throws Exception {
+                LOG.trace("Operation complete {}", channelFuture);
+                if (!channelFuture.isSuccess()) {
+                    // no success the set the caused exception and signal callback and break
+                    exchange.setException(channelFuture.getCause());
+                    producerCallback.done(false);
+                    return;
+                }
+
+                // if we do not expect any reply then signal callback to continue routing
+                if (!configuration.isSync()) {
+                    try {
+                        // should channel be closed after complete?
+                        Boolean close;
+                        if (ExchangeHelper.isOutCapable(exchange)) {
+                            close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+                        } else {
+                            close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+                        }
+
+                        // should we disconnect, the header can override the configuration
+                        boolean disconnect = getConfiguration().isDisconnect();
+                        if (close != null) {
+                            disconnect = close;
+                        }
+                        if (disconnect) {
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Closing channel when complete at address: {}", getEndpoint().getConfiguration().getAddress());
+                            }
+                            NettyHelper.close(channel);
+                        }
+                    } finally {
+                        // signal callback to continue routing
+                        producerCallback.done(false);
+                    }
+                }
+            }
+        });
+
+        // continue routing asynchronously
+        return false;
+    }
+
+    /**
+     * Gets the object we want to use as the request object for sending to netty.
+     *
+     * @param exchange the exchange
+     * @return the object to use as request
+     * @throws Exception is thrown if error getting the request body
+     */
+    protected Object getRequestBody(Exchange exchange) throws Exception {
+        Object body = NettyPayloadHelper.getIn(getEndpoint(), exchange);
+        if (body == null) {
+            return null;
+        }
+
+        // if textline enabled then covert to a String which must be used for textline
+        if (getConfiguration().isTextline()) {
+            body = NettyHelper.getTextlineBody(body, exchange, getConfiguration().getDelimiter(), getConfiguration().isAutoAppendDelimiter());
+        }
+
+        return body;
+    }
+
+    /**
+     * To get the {@link NettyCamelState} from the given channel.
+     */
+    public NettyCamelState getState(Channel channel) {
+        return (NettyCamelState) channel.getAttachment();
+    }
+
+    /**
+     * To remove the {@link NettyCamelState} stored on the channel,
+     * when no longer needed
+     */
+    public void removeState(Channel channel) {
+        channel.setAttachment(null);
+    }
+
+    protected void setupTCPCommunication() throws Exception {
+        if (channelFactory == null) {
+            // prefer using explicit configured thread pools
+            BossPool bp = configuration.getBossPool();
+            WorkerPool wp = configuration.getWorkerPool();
+
+            if (bp == null) {
+                // create new pool which we should shutdown when stopping as its not shared
+                bossPool = new NettyClientBossPoolBuilder()
+                        .withBossCount(configuration.getBossCount())
+                        .withName("NettyClientTCPBoss")
+                        .build();
+                bp = bossPool;
+            }
+            if (wp == null) {
+                // create new pool which we should shutdown when stopping as its not shared
+                workerPool = new NettyWorkerPoolBuilder()
+                        .withWorkerCount(configuration.getWorkerCount())
+                        .withName("NettyClientTCPWorker")
+                        .build();
+                wp = workerPool;
+            }
+            channelFactory = new NioClientSocketChannelFactory(bp, wp);
+        }
+    }
+
+    protected void setupUDPCommunication() throws Exception {
+        if (datagramChannelFactory == null) {
+            int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS;
+            workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count);
+            datagramChannelFactory = new NioDatagramChannelFactory(workerPool);
+        }
+    }
+
+    protected ChannelFuture openConnection() throws Exception {
+        ChannelFuture answer;
+
+        if (isTcp()) {
+            // its okay to create a new bootstrap for each new channel
+            ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
+            clientBootstrap.setOption("keepAlive", configuration.isKeepAlive());
+            clientBootstrap.setOption("tcpNoDelay", configuration.isTcpNoDelay());
+            clientBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
+            clientBootstrap.setOption("connectTimeoutMillis", configuration.getConnectTimeout());
+
+            // set any additional netty options
+            if (configuration.getOptions() != null) {
+                for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
+                    clientBootstrap.setOption(entry.getKey(), entry.getValue());
+                }
+            }
+
+            // set the pipeline factory, which creates the pipeline for each newly created channels
+            clientBootstrap.setPipelineFactory(pipelineFactory);
+            answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Created new TCP client bootstrap connecting to {}:{} with options: {}",
+                        new Object[]{configuration.getHost(), configuration.getPort(), clientBootstrap.getOptions()});
+            }
+            return answer;
+        } else {
+            // its okay to create a new bootstrap for each new channel
+            ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
+            connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+            connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+            connectionlessClientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+            connectionlessClientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
+            connectionlessClientBootstrap.setOption("child.broadcast", configuration.isBroadcast());
+            connectionlessClientBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
+            connectionlessClientBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
+
+            // set any additional netty options
+            if (configuration.getOptions() != null) {
+                for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
+                    connectionlessClientBootstrap.setOption(entry.getKey(), entry.getValue());
+                }
+            }
+
+            // set the pipeline factory, which creates the pipeline for each newly created channels
+            connectionlessClientBootstrap.setPipelineFactory(pipelineFactory);
+            // bind and store channel so we can close it when stopping
+            Channel channel = connectionlessClientBootstrap.bind(new InetSocketAddress(0));
+            ALL_CHANNELS.add(channel);
+            answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Created new UDP client bootstrap connecting to {}:{} with options: {}",
+                       new Object[]{configuration.getHost(), configuration.getPort(), connectionlessClientBootstrap.getOptions()});
+            }
+            return answer;
+        }
+    }
+
+    protected Channel openChannel(ChannelFuture channelFuture) throws Exception {
+        // blocking for channel to be done
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout());
+        }
+        // here we need to wait it in other thread
+        final CountDownLatch channelLatch = new CountDownLatch(1);
+        channelFuture.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture cf) throws Exception {
+                channelLatch.countDown();
+            }
+        });
+         
+        try {
+            channelLatch.await(configuration.getConnectTimeout(), TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ex) {
+            throw new CamelException("Interrupted while waiting for " + "connection to "
+                                     + configuration.getAddress());
+        }
+        
+
+        if (!channelFuture.isDone() || !channelFuture.isSuccess()) {
+            throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
+        }
+        Channel answer = channelFuture.getChannel();
+        // to keep track of all channels in use
+        ALL_CHANNELS.add(answer);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Creating connector to address: {}", configuration.getAddress());
+        }
+        return answer;
+    }
+
+    public NettyConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(NettyConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public ChannelFactory getChannelFactory() {
+        return channelFactory;
+    }
+
+    public void setChannelFactory(ChannelFactory channelFactory) {
+        this.channelFactory = channelFactory;
+    }
+
+    public ChannelGroup getAllChannels() {
+        return ALL_CHANNELS;
+    }
+
+    /**
+     * Callback that ensures the channel is returned to the pool when we are done.
+     */
+    private final class NettyProducerCallback implements AsyncCallback {
+
+        private final Channel channel;
+        private final AsyncCallback callback;
+
+        private NettyProducerCallback(Channel channel, AsyncCallback callback) {
+            this.channel = channel;
+            this.callback = callback;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            // put back in pool
+            try {
+                LOG.trace("Putting channel back to pool {}", channel);
+                pool.returnObject(channel);
+            } catch (Exception e) {
+                LOG.warn("Error returning channel to pool {}. This exception will be ignored.", channel);
+            } finally {
+                // ensure we call the delegated callback
+                callback.done(doneSync);
+            }
+        }
+    }
+
+    /**
+     * Object factory to create {@link Channel} used by the pool.
+     */
+    private final class NettyProducerPoolableObjectFactory implements PoolableObjectFactory<Channel> {
+
+        @Override
+        public Channel makeObject() throws Exception {
+            ChannelFuture channelFuture = openConnection();
+            Channel answer = openChannel(channelFuture);
+            LOG.trace("Created channel: {}", answer);
+            return answer;
+        }
+
+        @Override
+        public void destroyObject(Channel channel) throws Exception {
+            LOG.trace("Destroying channel: {}", channel);
+            NettyHelper.close(channel);
+            ALL_CHANNELS.remove(channel);
+        }
+
+        @Override
+        public boolean validateObject(Channel channel) {
+            // we need a connected channel to be valid
+            boolean answer = channel.isConnected();
+            LOG.trace("Validating channel: {} -> {}", channel, answer);
+            return answer;
+        }
+
+        @Override
+        public void activateObject(Channel channel) throws Exception {
+            // noop
+        }
+
+        @Override
+        public void passivateObject(Channel channel) throws Exception {
+            // noop
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
new file mode 100644
index 0000000..22effc8
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.camel.util.jsse.SSLContextParameters;
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+public class NettyServerBootstrapConfiguration implements Cloneable {
+
+    protected String protocol;
+    protected String host;
+    protected int port;
+    protected boolean broadcast;
+    protected long sendBufferSize = 65536;
+    protected long receiveBufferSize = 65536;
+    protected int receiveBufferSizePredictor;
+    protected int bossCount = 1;
+    protected int workerCount;
+    protected boolean keepAlive = true;
+    protected boolean tcpNoDelay = true;
+    protected boolean reuseAddress = true;
+    protected long connectTimeout = 10000;
+    protected int backlog;
+    protected ServerPipelineFactory serverPipelineFactory;
+    protected NettyServerBootstrapFactory nettyServerBootstrapFactory;
+    protected Map<String, Object> options;
+    // SSL options is also part of the server bootstrap as the server listener on port X is either plain or SSL
+    protected boolean ssl;
+    protected boolean sslClientCertHeaders;
+    protected SslHandler sslHandler;
+    protected SSLContextParameters sslContextParameters;
+    protected boolean needClientAuth;
+    protected File keyStoreFile;
+    protected File trustStoreFile;
+    protected String keyStoreResource;
+    protected String trustStoreResource;
+    protected String keyStoreFormat;
+    protected String securityProvider;
+    protected String passphrase;
+    protected BossPool bossPool;
+    protected WorkerPool workerPool;
+    protected String networkInterface;
+
+    public String getAddress() {
+        return host + ":" + port;
+    }
+
+    public boolean isTcp() {
+        return protocol.equalsIgnoreCase("tcp");
+    }
+
+    public String getProtocol() {
+        return protocol;
+    }
+
+    public void setProtocol(String protocol) {
+        this.protocol = protocol;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public boolean isBroadcast() {
+        return broadcast;
+    }
+
+    public void setBroadcast(boolean broadcast) {
+        this.broadcast = broadcast;
+    }
+
+    public long getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize(long sendBufferSize) {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public long getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(long receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public int getReceiveBufferSizePredictor() {
+        return receiveBufferSizePredictor;
+    }
+
+    public void setReceiveBufferSizePredictor(int receiveBufferSizePredictor) {
+        this.receiveBufferSizePredictor = receiveBufferSizePredictor;
+    }
+
+    public int getWorkerCount() {
+        return workerCount;
+    }
+
+    public void setWorkerCount(int workerCount) {
+        this.workerCount = workerCount;
+    }
+
+    public int getBossCount() {
+        return bossCount;
+    }
+
+    public void setBossCount(int bossCount) {
+        this.bossCount = bossCount;
+    }
+
+    public boolean isKeepAlive() {
+        return keepAlive;
+    }
+
+    public void setKeepAlive(boolean keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public boolean isReuseAddress() {
+        return reuseAddress;
+    }
+
+    public void setReuseAddress(boolean reuseAddress) {
+        this.reuseAddress = reuseAddress;
+    }
+
+    public long getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public void setConnectTimeout(long connectTimeout) {
+        this.connectTimeout = connectTimeout;
+    }
+
+    public int getBacklog() {
+        return backlog;
+    }
+
+    public void setBacklog(int backlog) {
+        this.backlog = backlog;
+    }
+
+    public boolean isSsl() {
+        return ssl;
+    }
+
+    public void setSsl(boolean ssl) {
+        this.ssl = ssl;
+    }
+
+    public boolean isSslClientCertHeaders() {
+        return sslClientCertHeaders;
+    }
+
+    public void setSslClientCertHeaders(boolean sslClientCertHeaders) {
+        this.sslClientCertHeaders = sslClientCertHeaders;
+    }
+
+    public SslHandler getSslHandler() {
+        return sslHandler;
+    }
+
+    public void setSslHandler(SslHandler sslHandler) {
+        this.sslHandler = sslHandler;
+    }
+
+    public SSLContextParameters getSslContextParameters() {
+        return sslContextParameters;
+    }
+
+    public void setSslContextParameters(SSLContextParameters sslContextParameters) {
+        this.sslContextParameters = sslContextParameters;
+    }
+
+    public boolean isNeedClientAuth() {
+        return needClientAuth;
+    }
+
+    public void setNeedClientAuth(boolean needClientAuth) {
+        this.needClientAuth = needClientAuth;
+    }
+
+    @Deprecated
+    public File getKeyStoreFile() {
+        return keyStoreFile;
+    }
+
+    @Deprecated
+    public void setKeyStoreFile(File keyStoreFile) {
+        this.keyStoreFile = keyStoreFile;
+    }
+
+    @Deprecated
+    public File getTrustStoreFile() {
+        return trustStoreFile;
+    }
+
+    @Deprecated
+    public void setTrustStoreFile(File trustStoreFile) {
+        this.trustStoreFile = trustStoreFile;
+    }
+
+    public String getKeyStoreResource() {
+        return keyStoreResource;
+    }
+
+    public void setKeyStoreResource(String keyStoreResource) {
+        this.keyStoreResource = keyStoreResource;
+    }
+
+    public String getTrustStoreResource() {
+        return trustStoreResource;
+    }
+
+    public void setTrustStoreResource(String trustStoreResource) {
+        this.trustStoreResource = trustStoreResource;
+    }
+
+    public String getKeyStoreFormat() {
+        return keyStoreFormat;
+    }
+
+    public void setKeyStoreFormat(String keyStoreFormat) {
+        this.keyStoreFormat = keyStoreFormat;
+    }
+
+    public String getSecurityProvider() {
+        return securityProvider;
+    }
+
+    public void setSecurityProvider(String securityProvider) {
+        this.securityProvider = securityProvider;
+    }
+
+    public String getPassphrase() {
+        return passphrase;
+    }
+
+    public void setPassphrase(String passphrase) {
+        this.passphrase = passphrase;
+    }
+
+    public ServerPipelineFactory getServerPipelineFactory() {
+        return serverPipelineFactory;
+    }
+
+    public void setServerPipelineFactory(ServerPipelineFactory serverPipelineFactory) {
+        this.serverPipelineFactory = serverPipelineFactory;
+    }
+
+    public NettyServerBootstrapFactory getNettyServerBootstrapFactory() {
+        return nettyServerBootstrapFactory;
+    }
+
+    public void setNettyServerBootstrapFactory(NettyServerBootstrapFactory nettyServerBootstrapFactory) {
+        this.nettyServerBootstrapFactory = nettyServerBootstrapFactory;
+    }
+
+    public Map<String, Object> getOptions() {
+        return options;
+    }
+
+    public void setOptions(Map<String, Object> options) {
+        this.options = options;
+    }
+
+    public BossPool getBossPool() {
+        return bossPool;
+    }
+
+    public void setBossPool(BossPool bossPool) {
+        this.bossPool = bossPool;
+    }
+
+    public WorkerPool getWorkerPool() {
+        return workerPool;
+    }
+
+    public void setWorkerPool(WorkerPool workerPool) {
+        this.workerPool = workerPool;
+    }
+
+    public String getNetworkInterface() {
+        return networkInterface;
+    }
+
+    public void setNetworkInterface(String networkInterface) {
+        this.networkInterface = networkInterface;
+    }
+
+    /**
+     * Checks if the other {@link NettyServerBootstrapConfiguration} is compatible
+     * with this, as a Netty listener bound on port X shares the same common
+     * {@link NettyServerBootstrapConfiguration}, which must be identical.
+     */
+    public boolean compatible(NettyServerBootstrapConfiguration other) {
+        boolean isCompatible = true;
+
+        if (!protocol.equals(other.protocol)) {
+            isCompatible = false;
+        } else if (!host.equals(other.host)) {
+            isCompatible = false;
+        } else if (port != other.port) {
+            isCompatible = false;
+        } else if (broadcast != other.broadcast) {
+            isCompatible = false;
+        } else if (sendBufferSize != other.sendBufferSize) {
+            return false;
+        } else if (receiveBufferSize != other.receiveBufferSize) {
+            isCompatible = false;
+        } else if (receiveBufferSizePredictor != other.receiveBufferSizePredictor) {
+            isCompatible = false;
+        } else if (workerCount != other.workerCount) {
+            isCompatible = false;
+        } else if (bossCount != other.bossCount) {
+            isCompatible = false;
+        } else if (keepAlive != other.keepAlive) {
+            isCompatible = false;
+        } else if (tcpNoDelay != other.tcpNoDelay) {
+            isCompatible = false;
+        } else if (reuseAddress != other.reuseAddress) {
+            isCompatible = false;
+        } else if (connectTimeout != other.connectTimeout) {
+            isCompatible = false;
+        } else if (backlog != other.backlog) {
+            isCompatible = false;
+        } else if (serverPipelineFactory != other.serverPipelineFactory) {
+            isCompatible = false;
+        } else if (nettyServerBootstrapFactory != other.nettyServerBootstrapFactory) {
+            isCompatible = false;
+        } else if (options == null && other.options != null) {
+            // validate all the options is identical
+            isCompatible = false;
+        } else if (options != null && other.options == null) {
+            isCompatible = false;
+        } else if (options != null && other.options != null && options.size() != other.options.size()) {
+            isCompatible = false;
+        } else if (options != null && other.options != null && !options.keySet().containsAll(other.options.keySet())) {
+            isCompatible = false;
+        } else if (options != null && other.options != null && !options.values().containsAll(other.options.values())) {
+            isCompatible = false;
+        } else if (ssl != other.ssl) {
+            isCompatible = false;
+        } else if (sslHandler != other.sslHandler) {
+            isCompatible = false;
+        } else if (sslContextParameters != other.sslContextParameters) {
+            isCompatible = false;
+        } else if (needClientAuth != other.needClientAuth) {
+            isCompatible = false;
+        } else if (keyStoreFile != other.keyStoreFile) {
+            isCompatible = false;
+        } else if (trustStoreFile != other.trustStoreFile) {
+            isCompatible = false;
+        } else if (keyStoreResource != null && !keyStoreResource.equals(other.keyStoreResource)) {
+            isCompatible = false;
+        } else if (trustStoreResource != null && !trustStoreResource.equals(other.trustStoreResource)) {
+            isCompatible = false;
+        } else if (keyStoreFormat != null && !keyStoreFormat.equals(other.keyStoreFormat)) {
+            isCompatible = false;
+        } else if (securityProvider != null && !securityProvider.equals(other.securityProvider)) {
+            isCompatible = false;
+        } else if (passphrase != null && !passphrase.equals(other.passphrase)) {
+            isCompatible = false;
+        } else if (bossPool != other.bossPool) {
+            isCompatible = false;
+        } else if (workerPool != other.workerPool) {
+            isCompatible = false;
+        } else if (networkInterface != null && !networkInterface.equals(other.networkInterface)) {
+            isCompatible = false;
+        }
+
+        return isCompatible;
+    }
+
+    public String toStringBootstrapConfiguration() {
+        return "NettyServerBootstrapConfiguration{"
+                + "protocol='" + protocol + '\''
+                + ", host='" + host + '\''
+                + ", port=" + port
+                + ", broadcast=" + broadcast
+                + ", sendBufferSize=" + sendBufferSize
+                + ", receiveBufferSize=" + receiveBufferSize
+                + ", receiveBufferSizePredictor=" + receiveBufferSizePredictor
+                + ", workerCount=" + workerCount
+                + ", bossCount=" + bossCount
+                + ", keepAlive=" + keepAlive
+                + ", tcpNoDelay=" + tcpNoDelay
+                + ", reuseAddress=" + reuseAddress
+                + ", connectTimeout=" + connectTimeout
+                + ", backlog=" + backlog
+                + ", serverPipelineFactory=" + serverPipelineFactory
+                + ", nettyServerBootstrapFactory=" + nettyServerBootstrapFactory
+                + ", options=" + options
+                + ", ssl=" + ssl
+                + ", sslHandler=" + sslHandler
+                + ", sslContextParameters='" + sslContextParameters + '\''
+                + ", needClientAuth=" + needClientAuth
+                + ", keyStoreFile=" + keyStoreFile
+                + ", trustStoreFile=" + trustStoreFile
+                + ", keyStoreResource='" + keyStoreResource + '\''
+                + ", trustStoreResource='" + trustStoreResource + '\''
+                + ", keyStoreFormat='" + keyStoreFormat + '\''
+                + ", securityProvider='" + securityProvider + '\''
+                + ", passphrase='" + passphrase + '\''
+                + ", bossPool=" + bossPool
+                + ", workerPool=" + workerPool
+                + ", networkInterface='" + networkInterface + '\''
+                + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapFactory.java
new file mode 100644
index 0000000..250366f
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapFactory.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Service;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+
+/**
+ * Factory for setting up Netty {@link org.jboss.netty.bootstrap.ServerBootstrap} and all
+ * the needed logic for doing that.
+ * <p/>
+ * This factory allows for consumers to reuse existing {@link org.jboss.netty.bootstrap.ServerBootstrap} which
+ * allows to share the same port for multiple consumers.
+ */
+public interface NettyServerBootstrapFactory extends Service {
+
+    /**
+     * Initializes this <b>non-shared</b> {@link NettyServerBootstrapFactory}.
+     *
+     * @param camelContext     the {@link CamelContext} for non-shared bootstrap factory
+     * @param configuration    the bootstrap configuration
+     * @param pipelineFactory  the pipeline factory
+     */
+    void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory);
+
+    /**
+     * Initializes this <b>shared</b> {@link NettyServerBootstrapFactory}.
+     *
+     * @param threadFactory    the thread factory to use for shared bootstrap factory
+     * @param configuration    the bootstrap configuration
+     * @param pipelineFactory  the pipeline factory
+     */
+    void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory);
+
+    /**
+     * When a new {@link Channel} is opened.
+     */
+    void addChannel(Channel channel);
+
+    /**
+     * When a {@link Channel} is closed.
+     */
+    void removeChannel(Channel channel);
+
+    /**
+     * When a {@link NettyConsumer} is added and uses this bootstrap factory.
+     */
+    void addConsumer(NettyConsumer consumer);
+
+    /**
+     * When a {@link NettyConsumer} is removed and no longer using this bootstrap factory.
+     */
+    void removeConsumer(NettyConsumer consumer);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
new file mode 100644
index 0000000..c90565f
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.NioServerBossPool;
+
+/**
+ * A builder to create Netty {@link org.jboss.netty.channel.socket.nio.BossPool} which can be used for sharing boos pools
+ * with multiple Netty {@link org.apache.camel.component.netty.NettyServerBootstrapFactory} server bootstrap configurations.
+ */
+public final class NettyServerBossPoolBuilder {
+
+    private String name = "NettyServerBoss";
+    private String pattern;
+    private int bossCount = 1;
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setPattern(String pattern) {
+        this.pattern = pattern;
+    }
+
+    public void setBossCount(int bossCount) {
+        this.bossCount = bossCount;
+    }
+
+    public NettyServerBossPoolBuilder withName(String name) {
+        setName(name);
+        return this;
+    }
+
+    public NettyServerBossPoolBuilder withPattern(String pattern) {
+        setPattern(pattern);
+        return this;
+    }
+
+    public NettyServerBossPoolBuilder withBossCount(int bossCount) {
+        setBossCount(bossCount);
+        return this;
+    }
+
+    /**
+     * Creates a new boss pool.
+     */
+    BossPool build() {
+        return new NioServerBossPool(Executors.newCachedThreadPool(), bossCount, new CamelNettyThreadNameDeterminer(pattern, name));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
new file mode 100644
index 0000000..0179d2b
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+
+/**
+ * A builder to create Netty {@link WorkerPool} which can be used for sharing worker pools
+ * with multiple Netty {@link NettyServerBootstrapFactory} server bootstrap configurations.
+ */
+public final class NettyWorkerPoolBuilder {
+
+    private String name = "NettyWorker";
+    private String pattern;
+    private int workerCount;
+    private volatile WorkerPool workerPool;
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setPattern(String pattern) {
+        this.pattern = pattern;
+    }
+
+    public void setWorkerCount(int workerCount) {
+        this.workerCount = workerCount;
+    }
+
+    public NettyWorkerPoolBuilder withName(String name) {
+        setName(name);
+        return this;
+    }
+
+    public NettyWorkerPoolBuilder withPattern(String pattern) {
+        setPattern(pattern);
+        return this;
+    }
+
+    public NettyWorkerPoolBuilder withWorkerCount(int workerCount) {
+        setWorkerCount(workerCount);
+        return this;
+    }
+
+    /**
+     * Creates a new worker pool.
+     */
+    WorkerPool build() {
+        int count = workerCount > 0 ? workerCount : NettyHelper.DEFAULT_IO_THREADS;
+        workerPool = new NioWorkerPool(Executors.newCachedThreadPool(), count, new CamelNettyThreadNameDeterminer(pattern, name));
+        return workerPool;
+    }
+
+    /**
+     * Shutdown the created worker pool
+     */
+    public void destroy() {
+        if (workerPool != null) {
+            workerPool.shutdown();
+            workerPool = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
new file mode 100644
index 0000000..e8ffe11
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+
+/**
+ * Factory to create {@link ChannelPipeline} for clients, eg {@link NettyConsumer}.
+ * <p/>
+ * Implementators must support creating a new instance of this factory which is associated
+ * to the given {@link NettyConsumer} using the {@link #createPipelineFactory(NettyConsumer)}
+ * method.
+ *
+ * @see ChannelPipelineFactory
+ */
+public abstract class ServerPipelineFactory implements ChannelPipelineFactory {
+
+    /**
+     * Creates a new {@link ClientPipelineFactory} using the given {@link NettyConsumer}
+     *
+     * @param consumer the associated consumer
+     * @return the {@link ClientPipelineFactory} associated to ghe given consumer.
+     */
+    public abstract ServerPipelineFactory createPipelineFactory(NettyConsumer consumer);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
new file mode 100644
index 0000000..68f3e40
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.jboss.netty.channel.ChannelHandler;
+
+/**
+ * A {@link ChannelHandlerFactory} returning a shareable {@link ChannelHandler}.
+ */
+public class ShareableChannelHandlerFactory implements ChannelHandlerFactory {
+
+    private final ChannelHandler channelHandler;
+
+    public ShareableChannelHandlerFactory(ChannelHandler channelHandler) {
+        this.channelHandler = channelHandler;
+    }
+
+    @Override
+    public ChannelHandler newChannelHandler() {
+        return channelHandler;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SharedSingletonObjectPool.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SharedSingletonObjectPool.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SharedSingletonObjectPool.java
new file mode 100644
index 0000000..9121838
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SharedSingletonObjectPool.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.NoSuchElementException;
+
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.PoolableObjectFactory;
+
+/**
+ * An {@link org.apache.commons.pool.ObjectPool} that uses a single shared instance.
+ * <p/>
+ * This implementation will always return <tt>1</tt> in {@link #getNumActive()} and
+ * return <tt>0</tt> in {@link #getNumIdle()}.
+ */
+public class SharedSingletonObjectPool<T> implements ObjectPool<T> {
+
+    private final PoolableObjectFactory<T> factory;
+    private volatile T t;
+
+    public SharedSingletonObjectPool(PoolableObjectFactory<T> factory) {
+        this.factory = factory;
+    }
+
+    @Override
+    public synchronized T borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
+        if (t == null) {
+            t = factory.makeObject();
+        }
+        return t;
+    }
+
+    @Override
+    public void returnObject(T obj) throws Exception {
+        // noop
+    }
+
+    @Override
+    public void invalidateObject(T obj) throws Exception {
+        t = null;
+    }
+
+    @Override
+    public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
+        // noop
+    }
+
+    @Override
+    public int getNumIdle() throws UnsupportedOperationException {
+        return 0;
+    }
+
+    @Override
+    public int getNumActive() throws UnsupportedOperationException {
+        return 1;
+    }
+
+    @Override
+    public void clear() throws Exception, UnsupportedOperationException {
+        t = null;
+    }
+
+    @Override
+    public void close() throws Exception {
+        t = null;
+    }
+
+    @Override
+    public void setFactory(PoolableObjectFactory<T> factory) throws IllegalStateException, UnsupportedOperationException {
+        // noop
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
new file mode 100644
index 0000000..9806a38
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.support.ServiceSupport;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link NettyServerBootstrapFactory} which is used by a single consumer (not shared).
+ */
+public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(SingleTCPNettyServerBootstrapFactory.class);
+    private final ChannelGroup allChannels;
+    private CamelContext camelContext;
+    private ThreadFactory threadFactory;
+    private NettyServerBootstrapConfiguration configuration;
+    private ChannelPipelineFactory pipelineFactory;
+    private ChannelFactory channelFactory;
+    private ServerBootstrap serverBootstrap;
+    private Channel channel;
+    private BossPool bossPool;
+    private WorkerPool workerPool;
+
+    public SingleTCPNettyServerBootstrapFactory() {
+        this.allChannels = new DefaultChannelGroup(SingleTCPNettyServerBootstrapFactory.class.getName());
+    }
+
+    public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+        this.camelContext = camelContext;
+        this.configuration = configuration;
+        this.pipelineFactory = pipelineFactory;
+    }
+
+    public void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+        this.threadFactory = threadFactory;
+        this.configuration = configuration;
+        this.pipelineFactory = pipelineFactory;
+    }
+
+    public void addChannel(Channel channel) {
+        allChannels.add(channel);
+    }
+
+    public void removeChannel(Channel channel) {
+        allChannels.remove(channel);
+    }
+
+    public void addConsumer(NettyConsumer consumer) {
+        // does not allow sharing
+    }
+
+    public void removeConsumer(NettyConsumer consumer) {
+        // does not allow sharing
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (camelContext == null && threadFactory == null) {
+            throw new IllegalArgumentException("Either CamelContext or ThreadFactory must be set on " + this);
+        }
+        startServerBootstrap();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        stopServerBootstrap();
+    }
+
+    protected void startServerBootstrap() {
+        // prefer using explicit configured thread pools
+        BossPool bp = configuration.getBossPool();
+        WorkerPool wp = configuration.getWorkerPool();
+
+        if (bp == null) {
+            // create new pool which we should shutdown when stopping as its not shared
+            bossPool = new NettyServerBossPoolBuilder()
+                    .withBossCount(configuration.getBossCount())
+                    .withName("NettyServerTCPBoss")
+                    .build();
+            bp = bossPool;
+        }
+        if (wp == null) {
+            // create new pool which we should shutdown when stopping as its not shared
+            workerPool = new NettyWorkerPoolBuilder()
+                    .withWorkerCount(configuration.getWorkerCount())
+                    .withName("NettyServerTCPWorker")
+                    .build();
+            wp = workerPool;
+        }
+
+        channelFactory = new NioServerSocketChannelFactory(bp, wp);
+
+        serverBootstrap = new ServerBootstrap(channelFactory);
+        serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+        serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+        serverBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
+        serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+        serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
+        if (configuration.getBacklog() > 0) {
+            serverBootstrap.setOption("backlog", configuration.getBacklog());
+        }
+
+        // set any additional netty options
+        if (configuration.getOptions() != null) {
+            for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
+                serverBootstrap.setOption(entry.getKey(), entry.getValue());
+            }
+        }
+
+        LOG.debug("Created ServerBootstrap {} with options: {}", serverBootstrap, serverBootstrap.getOptions());
+
+        // set the pipeline factory, which creates the pipeline for each newly created channels
+        serverBootstrap.setPipelineFactory(pipelineFactory);
+
+        LOG.info("ServerBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
+        channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+        // to keep track of all channels in use
+        allChannels.add(channel);
+    }
+
+    protected void stopServerBootstrap() {
+        // close all channels
+        LOG.info("ServerBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort());
+
+        LOG.trace("Closing {} channels", allChannels.size());
+        ChannelGroupFuture future = allChannels.close();
+        future.awaitUninterruptibly();
+
+        // close server external resources
+        if (channelFactory != null) {
+            channelFactory.releaseExternalResources();
+            channelFactory = null;
+        }
+
+        // and then shutdown the thread pools
+        if (bossPool != null) {
+            bossPool.shutdown();
+            bossPool = null;
+        }
+        if (workerPool != null) {
+            workerPool.shutdown();
+            workerPool = null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
new file mode 100644
index 0000000..1a8d984
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.support.ServiceSupport;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.DatagramChannel;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.jboss.netty.handler.ipfilter.IpV4Subnet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link NettyServerBootstrapFactory} which is used by a single consumer (not shared).
+ */
+public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class);
+    private static final String LOOPBACK_INTERFACE = "lo";
+    private static final String MULTICAST_SUBNET = "224.0.0.0/4";
+    private final ChannelGroup allChannels;
+    private CamelContext camelContext;
+    private ThreadFactory threadFactory;
+    private NettyServerBootstrapConfiguration configuration;
+    private ChannelPipelineFactory pipelineFactory;
+    private DatagramChannelFactory datagramChannelFactory;
+    private ConnectionlessBootstrap connectionlessBootstrap;
+    private WorkerPool workerPool;
+
+    public SingleUDPNettyServerBootstrapFactory() {
+        this.allChannels = new DefaultChannelGroup(SingleUDPNettyServerBootstrapFactory.class.getName());
+    }
+
+    public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+        this.camelContext = camelContext;
+        this.configuration = configuration;
+        this.pipelineFactory = pipelineFactory;
+    }
+
+    public void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+        this.threadFactory = threadFactory;
+        this.configuration = configuration;
+        this.pipelineFactory = pipelineFactory;
+    }
+
+    public void addChannel(Channel channel) {
+        allChannels.add(channel);
+    }
+
+    public void removeChannel(Channel channel) {
+        allChannels.remove(channel);
+    }
+
+    public void addConsumer(NettyConsumer consumer) {
+        // does not allow sharing
+    }
+
+    public void removeConsumer(NettyConsumer consumer) {
+        // does not allow sharing
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (camelContext == null && threadFactory == null) {
+            throw new IllegalArgumentException("Either CamelContext or ThreadFactory must be set on " + this);
+        }
+        startServerBootstrap();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        stopServerBootstrap();
+    }
+
+    protected void startServerBootstrap() throws UnknownHostException, SocketException {
+        // create non-shared worker pool
+        int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS;
+        workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count);
+
+        datagramChannelFactory = new NioDatagramChannelFactory(workerPool);
+
+        connectionlessBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
+        connectionlessBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+        connectionlessBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+        connectionlessBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
+        connectionlessBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+        connectionlessBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
+        connectionlessBootstrap.setOption("child.broadcast", configuration.isBroadcast());
+        connectionlessBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
+        connectionlessBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
+        // only set this if user has specified
+        if (configuration.getReceiveBufferSizePredictor() > 0) {
+            connectionlessBootstrap.setOption("receiveBufferSizePredictorFactory",
+                    new FixedReceiveBufferSizePredictorFactory(configuration.getReceiveBufferSizePredictor()));
+        }
+        if (configuration.getBacklog() > 0) {
+            connectionlessBootstrap.setOption("backlog", configuration.getBacklog());
+        }
+
+        // set any additional netty options
+        if (configuration.getOptions() != null) {
+            for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
+                connectionlessBootstrap.setOption(entry.getKey(), entry.getValue());
+            }
+        }
+
+        LOG.debug("Created ConnectionlessBootstrap {} with options: {}", connectionlessBootstrap, connectionlessBootstrap.getOptions());
+
+        // set the pipeline factory, which creates the pipeline for each newly created channels
+        connectionlessBootstrap.setPipelineFactory(pipelineFactory);
+
+        InetSocketAddress hostAddress = new InetSocketAddress(configuration.getHost(), configuration.getPort());
+        IpV4Subnet multicastSubnet = new IpV4Subnet(MULTICAST_SUBNET);
+
+        if (multicastSubnet.contains(configuration.getHost())) {
+            DatagramChannel channel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress);
+            String networkInterface = configuration.getNetworkInterface() == null ? LOOPBACK_INTERFACE : configuration.getNetworkInterface();
+            NetworkInterface multicastNetworkInterface = NetworkInterface.getByName(networkInterface);
+            LOG.info("ConnectionlessBootstrap joining {}:{} using network interface: {}", new Object[]{configuration.getHost(), configuration.getPort(), multicastNetworkInterface.getName()});
+            channel.joinGroup(hostAddress, multicastNetworkInterface);
+            allChannels.add(channel);
+        } else {
+            LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
+            Channel channel = connectionlessBootstrap.bind(hostAddress);
+            allChannels.add(channel);
+        }
+    }
+
+    protected void stopServerBootstrap() {
+        // close all channels
+        LOG.info("ConnectionlessBootstrap disconnecting from {}:{}", configuration.getHost(), configuration.getPort());
+
+        LOG.trace("Closing {} channels", allChannels.size());
+        ChannelGroupFuture future = allChannels.close();
+        future.awaitUninterruptibly();
+
+        // close server external resources
+        if (datagramChannelFactory != null) {
+            datagramChannelFactory.releaseExternalResources();
+            datagramChannelFactory = null;
+        }
+
+        // and then shutdown the thread pools
+        if (workerPool != null) {
+            workerPool.shutdown();
+            workerPool = null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/TextLineDelimiter.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/TextLineDelimiter.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/TextLineDelimiter.java
new file mode 100644
index 0000000..9791e57
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/TextLineDelimiter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+/**
+ * Possible text line delimiters to be used with the textline codec.
+ *
+ * @version 
+ */
+public enum TextLineDelimiter {
+    LINE, NULL;
+}


Mime
View raw message