kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ewen Cheslack-Postava <e...@confluent.io>
Subject Re: Kafka Connect key.converter and value.converter properties for Avro encoding
Date Tue, 08 Nov 2016 04:37:50 GMT
On Mon, Nov 7, 2016 at 7:14 AM, <david.franklin@bt.com> wrote:

> Hi Ewen,
>
> Sorry but I didn't understand much of that.
>
> I currently have an implementation of the Converter interface that uses
> Avro's
> BinaryEncoder/Decoder, SpecificDatumReader/Writer.
>
> The main mismatch I faced is that I need to use org.apache.avro.Schema for
> serialization whereas the Converter interface requires a
> org.apache.kafka.connect.data.Schema schema.
>

This was the main point I was trying to get at. The schemas that are used
in the Converter interface (parameter to fromConnectData, return value of
toConnectData) are Connect schemas. The values that go with those schemas
are *not* in a serialization-specific runtime format, i.e. you cannot just
use SpecificRecords. Instead they are in the format of the Connect data
API. The equivalent of Generic/SpecificRecord would be Struct (
http://docs.confluent.io/3.0.1/connect/javadocs/index.html?org/apache/kafka/connect/data/Struct.html
).

Although the only requirement of the Converter interface is that you
convert:

byte[] <-> (Connect Schema, Connect data value)

in practice many Converters will be decomposed into the following:

byte[] <-> (serialization format specific schema, serialization format
specific runtime data) <-> (Connect Schema, Connect data value)

i.e. in the case of Avro

byte[] <-> (Avro Schema, SpecificRecord) <-> (Connect Schema, Connect data
value)



>
> In the absence of a transformer to interconvert between these Schema
> representations (are any available?) I have, for now, gone for the slightly
> fragile approach of inferring the schema from the topic name (we currently
> have a topic per event type).  This means I ignore the schema parameter in
> fromConnectData and return a null schema in toConnectData.
>

Our AvroConverter has a class that does conversion of Avro <-> Connect:
https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java
This is combined with our normal Kafka de/serializers to perform the
complete conversion. Note, however, that this API is internal and not
guaranteed to be stable.


>
> With this I can create a simple Kafka consumer that correctly reads these
> binary Avro encoded events generated by my Kafka Connect source, once I've
> set the Kafka value.deserializer property to my serializer class which
> implements Deserializer<SpecificRecord>, which in turn (re)uses my Kafka
> Connect converter class internally.
>
> However, I've noticed something odd: the fromConnectData  invocations come
> in 2 forms:
>
> 1. schema = null, record = null
> 2. schema = Schema{BYTES}, record = a JSON structure


> Schema{BYTES} is, I presume, because I specify Schema.BYTES_SCHEMA as the
> 4th arg to the SourceRecord ctr.
>
> Any idea why form 1 occurs?
>

Form 1 isn't even the only form that can occur. Connect actually supports
schemaless data as well, where you would see (schema=null, record=<any
valid connect data>). (Though you will never see Structs since they require
a schema; however complex data can be represented as maps & lists.)

However, for that particular case you are almost definitely seeing this
from converting a null value in Kafka. If the value in Kafka is null,
there's no way to know what the intended schema was, so it has to be left
blank. However, since null is used in compacted topics for deletion, it is
important to actually translate null values in Connect to be true nulls in
Kafka.


>
> Thanks again,
> David
>
>
>
>
>
>
>
> -----Original Message-----
> From: Ewen Cheslack-Postava [mailto:ewen@confluent.io]
> Sent: 07 November 2016 04:35
> To: dev@kafka.apache.org
> Subject: Re: Kafka Connect key.converter and value.converter properties
> for Avro encoding
>
> You won't be accepting/returning SpecificRecords directly when working
> with Connect's API. Connect intentionally uses an interface that is
> different from Kafka serializers because we deal with structured data that
> the connectors need to be able to understand. We define a generic runtime
> representation for data (under org.apache.kafka.connect.data) and
> Converters are responsible for taking the data all the way through any
> byte[] -> serialization-specific format (e.g. SpecificRecord) -> Connect
> Data API.
>
> Even though your approach to handling Avro isn't exactly the same, I'd
> still suggest taking a look at our implementation. You'll be able to see
> how we separate this into those two steps, utilizing our normal
> Avro(De)Serializer to do byte[] <-> Avro conversions and then a separate
> class to do Avro <-> Connect Data API conversions. You could probably reuse
> the Avro <-> Connect Data API directly and only use the small bit of code
> you included for doing the byte[] <-> Avro conversion.
>
> re: configure(), yes, it's safe for it to be a noop as long as your
> Converter really doesn't require *any* configuration. But I would guess it
> at least needs to know the SpecificRecord class or schema you are trying to
> (de)serialize.
>
> -Ewen
>
> On Thu, Nov 3, 2016 at 7:25 AM, <david.franklin@bt.com> wrote:
>
> > Thanks to Gwen and Tommy Baker for their helpful replies.
> >
> > Currently, the environment I need to work with doesn't use the Schema
> > Registry; hopefully one day it will but for now that's not an option.
> > Events are written to Kafka without the schema embedded and each side
> > of the interface assumes a given schema, with the consequent risks
> accepted.
> >
> > To serialize a SpecificRecord for the
> > org.apache.kafka.connect.storage.Converter
> > interface (in the absence of access to the Confluent implementation
> > classes) I was thinking of something along these lines to Avro encode
> > a
> > SpecificRecord:
> >
> >     private byte[] toAvro(Schema schema, SpecificRecord record) throws
> > IOException{
> >         SpecificDatumWriter<SpecificRecord> writer = new
> > SpecificDatumWriter<>(schema);
> >         ByteArrayOutputStream baos = new ByteArrayOutputStream();
> >         BinaryEncoder binaryEncoder = null;
> >         binaryEncoder = new EncoderFactory().binaryEncoder(baos,
> > binaryEncoder);
> >         writer.write(record, binaryEncoder);
> >         return baos.toByteArray();
> >     }
> >
> > To work with Kafka Connect I need to comply with the
> > org.apache.kafka.connect.storage .Converter interface The Converter
> > interface defines the following methods:
> >
> > void configure(Map<String, ?> configs, boolean isKey); byte[]
> > fromConnectData(String topic, Schema schema, Object value);
> > SchemaAndValue toConnectData(String topic, byte[] value);
> >
> > Is it safe to provide a no-op implementation for configure().
> >
> > The toConnectData() method will presumably be achieved via a
> > corresponding SpecificDatumReader.
> >
> > Does this look a reasonable approach?
> >
> > Many thanks if you've read this far!
> >
> > Regards,
> > David
> >
> >
> > -----Original Message-----
> > From: Gwen Shapira [mailto:gwen@confluent.io]
> > Sent: 02 November 2016 21:18
> > To: dev@kafka.apache.org
> > Subject: Re: Kafka Connect key.converter and value.converter
> > properties for Avro encoding
> >
> > Both the Confluent Avro Converter and the Confluent Avro Serializer
> > use the Schema Registry. The reason is, as Tommy Becker mentioned
> > below, to avoid storing the entire schema in each record (which the
> > JSON serializer in Apache Kafka does). It has few other benefits schema
> validation and such.
> >
> > If you are interested in trying this approach, you will want to use
> > the Converter, since it was written specifically to integrate with
> Connect.
> > If you prefer another approach, without the Schema Registry, you can
> > write your own Converter - that's why we made them pluggable. Feel
> > free to copy ours and modify it as fits your Avro approach.
> >
> > Gwen
> >
> > On Wed, Nov 2, 2016 at 2:48 AM, <david.franklin@bt.com> wrote:
> >
> > > I am using Kafka Connect in source mode i.e. using it to send events
> > > to Kafka topics.
> > >
> > > With the key.converter and value.converter properties set to
> > > org.apache.kafka.connect.storage.StringConverter I can attach a
> > > consumer to the topics and see the events in a readable form.  This
> > > is helpful and reassuring but it is not the desired representation
> > > for my downstream consumers - these require the events to be Avro
> encoded.
> > >
> > > It seems that to write the events to Kafka Avro encoded, these
> > > properties need to be set to
> > > io.confluent.kafka.serializers.KafkaAvroSerializer.  Is this correct?
> > >
> > > I am not using the Confluent platform, merely the standard Kafka 10
> > > download, and have been unable to find out how to get at these from
> > > a Maven repository jar.
> > > http://docs.confluent.io/3.0.0/app-development.html#java
> > > suggest that these are available via:
> > >
> > >                <dependency>
> > >          <groupId>io.confluent</groupId>
> > >          <artifactId>kafka-avro-serializer</artifactId>
> > >          <version>3.0.0</version>
> > >      </dependency>
> > >
> > > But it doesn't appear to be true.  The class exists in
> > > https://raw.githubusercontent.com/confluentinc/schema-
> > > registry/master/avro-converter/src/main/java/io/confluent/connect/av
> > > ro / AvroConverter.java but this seems to use the Schema Registry
> > > which is something I'd rather avoid.
> > >
> > > I'd be grateful for any pointers on the simplest way of getting Avro
> > > encoded events written to Kafka from a Kafka Connect source
> > connector/task.
> > >
> > > Also in the task which creates SourceRecords, I'm choosing
> > > Schema.BYTES_SCHEMA for the 4th arg in the constructor.  But I'm not
> > > clear what this achieves - some light shed on that would also be
> helpful.
> > >
> > > Many thanks,
> > > David
> > >
> >
> >
> >
> > --
> > *Gwen Shapira*
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter <https://twitter.com/ConfluentInc> | blog <
> > http://www.confluent.io/blog>
> >
>
>
>
> --
> Thanks,
> Ewen
>



-- 
Thanks,
Ewen

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message