camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From SteveR <srichard...@vonage.com>
Subject camel-netty4: MessageToMessageDecoder<DatagramPacket> gives only first 2048 octets
Date Mon, 12 Oct 2015 00:41:16 GMT
I'm using *Camel 2.15.3* and *camel-netty4*, and since upgrading from
*camel-netty3*, I'm having problems receiving full JSON messages via UDP. 
Each JSON message is about 3 to 5 kbytes, but my
*MessageToMessageDecoder<DatagramPacket>* implementation is only giving me
the first 2048 (i.e. 2k bytes).  From a test program, I send in one UDP
message, and from my debug prints within my
*MessageToMessageDecoder<DatagramPacket>* it shows that the *decode()*
method is only called once.

I'm currently reading through *Netty In Action*, but i see this in my log
file: *UnpooledUnsafeDirectByteBuf(ridx: 0, widx: 2048, cap: 2048))*

I desperately need to get this fixed in production, and just need to be able
to 

My route looks like the below. It consumes from *netty4:udp* and produces to
a SEDA queue, just for now while testing:

	<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
	<route startupOrder="104" customId="true" id="ROUTE_ID_RAW_CQMS_EVENTS"
xmlns="http://camel.apache.org/schema/spring">
		<from
uri="netty4:udp://devserver-09.dev.s.mission.net:11111?serverPipelineFactory=#CQMS_SERVER_PIPELINE_FACTORY_ROUTE_ID_RAW_CQMS_EVENTS&amp;keepAlive=true&amp;sync=false&amp;receiveBufferSize=26214400&amp;sendBufferSize=26214400&amp;allowDefaultCodec=false&amp;disconnectOnNoReply=false&amp;receiveBufferSizePredictor=8192"/>
		<setProperty propertyName="CamelCharsetName" id="setProperty1">
			<expressionDefinition>iso-8859-1</expressionDefinition>
		</setProperty>
		<threads poolSize="7" maxPoolSize="14"
threadName="threads_ROUTE_ID_RAW_CQMS_EVENTS" callerRunsWhenRejected="true"
id="threads1">
			<to
uri="seda:SEDA_INPUT_QUEUE_102?size=200000&amp;concurrentConsumers=10&amp;waitForTaskToComplete=Never&amp;failIfNoConsumers=true&amp;timeout=10000"
id="to1"/>
			<setProperty propertyName="CamelCharsetName" id="setProperty2">
				<expressionDefinition>iso-8859-1</expressionDefinition>
			</setProperty>
		</threads>
	</route>


I print out the received *DatagramPacket*, which shows this: 
*UnpooledUnsafeDirectByteBuf(ridx: 0, widx: 2048, cap: 2048))*

*Here is my *MessageToMessageDecoder<DatagramPacket>* implementation:*

	package com.mission.mplr.multiprotocollistenerrouter;

	import com.vonage.mplr.utils.MiscUtils;
	import io.netty.channel.ChannelHandlerContext; // Represents the "binding"
between a ChannelHandler and the ChannelPipeline.
	import io.netty.channel.socket.DatagramPacket;
	import io.netty.handler.codec.MessageToMessageDecoder;
	import java.nio.charset.Charset;
	import java.util.List;
	import org.slf4j.Logger;        // The org.slf4j.Logger interface is the
main user entry point of SLF4J API.
	import org.slf4j.LoggerFactory; // Utility class producing Loggers for
various logging APIs, most notably for log4j.


	public class UdpDatagramDecoder extends
MessageToMessageDecoder<DatagramPacket> {
		private static final Logger logger      =
LoggerFactory.getLogger(UdpDatagramDecoder.class);
		private static final Logger errorLogger =
LoggerFactory.getLogger("ERROR_LOGGER");
		private final String CHARSET_NAME;

		UdpDatagramDecoder(String charsetName) {
			this.CHARSET_NAME = charsetName;
		}

		@Override

		public boolean acceptInboundMessage(Object msg) throws Exception {
			return true;
		}

		@Override
		protected void decode(ChannelHandlerContext chc, DatagramPacket packet,
List out) throws Exception {
			logger.info("decode(): ENTER");

			logger.info("decode(): Received datagram = {}", packet);

			String packetAsString =
packet.content().toString(Charset.forName(CHARSET_NAME));
			
			if(packetAsString == null) {
				return; // Nothing to do
			} else {
				out.add(packetAsString);
				packet.retain();
			}

			logger.info("decode(): bodyBytesAsString[size={}] = {}",
packetAsString.length(), packetAsString);

			String bodyBytesAsHex = MiscUtils.stringAsHex(packetAsString,
CHARSET_NAME);
			logger.info("decode(): bodyBytesAsHex[size={}] = {}",
bodyBytesAsHex.length(), bodyBytesAsHex);

			logger.info("decode(): EXIT");
		}
	}
	// ------------- end --------------

*My server pipeline has this initChannel() implementation:*

    @Override
    protected void initChannel(Channel ch) throws Exception {
        logger.trace("initChannel(): ENTER");

        ChannelPipeline channelPipeline = ch.pipeline();
        serverInvoked = true;  

        String theSourceRouteId = consumer.getRoute().getId();
        logger.debug("initChannel(): consumer = {}, theSourceRouteId = {}",
consumer.toString(), theSourceRouteId);

        //
-------------------------------------------------------------------
        // Here we add the custom UDP datagram decoder. Decoders are
typically
        // stateful, thus we create a new instance with every pipeline.
        //
-------------------------------------------------------------------
        String udpPacketDecoderName = "CQMS_UDP_DATAGRAM_DECODER_" +
theSourceRouteId;
        logger.debug("initChannel(): Adding {}", udpPacketDecoderName);
        channelPipeline.addLast(udpPacketDecoderName, new
UdpDatagramDecoder(CHARSET_NAME));

        //
-----------------------------------------------------------------------------------------
        // Default Camel ServerChannelHandler for the consumer, to allow
Camel to route the message.
        //
-----------------------------------------------------------------------------------------
        String serverChannelHandlerName = "CQMS_SERVER_CHANNEL_HANDLER_" +
theSourceRouteId;
        logger.debug("initChannel(): Adding {}", serverChannelHandlerName);
        channelPipeline.addLast(serverChannelHandlerName, new
ServerChannelHandler(consumer));

        logger.trace("initChannel(): EXIT");
    }











--
View this message in context: http://camel.465427.n5.nabble.com/camel-netty4-MessageToMessageDecoder-DatagramPacket-gives-only-first-2048-octets-tp5772540.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message