apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raja.Aravapalli <Raja.Aravapa...@target.com>
Subject Re: avrò deserialization fails when using kafka
Date Mon, 06 Jun 2016 18:54:34 GMT

After making the variable transient, it worked fine.

Raja.

From: "Raja.Aravapalli" <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
Date: Monday, June 6, 2016 at 1:52 PM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: avrò deserialization fails when using kafka


Thanks a lot Thomas & Ramanath.

Your suggestions helped!! My issue is fixed. Thank you.


Regards,
Raja.

From: Thomas Weise <thomas.weise@gmail.com<mailto:thomas.weise@gmail.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Monday, June 6, 2016 at 12:21 PM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: avrò deserialization fails when using kafka

Since you are creating the decoder in setup(), please mark the property transient. No need
to checkpoint it.

Thanks,
Thomas


On Mon, Jun 6, 2016 at 10:06 AM, Munagala Ramanath <ram@datatorrent.com<mailto:ram@datatorrent.com>>
wrote:
http://docs.datatorrent.com/troubleshooting/#application-throwing-following-kryo-exception

Please try the suggestions at the above link.

It appears from
  https://github.com/uber/confluent-schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDecoder.java
that the class does not have a default constructor.

Ram

On Mon, Jun 6, 2016 at 9:40 AM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

Hi,

I am trying to read data from kafka, and my input in kafka is avro messages.

So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records from kafka..
And in the next operator I am reading input as "byte[]” and deserializing the message!!

But the tuple deserialization is failing with below error in the log…

Can someone pls share your thoughts and help me fix this?



Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg
constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
Serialization trace:
decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
        at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)



Code FYR:


Application.java file:

public void populateDAG(DAG dag, Configuration conf)
{
  //KafkaSinglePortStringInputOperator kafkaInput =  dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);

  KafkaSinglePortByteArrayInputOperator kafkaInput =  dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());

  AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(“schemaRegURL"));

  HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);

  //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
  dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input);
  dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);

}


Operator Code:

public class AvroBytesConversionOperator extends BaseOperator {

    private String schemaRegURL;
    private KafkaAvroDecoder decoder;

    public AvroBytesConversionOperator(){

    }

    public AvroBytesConversionOperator(String schemaRegURL){
        this.schemaRegURL = schemaRegURL;
    }

    /**
     * Defines Input Port - DefaultInputPort
     * Accepts data from the upstream operator
     * Type byte[]
     */
    public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>()
{
        @Override
        public void process(byte[] tuple)
        {
            processTuple(tuple);
        }
    };


    /**
     * Defines Output Port - DefaultOutputPort
     * Sends data to the down stream operator which can consume this data
     * Type String
     */
    public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();


    /**
     * Setup call
     */
    @Override
    public void setup(OperatorContext context)
    {
        Properties props = new Properties();
        props.setProperty("schema.registry.url", this.schemaRegURL);
        this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
    }

    /**
     * Begin window call for the operator.
     * @param windowId
     */
    public void beginWindow(long windowId)
    {

    }

    /**
     * Defines what should be done with each incoming tuple
     */
    protected void processTuple(byte[] tuple)
    {
        GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
        output.emit(record.toString());
    }

    /**
     * End window call for the operator
     * If sending per window, emit the updated counts here.
     */
    @Override
    public void endWindow()
    {

    }

}


Mime
View raw message