Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3CB04200D02 for ; Sat, 23 Sep 2017 11:42:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3B16C1609B6; Sat, 23 Sep 2017 09:42:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 588621609B5 for ; Sat, 23 Sep 2017 11:42:05 +0200 (CEST) Received: (qmail 96359 invoked by uid 500); 23 Sep 2017 09:42:04 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 96345 invoked by uid 99); 23 Sep 2017 09:42:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Sep 2017 09:42:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id C29ECD8491 for ; Sat, 23 Sep 2017 09:42:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id YiGrCaJ8kJT8 for ; Sat, 23 Sep 2017 09:42:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id DEBAB5FCB9 for ; Sat, 23 Sep 2017 09:42:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 15F09E0EA5 for ; Sat, 23 Sep 2017 09:42:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 52B2C24211 for ; Sat, 23 Sep 2017 09:42:00 +0000 (UTC) Date: Sat, 23 Sep 2017 09:42:00 +0000 (UTC) From: "Per Steffensen (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sat, 23 Sep 2017 09:42:06 -0000 [ https://issues.apache.org/jira/browse/KAFKA-5716?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1617= 7688#comment-16177688 ]=20 Per Steffensen commented on KAFKA-5716: --------------------------------------- bq. Yeah, this makes sense. The existing SourceTask.commitRecord(...) metho= d is called after each source record has been written to Kafka =E2=80=93 ca= n you use this to keep track of the offsets that have been written? Yeah, that is what I am doing today, because it will calculate something th= at is closer to the truth, than just assuming that everything polled has be= en written. But it is not necessarily true either. It can happen that (some= of) the records that have been polled, but not included in the offsets-wri= te/flush (which I have demonstrated can happen), has also been sent to outg= oing Kafka AND received acknowledge, leading to the call to {{SourceTask.co= mmitRecord(...)}}. So at the time of {{SourceTask.commit()}} I claim that i= t is possible that {{SourceTask.commitRecord(...)}} has been called for som= e records not included in the offsets-write/flush. You suggest several things that I could do, and some of it I am doing. I be= lieve I know very well, how Kafka and Kafka-Connect works in this area. I c= an do a lot of things to "workaround" that fact that I do not really know w= hich offsets has been written/flushed. The thing about KAFKA-5716 is that I= really ought to be told - why not? Or at least the JavaDoc should not tell= me that I am told, when the truth is that I am not :-) > Connect: When SourceTask.commit it is possible not everthing from SourceT= ask.poll has been sent > -------------------------------------------------------------------------= ---------------------- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Reporter: Per Steffensen > Assignee: Per Steffensen > Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corre= cted recently. If so, I apologize. I found the "problem" by code-inspection= alone, so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link = #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Conne= ct will record offsets > automatically. This hook is provided for systems that also need to store = offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is= "told" that everything returned from poll up until "now" has been sent/sto= red - both the outgoing messages and the associated connect-offsets. Lookin= g at the implementation it also seems that this is what it tries to "guaran= tee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running WorkerSourceTa= sk.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is schedu= led to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord= and commitOffsets respectively, hindering the task-thread to add to outsta= ndingMessages and offsetWriter while committer-thread is marking what has t= o be flushed in the offsetWriter and waiting for outstandingMessages to be = empty. This means that the offsets committed will be consistent with what h= as been sent out, but not necessarily what has been polled. At least I do n= ot see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to outstandingM= essages and offsetWriter in sendRecords, committer-thread kicks in and does= its commiting, while hindering the task-thread adding the polled records t= o outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will e= nd up calling task.commit (via WorkerSourceTask.commitSourceTask), without = the records just polled from task.poll has been sent or corresponding conne= ctor-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (whi= ch I do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link = #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Conne= ct will record offsets > automatically. This hook is provided for systems that also need to store = offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says := -) > If I am not right, of course I apologize for the inconvenience. I would a= ppreciate an explanation where my code-inspection is not correct, and why i= t works even though I cannot see it. I will not expect such an explanation,= though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)