From user-return-28446-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Jul 9 02:23:54 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id E3243180665 for ; Tue, 9 Jul 2019 04:23:53 +0200 (CEST) Received: (qmail 2700 invoked by uid 500); 9 Jul 2019 02:23:51 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 2690 invoked by uid 99); 9 Jul 2019 02:23:51 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jul 2019 02:23:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id B7CBDC04A4 for ; Tue, 9 Jul 2019 02:23:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.338 X-Spam-Level: *** X-Spam-Status: No, score=3.338 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, PDS_NO_HELO_DNS=1.327, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, T_KAM_HTML_FONT_INVALID=0.01, URIBL_BLOCKED=0.001, URIBL_CSS=0.1, URIBL_CSS_A=0.1] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id QSJ6xvcCQnsJ for ; Tue, 9 Jul 2019 02:23:43 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.167.44; helo=mail-lf1-f44.google.com; envelope-from=walterddr@gmail.com; receiver= Received: from mail-lf1-f44.google.com (mail-lf1-f44.google.com [209.85.167.44]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 3A326BC52B for ; Tue, 9 Jul 2019 02:23:43 +0000 (UTC) Received: by mail-lf1-f44.google.com with SMTP id z15so12242222lfh.13 for ; Mon, 08 Jul 2019 19:23:43 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=YRA6OJRhCRX0jbbZ0YrzwXvcaagq3gIzUYMv0P/f9o0=; b=tOVFjTtKmEnUIGwYH8vYacqz5RMOne5/xV4lepmLwhGJGIEafmzNh4pBNV3UAsDnZQ NoK9S6xG5NVld3gXhO1Uyce1rLWw1uRyZgbU9FwOJFNCrwu/21+N9R8VgG4Q0jkAoYy9 43BrgQWKqnyEiHO+POteJqWyFADuH9KM2j9DREei9x0javL62kTC2QYjC+owQsCsv0fm ETzM6BKPaibChjHIOqBr802h8g+LrYVcgXDApnc8Z29t6Edg98zLcYApMn+sbrwSSibG XANkEW9jfT4EYD+WPVh00hFkeVsY6sp/4CQ6aRD1Scmd7O1goDp/zznj9iOjKgL/7PNG o5UA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=YRA6OJRhCRX0jbbZ0YrzwXvcaagq3gIzUYMv0P/f9o0=; b=IX8q35b/2Lf/Q7EedYJbiYQVKpSbJ50T6GW4hSmg06uV5/F/c63h1vkYy/t0AEUY/8 tYDhvBm5xerjky6mFC2Oe330YoykvRKaZHQDd7WYWu8d0wgzde/EWDfDou4WMN/1hky1 cCBsycLtIJv64pttbuA1P66TnzB8Z2GjDzTyvvrdHz8gj6zX1rW3umL2eyApq7CYpwtN vN/ddQptovkXb6YVKoMquA03mRJZFevn/D03ZNjfA12ClJ7mfM9W3tCzaw7ge0s3evR1 fKTJ0KHY3SmjdXyBTqEUnonCMJRMk2a1ORh6uW9eSyiHc9tod7SxSHpVUog7+PdsyFSM 13Fg== X-Gm-Message-State: APjAAAVQqFb3bsHS3Anu76ybUnpTbNz8FWYM7cYDrebnKCofUEpakrle Es6MH3j2TnIKakHh4Bd6GwIuG96JqIAHWxbRwDA= X-Google-Smtp-Source: APXvYqzQwgX/id2T6/Py6LQNnDVwH6j9vbkZOR8MC0luVZxbKneZflCPbYCAZ4zFZGQAOE9/0nZAAY/MHpve2zGanCA= X-Received: by 2002:a19:ae0d:: with SMTP id f13mr9980957lfc.123.1562639021983; Mon, 08 Jul 2019 19:23:41 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Rong Rong Date: Mon, 8 Jul 2019 19:23:30 -0700 Message-ID: Subject: Re: How are kafka consumer offsets handled if sink fails? To: John Smith Cc: Konstantin Knauf , user Content-Type: multipart/alternative; boundary="000000000000d14fc8058d363f98" --000000000000d14fc8058d363f98 Content-Type: text/plain; charset="UTF-8" Hi John, I think what Konstantin is trying to say is: Flink's Kafka consumer does not start consuming from the Kafka commit offset when starting the consumer, it would actually start with the offset that's last checkpointed to external DFS. (e.g. the starting point of the consumer has no relevance with Kafka committed offset whatsoever - if checkpoint is enabled.) This is to quote: "*the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the [checkpoints]->[current Kafka offset] as part of its periodic checkpoints.*" However if you do not enable checkpointing, I think your consumer will by-default restart from the default kafka offset (which I think is your committed group offset). -- Rong On Mon, Jul 8, 2019 at 6:39 AM John Smith wrote: > So when we say a sink is at least once. It's because internally it's not > checking any kind of state and it sends what it has regardless, correct? > Cause I willl build a sink that calls stored procedures. > > On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, < > konstantin@ververica.com> wrote: > >> Hi John, >> >> in case of a failure (e.g. in the SQL Sink) the Flink Job will be >> restarted from the last checkpoint. This means the offset of all Kafka >> partitions will be reset to that point in the stream along with state of >> all operators. To enable checkpointing you need to call >> StreamExecutionEnvironment#enableCheckpointing(). If you using the >> JDBCSinkFunction (which is an at-least-once sink), the output will be >> duplicated in the case of failures. >> >> To answer your questions: >> >> * For this the FlinkKafkaConsumer handles the offsets manually (no >> auto-commit). >> * No, the Flink Kafka Consumer does only commit offsets back to Kafka on >> a best-effort basis after every checkpoint. Internally Flink "commits" the >> checkpoints as part of its periodic checkpoints. >> * Yes, along with all other events between the last checkpoint and the >> failure. >> * It will continue from the last checkpoint. >> >> Hope this helps. >> >> Cheers, >> >> Konstantin >> >> On Fri, Jul 5, 2019 at 8:37 PM John Smith wrote: >> >>> Hi using Apache Flink 1.8.0 >>> >>> I'm consuming events from Kafka using nothing fancy... >>> >>> Properties props = new Properties(); >>> props.setProperty("bootstrap.servers", kafkaAddress); >>> props.setProperty("group.id",kafkaGroup); >>> >>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props); >>> >>> >>> Do some JSON transforms and then push to my SQL database using JDBC and >>> stored procedure. Let's assume the SQL sink fails. >>> >>> We know that Kafka can either periodically commit offsets or it can be >>> done manually based on consumers logic. >>> >>> - How is the source Kafka consumer offsets handled? >>> - Does the Flink Kafka consumer commit the offset to per event/record? >>> - Will that single event that failed be retried? >>> - So if we had 5 incoming events and say on the 3rd one it failed, will >>> it continue on the 3rd or will the job restart and try those 5 events. >>> >>> >>> >> >> -- >> >> Konstantin Knauf | Solutions Architect >> >> +49 160 91394525 >> >> >> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010 >> >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >> > --000000000000d14fc8058d363f98 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi John,

I think what Konstantin is try= ing to say is: Flink's Kafka consumer does not start consuming from the= Kafka commit offset when starting the consumer, it would actually start wi= th the offset that's last checkpointed to external DFS. (e.g. the start= ing point of the consumer has no relevance with Kafka committed offset what= soever - if checkpoint is enabled.)

This is to quo= te:=C2=A0
"the Flink Kafka Consumer does only commit offs= ets back to Kafka on a best-effort basis after every checkpoint. Internally= Flink "commits" the [checkpoints]->[current Kafka offset] = as part of its periodic checkpoints."

However if you do not enable checkpointing, I think your consumer will by-= default restart from the default kafka offset (which I think is your commit= ted group offset).

--
Rong

On Mon, Jul 8, 2019 at 6:39 AM John Smith <java.dev.mtl@gmail.com> wrote:
So when we say = a sink is at least once. It's because internally it's not checking = any kind of state and it sends what it has regardless, correct? Cause I wil= ll build a sink that calls stored procedures.

On Sun., Jul. 7, 2019, 4:03 p.= m. Konstantin Knauf, <konstantin@ververica.com> wrote:
Hi John,

in case of a failure (e.g. in the SQL Sink) the Flink= Job will be restarted from the last checkpoint. This means the offset of a= ll Kafka partitions will be reset to that point in the stream along with st= ate of all operators. To enable checkpointing you need to call StreamExecut= ionEnvironment#enableCheckpointing(). If you using the JDBCSinkFunction (wh= ich is an at-least-once sink), the output will be duplicated in the case of= failures.

To answer your questions:
=

* For this the FlinkKafkaConsumer handles th= e offsets manually (no auto-commit).
* No, the Flink Kafka C= onsumer does only commit offsets back to Kafka on a best-effort basis after= every checkpoint. Internally Flink "commits" the checkpoints as = part of its periodic checkpoints.
* Yes, along with all other eve= nts between the last checkpoint and the failure.
* It will contin= ue from the last checkpoint.

Hope this helps.

Cheers,

Konstantin

On Fri, Jul 5, 2019 at 8:37 PM John Smith <java.dev.mtl@gmail.co= m> wrote:
Hi using Apache Flink 1.8.0

I'm consuming even= ts from Kafka using nothing fancy...

Properties props =3D new Properties();
props.setPr= operty("bootstrap.servers"<= /span>, kafkaAddress);
props.setProperty("group.id",kafkaGroup);
=
FlinkKafkaConsumer<Stri= ng> consumer =3D new FlinkK= afkaConsumer<>(topic, new SimpleStringSchema(),props)<= span style=3D"color:rgb(204,120,50)">;

Do some JSON transforms and then push to my SQL database using JDBC a= nd stored procedure. Let's assume the SQL sink fails.

We know th= at Kafka can either periodically commit offsets or it can be done manually = based on consumers logic.

- How is the source Kafk= a consumer offsets handled?
- Does the Flink Kafka consumer c= ommit the offset to per event/record?
- Will that single even= t that failed be retried?
- So if we had 5 incoming events and sa= y on the 3rd one it failed, will it continue on the 3rd or will the job res= tart and try those 5 events.




--
=

Konstantin Knauf<= span style=3D"font-size:10pt;font-family:Roboto;color:rgb(0,0,0);background= -color:transparent;font-weight:700;font-style:normal;font-variant:normal;te= xt-decoration:none;vertical-align:baseline;white-space:pre-wrap"> | Solutions= Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09.= - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--<= /span>

Ververica GmbH
Registered at = Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzou= mas, Dr. Stephan Ewen

--000000000000d14fc8058d363f98--