camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From SteveR <>
Subject camel-netty4: MessageToMessageDecoder<DatagramPacket> gives only first 2048 octets
Date Mon, 12 Oct 2015 00:41:16 GMT
I'm using *Camel 2.15.3* and *camel-netty4*, and since upgrading from
*camel-netty3*, I'm having problems receiving full JSON messages via UDP. 
Each JSON message is about 3 to 5 kbytes, but my
*MessageToMessageDecoder<DatagramPacket>* implementation is only giving me
the first 2048 (i.e. 2k bytes).  From a test program, I send in one UDP
message, and from my debug prints within my
*MessageToMessageDecoder<DatagramPacket>* it shows that the *decode()*
method is only called once.

I'm currently reading through *Netty In Action*, but i see this in my log
file: *UnpooledUnsafeDirectByteBuf(ridx: 0, widx: 2048, cap: 2048))*

I desperately need to get this fixed in production, and just need to be able

My route looks like the below. It consumes from *netty4:udp* and produces to
a SEDA queue, just for now while testing:

	<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
	<route startupOrder="104" customId="true" id="ROUTE_ID_RAW_CQMS_EVENTS"
		<setProperty propertyName="CamelCharsetName" id="setProperty1">
		<threads poolSize="7" maxPoolSize="14"
threadName="threads_ROUTE_ID_RAW_CQMS_EVENTS" callerRunsWhenRejected="true"
			<setProperty propertyName="CamelCharsetName" id="setProperty2">

I print out the received *DatagramPacket*, which shows this: 
*UnpooledUnsafeDirectByteBuf(ridx: 0, widx: 2048, cap: 2048))*

*Here is my *MessageToMessageDecoder<DatagramPacket>* implementation:*

	package com.mission.mplr.multiprotocollistenerrouter;

	import com.vonage.mplr.utils.MiscUtils;
	import; // Represents the "binding"
between a ChannelHandler and the ChannelPipeline.
	import io.netty.handler.codec.MessageToMessageDecoder;
	import java.nio.charset.Charset;
	import java.util.List;
	import org.slf4j.Logger;        // The org.slf4j.Logger interface is the
main user entry point of SLF4J API.
	import org.slf4j.LoggerFactory; // Utility class producing Loggers for
various logging APIs, most notably for log4j.

	public class UdpDatagramDecoder extends
MessageToMessageDecoder<DatagramPacket> {
		private static final Logger logger      =
		private static final Logger errorLogger =
		private final String CHARSET_NAME;

		UdpDatagramDecoder(String charsetName) {
			this.CHARSET_NAME = charsetName;


		public boolean acceptInboundMessage(Object msg) throws Exception {
			return true;

		protected void decode(ChannelHandlerContext chc, DatagramPacket packet,
List out) throws Exception {"decode(): ENTER");"decode(): Received datagram = {}", packet);

			String packetAsString =
			if(packetAsString == null) {
				return; // Nothing to do
			} else {
			}"decode(): bodyBytesAsString[size={}] = {}",
packetAsString.length(), packetAsString);

			String bodyBytesAsHex = MiscUtils.stringAsHex(packetAsString,
CHARSET_NAME);"decode(): bodyBytesAsHex[size={}] = {}",
bodyBytesAsHex.length(), bodyBytesAsHex);"decode(): EXIT");
	// ------------- end --------------

*My server pipeline has this initChannel() implementation:*

    protected void initChannel(Channel ch) throws Exception {
        logger.trace("initChannel(): ENTER");

        ChannelPipeline channelPipeline = ch.pipeline();
        serverInvoked = true;  

        String theSourceRouteId = consumer.getRoute().getId();
        logger.debug("initChannel(): consumer = {}, theSourceRouteId = {}",
consumer.toString(), theSourceRouteId);

        // Here we add the custom UDP datagram decoder. Decoders are
        // stateful, thus we create a new instance with every pipeline.
        String udpPacketDecoderName = "CQMS_UDP_DATAGRAM_DECODER_" +
        logger.debug("initChannel(): Adding {}", udpPacketDecoderName);
        channelPipeline.addLast(udpPacketDecoderName, new

        // Default Camel ServerChannelHandler for the consumer, to allow
Camel to route the message.
        String serverChannelHandlerName = "CQMS_SERVER_CHANNEL_HANDLER_" +
        logger.debug("initChannel(): Adding {}", serverChannelHandlerName);
        channelPipeline.addLast(serverChannelHandlerName, new

        logger.trace("initChannel(): EXIT");

View this message in context:
Sent from the Camel - Users mailing list archive at

View raw message