Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C82B6200D02 for ; Sat, 23 Sep 2017 18:48:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C6A461609B8; Sat, 23 Sep 2017 16:48:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E59D21609B6 for ; Sat, 23 Sep 2017 18:48:05 +0200 (CEST) Received: (qmail 94948 invoked by uid 500); 23 Sep 2017 16:48:05 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 94934 invoked by uid 99); 23 Sep 2017 16:48:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Sep 2017 16:48:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 8F1D6192E51 for ; Sat, 23 Sep 2017 16:48:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id x_bV6ELdnSAJ for ; Sat, 23 Sep 2017 16:48:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id DBD706125C for ; Sat, 23 Sep 2017 16:48:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 662B2E0288 for ; Sat, 23 Sep 2017 16:48:02 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 49E40241E9 for ; Sat, 23 Sep 2017 16:48:00 +0000 (UTC) Date: Sat, 23 Sep 2017 16:48:00 +0000 (UTC) From: "Randall Hauch (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-5716) Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sat, 23 Sep 2017 16:48:07 -0000 [ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16177881#comment-16177881 ] Randall Hauch commented on KAFKA-5716: -------------------------------------- I agree that the current {{commit()}} method is incorrect and should be dealt with. However, my point was that you can do a lot with just {{commitRecord}} while ignoring the broken {{commit()}} method. We do need to fix this, and I think [~ewencp] assigned this to you since you were already working on a fix, tho obviously looking for guidance as to the correct route. I see [~ewencp]'s point, too, that *correcting the behavior* of the {{commit()}} method will not be trivial or inexpensive, and why he considers removing the method altogether (initially via deprecation) an attractive resolution. As I mention above, the most important thing is that {{commitRecord}} still tells you when a record has been written to Kafka. Flushed offsets are almost certainly going to be behind what's written to Kafka, which is why I think source connectors shouldn't really have to deal with or be concerned with what has been flushed. WDYT? > Connect: When SourceTask.commit it is possible not everthing from SourceTask.poll has been sent > ----------------------------------------------------------------------------------------------- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Reporter: Per Steffensen > Assignee: Per Steffensen > Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected recently. If so, I apologize. I found the "problem" by code-inspection alone, so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect will record offsets > automatically. This hook is provided for systems that also need to store offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is "told" that everything returned from poll up until "now" has been sent/stored - both the outgoing messages and the associated connect-offsets. Looking at the implementation it also seems that this is what it tries to "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and commitOffsets respectively, hindering the task-thread to add to outstandingMessages and offsetWriter while committer-thread is marking what has to be flushed in the offsetWriter and waiting for outstandingMessages to be empty. This means that the offsets committed will be consistent with what has been sent out, but not necessarily what has been polled. At least I do not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to outstandingMessages and offsetWriter in sendRecords, committer-thread kicks in and does its commiting, while hindering the task-thread adding the polled records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end up calling task.commit (via WorkerSourceTask.commitSourceTask), without the records just polled from task.poll has been sent or corresponding connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect will record offsets > automatically. This hook is provided for systems that also need to store offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would appreciate an explanation where my code-inspection is not correct, and why it works even though I cannot see it. I will not expect such an explanation, though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)