Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EFF61200C00 for ; Wed, 18 Jan 2017 17:22:49 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id EE868160B44; Wed, 18 Jan 2017 16:22:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 19DD1160B34 for ; Wed, 18 Jan 2017 17:22:48 +0100 (CET) Received: (qmail 60686 invoked by uid 500); 18 Jan 2017 16:22:48 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 60675 invoked by uid 99); 18 Jan 2017 16:22:47 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jan 2017 16:22:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 6B5D418066C for ; Wed, 18 Jan 2017 16:22:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.379 X-Spam-Level: ** X-Spam-Status: No, score=2.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id hkoMfi4DTQjx for ; Wed, 18 Jan 2017 16:22:46 +0000 (UTC) Received: from mail-wm0-f46.google.com (mail-wm0-f46.google.com [74.125.82.46]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 67E1A5F297 for ; Wed, 18 Jan 2017 16:22:45 +0000 (UTC) Received: by mail-wm0-f46.google.com with SMTP id r126so252121155wmr.0 for ; Wed, 18 Jan 2017 08:22:45 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=1N/OXEe94oMx0JYKiUY4/krv4xKf1PhO5XTutP604c0=; b=WyXghxVMv/HYrlE0D2b1bwLaGuWhWi3vqp98BGOfQlPBMiIXGfeTD3RE0BoqIXaBz0 6q12irDbRbmAEEDiv0ZSe4J/joiqFQlHthehFr8c20EPzackEA/1HaYctC6W5B5pUFD7 9bKkUhZXxUWD2UqWuT4gydPew2lccCRsogNRR4stcogFMIILA1ZhIrWY1L30MUkJ7bCo ca6DB92bACZlfaCB46T+kOXNByKuh/p76fjICXZs2fsHw3Ut8iiMWj/KtfRcdmErSE/f ipBZDXl29RdSMjykMaTZ/eFISuhfFu/QrfVNcQB3dn++Jn4LyEKsYH1BS0WxqtdcV3Dc JyFg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=1N/OXEe94oMx0JYKiUY4/krv4xKf1PhO5XTutP604c0=; b=mhbORmE5GI5oEjjil8SdLhrTp2PNA3NP3slBDbDvQuLsCqE6OxAwnUmhViiYyzY3nD ThnYLbIC7lia4bVOvwnvCAKGT1K0HNTlzsxtVzJWnzRtBYaj+wb7nYSf2hO6mWDRlfYt UbBE1pekqcRKqEScPkvXuh5Q3y1eQium0Euwo+m4s+vg7HnKRWe1nJt7nhZz1Jwx/myK //b2p4pY3qJObPtjb7kbIjbiH447N2b/1HQmpQ31nTS6PexNyegeOFWPbOj92S6bSxjU b/L3FIMd7+cjVd2LpPK8bM3in1yHVK0JHdMXD2y9Cy2p9S+vyoqwS+KMP+WI9bQmgk4P JgGw== X-Gm-Message-State: AIkVDXLdImE7QQrNLOzXLvdDeUyVQKeTb0ATBbWX1t7BwZ1SwkG8bzj8xX3k24da6oa8qBRxIZ2sLV7DqvNAhQ== X-Received: by 10.223.173.181 with SMTP id w50mr3542669wrc.177.1484756564847; Wed, 18 Jan 2017 08:22:44 -0800 (PST) MIME-Version: 1.0 Received: by 10.194.231.99 with HTTP; Wed, 18 Jan 2017 08:22:14 -0800 (PST) In-Reply-To: <254b025d-d990-de5c-e215-d836f1c1dd4c@gmail.com> References: <254b025d-d990-de5c-e215-d836f1c1dd4c@gmail.com> From: Fabian Hueske Date: Wed, 18 Jan 2017 17:22:14 +0100 Message-ID: Subject: Re: Window limitations on groupBy To: user@flink.apache.org Content-Type: multipart/alternative; boundary=f403045cf616a038d5054660d3dc archived-at: Wed, 18 Jan 2017 16:22:50 -0000 --f403045cf616a038d5054660d3dc Content-Type: text/plain; charset=UTF-8 Hi Raman, I would approach this issues as follows. You key the input stream on the sourceId and apply a stateful FlatMapFunction. The FlatMapFunction has a key-partioned state and stores for each key (sourceId) the latest event as state. When a new event arrives, you can compute the time spend in the last state by looking up the event from the state and the latest received event. Then you put the new event in the state. This solution works well if you have a finite number of sources or if you have an terminal event that signals that no more events will arrive for a key. Otherwise, the number of events stored in the state will grow infinitely and eventually become a problem. If the number of sources increases, you need to evict data at some point in time. A ProcessFunction can help here, because you can register a timer which you can use to evict up old state. Hope this helps, Fabian 2017-01-18 15:39 GMT+01:00 Raman Gupta : > I am investigating Flink. I am considering a relatively simple use > case -- I want to ingest streams of events that are essentially > timestamped state changes. These events may look something like: > > { > sourceId: 111, > state: OPEN, > timestamp: > } > > I want to apply various processing to these state change events, the > output of which can be used for analytics. For example: > > 1. average time spent in state, by state > 2. sources with longest (or shortest) time spent in OPEN state > > The time spent in each state may be days or even weeks. > > All the examples I have seen of similar logic involve windows on the > order of 15 minutes. Since time spent in each state may far exceed > these window sizes, I'm wondering what the best approach will be. > > One thought from reading the docs is to use `every` to operate on the > entire stream. But it seems like this will take longer and longer to > run as the event stream grows, so this is not an ideal solution. Or > does Flink apply some clever optimizations to avoid the potential > performance issue? > > Another thought was to split the event stream into multiple streams by > source, each of which will have a small (and limited) amount of data. > This will make processing each stream simpler, but since there can be > thousands of sources, it will result in a lot of streams to handle and > persist (probably in Kafka). This does not seem ideal either. > > It seems like this should be simple, but I'm struggling with > understanding how to solve it elegantly. > > Regards, > Raman > > --f403045cf616a038d5054660d3dc Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Raman,

I wou= ld approach this issues as follows.

You key the input stream o= n the sourceId and apply a stateful FlatMapFunction.
The FlatMapFu= nction has a key-partioned state and stores for each key (sourceId) the lat= est event as state.
When a new event arrives, you can compute the = time spend in the last state by looking up the event from the state and the= latest received event.
Then you put the new event in the state.
This solution works well if you have a finite number of so= urces or if you have an terminal event that signals that no more events wil= l arrive for a key.
Otherwise, the number of events stored in the state = will grow infinitely and eventually become a problem.

If the=C2=A0 n= umber of sources increases, you need to evict data at some point in time. A= ProcessFunction can help here, because you can register a timer which
= you can use to evict up old state.

Hope this helps,
Fabian

2017-01-18 15:39 GMT+01:00 Raman Gupta <= rocketraman@gmai= l.com>:
I am investigating = Flink. I am considering a relatively simple use
case -- I want to ingest streams of events that are essentially
timestamped state changes. These events may look something like:

{
=C2=A0 sourceId: 111,
=C2=A0 state: OPEN,
=C2=A0 timestamp: <date/time>
}

I want to apply various processing to these state change events, the
output of which can be used for analytics. For example:

1. average time spent in state, by state
2. sources with longest (or shortest) time spent in OPEN state

The time spent in each state may be days or even weeks.

All the examples I have seen of similar logic involve windows on the
order of 15 minutes. Since time spent in each state may far exceed
these window sizes, I'm wondering what the best approach will be.

One thought from reading the docs is to use `every` to operate on the
entire stream. But it seems like this will take longer and longer to
run as the event stream grows, so this is not an ideal solution. Or
does Flink apply some clever optimizations to avoid the potential
performance issue?

Another thought was to split the event stream into multiple streams by
source, each of which will have a small (and limited) amount of data.
This will make processing each stream simpler, but since there can be
thousands of sources, it will result in a lot of streams to handle and
persist (probably in Kafka). This does not seem ideal either.

It seems like this should be simple, but I'm struggling with
understanding how to solve it elegantly.

Regards,
Raman


--f403045cf616a038d5054660d3dc--