Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 98876 invoked from network); 19 Apr 2010 04:53:10 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 19 Apr 2010 04:53:10 -0000 Received: (qmail 48639 invoked by uid 500); 19 Apr 2010 04:53:10 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 48618 invoked by uid 500); 19 Apr 2010 04:53:10 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 48611 invoked by uid 99); 19 Apr 2010 04:53:10 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Apr 2010 04:53:10 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Apr 2010 04:53:06 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 76B86238898B; Mon, 19 Apr 2010 04:52:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r935452 - in /camel/trunk/components/camel-netty/src: main/java/org/apache/camel/component/netty/ test/java/org/apache/camel/component/netty/ Date: Mon, 19 Apr 2010 04:52:23 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100419045223.76B86238898B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Mon Apr 19 04:52:23 2010 New Revision: 935452 URL: http://svn.apache.org/viewvc?rev=935452&view=rev Log: CAMEL-2657: Multiple encoder and decoders now possible. Thanks to Stephen Gargan for the patch. Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java (with props) Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=935452&r1=935451&r2=935452&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java Mon Apr 19 04:52:23 2010 @@ -16,14 +16,18 @@ */ package org.apache.camel.component.netty; +import java.util.List; + import javax.net.ssl.SSLEngine; import org.apache.camel.component.netty.handlers.ClientChannelHandler; import org.apache.camel.component.netty.ssl.SSLEngineFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.jboss.netty.channel.ChannelDownstreamHandler; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.Channels; import org.jboss.netty.handler.ssl.SslHandler; @@ -50,9 +54,17 @@ public class ClientPipelineFactory imple } channelPipeline.addLast("ssl", sslHandler); } - - channelPipeline.addLast("decoder", producer.getConfiguration().getDecoder()); - channelPipeline.addLast("encoder", producer.getConfiguration().getEncoder()); + + List decoders = producer.getConfiguration().getDecoders(); + for (int x = 0; x < decoders.size(); x++) { + channelPipeline.addLast("decoder-" + x, decoders.get(x)); + } + + List encoders = producer.getConfiguration().getEncoders(); + for (int x = 0; x < encoders.size(); x++) { + channelPipeline.addLast("encoder-" + x, encoders.get(x)); + } + if (producer.getConfiguration().getHandler() != null) { channelPipeline.addLast("handler", producer.getConfiguration().getHandler()); } else { Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=935452&r1=935451&r2=935452&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Mon Apr 19 04:52:23 2010 @@ -18,6 +18,8 @@ package org.apache.camel.component.netty import java.io.File; import java.net.URI; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.camel.util.URISupport; @@ -29,6 +31,7 @@ import org.jboss.netty.handler.codec.ser import org.jboss.netty.handler.codec.serialization.ObjectEncoder; import org.jboss.netty.handler.ssl.SslHandler; +@SuppressWarnings("unchecked") public class NettyConfiguration { private String protocol; private String host; @@ -44,8 +47,8 @@ public class NettyConfiguration { private File keyStoreFile; private File trustStoreFile; private SslHandler sslHandler; - private ChannelDownstreamHandler encoder; - private ChannelUpstreamHandler decoder; + private List encoders = new ArrayList(); + private List decoders = new ArrayList(); private ChannelHandler handler; private boolean ssl; private long sendBufferSize; @@ -69,63 +72,72 @@ public class NettyConfiguration { setCorePoolSize(10); setMaxPoolSize(100); } - + public void parseURI(URI uri, Map parameters, NettyComponent component) throws Exception { protocol = uri.getScheme(); - + if ((!protocol.equalsIgnoreCase("tcp")) && (!protocol.equalsIgnoreCase("udp"))) { throw new IllegalArgumentException("Unrecognized Netty protocol: " + protocol + " for uri: " + uri); } - + setHost(uri.getHost()); setPort(uri.getPort()); - + sslHandler = component.resolveAndRemoveReferenceParameter(parameters, "sslHandler", SslHandler.class, null); passphrase = component.resolveAndRemoveReferenceParameter(parameters, "passphrase", String.class, null); - keyStoreFormat = component.getAndRemoveParameter(parameters, "keyStoreFormat", String.class, "JKS"); + keyStoreFormat = component.getAndRemoveParameter(parameters, "keyStoreFormat", String.class, "JKS"); securityProvider = component.getAndRemoveParameter(parameters, "securityProvider", String.class, "SunX509"); keyStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "keyStoreFile", File.class, null); trustStoreFile = component.resolveAndRemoveReferenceParameter(parameters, "trustStoreFile", File.class, null); - encoder = component.resolveAndRemoveReferenceParameter(parameters, "encoder", ChannelDownstreamHandler.class, new ObjectEncoder()); - decoder = component.resolveAndRemoveReferenceParameter(parameters, "decoder", ChannelUpstreamHandler.class, new ObjectDecoder()); - handler = component.resolveAndRemoveReferenceParameter(parameters, "handler", SimpleChannelHandler.class, null); - + + List referencedEncoders = component.resolveAndRemoveReferenceParameter(parameters, "encoders", List.class, null); + addToHandlersList(encoders, referencedEncoders, ChannelDownstreamHandler.class); + List referencedDecoders = component.resolveAndRemoveReferenceParameter(parameters, "decoders", List.class, null); + addToHandlersList(decoders, referencedDecoders, ChannelUpstreamHandler.class); + + if (encoders.isEmpty() && decoders.isEmpty()) { + encoders.add(component.resolveAndRemoveReferenceParameter(parameters, "encoder", ChannelDownstreamHandler.class, new ObjectEncoder())); + decoders.add(component.resolveAndRemoveReferenceParameter(parameters, "decoder", ChannelUpstreamHandler.class, new ObjectDecoder())); + } + + handler = component.resolveAndRemoveReferenceParameter(parameters, "handler", SimpleChannelHandler.class, null); + Map settings = URISupport.parseParameters(uri); if (settings.containsKey("keepAlive")) { setKeepAlive(Boolean.valueOf((String) settings.get("keepAlive"))); - } + } if (settings.containsKey("tcpNoDelay")) { setTcpNoDelay(Boolean.valueOf((String) settings.get("tcpNoDelay"))); - } + } if (settings.containsKey("broadcast")) { setBroadcast(Boolean.valueOf((String) settings.get("broadcast"))); - } + } if (settings.containsKey("reuseAddress")) { setReuseAddress(Boolean.valueOf((String) settings.get("reuseAddress"))); } if (settings.containsKey("connectTimeoutMillis")) { - setConnectTimeoutMillis(Long.valueOf((String)settings.get("connectTimeoutMillis"))); + setConnectTimeoutMillis(Long.valueOf((String) settings.get("connectTimeoutMillis"))); } if (settings.containsKey("sync")) { setTcpNoDelay(Boolean.valueOf((String) settings.get("sync"))); } if (settings.containsKey("receiveTimeoutMillis")) { - setReceiveTimeoutMillis(Long.valueOf((String)settings.get("receiveTimeoutMillis"))); + setReceiveTimeoutMillis(Long.valueOf((String) settings.get("receiveTimeoutMillis"))); } if (settings.containsKey("sendBufferSize")) { - setSendBufferSize(Long.valueOf((String)settings.get("sendBufferSize"))); + setSendBufferSize(Long.valueOf((String) settings.get("sendBufferSize"))); } if (settings.containsKey("receiveBufferSize")) { - setReceiveBufferSize(Long.valueOf((String)settings.get("receiveBufferSize"))); - } + setReceiveBufferSize(Long.valueOf((String) settings.get("receiveBufferSize"))); + } if (settings.containsKey("ssl")) { setTcpNoDelay(Boolean.valueOf((String) settings.get("ssl"))); } if (settings.containsKey("corePoolSize")) { - setCorePoolSize(Integer.valueOf((String)settings.get("corePoolSize"))); + setCorePoolSize(Integer.valueOf((String) settings.get("corePoolSize"))); } if (settings.containsKey("maxPoolSize")) { - setMaxPoolSize(Integer.valueOf((String)settings.get("maxPoolSize"))); + setMaxPoolSize(Integer.valueOf((String) settings.get("maxPoolSize"))); } } @@ -168,7 +180,7 @@ public class NettyConfiguration { public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } - + public boolean isBroadcast() { return broadcast; } @@ -209,20 +221,41 @@ public class NettyConfiguration { this.sslHandler = sslHandler; } + + public List getEncoders() { + return encoders; + } + + public List getDecoders() { + return decoders; + } + public ChannelDownstreamHandler getEncoder() { - return encoder; + return encoders.isEmpty() ? null : encoders.get(0); } public void setEncoder(ChannelDownstreamHandler encoder) { - this.encoder = encoder; + if (!encoders.contains(encoder)) { + encoders.add(encoder); + } + } + + public void setEncoders(List encoders) { + this.encoders = encoders; } public ChannelUpstreamHandler getDecoder() { - return decoder; + return decoders.isEmpty() ? null : decoders.get(0); } public void setDecoder(ChannelUpstreamHandler decoder) { - this.decoder = decoder; + if (!decoders.contains(decoder)) { + decoders.add(decoder); + } + } + + public void setDecoders(List decoders) { + this.decoders = decoders; } public ChannelHandler getHandler() { @@ -248,7 +281,7 @@ public class NettyConfiguration { public void setSendBufferSize(long sendBufferSize) { this.sendBufferSize = sendBufferSize; } - + public boolean isSsl() { return ssl; } @@ -319,6 +352,17 @@ public class NettyConfiguration { public void setSecurityProvider(String securityProvider) { this.securityProvider = securityProvider; - } + } + + private void addToHandlersList(List configured, List handlers, Class handlerType) { + if (handlers != null) { + for (int x = 0; x < handlers.size(); x++) { + Object handler = handlers.get(x); + if (handlerType.isInstance(handler)) { + configured.add(handler); + } + } + } + } } Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java?rev=935452&r1=935451&r2=935452&view=diff ============================================================================== --- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java (original) +++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ServerPipelineFactory.java Mon Apr 19 04:52:23 2010 @@ -16,14 +16,18 @@ */ package org.apache.camel.component.netty; +import java.util.List; + import javax.net.ssl.SSLEngine; import org.apache.camel.component.netty.handlers.ServerChannelHandler; import org.apache.camel.component.netty.ssl.SSLEngineFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.jboss.netty.channel.ChannelDownstreamHandler; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.Channels; import org.jboss.netty.handler.ssl.SslHandler; @@ -45,8 +49,15 @@ public class ServerPipelineFactory imple } channelPipeline.addLast("ssl", sslHandler); } - channelPipeline.addLast("decoder", consumer.getConfiguration().getDecoder()); - channelPipeline.addLast("encoder", consumer.getConfiguration().getEncoder()); + List decoders = consumer.getConfiguration().getDecoders(); + for (int x = 0; x < decoders.size(); x++) { + channelPipeline.addLast("decoder-" + x, decoders.get(x)); + } + + List encoders = consumer.getConfiguration().getEncoders(); + for (int x = 0; x < encoders.size(); x++) { + channelPipeline.addLast("encoder-" + x, encoders.get(x)); + } if (consumer.getConfiguration().getHandler() != null) { channelPipeline.addLast("handler", consumer.getConfiguration().getHandler()); } else { Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java?rev=935452&view=auto ============================================================================== --- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java (added) +++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java Mon Apr 19 04:52:23 2010 @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.jboss.netty.channel.ChannelDownstreamHandler; +import org.jboss.netty.channel.ChannelUpstreamHandler; +import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; +import org.jboss.netty.handler.codec.frame.LengthFieldPrepender; +import org.jboss.netty.handler.codec.string.StringDecoder; +import org.jboss.netty.handler.codec.string.StringEncoder; +import org.junit.Test; + +public class MultipleCodecsTest extends CamelTestSupport { + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + + List decoders = new ArrayList(); + decoders.add(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); + decoders.add(new StringDecoder()); + + List encoders = new ArrayList(); + encoders.add(new LengthFieldPrepender(4)); + encoders.add(new StringEncoder()); + + registry.bind("encoders", encoders); + registry.bind("decoders", decoders); + + return registry; + } + + @Test + public void canSupplyMultipleCodecsToEndpointPipeline() throws Exception { + String poem = new Poetry().getPoem(); + MockEndpoint mock = getMockEndpoint("mock:multiple-codec"); + mock.expectedBodiesReceived(poem); + sendBody("direct:mutliple-codec", poem); + mock.await(1, TimeUnit.SECONDS); + mock.assertIsSatisfied(); + + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct:mutliple-codec").to("netty:tcp://localhost:5150?encoders=#encoders"); + + from("netty:tcp://localhost:5150?decoders=#decoders").to("mock:multiple-codec"); + } + }; + } + +} Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/MultipleCodecsTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date