Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8CEA51829D for ; Thu, 30 Apr 2015 20:18:51 +0000 (UTC) Received: (qmail 82723 invoked by uid 500); 30 Apr 2015 20:18:51 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 82667 invoked by uid 500); 30 Apr 2015 20:18:51 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 82653 invoked by uid 99); 30 Apr 2015 20:18:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Apr 2015 20:18:51 +0000 X-ASF-Spam-Status: No, hits=2.2 required=5.0 tests=HTML_MESSAGE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: message received from 54.164.171.186 which is an MX secondary for dev@flink.apache.org) Received: from [54.164.171.186] (HELO mx1-us-east.apache.org) (54.164.171.186) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Apr 2015 20:18:45 +0000 Received: from mail-ig0-f179.google.com (mail-ig0-f179.google.com [209.85.213.179]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 8AFCD428ED for ; Thu, 30 Apr 2015 20:18:25 +0000 (UTC) Received: by igbyr2 with SMTP id yr2so24303228igb.0 for ; Thu, 30 Apr 2015 13:17:40 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=h6169Q475KH3RyLvYpl4Q6X3D1dF1Mo380H08Z98Cr8=; b=zsKhKyPASHi0o1TxEaSMs7W94/JRSM46oWaGv1H0pSF29YZirJ+p3Zy4+kFFRNfE8b o6aBecLuabirL116z9spKi9SL9NX5oLvHzVERhvCijARjtsdkNqLviIlzsUiqYQwaMvf n9SFGqAZRjV5BXRf5asRdkdkOXHMoldaVCdmS5oKckh2asw5ySA6JsGBVNr+TxeJriah Lg06vgUSs5ZffVn9Cxmp74EeNN8irNfOoj4iEeSVNDmoKXganEpGsC/H8faZEIiWCdFq 7NO56r5G4rT9Pqt6q5H1aj/WPzkVBSzIlBAJjpsr7oZYNHzC/q76j7EUjHwo1C9wfZFr KpVw== MIME-Version: 1.0 X-Received: by 10.50.13.67 with SMTP id f3mr6041783igc.12.1430425060141; Thu, 30 Apr 2015 13:17:40 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.64.25.108 with HTTP; Thu, 30 Apr 2015 13:17:40 -0700 (PDT) In-Reply-To: References: Date: Thu, 30 Apr 2015 22:17:40 +0200 X-Google-Sender-Auth: pWyMaAxCMFsdbAZ7BSlCZOZ_A80 Message-ID: Subject: Re: Making state in streaming more explicit From: Stephan Ewen To: "dev@flink.apache.org" Content-Type: multipart/alternative; boundary=089e0112ccf29674de0514f6c9a4 X-Virus-Checked: Checked by ClamAV on apache.org --089e0112ccf29674de0514f6c9a4 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable I think your assumption (and the current kafka source implementation) is that there is one state object that you update/mutate all the time. If you draw a snapshot state object at the time of checkpoint, the source can continue and that particular offset is remembered as the state of this checkpoint and can be committed to kafka/zookeeper later. On Thu, Apr 30, 2015 at 10:09 PM, Gyula F=C3=B3ra wr= ote: > Regarding the commits (for instance kafka offset): > > I dont exactly get how you mean to do this, if the source continues > processing after the checkpoint and before the commit, it will not know > what state has been committed exactly, so it would need to know the time = of > checkpoint and store a local copy. > > Gyula > > > On Thu, Apr 30, 2015 at 10:04 PM, Stephan Ewen wrote: > > > Thanks for the comments! > > > > Concerning acknowledging the checkpoint: > > > > The sinks need to definitely acknowledge it. > > If we asynchronously write the state of operator (and emit downstrea= m > > barriers before that is complete), > > then I think that we also need those operators to acknowledge the > > checkpoint. > > > > > > For the commit messages: > > > > My first thought was to send commit messages simply as actor message= s > > from the JobManager > > to the vertices that require these messages. That way, they are not > > stuck in the data flow with its possible latency. > > Also, in the data flow, messages get duplicated (at all to all > > connections). > > > > > > For iterative flows: > > > > Does the JobManager need to be aware of this, or can the IterationHead > > handle that transparently for the JobManager. > > From our last conversation, I recall: > > - Receive barriers, push out barriers > > - snapshot its state > > - wait for the barriers to come back through the backchannel > > - write the state snapshot plus the backchannel buffers > > - then only acknowledge the checkpoint > > > > My first impression is that this way the JobManager would not handle th= e > > IterationHead any different from all other stateful operators. > > > > Greetings, > > Stephan > > > > > > > > On Thu, Apr 30, 2015 at 9:27 PM, Paris Carbone wrote: > > > > > I agree with all suggestions, thanks for summing it up Stephan. > > > > > > A few more points I have in mind at the moment: > > > > > > - Regarding the acknowledgements, indeed we don=E2=80=99t need to mak= e all > > > operators commit back, we just have to make sure that all sinks have > > > acknowledged a checkpoint to consider it complete back at the > > coordinator. > > > > > > - Do you think we should broadcast commit responses to sources that > need > > > it after every successful checkpoint? The checkpoint interval does no= t > > > always match with the frequency we want to initiate a compaction for > > > example on Kafka. One alternative would be to make sources request a > > > successful checkpoint id via a future on demand. > > > > > > - We have to update the current checkpointing approach to cover > iterative > > > streams. We need to make sure we don=E2=80=99t send checkpoint reques= ts to > > > iteration heads and handle downstream backup for records in transit > > during > > > checkpoints accordingly. > > > > > > cheers > > > Paris > > > > > > > On 30 Apr 2015, at 20:47, Stephan Ewen wrote: > > > > > > > > I was looking into the handling of state in streaming operators, an= d > it > > > is > > > > a bit hidden from the system > > > > > > > > Right now, functions can (of they want) put some state into their > > > context. > > > > At runtime, state may occur or not. Before runtime, the system cann= ot > > > tell > > > > which operators are going to be stateful, and which are going to be > > > > stateless. > > > > > > > > I think it is a good idea to expose that. We can use that for > > > optimizations > > > > and we know which operators need to checkpoint state and acknowledg= e > > the > > > > asynchronous checkpoint. > > > > > > > > At this point, we need to assume that all operators need to send a > > > > confirmation message, which is unnecessary. > > > > > > > > Also, I think we should expose which operations want a "commit" > > > > notification after the checkpoint completed. Good examples are > > > > > > > > - the KafkaConsumer source, which can then commit the offset that = is > > > safe > > > > to zookeeper > > > > > > > > - a transactional KafkaProduce sink, which can commit a batch of > > > messages > > > > to the kafka partition once the checkpoint is done (to get exactly > once > > > > guarantees that include the sink) > > > > > > > > Comments welcome! > > > > > > > > Greetings, > > > > Stephan > > > > > > > > > --089e0112ccf29674de0514f6c9a4--