Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 34E01200C25 for ; Fri, 24 Feb 2017 18:38:14 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 31C34160B69; Fri, 24 Feb 2017 17:38:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0956E160B62 for ; Fri, 24 Feb 2017 18:38:12 +0100 (CET) Received: (qmail 58121 invoked by uid 500); 24 Feb 2017 17:38:12 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 58111 invoked by uid 99); 24 Feb 2017 17:38:12 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Feb 2017 17:38:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 9F69EC0744 for ; Fri, 24 Feb 2017 17:38:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.133 X-Spam-Level: *** X-Spam-Status: No, score=3.133 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_NEUTRAL=0.652, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=kpibench-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id OyYA6XURUneo for ; Fri, 24 Feb 2017 17:38:09 +0000 (UTC) Received: from mail-lf0-f47.google.com (mail-lf0-f47.google.com [209.85.215.47]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 6DB335FE25 for ; Fri, 24 Feb 2017 17:38:08 +0000 (UTC) Received: by mail-lf0-f47.google.com with SMTP id l12so11770855lfe.0 for ; Fri, 24 Feb 2017 09:38:08 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=kpibench-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=XJqOAAxw0eS5XIvQp6xsWH982MpPesMzSvGyBls9Dr8=; b=ZA7zFbh4MNYIX773uA0LAhCcueKsfrEivE3NPC3TJ+9kmajEal3GLEFf7hyd5W1QZK YuHVXjLGmHjSC1C4bywqPKTUmg6Gk9sJtJVGyu/em/vbxMtFF8tgV0Y4Zo/h8tmbYwS6 Kfaod4gmsFx9eydxcoQxkPOMqvg0poEf00jGC0BaBNWjmZvZfVEsjP4gFXjZKkP7ch5K E0mXEa2IAjbElqx1m6iK9tBzdQozLEhwYqiYz6tTh8XqKQSHW82cBPGsNAGyZ3zUnBCW E44qI8Y8bdTPI9DTBmmjyYej5pONJ/0iFOiqMBsYyqhqobNh/D6lKio/Yq7Wgab6EIiI fWow== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=XJqOAAxw0eS5XIvQp6xsWH982MpPesMzSvGyBls9Dr8=; b=p6RMIvMFGzGlNAlC35594dl2GzBGc6CqGiaPEXVVOaqiqp0YcYWWnmXuYJBZj4YoXU IZAif1BGOHLZwWMeN+LU1EqZUBN7buwTerKHSDY40nU9Kc6f+7P/3UbMd17bRVr4aVQR i7OiWNDUB2yZLbNiFZ7jg/qjdRO21lgczEMpCwUb75ZkimaoUNWp5SGpSk/djLiHP6Kd vqImEysX5l6RqXbjL9PxtVoFZterGzTkNNhkksr7lVgXZy7jhG+vbxjIxPVoj75H1HxS oVS4/HHALrjdIqH6NXfYm/YdBYfuEQCXjHallLg78qGxhr8uBZge0gtXD2l+WtX9mXGT 7kxA== X-Gm-Message-State: AMke39nl32/0pzMOYTobH6bzm/BYL3TsTFyw9/eUyLhMTqu23XUwMmIctY5HHYT8xLmAnXaqzSCehOJVv6KWJQ== X-Received: by 10.25.21.214 with SMTP id 83mr1190871lfv.66.1487957879526; Fri, 24 Feb 2017 09:37:59 -0800 (PST) MIME-Version: 1.0 Received: by 10.25.150.68 with HTTP; Fri, 24 Feb 2017 09:37:59 -0800 (PST) X-Originating-IP: [81.10.146.108] In-Reply-To: References: From: Patrick Brunmayr Date: Fri, 24 Feb 2017 18:37:59 +0100 Message-ID: Subject: Re: Flink the right tool for the job ? Huge Data window lateness To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114076b0d9ab9a05494a3035 archived-at: Fri, 24 Feb 2017 17:38:14 -0000 --001a114076b0d9ab9a05494a3035 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Yes it is and would be nice to handle this with Flink :) - *Size of data point* The size of a data point is basically just a simple case class with two fields in a 64bit OS case class MachineData(sensorId: String, eventTime:Long) *- Last write wins* We have cassandra as data warehouse but i was hoping i could solve that issue in the state level rather than in the db level. The reason beeing is one could send me the same events over and over again and this will cause that state to blow up until out of memory. Secondly by doing aggregations per sensor results will be wrong due multiple events with the same timestamp. thx 2017-02-24 17:47 GMT+01:00 Robert Metzger : > Hi, > sounds like a cool project. > > What's the size of one data point? > If one datapoint is 2 kb, you'll have 100 800 000 * 2048 bytes =3D 206 > gigabytes of state. That's something one or two machines (depending on th= e > disk throughput) should be able to handle. > > If possible, I would recommend you to do an experiment using a prototype > to see how many machines you need for your workload. > > On Fri, Feb 24, 2017 at 5:41 PM, Tzu-Li (Gordon) Tai > wrote: > >> Hi Patrick, >> >> Thanks a lot for feedback on your use case! At a first glance, I would >> say that Flink can definitely solve the issues you are evaluating. >> >> I=E2=80=99ll try to explain them, and point you to some docs / articles = that can >> further explain in detail: >> >> *- Lateness* >> >> The 7-day lateness shouldn=E2=80=99t be a problem. We definitely recomme= nd >> using RocksDB as the state backend for such a use case, as you >> mentioned correctly, the state would be kept for a long time. >> The heavy burst when your locally buffered data on machines are >> sent to Kafka once they come back online shouldn=E2=80=99t be a problem = either; >> since Flink is a pure data streaming engine, it handles backpressure >> naturally without any additional mechanisms (I would recommend >> taking a look at http://data-artisans.com/how-flink-handles-backpressure= / >> ). >> >> *- Out of Order* >> >> That=E2=80=99s exactly what event time processing is for :-) As long as = the event >> comes in before the allowed lateness for windows, the event will still >> fall >> into its corresponding event time window. So, even with the heavy burst = of >> the your late machine data, they will still be aggregated in the correct >> windows. >> You can look into event time in Flink with more detail in the event time >> docs: >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >> dev/event_time.html >> >> *- Last write wins* >> >> Your operators that does the aggregations simply need to be able to >> reprocess >> results if it sees an event with the same id come in. Now, if results ar= e >> sent out >> of Flink and stored in an external db, if you can design the db writes t= o >> be idempotent, >> then it=E2=80=99ll effectively be a =E2=80=9Clast write wins=E2=80=9D. I= t depends mostly on your >> pipeline and >> use case. >> >> *- Computations per minute* >> I think you can simply do this by having two separate window operators. >> One that works on your longer window, and another on a per-minute basis. >> >> Hope this helps! >> >> - Gordon >> >> On February 24, 2017 at 10:49:14 PM, Patrick Brunmayr (jay@kpibench.com) >> wrote: >> >> Hello >> >> I've done my first steps with Flink and i am very impressed of its >> capabilities. Thank you for that :) I want to use it for a project we ar= e >> currently working on. After reading some documentation >> i am not sure if it's the right tool for the job. We have an IoT >> application in which we are monitoring machines in production plants. Th= e >> machines have sensors attached and they are sending >> their data to a broker ( Kafka, Azure Iot Hub ) currently on a per minut= e >> basis. >> >> Following requirements must be fulfilled >> >> >> - Lateness >> >> We have to allow lateness for 7 days because machines can have down >> time due network issues, maintenance or something else. If thats the = case >> buffering of data happens localy on the machine and once they >> are online again all data will be sent to the broker. This can result >> in some relly heavy burst. >> >> >> - Out of order >> >> Events come out of order due this lateness issues >> >> >> - Last write wins >> >> Machines are not stateful and can not guarantee exactly once sending >> of their data. It can happen that sometimes events are sent twice. In= that >> case the last event wins and should override the previous one. >> Events are unique due a sensor_id and a timestamp >> >> - Computations per minute >> >> We can not wait until the windows ends and have to do computations on >> a per minute basis. For example aggregating data per sensor and writi= ng it >> to a db >> >> >> My biggest concern in that case is the huge lateness. Keeping data for 7 >> days would result in 10080 data points for just one sensor! Multiplying >> that by 10.000 sensors would result in 100800000 datapoints which Flink >> would have to handle in its state. The number of sensors are constantly >> growing so will the number of data points >> >> So my questions are >> >> >> - Is Flink the right tool for the Job ? >> >> - Is that lateness an issue ? >> >> - How can i implement the Last write wins ? >> >> - How to tune flink to handle that growing load of sensors and data >> points ? >> >> - Hardware requirements, storage and memory size ? >> >> >> >> I don't want to maintain two code base for batch and streaming because >> the operations are all equal. The only difference is the time range! Tha= ts >> the reason i wanted to do all this with Flink Streaming. >> >> Hope you can guide me in the right direction >> >> Thx >> >> >> >> >> >> >> >> > --001a114076b0d9ab9a05494a3035 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi

Yes it is and would be nice to handl= e this with Flink :)

- Size of data point

The size of a data point is basically just a simple case class with t= wo fields in a 64bit OS

case class MachineData(sens= orId: String, eventTime:Long)

- Last write wins

We have cassandra as= data warehouse but i was hoping i could solve that issue in the state leve= l rather than in the db level. The reason beeing is one could send me the s= ame events
over and over again and this will cause that state to blow up= until out of memory. Secondly by doing aggregations per sensor results wil= l be wrong due multiple events with the same
timestamp.

thx





2017-02-= 24 17:47 GMT+01:00 Robert Metzger <rmetzger@apache.org>:
Hi,
sounds like a coo= l project.

What's the size of one data point?<= /div>
If one datapoint is 2 kb, you'll have=C2=A0100 800 000 * 2048= bytes =3D 206 gigabytes of state. That's something one or two machines= (depending on the disk throughput) should be able to handle.

If possible, I would recommend you to do an experiment usin= g a prototype to see how many machines you need for your workload.

<= div class=3D"gmail_quote">On Fri, Feb 24, 2017 at 5:41 PM, Tzu-Li (Gordon) = Tai <tzulitai@apache.org> wrote:
Hi P= atrick,

= Thanks a lot for feedback on your use case! At a first glance, I would say = that Flink can definitely solve the issues you are evaluating.

I=E2=80=99ll try to = explain them, and point you to some docs / articles that can further explai= n in detail:

- Lateness

The 7-day = lateness shouldn=E2=80=99t be a problem. We definitely recommend
using = RocksDB as the state backend for such a use case, as you
mentione= d correctly, the state would be kept for a long time.
The heavy b= urst when your locally buffered data on machines are
sent to Kafk= a once they come back online shouldn=E2=80=99t be a problem either;
since Flink is a pure data streaming engine, it handles backpressure
naturally without any additional mechanisms (I would recommend
=

- Out of Order

That=E2=80=99s exactly what event time proce= ssing is for :-) As long as the event
comes in before the allowed= lateness for windows, the event will still fall
into its corresp= onding event time window. So, even with the heavy burst of
the yo= ur late machine data, they will still be aggregated in the correct windows.=
You can look into event time in Flink with more detail in the ev= ent time docs:

- Computati= ons per minute

Hope this helps!

- Gordon


On February 24,= 2017 at 10:49:14 PM, Patrick Brunmayr (jay@kpibench.com) wrote:

Hello

I've done my first steps with Flink and i am very impressed of its capabilities. Thank you for that :) I want to use it for a project we are currently working on. After reading some documentation
i am not sure if it's the right tool for the job. We have an IoT application in which we are monitoring machines in production plants. The machines have sensors attached and they are sending
their data to a broker ( Kafka, Azure Iot Hub ) currently on a per minute basis.

Following requirements must be fulfilled

  • Lateness

    We have to allow lateness for 7 days because machines can have down time due network issues, maintenance or something else. If thats the case buffering of data happens localy on the machine and once they
    are online again all data will be sent to the broker. This can result in some relly heavy burst.


  • Out of order

    Events come out of order due this lateness issues


  • Last write wins

    Machines are not stateful and can not guarantee exactly once sending of their data. It can happen that sometimes events are sent twice. In that case the last event wins and should override the previous one.
    Events are unique due a sensor_id and a timestamp

  • Computations per minute

    We can not wait until the windows ends and have to do computations on a per minute basis. For example aggregating data per sensor and writing it to a db

My biggest concern in that case is the huge lateness. Keeping data for 7 days would result in=C2=A010080 data points for just one sensor! Multiplying that by 10.000 sensors would result in=C2=A0100800000 datapoints which Flink
would have to handle in its state. The number of sensors are constantly growing so will the number of data points

So my questions are

  • Is Flink the right tool for the Job ?

  • Is that lateness an issue ?

  • How can i implement the Last write wins ?

  • How to tune flink to handle that growing load of sensors and data points ?

  • Hardware requirements, storage and memory size ?


I don't want to maintain two code base for batch and streaming because the operations are all equal. The only difference is the time range! Thats the reason i wanted to do all this with Flink Streaming.

Hope you can guide me in the right direction

Thx







=

--001a114076b0d9ab9a05494a3035--