Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A0DC7200D35 for ; Tue, 24 Oct 2017 08:23:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9F63D160BF0; Tue, 24 Oct 2017 06:23:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6FA291609E0 for ; Tue, 24 Oct 2017 08:23:24 +0200 (CEST) Received: (qmail 47533 invoked by uid 500); 24 Oct 2017 06:23:23 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 47523 invoked by uid 99); 24 Oct 2017 06:23:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Oct 2017 06:23:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 5F10F1A1391 for ; Tue, 24 Oct 2017 06:23:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.193 X-Spam-Level: *** X-Spam-Status: No, score=3.193 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id kBi4TMbLwjDq for ; Tue, 24 Oct 2017 06:23:19 +0000 (UTC) Received: from mail-ua0-f181.google.com (mail-ua0-f181.google.com [209.85.217.181]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 96D975FC1C for ; Tue, 24 Oct 2017 06:23:19 +0000 (UTC) Received: by mail-ua0-f181.google.com with SMTP id e46so14647092uaa.4 for ; Mon, 23 Oct 2017 23:23:19 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=GIpUrDWYrCF5UMQkNwbDdwI4fNsVyCMkimPnx4NzsI8=; b=Y8HCy15BxRNOysFeZ3DGKZeD6beITVooWQ4edZdfUQMy75vm0yFztRfvXvFy/46EZI D7QMYgOvVkHJxhnLz6hmVsbVWIr7cCE/z+h+icj4W3f5dQdi6mkmI3uxAi1NJFAaw+Ak m52Jkd55dSwg+vCpp3WtyNVwK8IQ9YZZ8MyT4TmZEY834i41pGaf2ZD3T011NWHmblfj LpihnbsIxxSe0/qwlykjWgn45Nh1TsLkJsVyxEq+cPffzxxJj+ZUbr8N8DWfRjf0BKcz N3jtMaMtHg60ww6/dKf+xby22zVz/Cyg/I3QlQCtFbUn9DrzK6T4GDmDLfnSAqOFhNdb UzWQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=GIpUrDWYrCF5UMQkNwbDdwI4fNsVyCMkimPnx4NzsI8=; b=IFu2njRRUWB5CGkiKd+aGPv9JeNiQECaeE04vJvPhfIyrtiImu1SZDSEaWF+EFzHH6 edpaIt09ejs4E7WD1T8rMSODLXYGvPbojBV3XNXXF757/x/Zph7xnxy7JTUKuss88Ego wKRZW683AohkCeM3Xcb71XyQ2HiOVjkLuM7hG/dF4lFsJVBQGAe11AIO3wZRSXJsPSqE eh5wHPs8GBoSeDPDwX6/o/+88MSQKp6mOWPS/IsLpf+jUBhoHuDc5+38BXu6Zuu60HP6 j8fbrcseZriPJJaKNN0g1WyXA1NbeXVGSk7GsAOqiFmF801JXePsq1EK2fYoBdawrMm3 RAfQ== X-Gm-Message-State: AMCzsaWU1wyMml9LH5G1le6LuGrQTXzHqMXJYSk/p875wtx67TAvbuwx 0IRWjzoD2KEk9q1WslD73sMyKH6XebgO0SH2ZEA= X-Google-Smtp-Source: ABhQp+TNVNpUScDZja4Q8oD6CoI2c2ZKFZKtq9e/CV9o9Pqi53E68JEP8+sL8bLYFiRPRNau3CJPVaoi1EJeAq/CzCk= X-Received: by 10.176.81.68 with SMTP id f4mr11521875uaa.52.1508826199159; Mon, 23 Oct 2017 23:23:19 -0700 (PDT) MIME-Version: 1.0 Received: by 10.176.20.193 with HTTP; Mon, 23 Oct 2017 23:22:58 -0700 (PDT) In-Reply-To: References: <70048DDD-E1C5-44FB-B254-54C2CAC9C029@apache.org> From: vipul singh Date: Mon, 23 Oct 2017 23:22:58 -0700 Message-ID: Subject: Re: Questions about checkpoints/savepoints To: Tony Wei Cc: Aljoscha Krettek , Stefan Richter , user Content-Type: multipart/alternative; boundary="94eb2c191318a0f843055c44f91d" archived-at: Tue, 24 Oct 2017 06:23:25 -0000 --94eb2c191318a0f843055c44f91d Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Thanks Tony, that was the issue. I was thinking that when we use Rocksdb and provide an s3 path, it uses externalized checkpoints by default. Thanks so much! I have one followup question. Say in above case, I terminate the cluster, and since the metadata is on s3, and not on local storage, does flink avoid read after write consistency of s3? Would it be a valid concern, or we handle that case in externalized checkpoints as well, and dont deal with file system operations while dealing with retrieving externalized checkpoints on s3. Thanks, Vipul On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei wrote: > Hi, > > Did you enable externalized checkpoints? [1] > > Best, > Tony Wei > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.3/setup/checkpoints.html#externalized-checkpoints > > 2017-10-24 13:07 GMT+08:00 vipul singh : > >> Thanks Aljoscha for the answer above. >> >> I am experimenting with savepoints and checkpoints on my end, so that we >> built fault tolerant application with exactly once semantics. >> >> I have been able to test various scenarios, but have doubts about one us= e >> case. >> >> My app is running on an emr cluster, and I am trying to test the case >> when a emr cluster is terminated. I have read that >> *state.checkpoints.dir *is responsible for storing metadata information, >> and links to data files in *state.backend.fs.checkpointdir.* >> >> For my application I have configured both >> *state.backend.fs.checkpointdir* and *state.checkpoints.dir* >> >> Also I have the following in my main app: >> >> env.enableCheckpointing(CHECKPOINT_TIME_MS) >> >> val CHECKPOINT_LOCATION =3D s"s3://${config.s3Bucket}/${config.s3BasePat= h}/${config.s3ExtensionPath}/checkpoints/rocksdb" >> >> val backend:RocksDBStateBackend =3D >> new RocksDBStateBackend(CHECKPOINT_LOCATION) >> >> env.setStateBackend(backend) >> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAU= SE) >> env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS) >> env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCU= RRENT) >> >> >> In the application startup logs I can see >> *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values >> being loaded. However when the checkpoint happens I dont see any content= in >> the metadata dir. Is there something I am missing? Please let me know. I= am >> using flink version 1.3 >> >> Thanks, >> Vipul >> >> >> >> On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek >> wrote: >> >>> Hi, >>> >>> Flink does not rely on file system operations to list contents, all >>> necessary file paths are stored in the meta data file, as you guessed. = This >>> is the reason savepoints also work with file systems that "only" have >>> read-after-write consistency. >>> >>> Best, >>> Aljoscha >>> >>> >>> On 10. Oct 2017, at 03:01, vipul singh wrote: >>> >>> Thanks Stefan for the answers above. These are really helpful. >>> >>> I have a few followup questions: >>> >>> 1. I see my savepoints are created in a folder, which has a >>> _metadata file and another file. Looking at the code >>> >>> it seems like the metadata file contains tasks states, operator >>> state and master states >>> . >>> What is the purpose of the other file in the savepoint folder? My gu= ess is >>> it should be a checkpoint file? >>> 2. I am planning to use s3 as my state backend, so want to ensure >>> that application restarts are not affected by read-after-write consi= stency >>> of s3( if I use s3 as a savepoint backend). I am curious how flink r= estores >>> data from the _metadata file, and the other file? Does the _metadata= file >>> contain path to these other files? or would it do a listing on the s= 3 >>> folder? >>> >>> >>> Please let me know, >>> >>> Thanks, >>> Vipul >>> >>> On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richter < >>> s.richter@data-artisans.com> wrote: >>> >>>> Hi, >>>> >>>> I have answered your questions inline: >>>> >>>> >>>> 1. It seems to me that checkpoints can be treated as flink internal >>>> recovery mechanism, and savepoints act more as user-defined recover= y >>>> points. Would that be a correct assumption? >>>> >>>> You could see it that way, but I would describe savepoints more as >>>> user-defined *restart* points than *recovery* points. Please take a lo= ok at >>>> my answers in this thread, because they cover most of your question: >>>> >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>> ble.com/difference-between-checkpoints-amp-savepoints-td14787.html . >>>> >>>> >>>> 1. While cancelling an application with -s option, it specifies the >>>> savepoint location. Is there a way during application startup to id= entify >>>> the last know savepoint from a folder by itself, and restart from t= here. >>>> Since I am saving my savepoints on s3, I want to avoid issues arisi= ng from >>>> *ls* command on s3 due to read-after-write consistency of s3. >>>> >>>> I don=E2=80=99t think that this feature exists, you have to specify th= e >>>> savepoint. >>>> >>>> >>>> 1. Suppose my application has a checkpoint at point t1, and say i >>>> cancel this application sometime in future before the next availabl= e >>>> checkpoint( say t1+x). If I start the application without specifyin= g the >>>> savepoint, it will start from the last known checkpoint(at t1), whi= ch wont >>>> have the application state saved, since I had cancelled the applica= tion. >>>> Would this is a correct assumption? >>>> >>>> If you restart a canceled application it will not consider checkpoints= . >>>> They are only considered in recovery on failure. You need to specify a >>>> savepoint or externalized checkpoint for restarts to make explicit tha= t you >>>> intend to restart a job, and not to run a new instance of the job. >>>> >>>> >>>> 1. Would using ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION >>>> be same as manually saving regular savepoints? >>>> >>>> Not the same, because checkpoints and savepoints are different in >>>> certain aspects, but both methods leave you with something that surviv= es >>>> job cancelation and can be used to restart from a certain state. >>>> >>>> Best, >>>> Stefan >>>> >>>> >>> >>> >>> -- >>> Thanks, >>> Vipul >>> >>> >>> >> >> >> -- >> Thanks, >> Vipul >> > > --=20 Thanks, Vipul --94eb2c191318a0f843055c44f91d Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Thanks Tony, that was the issue. I was thinking that when = we use Rocksdb and provide an s3 path, it uses externalized checkpoints by = default. Thanks so much!

I have one followup question. S= ay in above case, I terminate the cluster, and since the metadata is on s3,= and not on local storage, does flink avoid read after write consistency of= s3? Would it be a valid concern, or we handle that case in externalized ch= eckpoints as well, and dont deal with file system operations while dealing = with retrieving externalized checkpoints on s3.=C2=A0

<= div>Thanks,
Vipul



On Mon, Oct 23, 2017 at= 11:00 PM, Tony Wei <tony19920430@gmail.com> wrote:
=
Hi,

Did = you enable externalized checkpoints? [1]


2017-10-24 13:07 GMT+08:00= vipul singh <neoeahit@gmail.com>:
Thanks Aljoscha for the answer above.

I am experimenting with savepoints and checkpoints on my end, so t= hat we built fault tolerant application with exactly once semantics.
<= div>
I have been able to test various scenarios, but have dou= bts about one use case.

My app is running on an em= r cluster, and I am trying to test the case when a emr cluster is terminate= d. I have read that state.checkpoints.dir=C2=A0is responsible for st= oring metadata information, and links to data files in state.backend.fs.= checkpointdir.

For my application I have configured both<= br>state.backend.fs.checkpointdir and state.checkpoints.dir
Also I have the following in my main app:
env.enableCheckpointing(CHECKPOINT_TIME_MS)

val CHECKPOINT_LOCATIO= N =3D s"s3://${config.s3Bucket}/${config.s3BasePath}/= ${config.s3ExtensionPath}= /checkpoints/rocksdb"

val = backend:RocksDBStateBackend =3D
new RocksDBStateBackend(CHECKPOINT_LOCA= TION)

env.setStateBackend(backend)
env.getCheckpointConfig.setMin= PauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
env.getCheckpointConfig.set= CheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)

In the application startup logs I can see state.backend.fs.checkpointd= ir and state.checkpoints.dir,=C2=A0values being loaded. However = when the checkpoint happens I dont see any content in the metadata dir. Is = there something I am missing? Please let me know. I am using flink version = 1.3

Thanks,
Vipul



On Tue, Oct 10, 2017 at 7:55 = AM, Aljoscha Krettek <aljoscha@apache.org> wrote:
Hi,

Flink does not rely on file system o= perations to list contents, all necessary file paths are stored in the meta= data file, as you guessed. This is the reason savepoints also work with fi= le systems that "only" have read-after-write consistency.

Best,
Aljoscha


On 10. Oct 2017, at 03:01, vipul singh <neoeahit@gmail.com> wrote:

Thanks Stefan for the answers abo= ve. These are really helpful.

I have a few followup ques= tions:
  1. I see my savepoints are created in a folder, which= has a _metadata file and another file. Looking at the code it seems like the metadata fi= le contains tasks stat= es, operator state and master states. What is the purpose of the other = file in the savepoint folder? My guess is it should be a checkpoint file?= =C2=A0
  2. I am planning to use s3 as my state backend, so want to ensu= re that application restarts are not affected by read-after-write consisten= cy of s3( if I use s3 as a savepoint backend). I am curious how flink resto= res data from the _metadata file, and the other file? Does the _metadata fi= le contain path to these other files? or would it do a listing on the s3 fo= lder?

Please let me know,

=
Thanks,
Vipul
On Tue, Sep 26, 2017 at 2:36 AM, Stefan Richte= r <s.richter@data-artisans.com> wrote:
Hi,

I have answered your questions inline:
  1. It seems to me that che= ckpoints can be treated as flink internal recovery mechanism, and savepoint= s act more as user-defined recovery points. Would that be a correct assumpt= ion?
You could see = it that way, but I would describe savepoints more as user-defined *restart*= points than *recovery* points. Please take a look at my answers in this th= read, because they cover most of your question:

http://apache-flink-user-mailing-list-archive.2336050.n4.nab= ble.com/difference-between-checkpoints-amp-savepoints-td14787.htm= l=C2=A0.

  1. While cancelling an application with -s option, i= t specifies the savepoint location. Is there a way during application start= up to identify the last know savepoint from a folder by itself, and restart= from there. Since I am saving my savepoints on s3, I want to avoid issues = arising from ls command on s3 due to read-after-write consistency of= s3.
I don=E2=80=99t th= ink that this feature exists, you have to specify the savepoint.

    <= li>Suppose my application has a checkpoint at point t1, and say i cancel th= is application sometime in future before the next available checkpoint( say= t1+x). If I start the application without specifying the savepoint, it wil= l start from the last known checkpoint(at t1), which wont have the applicat= ion state saved, since I had cancelled the application. Would this is a cor= rect assumption?
If you= restart a canceled application it will not consider checkpoints. They are = only considered in recovery on failure. You need to specify a savepoint or = externalized checkpoint for restarts to make explicit that you intend to re= start a job, and not to run a new instance of the job.

  1. Would u= sing=C2=A0<= code style=3D"box-sizing:border-box;font-family:Menlo,'Lucida Console&#= 39;,monospace;font-size:12.6px;padding:1px;border-top-left-radius:4px;borde= r-top-right-radius:4px;border-bottom-right-radius:4px;border-bottom-left-ra= dius:4px;background-position:initial initial;background-repeat:initial init= ial">ExternalizedCheckpointCleanup.RET= AIN_ON_CANCELLATION=C2=A0be same as manually savi= ng regular savepoints?=C2=A0
Not the same, because checkpoints and savepoints = are different in certain aspects, but both methods leave you with something= that survives job cancelation and can be used to restart from a certain st= ate.

Best,
Stefan




--
Thanks,<= /div>
Vipul



--
Thanks,
Vipul




--
=
Thanks,
Vipul
--94eb2c191318a0f843055c44f91d--