incubator-kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Help with encoding issue.
Date Wed, 04 Apr 2012 01:18:49 GMT
I think the issue is that you used the wrong API of Message. A client
should only use this(bytes: Array[Byte]), instead of this(buffer:
ByteBuffer). The latter is for internal use only and includes kafka
metadata in it. We probably should restrict the visibility of the latter
api.

Thanks,

Jun

On Tue, Apr 3, 2012 at 3:22 PM, Patricio Echagüe <patricioe@gmail.com>wrote:

> Hi, I noticed that String Serializer somehow doesn't do well encoding
> special characters such as "ü".
>
> I tried to create a ByteBufferEncoder this way:
>
> import java.nio.ByteBuffer;
>
> import kafka.message.Message;
>
> import kafka.serializer.Encoder;
>
>
> public class ByteBufferEncoder implements Encoder<ByteBuffer> {
>
>   public Message toMessage(ByteBuffer buffer) {
>
>     return new Message(buffer);
>
>   }
>
> }
>
>
> but I get this exception [1]
>
>
> Could you guys please advice on how to fix my encoding issue?
>
> Thanks
>
>
> [1]
>
> Exception in thread "main" java.lang.RuntimeException: Invalid magic byte
> 34
>
> at kafka.message.Message.compressionCodec(Message.scala:144)
>
> at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(
> ByteBufferMessageSet.scala:112)
>
> at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
> ByteBufferMessageSet.scala:138)
>
> at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
> ByteBufferMessageSet.scala:82)
>
> at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>
> at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize(
> SyncProducer.scala:139)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>
> at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(
> ProducerPool.scala:116)
>
> at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>
> at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:57)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>
> at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
>
> at kafka.producer.Producer.zkSend(Producer.scala:143)
>
> at kafka.producer.Producer.send(Producer.scala:105)
>
> at kafka.javaapi.producer.Producer.send(Producer.scala:104)
>
> at com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message