Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 84D31200C01 for ; Thu, 19 Jan 2017 21:37:25 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 83688160B54; Thu, 19 Jan 2017 20:37:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7E5CA160B3A for ; Thu, 19 Jan 2017 21:37:24 +0100 (CET) Received: (qmail 21646 invoked by uid 500); 19 Jan 2017 20:37:18 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 21636 invoked by uid 99); 19 Jan 2017 20:37:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jan 2017 20:37:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D398E1A057B for ; Thu, 19 Jan 2017 20:37:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.379 X-Spam-Level: *** X-Spam-Status: No, score=3.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id iPbrJYAAlMJc for ; Thu, 19 Jan 2017 20:37:11 +0000 (UTC) Received: from mail-wm0-f41.google.com (mail-wm0-f41.google.com [74.125.82.41]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 0B1775F3F5 for ; Thu, 19 Jan 2017 20:37:11 +0000 (UTC) Received: by mail-wm0-f41.google.com with SMTP id c206so12064411wme.0 for ; Thu, 19 Jan 2017 12:37:10 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=1CxjF5IHISyRL0qr4i1QGLVr0wJeQRsHM55pwIIpQ/Q=; b=AVTgRSdNoLEzZofZkGXGvWK5FTy9vLG0zVdBfQS3dFzsyVqWT21f3p8gqGgLRW5i8a cxtsaU+ea4DsDECuhBuPKWifB8K6W7gyYErBGFZmwSuv/zMctWjOSp0V3+KdseuW7ZTE XnNXa8N9E8K8e9LZXfyF1AT4TkmodtDTP0L2l5QmGNO64hhlBd4IJN34QfOupY4MTcUr XTRUjnaAGUV4OZpSne8SfhWAIBe6dHzun+bMaL7Pjv0OBNdNO+7Q6JrO1bdFH+hQSJHc f5/ZANNMe3C2FoFxiZjXtD2Zk7Jsx5tIi/4Rlz+c6cCLZSvwR0oOn5dqGdFmRXcii7og mcUg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=1CxjF5IHISyRL0qr4i1QGLVr0wJeQRsHM55pwIIpQ/Q=; b=OM4cucMB7M6AoKmu5OiSY65Xc5qvM5KGaGlqvpb6ckOxGsNEJPpYAVA8k2VWlcOH0l HSLqB0cgM/kJ5+p4qQ30r4YJeUQzIIy/ZN6A4eAgMQGLC0ckzl+r4nTIZYKRoAfddvSJ RvCLyedml1bImqq6Sh2EDWE6TXw5KE7zYizSe/NkfUnLxD5xqCyeqBpZtpx/szCxHlX+ dgwdqRvTEHDFZqdMrzevWdw/BxOvGYt24pPOIn9FhPQKppGlTaxKhx1+BHGXBiTdVLH7 gLJaCmeCLe1e/0Yy+MeElktmmhbZ00xO9TZmVP/NyxHauEj3efM5rMkgj7CjqIqR2Xti FGww== X-Gm-Message-State: AIkVDXLO+RnoLI9KZpdUi3gP51uJI8gYQJiME2phQ6H7Xqef2YC9j3XOj/WapnvMhWqoS9Yfp2+A+AhF15hIUQ== X-Received: by 10.28.169.209 with SMTP id s200mr363864wme.9.1484858223357; Thu, 19 Jan 2017 12:37:03 -0800 (PST) MIME-Version: 1.0 Received: by 10.194.231.99 with HTTP; Thu, 19 Jan 2017 12:36:32 -0800 (PST) In-Reply-To: References: <254b025d-d990-de5c-e215-d836f1c1dd4c@gmail.com> From: Fabian Hueske Date: Thu, 19 Jan 2017 21:36:32 +0100 Message-ID: Subject: Re: Operational concerns with state (was Re: Window limitations on groupBy) To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114b414ef1fa4b0546787e5d archived-at: Thu, 19 Jan 2017 20:37:25 -0000 --001a114b414ef1fa4b0546787e5d Content-Type: text/plain; charset=UTF-8 Hi Raman, Checkpoints are used to recover from task or process failures and usually automatically taken at periodic intervals if configured correctly. Checkpoints are usually removed when a more recent checkpoint is completed (the exact policy can be configured). Savepoints are used to restart a job that was previously shutdown, to migrate a job to another cluster (e.g., when upgrading Flink), updating the job itself etc. So more for planned maintenance. Nonetheless they can also be used for more coarse-grained fault tolerance and it is a common practice to periodically trigger a savepoint. These blog posts might be helpful to understand the potential of savepoints [1] [2]. Best, Fabian [1] http://data-artisans.com/turning-back-time-savepoints/ [2] http://data-artisans.com/savepoints-part-2-updating-applications/ 2017-01-19 19:02 GMT+01:00 Raman Gupta : > I was able to get it working well with the original approach you > described. Thanks! Note that the documentation on how to do this with the > Java API is... sparse, to say the least. I was able to look at the > implementation of the scala flatMapWithState function as a starting point. > > Now I'm trying to understand all the operational concerns related to the > stored state. My checkpoints are in rocksdb configured via the job > definition. > > It seems that the checkpointed state of the streaming job is lost when I > stop and restart flink normally, or Flink terminates abnormally and is > restarted. I was able to take an explicit savepoint and then restart the > job with it. > > Is the correct approach as of now to take savepoints periodically via > cron, and use those to re-run jobs in case of flink failure or restart? > > Regards, > Raman > > On 19/01/17 05:43 AM, Fabian Hueske wrote: > > Hi Raman, > > I think you would need a sliding count window of size 2 with slide 1. > This is basically a GlobalWindow with a special trigger. > > However, you would need to modify the custom trigger to be able to > - identify a terminal event (if there is such a thing) or to > - close the window after a certain period of inactivity to clean up the > state. > > Best, Fabian > > 2017-01-19 1:43 GMT+01:00 Raman Gupta : > >> Thank you for your reply. >> >> If I were to use a keyed stream with a count-based window of 2, would >> Flink keep the last state persistently until the next state is >> received? Would this be another way of having Flink keep this >> information persistently without having to implement it manually? >> >> Thanks, >> Raman >> >> On 18/01/17 11:22 AM, Fabian Hueske wrote: >> > Hi Raman, >> > >> > I would approach this issues as follows. >> > >> > You key the input stream on the sourceId and apply a stateful >> > FlatMapFunction. >> > The FlatMapFunction has a key-partioned state and stores for each key >> > (sourceId) the latest event as state. >> > When a new event arrives, you can compute the time spend in the last >> > state by looking up the event from the state and the latest received >> > event. >> > Then you put the new event in the state. >> > >> > This solution works well if you have a finite number of sources or if >> > you have an terminal event that signals that no more events will >> > arrive for a key. >> > Otherwise, the number of events stored in the state will grow >> > infinitely and eventually become a problem. >> > >> > If the number of sources increases, you need to evict data at some >> > point in time. A ProcessFunction can help here, because you can >> > register a timer which >> > you can use to evict up old state. >> > >> > Hope this helps, >> > Fabian >> > >> > 2017-01-18 15:39 GMT+01:00 Raman Gupta > > >: >> > >> > I am investigating Flink. I am considering a relatively simple use >> > case -- I want to ingest streams of events that are essentially >> > timestamped state changes. These events may look something like: >> > >> > { >> > sourceId: 111, >> > state: OPEN, >> > timestamp: >> > } >> > >> > I want to apply various processing to these state change events, the >> > output of which can be used for analytics. For example: >> > >> > 1. average time spent in state, by state >> > 2. sources with longest (or shortest) time spent in OPEN state >> > >> > The time spent in each state may be days or even weeks. >> > >> > All the examples I have seen of similar logic involve windows on the >> > order of 15 minutes. Since time spent in each state may far exceed >> > these window sizes, I'm wondering what the best approach will be. >> > >> > One thought from reading the docs is to use `every` to operate on >> the >> > entire stream. But it seems like this will take longer and longer to >> > run as the event stream grows, so this is not an ideal solution. Or >> > does Flink apply some clever optimizations to avoid the potential >> > performance issue? >> > >> > Another thought was to split the event stream into multiple streams >> by >> > source, each of which will have a small (and limited) amount of >> data. >> > This will make processing each stream simpler, but since there can >> be >> > thousands of sources, it will result in a lot of streams to handle >> and >> > persist (probably in Kafka). This does not seem ideal either. >> > >> > It seems like this should be simple, but I'm struggling with >> > understanding how to solve it elegantly. >> > >> > Regards, >> > Raman >> > >> > >> > > > --001a114b414ef1fa4b0546787e5d Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Raman,

Checkpoints are used= to recover from task or process failures and usually automatically taken a= t periodic intervals if configured correctly.
Checkpoints are= usually removed when a more recent checkpoint is completed (the exact poli= cy can be configured).

Savepoints are used to restart a job th= at was previously shutdown, to migrate a job to another cluster (e.g., when= upgrading Flink), updating the job itself etc. So more for planned mainten= ance.
Nonetheless they can also be used for more coarse-grain= ed fault tolerance and it is a common practice to periodically trigger a sa= vepoint.

These blog posts might be helpful to understand = the potential of savepoints [1] [2].

=

2017-01-19 19:02 = GMT+01:00 Raman Gupta <rocketraman@gmail.com>:
=20 =20 =20
I was able to get it working well with the original approach you described. Thanks! Note that the documentation on how to do this with the Java API is... sparse, to say the least. I was able to look at the implementation of the scala flatMapWithState function as a starting point.

Now I'm trying to understand all the operational concerns related t= o the stored state. My checkpoints are in rocksdb configured via the job definition.

It seems that the checkpointed state of the streaming job is lost when I stop and restart flink normally, or Flink terminates abnormally and is restarted. I was able to take an explicit savepoint and then restart the job with it.

Is the correct approach as of now to take savepoints periodically via cron, and use those to re-run jobs in case of flink failure or restart?

Regards,
Raman

On 19/01/17 05:43 A= M, Fabian Hueske wrote:
Hi Raman,

I think you would need a sliding count window of size 2 with slide 1.
This is basically a GlobalWindow with a special trigger.

However, you would need to modify the custom trigger to be able to
- identify a terminal event (if there is such a thing) or to
- close the window after a certain period of inactivity to clean up the state.

Best, Fabian

2017-01-19 1:43 GMT+01:00 Raman Gupta <rocketraman@gmail.com>:
Thank you for your reply.

If I were to use a keyed stream with a count-based window of 2, would
Flink keep the last state persistently until the next state is
received? Would this be another way of having Flink keep this
information persistently without having to implement it manually?

Thanks,
Raman

On 18/01/17 11:22 AM, Fabian Hueske wrote:
> Hi Raman,
>
> I would approach this issues as follows.
>
> You key the input stream on the sourceId and apply a stateful
> FlatMapFunction.
> The FlatMapFunction has a key-partioned state and stores for each key
> (sourceId) the latest event as state.
> When a new event arrives, you can compute the time spend in the last
> state by looking up the event from the state and the latest received
> event.
> Then you put the new event in the state.
>
> This solution works well if you have a finite number of sources or if
> you have an terminal event that signals that no more events will
> arrive for a key.
> Otherwise, the number of events stored in the state will grow
> infinitely and eventually become a problem.
>
> If the=C2=A0 number of sources increases, you need to evict data at some
> point in time. A ProcessFunction can help here, because you can
> register a timer which
> you can use to evict up old state.
>
> Hope this helps,
> Fabian
>
> 2017-01-18 15:39 GMT+01:00 Raman Gupta <rocketraman@gmail.com
> <mailto:rocketraman@gmail.com>>:
>
>=C2=A0 =C2=A0 =C2=A0I am investigating Flink. I am cons= idering a relatively simple use
>=C2=A0 =C2=A0 =C2=A0case -- I want to ingest streams of= events that are essentially
>=C2=A0 =C2=A0 =C2=A0timestamped state changes. These ev= ents may look something like:
>
>=C2=A0 =C2=A0 =C2=A0{
>=C2=A0 =C2=A0 =C2=A0 =C2=A0sourceId: 111,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0state: OPEN,
>=C2=A0 =C2=A0 =C2=A0 =C2=A0timestamp: <date/time>=
>=C2=A0 =C2=A0 =C2=A0}
>
>=C2=A0 =C2=A0 =C2=A0I want to apply various processing = to these state change events, the
>=C2=A0 =C2=A0 =C2=A0output of which can be used for ana= lytics. For example:
>
>=C2=A0 =C2=A0 =C2=A01. average time spent in state, by = state
>=C2=A0 =C2=A0 =C2=A02. sources with longest (or shortes= t) time spent in OPEN state
>
>=C2=A0 =C2=A0 =C2=A0The time spent in each state may be= days or even weeks.
>
>=C2=A0 =C2=A0 =C2=A0All the examples I have seen of sim= ilar logic involve windows on the
>=C2=A0 =C2=A0 =C2=A0order of 15 minutes. Since time spe= nt in each state may far exceed
>=C2=A0 =C2=A0 =C2=A0these window sizes, I'm wonderi= ng what the best approach will be.
>
>=C2=A0 =C2=A0 =C2=A0One thought from reading the docs i= s to use `every` to operate on the
>=C2=A0 =C2=A0 =C2=A0entire stream. But it seems like th= is will take longer and longer to
>=C2=A0 =C2=A0 =C2=A0run as the event stream grows, so t= his is not an ideal solution. Or
>=C2=A0 =C2=A0 =C2=A0does Flink apply some clever optimi= zations to avoid the potential
>=C2=A0 =C2=A0 =C2=A0performance issue?
>
>=C2=A0 =C2=A0 =C2=A0Another thought was to split the ev= ent stream into multiple streams by
>=C2=A0 =C2=A0 =C2=A0source, each of which will have a s= mall (and limited) amount of data.
>=C2=A0 =C2=A0 =C2=A0This will make processing each stre= am simpler, but since there can be
>=C2=A0 =C2=A0 =C2=A0thousands of sources, it will resul= t in a lot of streams to handle and
>=C2=A0 =C2=A0 =C2=A0persist (probably in Kafka). This d= oes not seem ideal either.
>
>=C2=A0 =C2=A0 =C2=A0It seems like this should be simple= , but I'm struggling with
>=C2=A0 =C2=A0 =C2=A0understanding how to solve it elega= ntly.
>
>=C2=A0 =C2=A0 =C2=A0Regards,
>=C2=A0 =C2=A0 =C2=A0Raman
>
>



--001a114b414ef1fa4b0546787e5d--