apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raja.Aravapalli <Raja.Aravapa...@target.com>
Subject Re: avrò deserialization fails when using kafka
Date Mon, 06 Jun 2016 18:52:14 GMT

Thanks a lot Thomas & Ramanath.

Your suggestions helped!! My issue is fixed. Thank you.


From: Thomas Weise <thomas.weise@gmail.com<mailto:thomas.weise@gmail.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Monday, June 6, 2016 at 12:21 PM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: avrò deserialization fails when using kafka

Since you are creating the decoder in setup(), please mark the property transient. No need
to checkpoint it.


On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <ram@datatorrent.com<mailto:ram@datatorrent.com>>

Please try the suggestions at the above link.

It appears from
that the class does not have a default constructor.


On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>


I am trying to read data from kafka, and my input in kafka is avro messages.

So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records from kafka..
And in the next operator I am reading input as "byte[]” and deserializing the message!!

But the tuple deserialization is failing with below error in the log…

Can someone pls share your thoughts and help me fix this?

Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg
constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
Serialization trace:
decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
        at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)

Code FYR:

Application.java file:

public void populateDAG(DAG dag, Configuration conf)
  //KafkaSinglePortStringInputOperator kafkaInput =  dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);

  KafkaSinglePortByteArrayInputOperator kafkaInput =  dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());

  AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(“schemaRegURL"));

  HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);

  //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
  dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
  dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);


Operator Code:

public class AvroBytesConversionOperator extends BaseOperator {

    private String schemaRegURL;
    private KafkaAvroDecoder decoder;

    public AvroBytesConversionOperator(){


    public AvroBytesConversionOperator(String schemaRegURL){
        this.schemaRegURL = schemaRegURL;

     * Defines Input Port - DefaultInputPort
     * Accepts data from the upstream operator
     * Type byte[]
    public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>()
        public void process(byte[] tuple)

     * Defines Output Port - DefaultOutputPort
     * Sends data to the down stream operator which can consume this data
     * Type String
    public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();

     * Setup call
    public void setup(OperatorContext context)
        Properties props = new Properties();
        props.setProperty("schema.registry.url", this.schemaRegURL);
        this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));

     * Begin window call for the operator.
     * @param windowId
    public void beginWindow(long windowId)


     * Defines what should be done with each incoming tuple
    protected void processTuple(byte[] tuple)
        GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);

     * End window call for the operator
     * If sending per window, emit the updated counts here.
    public void endWindow()



View raw message