Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4E0EF200D11 for ; Mon, 2 Oct 2017 17:31:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4C6A4160BCB; Mon, 2 Oct 2017 15:31:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6CF731609DE for ; Mon, 2 Oct 2017 17:31:16 +0200 (CEST) Received: (qmail 77688 invoked by uid 500); 2 Oct 2017 15:31:15 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 77678 invoked by uid 99); 2 Oct 2017 15:31:15 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Oct 2017 15:31:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 6ABAFCF000 for ; Mon, 2 Oct 2017 15:31:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.398 X-Spam-Level: ** X-Spam-Status: No, score=2.398 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_SORBS_SPAM=0.5, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (768-bit key) header.d=teads.tv Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id HpGSRonR02m3 for ; Mon, 2 Oct 2017 15:31:11 +0000 (UTC) Received: from mail.teads.tv (mail.teads.tv [77.87.108.75]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 9D2005FB06 for ; Mon, 2 Oct 2017 15:31:11 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; q=dns/txt; c=relaxed/relaxed; d=teads.tv; s=proceau; h=Content-Type:Cc:To:Subject:Message-ID:Date:From:In-Reply-To:References:MIME-Version; bh=9/iPZfczouPEcFMqUNmY9RRbIHQvo/gX5eX23dFha1E=; b=jytxkqdpyn8zlM9gyDTyJfhele/xUpYQBwoJlBg+W6pVWO7aAgC7Ol9TglepyMSDJQfCeOpfiYSMvRDeoDjSuCSMIYx9riR7yKBJRRwLuSJXero8YPuNievrYsBY8lXR; Received: from mail-ua0-f200.google.com ([209.85.217.200]) by mail.teads.tv with esmtps (TLS1.2:RSA_AES_128_CBC_SHA1:128) (Exim 4.80) (envelope-from ) id 1dz2gY-0000it-2w for user@flink.apache.org; Mon, 02 Oct 2017 17:31:10 +0200 Received: by mail-ua0-f200.google.com with SMTP id l40so2730938uah.2 for ; Mon, 02 Oct 2017 08:31:10 -0700 (PDT) X-Gm-Message-State: AMCzsaWK2QLfiw4nLIJxa4ulahVhbdkoXDh40JGE/zXXMbyCigRGIyZW sm162g45SDM6UL6se7ijsV9N5QRpnh0JZEXZttHvl7oGDvLY9rPqanUgoOgE119BFDRIaoqQseZ 1LRzi4cfUYJqKvb4uT5Vz+3HkD+A5V92A X-Received: by 10.159.58.76 with SMTP id r12mr9812310uag.92.1506958264061; Mon, 02 Oct 2017 08:31:04 -0700 (PDT) X-Google-Smtp-Source: AOwi7QApvQT2liFla96B/oICrh/BASlrauH4Mvv+2IjA0mtWOL1sHQUl7xpJYR7no3v1T0SOTusEmnV1DWmzuTrXqYg= X-Received: by 10.159.58.76 with SMTP id r12mr9812296uag.92.1506958263821; Mon, 02 Oct 2017 08:31:03 -0700 (PDT) MIME-Version: 1.0 References: <3898522B-CB90-4469-A422-0181A7D4C1D5@data-artisans.com> In-Reply-To: <3898522B-CB90-4469-A422-0181A7D4C1D5@data-artisans.com> From: Antoine Philippot Date: Mon, 02 Oct 2017 15:30:53 +0000 Message-ID: Subject: Re: Avoid duplicate messages while restarting a job for an application upgrade To: Piotr Nowojski Cc: user@flink.apache.org Content-Type: multipart/alternative; boundary="089e08e4b65d01bd10055a92104b" archived-at: Mon, 02 Oct 2017 15:31:17 -0000 --089e08e4b65d01bd10055a92104b Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and until a while). We can not afford tens of thousands of duplicated messages for each application upgrade, can I help by working on this feature ? Do you have any hint or details on this part of that "todo list" ? Le lun. 2 oct. 2017 =C3=A0 16:50, Piotr Nowojski = a =C3=A9crit : > Hi, > > For failures recovery with Kafka 0.9 it is not possible to avoid > duplicated messages. Using Flink 1.4 (unreleased yet) combined with Kafka > 0.11 it will be possible to achieve exactly-once end to end semantic when > writing to Kafka. However this still a work in progress: > > https://issues.apache.org/jira/browse/FLINK-6988 > > However this is a superset of functionality that you are asking for. > Exactly-once just for clean shutdowns is also on our =E2=80=9CTODO=E2=80= =9D list (it > would/could support Kafka 0.9), but it is not currently being actively > developed. > > Piotr Nowojski > > On Oct 2, 2017, at 3:35 PM, Antoine Philippot > wrote: > > Hi, > > I'm working on a flink streaming app with a kafka09 to kafka09 use case > which handles around 100k messages per seconds. > > To upgrade our application we used to run a flink cancel with savepoint > command followed by a flink run with the previous saved savepoint and the > new application fat jar as parameter. We notice that we can have more tha= n > 50k of duplicated messages in the kafka sink wich is not idempotent. > > This behaviour is actually problematic for this project and I try to find > a solution / workaround to avoid these duplicated messages. > > The JobManager indicates clearly that the cancel call is triggered once > the savepoint is finished, but during the savepoint execution, kafka sour= ce > continue to poll new messages which will not be part of the savepoint and > will be replayed on the next application start. > > I try to find a solution with the stop command line argument but the kafk= a > source doesn't implement StoppableFunction ( > https://issues.apache.org/jira/browse/FLINK-3404) and the savepoint > generation is not available with stop in contrary to cancel. > > Is there an other solution to not process duplicated messages for each > application upgrade or rescaling ? > > If no, has someone planned to implement it? Otherwise, I can propose a > pull request after some architecture advices. > > The final goal is to stop polling source and trigger a savepoint once > polling stopped. > > Thanks > > > --089e08e4b65d01bd10055a92104b Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Thanks Piotr for your answer, we sadly can't use = kafka 0.11 for now (and until a while).

We can not= afford tens of thousands of duplicated messages for each application upgra= de, can I help by working on this feature ?
Do you have any hint = or details on this part of that "todo list" ?=C2=A0
=C2= =A0

Le=C2=A0lun. 2 oct= . 2017 =C3=A0=C2=A016:50, Piotr Nowojski <piotr@data-artisans.com> a =C3=A9crit= =C2=A0:
Hi,

For failures recovery with Kafka= 0.9 it is not possible to avoid duplicated messages. Using Flink 1.4 (unre= leased yet) combined with Kafka 0.11 it will be possible to achieve exactly= -once end to end semantic when writing to Kafka. However this still a work = in progress:


However this is a superset of = functionality that you are asking for. Exactly-once just for clean shutdown= s is also on our =E2=80=9CTODO=E2=80=9D list (it would/could support Kafka = 0.9), but it is not currently being actively developed.

Piotr Nowojski
<= div style=3D"word-wrap:break-word">
= On Oct 2, 2017, at 3:35 PM, Antoine Philippot <antoine.philippot@teads.tv> w= rote:

Hi,

= I'm working on a flink streaming app with a kafka09 to kafka09 use case= which handles around 100k messages per seconds.

T= o upgrade our application we used to run a flink cancel with savepoint comm= and followed by a flink run with the previous saved savepoint and the new a= pplication fat jar as parameter. We notice that we can have more than 50k o= f duplicated messages in the kafka sink wich is not idempotent.
<= br>
This behaviour is actually problematic for this project and I= try to find a solution / workaround to avoid these duplicated messages.

The JobManager indicates clearly that the cancel cal= l is triggered once the savepoint is finished, but during the savepoint exe= cution, kafka source continue to poll new messages which will not be part o= f the savepoint and will be replayed on the next application start.

I try to find a solution with the stop command line argum= ent but the kafka source doesn't implement StoppableFunction (htt= ps://issues.apache.org/jira/browse/FLINK-3404) and the savepoint genera= tion is not available with stop in contrary to cancel.

=
Is there an other solution to not process duplicated messages for each= application upgrade or rescaling ?

If no, has som= eone planned to implement it? Otherwise, I can propose a pull request after= some architecture advices.

The final goal is to s= top polling source and trigger a savepoint once polling stopped.
=
Thanks

--089e08e4b65d01bd10055a92104b--