kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Randall Hauch <rha...@gmail.com>
Subject Re: [DISCUSS] KIP-416: Notify SourceTask of ACK'd offsets, metadata
Date Tue, 01 Oct 2019 16:09:05 GMT
Apologies for the late entry -- I entirely missed this KIP and discussion.
:-(

Thanks for creating the KIP and proposing this change. I do think it's
useful for source connector tasks to get more information about the
acknowledgement after the record was written.

However, given the KIPs suggestion that the two `commitRecord(...)` method
variants are disjoint, I'm a bit surprised that the WorkerSourceTask would
do the following:

    task.commitRecord(preTransformRecord);
    if (recordMetadata != null)
        task.commitRecord(preTransformRecord, recordMetadata);

rather than:

    if (recordMetadata != null)
        task.commitRecord(preTransformRecord, recordMetadata);
    else
        task.commitRecord(preTransformRecord);

But if this is the case, I would argue that it is better to simply have one
`commitRecord(SourceRecord record, RecordMetadata metadata)` method that
clearly denotes that the metadata may be null if the record was not written
(e.g., an SMT caused it to be dropped) or could not be written (after
giving up retrying after failures in the SMTs and/or the converter), and
let the implementation deal with the differences. Essentially, we've be
deprecating the existing `commitRecord(SourceRecord)` method, changing the
framework to always use the new method, and having the new method by
default delegate to the existing method. (This is what Jun also suggested
on the PR request,
https://github.com/apache/kafka/pull/6295#discussion_r330097541). This is
backwards compatible for connector implementations that only override the
old method, yet provides a way for connectors that do implement the new API
to override the new method without having to also implement the old method,
too.

IOW:

@deprecated
public void commitRecord(SourceRecord sourceRecord) {
  // nop
}

/**
 * <p>
 * Commit an individual {@link SourceRecord} when the callback from the
producer client is received, or if a record is filtered by a transformation
and not sent to the producer.
 * By default, this method delegates to the {@link
#commitRecord(SourceRecord)} method to maintain backward compatibility.
Tasks can choose to override this method,
 * override the {@link #commitRecord(SourceRecord)} method, or not override
either one.
 * </p>
 * <p>
 * SourceTasks are not required to implement this functionality; Kafka
Connect will record offsets
 * automatically. This hook is provided for systems that also need to store
offsets internally
 * in their own system.
 * </p>
 *
 * @param record {@link SourceRecord} that was successfully sent via the
producer.
 * @param recordMetadata the metadata from the producer's write
acknowledgement, or null if the record was not sent to the producer because
it was filtered by an SMT or could not be transformed and/or converted
 * @throws InterruptedException
 */
public void commitRecord(SourceRecord sourceRecord, RecordMetadata
recordMetadata) {
  commitRecord(sourceRecord);
}

Best regards,

Randall


On Thu, Jan 31, 2019 at 9:02 AM Ryanne Dolan <ryannedolan@gmail.com> wrote:

> Andrew, I have considered this, but I think passing null for RecordMetadata
> would be surprising and error prone for anyone implementing SourceTask. I
> figure the only use-case for overriding this variant (and not the existing
> one) is to capture the RecordMetadata. If that's the case, every
> implementation would need to check for null. What worries me is that an
> implementation that does not check for null will seem to work until an SMT
> is configured to filter records, which I believe would be exceedingly rare.
> Moreover, the presence of the RecordMetadata parameter strongly implies
> that the record has been sent and ACK'd, and it would be surprising to
> discover otherwise.
>
> On the other hand, the current PR makes it difficult to distinguish between
> records that are filtered vs ACK'd. The implementing class would need to
> correlate across poll() and the two commitRecord() invocations in order to
> find records that were poll()'d but not ACK'd. In contrast, if we passed
> null to commitRecord, the method would trivially know that the record was
> filtered. I think this is probably not a common use-case, so I don't think
> we should worry about it. In fact, the existing commitRecord callback seems
> to purposefully hide this detail from the implementing class, and I don't
> know why we'd try to expose it in the new method.
>
> This sort of confusion is why I originally proposed a new method name for
> this callback, as does the similar KIP-381. I agree that overloading the
> existing method is all-around easier, and I think a casual reader would
> make the correct assumption that RecordMetadata in the parameter list
> implies that the record was sent and ACK'd.
>
> > the connector implementor would want to provide only a single variant of
> commitRecord()
>
> I think this would be true either way. The only reason you'd implement both
> variants is to detect that a record has _not_ been ACK'd, which again I
> believe is a non-requirement.
>
> Would love to hear if you disagree.
>
> Thanks!
> Ryanne
>
>
> On Thu, Jan 31, 2019 at 3:47 AM Andrew Schofield <
> andrew_schofield@live.com>
> wrote:
>
> > As you might expect, I like the overloaded commitRecord() but I think the
> > overloaded method should be called in exactly the same situations as the
> > previous method. When it does not reflect an ACK, the second parameter
> > could be null. The text of the KIP says that the overloaded method is
> only
> > called when a record is ACKed and I would have thought that the connector
> > implementor would want to provide only a single variant of
> commitRecord().
> >
> > Andrew Schofield
> > IBM Event Streams
> >
> > ´╗┐On 31/01/2019, 03:00, "Ryanne Dolan" <ryannedolan@gmail.com> wrote:
> >
> >     I've updated the KIP and PR to overload commitRecord instead of
> adding
> > a
> >     new method. Here's the PR:
> >
> >
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fpull%2F6171&amp;data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151935856&amp;sdata=hxBWSTt5gF7AAVxw2P8%2BZ8duBB0T97gHOOYG6GCkdd8%3D&amp;reserved=0
> >
> >     Ryanne
> >
> >     On Mon, Jan 21, 2019 at 6:29 PM Ryanne Dolan <ryannedolan@gmail.com>
> > wrote:
> >
> >     > Andrew Schofield suggested we overload the commitRecord method
> > instead of
> >     > adding a new one. Thoughts?
> >     >
> >     > Ryanne
> >     >
> >     > On Thu, Jan 17, 2019, 5:34 PM Ryanne Dolan <ryannedolan@gmail.com
> > wrote:
> >     >
> >     >> I had to change the KIP number (concurrency is hard!) so the link
> > is now:
> >     >>
> >     >>
> >     >>
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-416%253A%2BNotify%2BSourceTask%2Bof%2BACK%2527d%2Boffsets%252C%2Bmetadata&amp;data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151935856&amp;sdata=VkAFrM8B2ozCRJosPQjgM3aDD1cS%2Bob8KWVuNuuOJ9s%3D&amp;reserved=0
> >     >>
> >     >> Ryanne
> >     >>
> >     >> On Fri, Jan 11, 2019 at 2:43 PM Ryanne Dolan <
> ryannedolan@gmail.com
> > >
> >     >> wrote:
> >     >>
> >     >>> Hey y'all,
> >     >>>
> >     >>> Please review the following small KIP:
> >     >>>
> >     >>>
> >     >>>
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-414%253A%2BNotify%2BSourceTask%2Bof%2BACK%2527d%2Boffsets%252C%2Bmetadata&amp;data=02%7C01%7C%7Cc627d954fa6f44574f7908d6872838c5%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636845004151945855&amp;sdata=2mhXA4hEV3ZvrFaOcTqagO1rYNj1JsYAEDHQsFqkzG8%3D&amp;reserved=0
> >     >>>
> >     >>> Thanks!
> >     >>> Ryanne
> >     >>>
> >     >>
> >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message