kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <david.frank...@bt.com>
Subject RE: Kafka Connect key.converter and value.converter properties for Avro encoding
Date Tue, 08 Nov 2016 12:11:23 GMT
Hi Ewen,

Thanks for the additional insight.

Because I have no Connect schema (only an Avro schema) am I safe to just use the byte[] <->
(Avro Schema, SpecificRecord) conversion?  This seems to work with the, admittedly limited,
testing I've done so far.  Without converting my Avro schema into a Connect schema I don't
see that I have any other option.

I'm a bit puzzled and concerned about the Kafka null values.  Am I unwittingly generating
them or do they arise from the Kafka framework?  You mentioned compaction but I thought that
was an purely internal operation on the commit log that didn't give rise to events.  I've
never encountered null events in my previous experience with Kafka.

Regards,
David

-----Original Message-----
From: Ewen Cheslack-Postava [mailto:ewen@confluent.io] 
Sent: 08 November 2016 04:38
To: dev@kafka.apache.org
Subject: Re: Kafka Connect key.converter and value.converter properties for Avro encoding

On Mon, Nov 7, 2016 at 7:14 AM, <david.franklin@bt.com> wrote:

> Hi Ewen,
>
> Sorry but I didn't understand much of that.
>
> I currently have an implementation of the Converter interface that 
> uses Avro's BinaryEncoder/Decoder, SpecificDatumReader/Writer.
>
> The main mismatch I faced is that I need to use org.apache.avro.Schema 
> for serialization whereas the Converter interface requires a 
> org.apache.kafka.connect.data.Schema schema.
>

This was the main point I was trying to get at. The schemas that are used in the Converter
interface (parameter to fromConnectData, return value of
toConnectData) are Connect schemas. The values that go with those schemas are *not* in a serialization-specific
runtime format, i.e. you cannot just use SpecificRecords. Instead they are in the format of
the Connect data API. The equivalent of Generic/SpecificRecord would be Struct ( http://docs.confluent.io/3.0.1/connect/javadocs/index.html?org/apache/kafka/connect/data/Struct.html
).

Although the only requirement of the Converter interface is that you
convert:

byte[] <-> (Connect Schema, Connect data value)

in practice many Converters will be decomposed into the following:

byte[] <-> (serialization format specific schema, serialization format specific runtime
data) <-> (Connect Schema, Connect data value)

i.e. in the case of Avro

byte[] <-> (Avro Schema, SpecificRecord) <-> (Connect Schema, Connect data
value)



>
> In the absence of a transformer to interconvert between these Schema 
> representations (are any available?) I have, for now, gone for the 
> slightly fragile approach of inferring the schema from the topic name 
> (we currently have a topic per event type).  This means I ignore the 
> schema parameter in fromConnectData and return a null schema in toConnectData.
>

Our AvroConverter has a class that does conversion of Avro <-> Connect:
https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java
This is combined with our normal Kafka de/serializers to perform the complete conversion.
Note, however, that this API is internal and not guaranteed to be stable.


>
> With this I can create a simple Kafka consumer that correctly reads 
> these binary Avro encoded events generated by my Kafka Connect source, 
> once I've set the Kafka value.deserializer property to my serializer 
> class which implements Deserializer<SpecificRecord>, which in turn 
> (re)uses my Kafka Connect converter class internally.
>
> However, I've noticed something odd: the fromConnectData  invocations 
> come in 2 forms:
>
> 1. schema = null, record = null
> 2. schema = Schema{BYTES}, record = a JSON structure


> Schema{BYTES} is, I presume, because I specify Schema.BYTES_SCHEMA as 
> the 4th arg to the SourceRecord ctr.
>
> Any idea why form 1 occurs?
>

Form 1 isn't even the only form that can occur. Connect actually supports schemaless data
as well, where you would see (schema=null, record=<any valid connect data>). (Though
you will never see Structs since they require a schema; however complex data can be represented
as maps & lists.)

However, for that particular case you are almost definitely seeing this from converting a
null value in Kafka. If the value in Kafka is null, there's no way to know what the intended
schema was, so it has to be left blank. However, since null is used in compacted topics for
deletion, it is important to actually translate null values in Connect to be true nulls in
Kafka.


>
> Thanks again,
> David
>
>
>
>
>
>
>
> -----Original Message-----
> From: Ewen Cheslack-Postava [mailto:ewen@confluent.io]
> Sent: 07 November 2016 04:35
> To: dev@kafka.apache.org
> Subject: Re: Kafka Connect key.converter and value.converter 
> properties for Avro encoding
>
> You won't be accepting/returning SpecificRecords directly when working 
> with Connect's API. Connect intentionally uses an interface that is 
> different from Kafka serializers because we deal with structured data 
> that the connectors need to be able to understand. We define a generic 
> runtime representation for data (under org.apache.kafka.connect.data) 
> and Converters are responsible for taking the data all the way through 
> any byte[] -> serialization-specific format (e.g. SpecificRecord) -> 
> Connect Data API.
>
> Even though your approach to handling Avro isn't exactly the same, I'd 
> still suggest taking a look at our implementation. You'll be able to 
> see how we separate this into those two steps, utilizing our normal 
> Avro(De)Serializer to do byte[] <-> Avro conversions and then a 
> separate class to do Avro <-> Connect Data API conversions. You could 
> probably reuse the Avro <-> Connect Data API directly and only use the 
> small bit of code you included for doing the byte[] <-> Avro conversion.
>
> re: configure(), yes, it's safe for it to be a noop as long as your 
> Converter really doesn't require *any* configuration. But I would 
> guess it at least needs to know the SpecificRecord class or schema you 
> are trying to (de)serialize.
>
> -Ewen
>
> On Thu, Nov 3, 2016 at 7:25 AM, <david.franklin@bt.com> wrote:
>
> > Thanks to Gwen and Tommy Baker for their helpful replies.
> >
> > Currently, the environment I need to work with doesn't use the 
> > Schema Registry; hopefully one day it will but for now that's not an option.
> > Events are written to Kafka without the schema embedded and each 
> > side of the interface assumes a given schema, with the consequent 
> > risks
> accepted.
> >
> > To serialize a SpecificRecord for the 
> > org.apache.kafka.connect.storage.Converter
> > interface (in the absence of access to the Confluent implementation
> > classes) I was thinking of something along these lines to Avro 
> > encode a
> > SpecificRecord:
> >
> >     private byte[] toAvro(Schema schema, SpecificRecord record) 
> > throws IOException{
> >         SpecificDatumWriter<SpecificRecord> writer = new 
> > SpecificDatumWriter<>(schema);
> >         ByteArrayOutputStream baos = new ByteArrayOutputStream();
> >         BinaryEncoder binaryEncoder = null;
> >         binaryEncoder = new EncoderFactory().binaryEncoder(baos,
> > binaryEncoder);
> >         writer.write(record, binaryEncoder);
> >         return baos.toByteArray();
> >     }
> >
> > To work with Kafka Connect I need to comply with the 
> > org.apache.kafka.connect.storage .Converter interface The Converter 
> > interface defines the following methods:
> >
> > void configure(Map<String, ?> configs, boolean isKey); byte[] 
> > fromConnectData(String topic, Schema schema, Object value); 
> > SchemaAndValue toConnectData(String topic, byte[] value);
> >
> > Is it safe to provide a no-op implementation for configure().
> >
> > The toConnectData() method will presumably be achieved via a 
> > corresponding SpecificDatumReader.
> >
> > Does this look a reasonable approach?
> >
> > Many thanks if you've read this far!
> >
> > Regards,
> > David
> >
> >
> > -----Original Message-----
> > From: Gwen Shapira [mailto:gwen@confluent.io]
> > Sent: 02 November 2016 21:18
> > To: dev@kafka.apache.org
> > Subject: Re: Kafka Connect key.converter and value.converter 
> > properties for Avro encoding
> >
> > Both the Confluent Avro Converter and the Confluent Avro Serializer 
> > use the Schema Registry. The reason is, as Tommy Becker mentioned 
> > below, to avoid storing the entire schema in each record (which the 
> > JSON serializer in Apache Kafka does). It has few other benefits 
> > schema
> validation and such.
> >
> > If you are interested in trying this approach, you will want to use 
> > the Converter, since it was written specifically to integrate with
> Connect.
> > If you prefer another approach, without the Schema Registry, you can 
> > write your own Converter - that's why we made them pluggable. Feel 
> > free to copy ours and modify it as fits your Avro approach.
> >
> > Gwen
> >
> > On Wed, Nov 2, 2016 at 2:48 AM, <david.franklin@bt.com> wrote:
> >
> > > I am using Kafka Connect in source mode i.e. using it to send 
> > > events to Kafka topics.
> > >
> > > With the key.converter and value.converter properties set to 
> > > org.apache.kafka.connect.storage.StringConverter I can attach a 
> > > consumer to the topics and see the events in a readable form.  
> > > This is helpful and reassuring but it is not the desired 
> > > representation for my downstream consumers - these require the 
> > > events to be Avro
> encoded.
> > >
> > > It seems that to write the events to Kafka Avro encoded, these 
> > > properties need to be set to 
> > > io.confluent.kafka.serializers.KafkaAvroSerializer.  Is this correct?
> > >
> > > I am not using the Confluent platform, merely the standard Kafka 
> > > 10 download, and have been unable to find out how to get at these 
> > > from a Maven repository jar.
> > > http://docs.confluent.io/3.0.0/app-development.html#java
> > > suggest that these are available via:
> > >
> > >                <dependency>
> > >          <groupId>io.confluent</groupId>
> > >          <artifactId>kafka-avro-serializer</artifactId>
> > >          <version>3.0.0</version>
> > >      </dependency>
> > >
> > > But it doesn't appear to be true.  The class exists in
> > > https://raw.githubusercontent.com/confluentinc/schema-
> > > registry/master/avro-converter/src/main/java/io/confluent/connect/
> > > av ro / AvroConverter.java but this seems to use the Schema 
> > > Registry which is something I'd rather avoid.
> > >
> > > I'd be grateful for any pointers on the simplest way of getting 
> > > Avro encoded events written to Kafka from a Kafka Connect source
> > connector/task.
> > >
> > > Also in the task which creates SourceRecords, I'm choosing 
> > > Schema.BYTES_SCHEMA for the 4th arg in the constructor.  But I'm 
> > > not clear what this achieves - some light shed on that would also 
> > > be
> helpful.
> > >
> > > Many thanks,
> > > David
> > >
> >
> >
> >
> > --
> > *Gwen Shapira*
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter <https://twitter.com/ConfluentInc> | blog < 
> > http://www.confluent.io/blog>
> >
>
>
>
> --
> Thanks,
> Ewen
>



--
Thanks,
Ewen
Mime
View raw message