beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raghu Angadi <rang...@google.com>
Subject Re: KafkaIO and Avro
Date Thu, 19 Oct 2017 17:54:13 GMT
Thanks Tim.

How about extending KafkaAvroDeserializer rather
than AbstractKafkaAvroDeserializer?

TypedKafkaAvroDeserializer class below is useful, but not directly usable
by the yet. It needs to store the actual type in Kafka consumer config to
retrieve at run time.
Even without storing the class, it is still useful. It simplifies user code:

public class EnvelopeKafkaAvroDeserializer extends
TypedKafkaAvroDeserializer<Envelope> {}

This should be part of same package as KafkaAvroDeserializer (surprised it
is not there yet).

On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <timrobertson100@gmail.com>
wrote:

> Happy to hear
>
> I wonder if we could do something like this (totally untested):
>
> public class TypedKafkaAvroDeserializer<T> extends
> AbstractKafkaAvroDeserializer implements Deserializer<T> {
>    @Override
>     public T deserialize(String s, byte[] bytes) {
>         return (T) this.deserialize(bytes);
>     }
> }
>
> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
> andrew+beam@andrew-jones.com> wrote:
>
>> Thanks Tim, that works!
>>
>> Full code is:
>>
>> public class EnvelopeKafkaAvroDeserializer extends
>> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {
>>     @Override
>>     public void configure(Map<String, ?> configs, boolean isKey) {
>>         configure(new KafkaAvroDeserializerConfig(configs));
>>     }
>>
>>     @Override
>>     public Envelope deserialize(String s, byte[] bytes) {
>>         return (Envelope) this.deserialize(bytes);
>>     }
>>
>>     @Override
>>     public void close() {}
>> }
>>
>> Nicer than my solution so think that is the one I'm going to go with for
>> now.
>>
>> Thanks,
>> Andrew
>>
>>
>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>
>> Hi Andrew,
>>
>> I also saw the same behaviour.
>>
>> It's not pretty but perhaps try this? It was my last idea I ran out of
>> time to try...
>>
>>
>> *// Basically a copy KafkaAvroDeserializer with the casts in deserialize**public
class *EnvelopeAvroDeserializer *extends *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope>
{
>>
>>   ...
>>
>>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>>
>>     *return *(Envelope) *this*.deserialize(bytes);
>>
>>   }
>>
>>
>>
>>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema readerSchema) {
>>
>>     *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>>
>>   }
>>
>>
>>
>>   ...
>>
>> }
>>
>>  Tim
>>
>>
>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
>> andrew+beam@andrew-jones.com> wrote:
>>
>>
>> Using Object doesn't work unfortunately. I get an 'Unable to
>> automatically infer a Coder' error at runtime.
>>
>> This is the code:
>>
>> p.apply(KafkaIO.<String, Object>read()
>>                 .withValueDeserializer(KafkaAvroDeserializer.class)
>>
>> It compiles, but at runtime:
>>
>> Caused by: java.lang.RuntimeException: Unable to automatically infer a
>> Coder for the Kafka Deserializer class io.confluent.kafka.serializers.KafkaAvroDeserializer:
>> no coder registered for type class java.lang.Object
>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>>
>> So far the only thing I've got working is this, where I use the
>> ByteArrayDeserializer and then parse Avro myself:
>>
>>     private static KafkaAvroDecoder avroDecoder;
>>     static {
>>         final Properties props = new Properties();
>>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
>>         props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> "http://registry:8081");
>>         props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>> true);
>>         VerifiableProperties vProps = new VerifiableProperties(props);
>>         avroDecoder = new KafkaAvroDecoder(vProps);
>>     }
>>
>>     public static void main(String[] args) {
>>
>>         PipelineOptions options = PipelineOptionsFactory.create();
>>         Pipeline p = Pipeline.create(options);
>>
>>         p.apply(KafkaIO.<byte[], byte[]>read()
>>                 .withBootstrapServers("kafka:9092")
>>                 .withTopic("dbserver1.inventory.customers")
>>                 .withKeyDeserializer(ByteArrayDeserializer.class)
>>                 .withValueDeserializer(ByteArrayDeserializer.class)
>>                 .withoutMetadata(
>>         )
>>                 .apply(Values.<byte[]>create())
>>                 .apply("ParseAvro", ParDo.of(new DoFn<byte[], Envelope>()
>> {
>>                     @ProcessElement
>>                     public void processElement(ProcessContext c) {
>>                         Envelope data = (Envelope)
>> avroDecoder.fromBytes(c.element());
>>                         c.output(data);
>>                     }
>>                 }))
>>
>> Thanks,
>> Andrew
>>
>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>
>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <kirpichov@google.com>
>> wrote:
>>
>> It seems that KafkaAvroDeserializer implements Deserializer<Object>,
>> though I suppose with proper configuration that Object will at run-time be
>> your desired type. Have you tried adding some Java type casts to make it
>> compile?
>>
>>
>> +1, cast might be the simplest fix. Alternately you can wrap or
>> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object
>> returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime.
>>
>>
>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <timrobertson100@gmail.com>
>> wrote:
>>
>> I just tried quickly and see the same as you Andrew.
>> We're missing something obvious or else extending KafkaAvroDeserializer seems
>> necessary right?
>>
>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
>> andrew+beam@andrew-jones.com> wrote:
>>
>> Hi,
>>
>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
>> it should be as simple as:
>>
>> p.apply(KafkaIO.<String, Envelope>*read*()
>>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>>   AvroCoder.of(Envelope.class))
>>
>> Where Envelope is the name of the Avro class. However, that does not
>> compile and I get the following error:
>>
>> incompatible types:
>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer>
>> cannot be converted to java.lang.Class<? extends
>> org.apache.kafka.common.serialization.Deserializer<dbserver1
>> .inventory.customers.Envelope>>
>>
>> I've tried a number of variations on this theme but haven't yet worked
>> it out and am starting to run out of ideas...
>>
>> Has anyone successfully read Avro data from Kafka?
>>
>> The code I'm using can be found at
>> https://github.com/andrewrjones/debezium-kafka-beam-example and a full
>> environment can be created with Docker.
>>
>> Thanks,
>> Andrew
>>
>>
>>
>>
>

Mime
View raw message