kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Randall Hauch (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5568) Transformations that mutate topic-partitions break sink connectors that manage their own configuration
Date Fri, 07 Jul 2017 02:17:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16077473#comment-16077473
] 

Randall Hauch commented on KAFKA-5568:
--------------------------------------

Option #2 seems like it's the cleanest, IMO. It still allows connectors to know what the original
topic and partition were for each record, though connectors that rely upon this would need
to change. The impact on memory is slight, especially if we take a bit of care to ensure the
fields refer to the same objects when the topic and partitions were not changed.

Option #3 is a bit more flexible and a bit more efficient (memory and GC-wise) at the cost
of being harder for connectors to correlate the two `Collection<SinkRecord>`. And technically,
the API uses Collection, which doesn't imply order.

Option #4 is better than #3 IMO.

Option #5 could be done, but it'd be a complicated migration.

Option #6: How about a slight variation of #2, but instead have SinkRecord have an `originalRecord()`
method that returns the original record. Overhead is a single field, but since the transformations
are already creating new SinkRecords there's almost no additional GC impact or computational
cost.

> Transformations that mutate topic-partitions break sink connectors that manage their
own configuration
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5568
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5568
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
>            Reporter: Ewen Cheslack-Postava
>              Labels: needs-discussion, needs-kip
>
> KAFKA-5567 describes how offset commits for sink connectors are broken if a record's
topic-partition is mutated by an SMT, e.g RegexRouter or TimestampRouter.
> This is also a problem for sink connectors that manage their own offsets, i.e. those
that store offsets elsewhere and call SinkTaskContext.rewind(). In this case, the transformation
has already been applied by the time the SinkTask sees it, so there is no way it could correctly
track offsets and call rewind() with valid values. For example, this would make the offset
tracking that Confluent's HDFS connector does by working with filenames no longer work. Even
if they were stored separately in a file rather than relying on filenames, it still wouldn't
have ever had the correct offsets to write to that file.
> There are a couple of options:
> 1. Decide that this is an acceptable consequence of combining SMTs with sink connectors
and it's a limitation we accept. You can either transform the data via Kafka Streams instead
or accept that you can't do these "routing" type operations in the sink connector unless it
supports it natively. This *might* not be the wrong choice since we think there are very few
connectors that track their own offsets. In the case of HDFS, we might rarely hit this issue
because it supports its own file/directory partitioning schemes anyway so doing this via SMTs
isn't as necessary there.
> 2. Try to expose the original record information to the sink connector via the records.
I can think of 2 ways this could be done. The first is to attach the original record to each
SinkRecord. The cost here is relatively high in terms of memory, especially for sink connectors
that need to buffer data. The second is to add fields to SinkRecords for originalTopic() and
originalPartition(). This feels a bit ugly to me but might be the least intrusive change API-wise
and we can guarantee those fields aren't overwritten by not allowing public constructors to
set them.
> 3. Try to expose the original record information to the sink connector via a new pre-processing
callback. The idea is similar to preCommit, but instead would happen before any processing
occurs. Taken to its logical conclusion this turns into a sort of interceptor interface (preConversion,
preTransformation, put, and preCommit).
> 4. Add something to the Context that allows the connector to get back at the original
information. Maybe some sort of IdentityMap<Record, Record> originalPutRecords() that
would let you get a mapping back to the original records. One nice aspect of this is that
the connector can hold onto the original only if it needs it.
> 5. A very intrusive change/extension to the SinkTask API that passes in pairs of <original,
transformed> records. Accomplishes the same as 2 but requires what I think are more complicated
changes. Mentioned for completeness.
> 6. Something else I haven't thought of?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message