kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael André Pearce <michael.andre.pea...@me.com>
Subject Re: [DISCUSS] KIP 145 - Expose Record Headers in Kafka Connect
Date Wed, 13 Dec 2017 05:36:37 GMT
*concert = convert 

Sent from my iPhone

> On 13 Dec 2017, at 05:35, Michael André Pearce <michael.andre.pearce@me.com> wrote:
> 
> Hi Randall
> 
> What’s the main difference between this and my earlier alternative option PR
> https://github.com/apache/kafka/pull/2942/files
> 
> If none then +1.
> From what I can tell the only difference I make is the headers you support being able
to cross convert primitive types eg if value after conversion is integer you can still ask
for float and it will type concert if possible.
> 
> Cheers
> Mike
> 
> 
> Sent from my iPhone
> 
>> On 13 Dec 2017, at 01:36, Randall Hauch <rhauch@gmail.com> wrote:
>> 
>> Trying to revive this after several months of inactivity....
>> 
>> I've spent quite a bit of time evaluating the current KIP-145 proposal and
>> several of the suggested PRs. The original KIP-145 proposal is relatively
>> minimalist (which is very nice), and it adopts Kafka's approach to headers
>> where header keys are strings and header values are byte arrays. IMO, this
>> places too much responsibility on the connector developers to know how to
>> serialize and deserialize, which means that it's going to be difficult to
>> assemble into pipelines connectors and stream processors that make
>> different, incompatible assumptions. It also makes Connect headers very
>> different than Connect's keys and values, which are generally structured
>> and describable with Connect schemas. I think we need Connect headers to do
>> more.
>> 
>> The other proposals attempt to do more, but even my first proposal doesn't
>> seem to really provide a solution that works for Connect users and
>> connector developers. After looking at this feature from a variety of
>> perspectives over several months, I now assert that Connect must solve two
>> orthogonal problems:
>> 
>> 1) Serialization: How different data types are (de)serialized as header
>> values
>> 2) Conversion: How values of one data type are converted to values of
>> another data type
>> 
>> For the serialization problem, Ewen suggested quite a while back that we
>> use something akin to `Converter` for header values. Unfortunately we can't
>> directly reuse `Converters` since the method signatures don't allow us to
>> supply the header name and the topic name, but we could define a
>> `HeaderConverter` that is similar to and compatible with `Converter` such
>> that a single class could implement both. This would align Connector
>> headers with how message keys and values are handled. Each connector could
>> define which converter it wants to use; for backward compatibility purposes
>> we use a header converter by default that serialize values to strings. If
>> you want something other than this default, you'd have to specify the
>> header converter options as part of the connector configuration; this
>> proposal changes the `StringConverter`, `ByteArrayConverter`, and
>> `JsonConverter` to all implement `HeaderConverter`, so these are all
>> options. This approach supposes that a connector will serialize all of its
>> headers in the same way -- with string-like representations by default. I
>> think this is a safe assumption for the short term, and if we need more
>> control to (de)serialize named headers differently for the same connector,
>> we can always implement a different `HeaderConverter` that gives users more
>> control.
>> 
>> So that would solve the serialization problem. How about connectors and
>> transforms that are implemented to expect a certain type of header value,
>> such as an integer or boolean or timestamp? We could solve this problem
>> (for the most part) by adding methods to the `Header` interface to get the
>> value in the desired type, and to support all of the sensible conversions
>> between Connect's primitives and logical types. So, a connector or
>> transform could always call `header.valueAsObject()` to get the raw
>> representation from the converter, but a connector or transform could also
>> get the string representation by calling `header.valueAsString()`, or the
>> INT64 representation by calling `header.valueAsLong()`, etc. We could even
>> have converting methods for the built-in logical types (e.g.,
>> `header.valueAsTimestamp()` to return a java.util.Date value that is
>> described by Connect's Timestamp logical type). We can convert between most
>> primitive and logical types (e.g., anything to a STRING, INT32 to FLOAT32,
>> etc.), but there are a few that don't make sense (e.g., ARRAY to FLOAT32,
>> INT32 to STRUCT, BYTE_ARRAY to anything, etc.), so these can throw a
>> `DataException`.
>> 
>> I've refined this approach over the last few months, and have a PR for a
>> complete prototype that demonstrates these concepts and techniques:
>> https://github.com/apache/kafka/pull/4319
>> 
>> This PR does *not* update the documentation, though I can add that if we
>> approve of this approach. And, we probably want to define (at least on the
>> KIP) some relatively obvious SMTs for copying header values into record
>> key/value fields, and extracting record key/value fields into header values.
>> 
>> @Michael, would you mind if I edited KIP-145 to reflect this proposal? I
>> would be happy to keep the existing proposal at the end of the document (or
>> remove it if you prefer, since it's already in the page history), and we
>> can revise as we choose a direction.
>> 
>> Comments? Thoughts?
>> 
>> Best regards,
>> 
>> Randall
>> 
>> 
>> On Thu, Oct 19, 2017 at 2:10 PM, Michael André Pearce <
>> michael.andre.pearce@me.com> wrote:
>> 
>>> @rhauch
>>> 
>>> Here is the previous discussion thread, just reigniting so we can discuss
>>> against the original kip thread
>>> 
>>> 
>>> Cheers
>>> 
>>> Mike
>>> 
>>> Sent from my iPhone
>>> 
>>>> On 5 May 2017, at 02:21, Michael Pearce <Michael.Pearce@ig.com> wrote:
>>>> 
>>>> Hi Ewen,
>>>> 
>>>> Did you get a chance to look at the updated sample showing the idea?
>>>> 
>>>> Did it help?
>>>> 
>>>> Cheers
>>>> Mike
>>>> 
>>>> Sent using OWA for iPhone
>>>> ________________________________________
>>>> From: Michael Pearce <Michael.Pearce@ig.com>
>>>> Sent: Wednesday, May 3, 2017 10:11:55 AM
>>>> To: dev@kafka.apache.org
>>>> Subject: Re: [DISCUSS] KIP 145 - Expose Record Headers in Kafka Connect
>>>> 
>>>> Hi Ewen,
>>>> 
>>>> As code I think helps, as I don’t think I explained what I meant very
>>> well.
>>>> 
>>>> I have pushed what I was thinking to the branch/pr.
>>>> https://github.com/apache/kafka/pull/2942
>>>> 
>>>> The key bits added on top here are:
>>>> new ConnectHeader that holds the header key (as string) and then header
>>> value object header value schema
>>>> 
>>>> new SubjectConverter which allows exposing a subject, in this case the
>>> subject is the key. - this can be used to register the header type in repos
>>> like schema registry, or in my case below in a property file.
>>>> 
>>>> 
>>>> We can default the subject converter to String based of Byte based where
>>> all header values are treated safely as String or byte[] type.
>>>> 
>>>> But this way you could add in your own converter which could be more
>>> sophisticated and convert the header based on the key.
>>>> 
>>>> The main part is to have access to the key, so you can look up the
>>> header value type, based on the key from somewhere, aka a properties file,
>>> or some central repo (aka schema repo), where the repo subject could be the
>>> topic + key, or just key if key type is global, and the schema could be
>>> primitive, String, byte[] or even can be more elaborate.
>>>> 
>>>> Cheers
>>>> Mike
>>>> 
>>>> On 03/05/2017, 06:00, "Ewen Cheslack-Postava" <ewen@confluent.io> wrote:
>>>> 
>>>>  Michael,
>>>> 
>>>>  Aren't JMS headers an example where the variety is a problem? Unless
>>> I'm
>>>>  misunderstanding, there's not even a fixed serialization format
>>> expected
>>>>  for them since JMS defines the runtime types, not the wire format. For
>>>>  example, we have JMSCorrelationID (String), JMSExpires (Long), and
>>>>  JMSReplyTo (Destination). These are simply run time types, so we'd
>>> need
>>>>  either (a) a different serializer/deserializer for each or (b) a
>>>>  serializer/deserializer that can handle all of them (e.g. Avro, JSON,
>>> etc).
>>>> 
>>>>  What is the actual serialized format of the different fields? And if
>>> it's
>>>>  not specified anywhere in the KIP, why should using the well-known
>>> type for
>>>>  the header key (e.g. use StringSerializer, IntSerializer, etc) be
>>> better or
>>>>  worse than using a general serialization format (e.g. Avro, JSON)?
>>> And if
>>>>  the latter is the choice, how do you decide on the format?
>>>> 
>>>>  -Ewen
>>>> 
>>>>  On Tue, May 2, 2017 at 12:48 PM, Michael André Pearce <
>>>>  michael.andre.pearce@me.com> wrote:
>>>> 
>>>>> Hi Ewan,
>>>>> 
>>>>> So on the point of JMS the predefined/standardised JMS and JMSX headers
>>>>> have predefined types. So these can be serialised/deserialised
>>> accordingly.
>>>>> 
>>>>> Custom jms headers agreed could be a bit more difficult but on the 80/20
>>>>> rule I would agree mostly they're string values and as anyhow you can
>>> hold
>>>>> bytes as a string it wouldn't cause any issue, defaulting to that.
>>>>> 
>>>>> But I think easily we maybe able to do one better.
>>>>> 
>>>>> Obviously can override the/config the headers converter but we can
>>> supply
>>>>> a default converter could take a config file with key to type mapping?
>>>>> 
>>>>> Allowing people to maybe define/declare a header key with the expected
>>>>> type in some property file? To support string, byte[] and primitives?
>>> And
>>>>> undefined headers just either default to String or byte[]
>>>>> 
>>>>> We could also pre define known headers like the jms ones mentioned
>>> above.
>>>>> 
>>>>> E.g
>>>>> 
>>>>> AwesomeHeader1=boolean
>>>>> AwesomeHeader2=long
>>>>> JMSCorrelationId=String
>>>>> JMSXGroupId=String
>>>>> 
>>>>> 
>>>>> What you think?
>>>>> 
>>>>> 
>>>>> Cheers
>>>>> Mike
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> Sent from my iPhone
>>>>> 
>>>>>> On 2 May 2017, at 18:45, Ewen Cheslack-Postava <ewen@confluent.io>
>>>>> wrote:
>>>>>> 
>>>>>> A couple of thoughts:
>>>>>> 
>>>>>> First, agreed that we definitely want to expose header functionality.
>>>>> Thank
>>>>>> you Mike for starting the conversation! Even if Connect doesn't do
>>>>> anything
>>>>>> special with it, there's value in being able to access/set headers.
>>>>>> 
>>>>>> On motivation -- I think there are much broader use cases. When
>>> thinking
>>>>>> about exposing headers, I'd actually use Replicator as only a minor
>>>>>> supporting case. The reason is that it is a very uncommon case where
>>>>> there
>>>>>> is zero impedance mismatch between the source and sink of the data
>>> since
>>>>>> they are both Kafka. This means you don't need to think much about
data
>>>>>> formats/serialization. I think the JMS use case is a better example
>>> since
>>>>>> JMS headers and Kafka headers don't quite match up. Here's a quick
list
>>>>> of
>>>>>> use cases I can think of off the top of my head:
>>>>>> 
>>>>>> 1. Include headers from other systems that support them: JMS (or
really
>>>>> any
>>>>>> MQ), HTTP
>>>>>> 2. Other connector-specific headers. For example, from JDBC maybe
the
>>>>> table
>>>>>> the data comes from is a header; for a CDC connector you might include
>>>>> the
>>>>>> binlog offset as a header.
>>>>>> 3. Interceptor/SMT-style use cases for annotating things like
>>> provenance
>>>>> of
>>>>>> data:
>>>>>> 3a. Generically w/ user-supplied data like data center, host, app
ID,
>>>>> etc.
>>>>>> 3b. Kafka Connect framework level info, such as the connector/task
>>>>>> generating the data
>>>>>> 
>>>>>> On deviation from Connect's model -- to be honest, the KIP-82 also
>>>>> deviates
>>>>>> quite substantially from how Kafka handles data already, so we may
>>>>> struggle
>>>>>> a bit to rectify the two. (In particular, headers specify some
>>> structure
>>>>>> and enforce strings specifically for header keys, but then require
you
>>> to
>>>>>> do serialization of header values yourself...).
>>>>>> 
>>>>>> I think the use cases I mentioned above may also need different
>>>>> approaches
>>>>>> to how the data in headers are handled. As Gwen mentions, if we expose
>>>>> the
>>>>>> headers to Connectors, they need to have some idea of the format
and
>>> the
>>>>>> reason for byte[] values in KIP-82 is to leave that decision up to
the
>>>>>> organization using them. But without knowing the format, connectors
>>> can't
>>>>>> really do anything with them -- if a source connector assumes a format,
>>>>>> they may generate data incompatible with the format used by the rest
of
>>>>> the
>>>>>> organization. On the other hand, I have a feeling most people will
just
>>>>> use
>>>>>> <String, String> headers, so allowing connectors to embed arbitrarily
>>>>>> complex data may not work out well in practice. Or maybe we leave
it
>>>>>> flexible, most people default to using StringConverter for the
>>> serializer
>>>>>> and Connectors will end up defaulting to that just for compatibility...
>>>>>> 
>>>>>> I'm not sure I have a real proposal yet, but I do think understanding
>>> the
>>>>>> impact of using a Converter for headers would be useful, and we might
>>>>> want
>>>>>> to think about how this KIP would fit in with transformations (or
if
>>> that
>>>>>> is something that can be deferred, handled separately from the existing
>>>>>> transformations, etc).
>>>>>> 
>>>>>> -Ewen
>>>>>> 
>>>>>> On Mon, May 1, 2017 at 11:52 AM, Michael Pearce <Michael.Pearce@ig.com
>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi Gwen,
>>>>>>> 
>>>>>>> Then intent here was to allow tools that perform similar role
to
>>> mirror
>>>>>>> makers of replicating the messaging from one cluster to another.
 Eg
>>>>> like
>>>>>>> mirror make should just be taking and transferring the headers
as is.
>>>>>>> 
>>>>>>> We don't actually use this inside our company, so not exposing
this
>>>>> isn't
>>>>>>> an issue for us. Just believe there are companies like confluent
who
>>>>> have
>>>>>>> tools like replicator that do.
>>>>>>> 
>>>>>>> And as good citizens think we should complete the work and expose
the
>>>>>>> headers same as in the record to at least allow them to replicate
the
>>>>>>> messages as is. Note Steph seems to want it.
>>>>>>> 
>>>>>>> Cheers
>>>>>>> Mike
>>>>>>> 
>>>>>>> Sent using OWA for iPhone
>>>>>>> ________________________________________
>>>>>>> From: Gwen Shapira <gwen@confluent.io>
>>>>>>> Sent: Monday, May 1, 2017 2:36:34 PM
>>>>>>> To: dev@kafka.apache.org
>>>>>>> Subject: Re: [DISCUSS] KIP 145 - Expose Record Headers in Kafka
>>> Connect
>>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> I'm excited to see the community expanding Connect in this direction!
>>>>>>> Headers + Transforms == Fun message routing.
>>>>>>> 
>>>>>>> I like how clean the proposal is, but I'm concerned that it kinda
>>>>> deviates
>>>>>>> from how Connect handles data elsewhere.
>>>>>>> Unlike Kafka, Connect doesn't look at all data as byte-arrays,
we have
>>>>>>> converters that take data in specific formats (JSON, Avro) and
turns
>>> it
>>>>>>> into Connect data types (defined in the data api). I think it
will be
>>>>> more
>>>>>>> consistent for connector developers to also get headers as some
kind
>>> of
>>>>>>> structured or semi-structured data (and to expand the converters
to
>>>>> handle
>>>>>>> header conversions as well).
>>>>>>> This will allow for Connect's separation of concerns - Connector
>>>>> developers
>>>>>>> don't worry about data formats (because they get the internal
connect
>>>>>>> objects) and Converters do all the data format work.
>>>>>>> 
>>>>>>> Another thing, in my experience, APIs work better if they are
put into
>>>>> use
>>>>>>> almost immediately - so difficulties in using the APIs are immediately
>>>>>>> surfaced. Are you planning any connectors that will use this
feature
>>>>> (not
>>>>>>> necessarily in Kafka, just in general)? Or perhaps we can think
of a
>>>>> way to
>>>>>>> expand Kafka's file connectors so they'll use headers somehow
(can't
>>>>> think
>>>>>>> of anything, but maybe?).
>>>>>>> 
>>>>>>> Gwen
>>>>>>> 
>>>>>>> On Sat, Apr 29, 2017 at 12:12 AM, Michael Pearce <
>>> Michael.Pearce@ig.com
>>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi All,
>>>>>>>> 
>>>>>>>> Now KIP-82 is committed I would like to discuss extending
the work to
>>>>>>>> expose it in Kafka Connect, its primary focus being so connectors
>>> that
>>>>>>> may
>>>>>>>> do similar tasks as MirrorMakers, either Kafka->Kafka
or JMS-Kafka
>>>>> would
>>>>>>> be
>>>>>>>> able to replicate the headers.
>>>>>>>> It would be ideal but not mandatory for this to go in 0.11
release so
>>>>> is
>>>>>>>> available on day one of headers being available.
>>>>>>>> 
>>>>>>>> Please find the KIP here:
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 145+-+Expose+Record+Headers+in+Kafka+Connect
>>>>>>>> 
>>>>>>>> Please find an initial implementation as a PR here:
>>>>>>>> https://github.com/apache/kafka/pull/2942
>>>>>>>> 
>>>>>>>> Kind Regards
>>>>>>>> Mike
>>>>>>>> The information contained in this email is strictly confidential
and
>>>>> for
>>>>>>>> the use of the addressee only, unless otherwise indicated.
If you are
>>>>> not
>>>>>>>> the intended recipient, please do not read, copy, use or
disclose to
>>>>>>> others
>>>>>>>> this message or any attachment. Please also notify the sender
by
>>>>> replying
>>>>>>>> to this email or by telephone (+44(020 7896 0011) and then
delete the
>>>>>>> email
>>>>>>>> and any copies of it. Opinions, conclusion (etc) that do
not relate
>>> to
>>>>>>> the
>>>>>>>> official business of this company shall be understood as
neither
>>> given
>>>>>>> nor
>>>>>>>> endorsed by it. IG is a trading name of IG Markets Limited
(a company
>>>>>>>> registered in England and Wales, company number 04008957)
and IG
>>> Index
>>>>>>>> Limited (a company registered in England and Wales, company
number
>>>>>>>> 01190902). Registered address at Cannon Bridge House, 25
Dowgate
>>> Hill,
>>>>>>>> London EC4R 2YA. Both IG Markets Limited (register number
195355) and
>>>>> IG
>>>>>>>> Index Limited (register number 114059) are authorised and
regulated
>>> by
>>>>>>> the
>>>>>>>> Financial Conduct Authority.
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> *Gwen Shapira*
>>>>>>> Product Manager | Confluent
>>>>>>> 650.450.2760 | @gwenshap
>>>>>>> Follow us: Twitter <https://twitter.com/ConfluentInc> |
blog
>>>>>>> <http://www.confluent.io/blog>
>>>>>>> The information contained in this email is strictly confidential
and
>>> for
>>>>>>> the use of the addressee only, unless otherwise indicated. If
you are
>>>>> not
>>>>>>> the intended recipient, please do not read, copy, use or disclose
to
>>>>> others
>>>>>>> this message or any attachment. Please also notify the sender
by
>>>>> replying
>>>>>>> to this email or by telephone (+44(020 7896 0011) and then delete
the
>>>>> email
>>>>>>> and any copies of it. Opinions, conclusion (etc) that do not
relate to
>>>>> the
>>>>>>> official business of this company shall be understood as neither
given
>>>>> nor
>>>>>>> endorsed by it. IG is a trading name of IG Markets Limited (a
company
>>>>>>> registered in England and Wales, company number 04008957) and
IG Index
>>>>>>> Limited (a company registered in England and Wales, company number
>>>>>>> 01190902). Registered address at Cannon Bridge House, 25 Dowgate
Hill,
>>>>>>> London EC4R 2YA. Both IG Markets Limited (register number 195355)
and
>>> IG
>>>>>>> Index Limited (register number 114059) are authorised and regulated
by
>>>>> the
>>>>>>> Financial Conduct Authority.
>>>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> The information contained in this email is strictly confidential and for
>>> the use of the addressee only, unless otherwise indicated. If you are not
>>> the intended recipient, please do not read, copy, use or disclose to others
>>> this message or any attachment. Please also notify the sender by replying
>>> to this email or by telephone (+44(020 7896 0011) and then delete the email
>>> and any copies of it. Opinions, conclusion (etc) that do not relate to the
>>> official business of this company shall be understood as neither given nor
>>> endorsed by it. IG is a trading name of IG Markets Limited (a company
>>> registered in England and Wales, company number 04008957) and IG Index
>>> Limited (a company registered in England and Wales, company number
>>> 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
>>> London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG
>>> Index Limited (register number 114059) are authorised and regulated by the
>>> Financial Conduct Authority.
>>> 

Mime
View raw message