kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ewen Cheslack-Postava (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent
Date Sun, 24 Sep 2017 23:42:01 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16178406#comment-16178406
] 

Ewen Cheslack-Postava commented on KAFKA-5716:
----------------------------------------------

Sorry, I was just doing a round of cleanup on the JIRAs to help track in-progress JIRAs. Submitting
PRs doesn't automatically mark these as in progress so I was just updating status + assignee
for all Connect JIRAs based on what had been submitted so far as it helps us find JIRAs that
could use review, especially as releases are nearing.

I think in retrospect the naming of commitRecord is probably unfortunate. commit() would tell
you when the data is flushed to Kafka + offsets committed, whereas commitRecord really only
guarantees it was written to Kafka, but then you might restart the task and read committed
offsets that are earlier than that. Probably flushedRecord or ackedRecord for what commitRecord
currently does + commitRecord behavior that would have the framework save the list of records
and then invoke callbacks when the offset commit succeeded would be better.

The motivating use case for the commitRecord was really systems that only have individual
message ack anyway, e.g. message queues. In this case, any sort of bulk ack from the connect
framework, i.e. commit(), isn't all that helpful and in fact requires more state tracking
on the part of the connector. Because of the current behavior, it also allows you to avoid
duplicates in the periods between offset commits since you can selectively destroy data that
you know has made it to Kafka even if the offsets haven't been committed yet. Even if we just
removed commit(), whether you'd want something other than commitRecord() would depend on how
fine-grained you are ok with acking data and whether you need a collective/bulk ack.

> Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has
been sent
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5716
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Per Steffensen
>            Assignee: Per Steffensen
>            Priority: Minor
>         Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected recently.
If so, I apologize. I found the "problem" by code-inspection alone, so I may be wrong. Have
not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect will record
offsets
> automatically. This hook is provided for systems that also need to store offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is "told" that
everything returned from poll up until "now" has been sent/stored - both the outgoing messages
and the associated connect-offsets. Looking at the implementation it also seems that this
is what it tries to "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled to call
WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and commitOffsets
respectively, hindering the task-thread to add to outstandingMessages and offsetWriter while
committer-thread is marking what has to be flushed in the offsetWriter and waiting for outstandingMessages
to be empty. This means that the offsets committed will be consistent with what has been sent
out, but not necessarily what has been polled. At least I do not see why the following is
not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to outstandingMessages and
offsetWriter in sendRecords, committer-thread kicks in and does its commiting, while hindering
the task-thread adding the polled records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end up calling
task.commit (via WorkerSourceTask.commitSourceTask), without the records just polled from
task.poll has been sent or corresponding connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I do believe
is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect will record
offsets
> automatically. This hook is provided for systems that also need to store offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would appreciate an
explanation where my code-inspection is not correct, and why it works even though I cannot
see it. I will not expect such an explanation, though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message