Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 228BA200D18 for ; Wed, 11 Oct 2017 17:10:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 20B891609E4; Wed, 11 Oct 2017 15:10:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6502B1609CA for ; Wed, 11 Oct 2017 17:10:47 +0200 (CEST) Received: (qmail 95443 invoked by uid 500); 11 Oct 2017 15:10:46 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 95433 invoked by uid 99); 11 Oct 2017 15:10:46 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Oct 2017 15:10:46 +0000 Received: from aljoschas-mbp.fritz.box (ip-2-205-81-99.web.vodafone.de [2.205.81.99]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 2237E1A02CD; Wed, 11 Oct 2017 15:10:44 +0000 (UTC) Content-Type: text/plain; charset=us-ascii Mime-Version: 1.0 (Mac OS X Mail 11.0 \(3445.1.7\)) Subject: Re: Question about checkpointing with stateful operators and state recovery From: Aljoscha Krettek In-Reply-To: Date: Wed, 11 Oct 2017 17:10:42 +0200 Cc: user Content-Transfer-Encoding: quoted-printable Message-Id: References: To: Federico D'Ambrosio X-Mailer: Apple Mail (2.3445.1.7) archived-at: Wed, 11 Oct 2017 15:10:48 -0000 Hi Frederico, I'll try and give some answers: 1. Generally speaking, no. If you use keyed state, for example via = RuntimeContext you don't need to implement CheckpointedFunction. 2. You don't have to set setCommitOffsetsOnCheckpoints(true), this only = affects how offsets are committed to Kafka in case other systems want to = check that offset. To get exactly once semantics you have two general = paths: 1) your sink is idempotent, meaning it doesn't matter whether you = write output multiple times 2) the sink has to be integrated with Flink = checkpointing and transactions. 2) was not easily possible for Kafka = until Kafka 0.11 introduced transaction support. Flink 1.4 will have a = Kafka 0.11 producer that supports transactions so with that you can have = end-to-end exactly once. 3. The advantage of externalised checkpoints is that they don't get = deleted when you cancel a job. This is different from regular = checkpoints, which get deleted when you manually cancel a job. There are = plans to make all checkpoints "externalised" in Flink 1.4. 4. Yes, you are correct. :-) Best, Aljoscha > On 28. Sep 2017, at 11:46, Federico D'Ambrosio = wrote: >=20 > Hi, I've got a couple of questions concerning the topics in the = subject: >=20 > 1. If an operator is getting applied on a keyed stream, do I still = have to implement the CheckpointedFunction trait and define the = snapshotState and initializeState methods, in order to successfully = recover the state from a job failure? > =20 > 2. While using a FlinkKafkaConsumer, enabling checkpointing allows = exactly once semantics end to end, provided that the sink is able to = guarantee the same. Do I have to set > setCommitOffsetsOnCheckpoints(true)? How would someone implement = exactly once semantics in a sink? >=20 > 3. What are the advantages of externalized checkpoints and which = are the cases where I would want to use them? > =20 > 4. Let's suppose a scenario where: checkpointing is enabled every = 10 seconds, I have a kafka consumer which is set to start from the = latest records, a sink providing at least once semantics and a stateful = keyed operator inbetween the consumer and the sink. Is it correct that, = in case of task failure, happens the following? > - the kafka consumer gets reverted to the latest offset (does = it happen even if I don't set setCommitOffsetsOnCheckpoints(true)?) > - the operator state gets reverted to the latest checkpoint > - the sink is stateless so it doesn't really care about what = happened > - the stream restarts and probably some of the events coming = to the sink have already been processed before >=20 > Thank you for attention, > Kind regards, > Federico