camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Willem Jiang <willem.ji...@gmail.com>
Subject Re: Camel 2.14/Netty: How to add ByteArrayDecoder to ServerChannelPipeline?
Date Tue, 30 Jun 2015 08:03:08 GMT
camel-netty is based on Netty3.x which page name starts with org.jboss.netty

It looks like you are using Netty 4.x (page name starts with io.netty), I suggest you use
camel-netty4 instead of camel-netty component.

--  
Willem Jiang

Red Hat, Inc.
Web: http://www.redhat.com
Blog: http://willemjiang.blogspot.com (English)
http://jnn.iteye.com (Chinese)
Twitter: willemjiang  
Weibo: 姜宁willem



On June 29, 2015 at 2:58:07 PM, SteveR (srichardson@vonage.com) wrote:
> I have a linux Java7 stand-alone application using Camel 2.14 and the
> camel-netty component. I have a route that receives via netty:udp and then
> attempts to mirror the received packets also via netty:udp. I'm also using
> the *LengthFieldBasedFrameDecoder *to decode the UDP packets into message
> frames based on a 2-byte length field within each UDP message.
>  
> The UDP messages I'm receiving contain certain characters that don't decode
> in UTF-8 (e.g. 0xa1 0xb2, 0xc3 ), so I've been trying to use the
> *iso-8859-1* charset. I'm thinking that what I want in my
> ServerPipelineFactory subclass is the
> *io.netty.handler.codec.bytes.ByteArrayDecoder*, but I'm unable to get it to
> compile with the *addlast()* method.
>  
> I've been using the approach outlined here
>  
> thus far, but then I run into this compile issue with *io.netty *versus
> *org.jboss.netty*.
>  
> Which leads me to believe that I'm confused wrt imports for *io.netty*
> versus *org.jboss.netty*? Below is my extended *ServerPipelineFactory
> *class which I'd like to switch the StringDecoder/StringEncoder for
> ByteArrayDecoder/ByteArrayEncoder.
>  
> Any help is greatly appreciated!
>  
> Thanks, SteveR
>  
>  
> package multiprotocollistenerrouter;
>  
> //import io.netty.handler.codec.bytes.ByteArrayDecoder;
> //import io.netty.handler.codec.bytes.ByteArrayEncoder;
> //import io.netty.channel.ChannelPipeline;
> //import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
>  
> import io.netty.util.CharsetUtil;
> import org.apache.camel.component.netty.NettyConsumer;
> import org.apache.camel.component.netty.ServerPipelineFactory;
> import org.apache.camel.component.netty.handlers.ServerChannelHandler;
> import org.jboss.netty.channel.ChannelHandler;
> import org.jboss.netty.channel.ChannelPipeline;
> import org.jboss.netty.channel.Channels;
> import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;  
> import org.jboss.netty.handler.codec.string.StringDecoder;
> import org.jboss.netty.handler.codec.string.StringEncoder;
>  
> //import org.jboss.netty.handler.logging.LoggingHandler;
> //import org.jboss.netty.logging.InternalLogLevel;
>  
> import org.slf4j.Logger; // The org.slf4j.Logger interface is the
> main user entry point of SLF4J API.
> import org.slf4j.LoggerFactory; // Utility class producing Loggers for
> various logging APIs, most notably for log4j.
>  
> /**
> * Consumer linked channel pipeline factory for the MCQ source provider.
> * Provides custom support for reception/parsing/processing of MCQ.
> *
> * @author steve
> *
> * @see http://camel.apache.org/netty.html
> * @see
> http://opensourceknowledge.blogspot.com/2010/08/customizing-netty-endpoints-using.html#
 
> * @see
> http://seeallhearall.blogspot.com/2012/06/netty-tutorial-part-15-on-channel.html  
> * @see
> https://github.com/apache/camel/blob/master/components/camel-netty/src/test/  
> *
> java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java  
> */
> public class McqServerPipelineFactory extends ServerPipelineFactory {
> private final static Logger logger =
> LoggerFactory.getLogger(McqServerPipelineFactory.class);
> private static final String NEW_LINE =
> System.getProperty("line.separator");
>  
> // -------------------------------------------------------------------
> // Stateless, singleton handler instances...re-used across connections
> // -------------------------------------------------------------------
> private static final ChannelHandler STR_ENCODER = new
> StringEncoder(CharsetUtil.ISO_8859_1);
> private static final ChannelHandler STR_DECODER = new
> StringDecoder(CharsetUtil.ISO_8859_1);
>  
> // private static final ChannelHandler LOG_HANDLER = new
> LoggingHandler(InternalLogLevel.INFO);
>  
> // private static final ByteArrayDecoder BYTES_DECODER = new
> ByteArrayDecoder();
> // private static final ByteArrayEncoder BYTES_ENCODER = new
> ByteArrayEncoder();
>  
> private final NettyConsumer consumer;
>  
> private final int maxFrameLength;
> private final int lengthFieldOffset;
> private final int lengthFieldLength;
> private final int lengthAdjustment;
> private final int initialBytesToStrip;
> private boolean invoked;
> private String routeId;
>  
> @Override
> public ChannelPipeline getPipeline() throws Exception {
> logger.trace("getPipeline(): ENTER");
>  
> invoked = true;
> ChannelPipeline channelPipeline = Channels.pipeline();
>  
> String theRouteId = consumer.getRoute().getId();
> logger.info("getPipeline(): {}, routeId = {}", consumer.toString(),
> theRouteId);
>  
> // -----------------------------------------------
> // Add logger to print incoming and outgoing data.
> // This is both an upstream/downstream handler.
> // -----------------------------------------------
> // String loggerName = "MCQ_LOGGING_HANDLER_" + theRouteId;
> // channelPipeline.addLast(loggerName, LOG_HANDLER);
>  
> //
> ----------------------------------------------------------------------------------- 

> // Here we add the custom message decoder. Frame decoders are
> typically
> // stateful, thus we create a new instance with every pipeline.
> // See
> https://docs.jboss.org/netty/3.1/api/org/jboss/netty/buffer/ChannelBuffer.html  
> // See
> https://docs.jboss.org/netty/3.1/api/org/jboss/netty/buffer/ChannelBuffers.html  
> //
> ----------------------------------------------------------------------------------- 

> String lengthFieldBasedFrameDecoderName =
> "MCQ_LENGTH_FIELD_FRAME_BASED_DECODER_" + theRouteId;
> channelPipeline.addLast(lengthFieldBasedFrameDecoderName,
> new LengthFieldBasedFrameDecoder(maxFrameLength,
> lengthFieldOffset,
> lengthFieldLength,
> lengthAdjustment,
> initialBytesToStrip));
>  
> if(logger.isInfoEnabled()) {
> StringBuilder sb = new StringBuilder();
> sb.append("getPipeline(): Added
> ").append(lengthFieldBasedFrameDecoderName).append(":").
> append(NEW_LINE).
> append("maxFrameLength = ").append(maxFrameLength)
> .append(NEW_LINE).
> append("lengthFieldOffset =
> ").append(lengthFieldOffset).append(NEW_LINE).
> append("lengthFieldLength =
> ").append(lengthFieldLength).append(NEW_LINE).
> append("lengthAdjustment = ").append(lengthAdjustment)
> .append(NEW_LINE).
> append("initialBytesToStrip = ").append(initialBytesToStrip);
> logger.info(sb.toString());
> }
>  
> // -----------------------------------------------------------
> // Add string encoder (downstream) / string decoder (upstream)
> // -----------------------------------------------------------
> String stringDecoderName = "MCQ_STRING_DECODER_" + theRouteId;
> channelPipeline.addLast(stringDecoderName, STR_DECODER);
> String stringEncoderName = "MCQ_STRING_ENCODER_" + theRouteId;
> channelPipeline.addLast(stringEncoderName, STR_ENCODER);
>  
> // -----------------------------------------------------------
> // Add byte[] encoder (downstream) / byte[] decoder (upstream)
> // -----------------------------------------------------------
> // String bytesDecoderName = "MCQ_BYTES_DECODER_" + theRouteId;
> // channelPipeline.addLast(bytesDecoderName, BYTES_DECODER);
> // String bytesEncoderName = "MCQ_BYTES_ENCODER_" + theRouteId;
> // channelPipeline.addLast(bytesEncoderName, BYTES_ENCODER);
>  
> // ------------------------------------------------------
> // Here we add the default Camel ServerChannelHandler for
> // the consumer, to allow Camel to route the message, etc.
> // ------------------------------------------------------
> String serverChannelHandlerName = "MCQ_SERVER_CHANNEL_HANDLER_" +
> theRouteId;
> channelPipeline.addLast(serverChannelHandlerName, new
> ServerChannelHandler(consumer));
> logger.info("getPipeline(): Added default Camel {}",
> serverChannelHandlerName);
>  
> logger.trace("getPipeline(): EXIT");
> return channelPipeline;
> }
>  
> /**
> * Create the server pipeline factory.
> *
> * @param consumer
> * @return ServerPipelineFactory
> */
> @Override
> public ServerPipelineFactory createPipelineFactory(NettyConsumer
> consumer) {
> logger.trace("createPipelineFactory(NettyConsumer): ENTER with {}",
> (consumer == null) ? "null" : consumer);
>  
> if(consumer != null) {
> // -------------------------------------------------------------
> // - Get routeId from NettyConsumer input.
> // - Get serverPipelineFactoryName from routeId.
> // - Get LengthFieldBasedFrameDecoderBean from RouteManager.
> // - Construct McqServerPipelineFactory from
> // NettyConsumer and LengthFieldBasedFrameDecoderBean members.
> // -------------------------------------------------------------
> String theRouteId = consumer.getRoute().getId();
>  
> String serverPipelineFactoryName =
> RouteManager.getServerPipelineFactoryName(theRouteId);
>  
> LengthFieldBasedFrameDecoderBean
> lengthFieldBasedFrameDecoderBean =
>  
> RouteManager.getLengthFieldBasedFrameDecoderInfo(serverPipelineFactoryName);  
>  
> Integer theMaxFrameLength =
> lengthFieldBasedFrameDecoderBean.getMaxFrameLengthInBytes();
> Integer theLengthFieldOffset =
> lengthFieldBasedFrameDecoderBean.getFrameDecoderLengthFieldOffset();
> Integer theLengthFieldLength =
> lengthFieldBasedFrameDecoderBean.getFrameDecoderLengthFieldLength();
> Integer theLengthAdjustment =
> lengthFieldBasedFrameDecoderBean.getFrameDecoderLengthAdjustment();
> Integer theInitialBytesToStrip =
> lengthFieldBasedFrameDecoderBean.getFrameDecoderInitialBytesToStrip();
>  
> if(logger.isInfoEnabled()) {
> StringBuilder sb = new StringBuilder();
> sb.append("Constructing McqServerPipelineFactory:")
> .append(NEW_LINE).
> append("consumer = ").append(consumer)
> .append(NEW_LINE).
> append("maxFrameLength =
> ").append(theMaxFrameLength) .append(NEW_LINE).
> append("lengthFieldOffset =
> ").append(theLengthFieldOffset) .append(NEW_LINE).
> append("lengthFieldLength =
> ").append(theLengthFieldLength) .append(NEW_LINE).
> append("lengthAdjustment =
> ").append(theLengthAdjustment) .append(NEW_LINE).
> append("initialBytesToStrip =
> ").append(theInitialBytesToStrip).append(NEW_LINE).
> append("routeId = ").append(theRouteId);
> logger.info(sb.toString());
> }
>  
> logger.trace("createPipelineFactory(): EXIT");
> return new McqServerPipelineFactory(consumer,
> theMaxFrameLength,
> theLengthFieldOffset,
> theLengthFieldLength,
> theLengthAdjustment,
> theInitialBytesToStrip,
> theRouteId);
> }
>  
> logger.trace("createPipelineFactory(): EXIT");
> return null;
> }
>  
> public boolean isfactoryInvoked() {
> return invoked;
> }
>  
> public McqServerPipelineFactory(NettyConsumer consumer) {
> logger.trace("McqServerPipelineFactory(NettyConsumer): ENTER with
> {}", (consumer == null) ? "null" : consumer);
>  
> this.maxFrameLength = 0;
> this.lengthFieldOffset = 0;
> this.lengthFieldLength = 0;
> this.lengthAdjustment = 0;
> this.initialBytesToStrip = 0;
> this.consumer = consumer;
>  
> logger.trace("McqServerPipelineFactory(NettyConsumer): EXIT");
> }
>  
> public McqServerPipelineFactory(
> NettyConsumer consumer,
> Integer maxFrameLength,
> Integer lengthFieldOffset,
> Integer lengthFieldLength,
> Integer lengthAdjustment,
> Integer initialBytesToStrip,
> String routeId) {
> if(logger.isTraceEnabled()) {
> StringBuilder sb = new StringBuilder();
> sb.append("McqServerPipelineFactory(): ENTER with:").
> append(NEW_LINE).
> append("consumer = ").append(consumer)
> .append(NEW_LINE).
> append("maxFrameLength = ").append(maxFrameLength)
> .append(NEW_LINE).
> append("lengthFieldOffset = ").append(lengthFieldOffset)
> .append(NEW_LINE).
> append("lengthFieldLength = ").append(lengthFieldLength)
> .append(NEW_LINE).
> append("lengthAdjustment = ").append(lengthAdjustment)
> .append(NEW_LINE).
> append("initialBytesToStrip =
> ").append(initialBytesToStrip).append(NEW_LINE).
> append("routeId = ").append(routeId);
> logger.trace(sb.toString());
> }
>  
> this.consumer = consumer;
> this.maxFrameLength = maxFrameLength;
> this.lengthFieldOffset = lengthFieldOffset;
> this.lengthFieldLength = lengthFieldLength;
> this.lengthAdjustment = lengthAdjustment;
> this.initialBytesToStrip = initialBytesToStrip;
> this.routeId = routeId;
>  
> logger.trace("McqServerPipelineFactory(): EXIT");
> }
> }
> // ------------- end --------------
>  
>  
>  
>  
>  
> --
> View this message in context: http://camel.465427.n5.nabble.com/Camel-2-14-Netty-How-to-add-ByteArrayDecoder-to-ServerChannelPipeline-tp5768638.html
 
> Sent from the Camel - Users mailing list archive at Nabble.com.
>  


Mime
View raw message