From user-return-18462-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Mar 1 10:24:08 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 7B1BF18064D for ; Thu, 1 Mar 2018 10:24:07 +0100 (CET) Received: (qmail 27722 invoked by uid 500); 1 Mar 2018 09:24:06 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 27711 invoked by uid 99); 1 Mar 2018 09:24:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Mar 2018 09:24:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id E89501A0920 for ; Thu, 1 Mar 2018 09:24:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id oEe-ynHLCprq for ; Thu, 1 Mar 2018 09:24:00 +0000 (UTC) Received: from mail-lf0-f46.google.com (mail-lf0-f46.google.com [209.85.215.46]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id D25D25F180 for ; Thu, 1 Mar 2018 09:23:59 +0000 (UTC) Received: by mail-lf0-f46.google.com with SMTP id g72so7780789lfg.3 for ; Thu, 01 Mar 2018 01:23:59 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=2WNyi/ZcPtZGiARapmLZ9j3pX5ZUNF7GcQGTOqiaCDM=; b=K2bu0Zozd598BrNTTuO+4aCQTAiwTcnUVy/mr3fpQGqgDj8EkLwkDA+clzGHl9cT9m Zmxfx/k3EOOvO3/vS013R1MabePfl7eF7wIiTvbjt478wKxv8yvN3WXbS6ToY0AqzaDh t+FKeE4zEute76FaL7wLTDjqlBFM/yTCoTHRKM+lMdye5YjeffOJ7VMi1WHk35cE2urd D09i8uOBipGWJPJsig0uBpByt6i1PnhYhdCzHsAkx9ZKwCdVkG17uoUVhzNO71zzm3du w4eh9Cyo+KXCaH8kSSsV9KiG7PppLKVhfW7IBaM62zpg16FJTMEsQNVpwrWNs0AdWOY+ 3v1g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=2WNyi/ZcPtZGiARapmLZ9j3pX5ZUNF7GcQGTOqiaCDM=; b=SXGrzfoPzqsKWweZI9hRwlyTs4yVVBzQiV8zXvgfHgMv4j4jYbqCys89nngX293/0q TNCHh2XW41A9mHUQaP9BHWv5TNI4ZRhnUyLkTI+XF0z1lIwraGqGp2PDepWSz+4ndCzN QcCTnsyOO5HkZdYugJcLZJ8MX0bewGPQ4dPQjbvK1H/yZA4bfpNBC/774r7TFoADVQ4n 7N5DLlFizpwAaFPMZERMBxcEblSAHWf+fQiF2st6uNMt6je/qUFNVjmodWs2sSMRgE32 cAa9163oO1hOa2fNHcvvfq51+PjV2ivltiHmBWZqCosmhtFzRZUIhaGn+xxuMgzSQGym +dzg== X-Gm-Message-State: AElRT7FyjWAvHNSJxTVPTzRc854oe3/3MTUGag9RWEsMCBCXh1cUlC/H 9kKbNZjbOlNDTnZ9vrTPOkGegRHX/bBzTkLoNwY= X-Google-Smtp-Source: AG47ELv8JEW6/vuJ/TEYuFZEFPR2tkUdg13JcHL09DFXNnOwAjwHlkan3+Rvm8qtCyHvg6BFkFHP2W/2mXZx2A7eHEc= X-Received: by 10.25.169.67 with SMTP id s64mr918243lfe.80.1519896239247; Thu, 01 Mar 2018 01:23:59 -0800 (PST) MIME-Version: 1.0 Received: by 10.46.33.79 with HTTP; Thu, 1 Mar 2018 01:23:18 -0800 (PST) In-Reply-To: References: <7063d36b-2490-7bc7-1f42-140c2ddbf3d5@student.tut.fi> <22e896b3-42b4-5261-1ec5-51245a6b35e6@student.tut.fi> From: Fabian Hueske Date: Thu, 1 Mar 2018 10:23:18 +0100 Message-ID: Subject: Re: Reading csv-files To: Esa Heikkinen Cc: "user@flink.apache.org" Content-Type: multipart/alternative; boundary="001a114021fa6f97e80566566bb7" --001a114021fa6f97e80566566bb7 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Esa, IMO, the easiest approach would be to implement a custom source function that reads the CSV files line-wise (in the correct timestamp order) and extracts timestamps. At the end of each file, you can emit a watermark. The order of files can either be hardcoded or determined from the file name= . This approach is similar to the source function in the RideCleansing exercise [1] (without the alignment of timestamps with the actual time). Once you have a DataStream with correctly assigned timestamps and watermarks, you should be able to use the CEP library. Best, Fabian [1] https://github.com/dataArtisans/flink-training-exercises/blob/master/src/ma= in/java/com/dataartisans/flinktraining/exercises/datastream_java/sources/Ch= eckpointedTaxiRideSource.java 2018-02-28 10:47 GMT+01:00 Esa Heikkinen : > Because I have no time to learn all features of Flink and because there > can be some issues in this my case, I am very interested about implementi= ng > external =E2=80=9Clogs replayer=E2=80=9D or some batch to stream data con= verter. > > > > Do you have any ideas or suggestions how to build this kind of logs > replayer ? Or could it be even found at the ready ? > > Could Kafka do something like this ? > > > > I think I also can write this logs replayer by Python. > > > > What kind of parallel streams would be best and easiest for Flink ? > > > > By the way, I am writing conference paper about comparing Flink and my > LOGDIG log file analyzer, which is described in my old paper (LOGDIG Log > File Analyzer for Mining Expected Behavior from Log Files): > > https://www.researchgate.net/profile/Timo_Haemaelaeinen/ > publication/283264599_LOGDIG_Log_File_Analyzer_for_Mining_ > Expected_Behavior_from_Log_Files/links/562f7ea208ae4742240ae977.pdf > > > > LOGDIG is very simple and slow analyzer and it runs only in local compute= r > (at this moment), but it is capable to analyze very complex cases from ma= ny > parallel log files. The analysis of LOGDIG is close to CEP. I have writte= n > it by Python. > > > > I don=E2=80=99t know whether Flink is the best benchmarking target, but I= do not > know better. I also tried Spark, but it also had its own problems. For > example CEP is not good in Spark than in Flink. > > > > Best, Esa > > > > *From:* Fabian Hueske [mailto:fhueske@gmail.com] > *Sent:* Tuesday, February 27, 2018 11:27 PM > *To:* Esa Heikkinen > *Cc:* user@flink.apache.org > *Subject:* Re: Reading csv-files > > > > Yes, that is mostly correct. > You can of course read files in parallel, assign watermarks, and obtain a > DataStream with correct timestamps and watermarks. > If you do that, you should ensure that each parallel source tasks reads > the files in the order of increasing timestamps. > > As I said before, you can do that by providing a custom InputSplitAssigne= r > that hands out the splits in order of their timestamps. > The timestamp order would need to be encoded in the file name because the > assigner cannot look into the file. > > Reading unsplitted files in a single task makes the problem a bit easier > to handle, but parallel reads are also possible. > > > > The RideCleansing example that you are referring to, does not have these > problems because the source reads the data in a single thread from a sing= le > file. > This is done in order to avoid all the issues that I described before. > > Best, Fabian > > > > > > 2018-02-27 22:14 GMT+01:00 Esa Heikkinen : > > > > Hi > > Thanks for the answer. All csv-files are already present and they will no= t > change during the processing. > > Because Flink can read many streams in parallel, i think it is also > possbile to read many csv-files in parallel. > > From what i have understand, it is possible to convert csv-files to > streams internally in Flink ? But the problem may be how to synchronize > parallel reading of csv-files based on timestamps ? > > Maybe i should develop an external "replayer" of csv-files, which > generates parallel streams of events (based on timestamps) for Flink ? > > But i think the "replayer" is also possible to do by Flink and it also ca= n > be run at an accelerated speed ? > > The RideCleansing-example does something like that, but i don't know if i= t > otherwise appropriate to my purpose. > > Best, Esa > > > > Fabian Hueske kirjoitti 27.2.2018 klo 22:32: > > Hi Esa, > > Reading records from files with timestamps that need watermarks can be > tricky. > > If you are aware of Flink's watermark mechanism, you know that records > should be ingested in (roughly) increasing timestamp order. > > This means that files usually cannot be split (i.e, need to be read by a > single task from start to end) and also need to be read in the right orde= r > (files with smaller timestamps first). > > Also each file should contain records of a certain time interval that > should not overlap (too much) with the time interval of other files. > > > > Unfortunately, Flink does not provide good built-in support to read files > in a specific order. > > If all files that you want to process are already present, you can > implement a custom InputFormat by extending a CsvInputFormat, set > unsplittable to true and override the getInputSplitAssigner() to return a= n > assigner that returns the splits in the correct order. > > > If you want to process files as they appear, things might be a bit easier > given that the timestamps in each new file are larger than the timestamps > of the previous files. In this case, you can use StreamExecutionEnvironme= nt.readFile() > with the interval and FileProcessingMode parameter. With a correctly > configured watermark assigner, it should be possible to get valid > watermarks. > > In any case, reading timestamped data from files is much more tricky than > ingesting data from an event log which provides the events in the same > order in which they were written. > > Best, Fabian > > > > 2018-02-27 20:13 GMT+01:00 Esa Heikkinen : > > > I'd want to read csv-files, which includes time series data and one colum= n > is timestamp. > > Is it better to use addSource() (like in Data-artisans > RideCleansing-exercise) or CsvSourceTable() ? > > I am not sure CsvTableSource() can undertand timestamps ? I have not foun= d > good examples about that. > > It is maybe little more job to write csv-parser in addSource()-case ? > > Best, Esa > > > > > > > --001a114021fa6f97e80566566bb7 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Esa,

IMO, the eas= iest approach would be to implement a custom source function that reads the= CSV files line-wise (in the correct timestamp order) and extracts timestam= ps.
At the end of each file, you can emit a watermark.
The order of files can either be hardcoded or determined from the file n= ame.

This approach is similar to the source functio= n in the RideCleansing exercise [1] (without the alignment of timestamps wi= th the actual time).

Once you have a DataStream with correctly= assigned timestamps and watermarks, you should be able to use the CEP libr= ary.

Best, Fabian
<= div class=3D"gmail_extra">
2018-02-28 10:47 G= MT+01:00 Esa Heikkinen <esa.heikkinen@student.tut.fi>:

Because I have no time= to learn all features of Flink and because there can be some issues in thi= s my case, I am very interested about implementing external =E2=80=9Clogs replayer=E2=80=9D or = some batch to stream data converter.

=C2=A0

Do you have any ideas = or suggestions how to build this kind of logs replayer ? Or could it be eve= n found at the ready ?

Could Kafka do somethi= ng like this ?

=C2=A0

I think I also can wri= te this logs replayer by Python.

=C2=A0

What kind of parallel = streams would be best and easiest for Flink ?

=C2=A0

By the way, I am writi= ng conference paper about comparing Flink and my LOGDIG log file analyzer, = which is described in my old paper (LOGDIG Log File Analyzer for Mining Expected Behavior from L= og Files):

https://www.researchgate.net/p= rofile/Timo_Haemaelaeinen/publication/283264599_LOGDIG_Log_File_A= nalyzer_for_Mining_Expected_Behavior_from_Log_Files/links/56= 2f7ea208ae4742240ae977.pdf

=C2=A0

LOGDIG is very simple = and slow analyzer and it runs only in local computer (at this moment), but = it is capable to analyze very complex cases from many parallel log files. The analysis of LOGDIG is= close to CEP. I have written it by Python.

=C2=A0

I don=E2=80=99t know w= hether Flink is the best benchmarking target, but I do not know better. I a= lso tried Spark, but it also had its own problems. For example CEP is not good in Spark than in Flink.<= u>

=C2=A0

Best, Esa

=C2=A0

From: = Fabian Hueske [mailto:fhueske@gmail.com]
Sent: Tuesday, February 27, 2018 11:27 PM
To: Esa Heikkinen <esa.heikkinen@student.tut.fi>
Cc: user@= flink.apache.org
Subject: Re: Reading csv-files

=C2=A0

Yes, that is mostly correct.
You can of course read files in parallel, assign watermarks, and obtain a D= ataStream with correct timestamps and watermarks.
If you do that, you should ensure that each parallel source tasks reads the= files in the order of increasing timestamps.

As I said before, you can do that by providing a cus= tom InputSplitAssigner that hands out the splits in order of their timestam= ps.
The timestamp order would need to be encoded in the file name because the a= ssigner cannot look into the file.

Reading unsplitted files in a single task makes the = problem a bit easier to handle, but parallel reads are also possible.

=C2=A0

The RideCleansing exa= mple that you are referring to, does not have these problems because the so= urce reads the data in a single thread from a single file.
This is done in order to avoid all the issues that I described before.

Best, Fabian

=C2=A0

=C2=A0

2018-02-27 22:14 GMT+01:00 Esa Heikkinen <heikkin2@student.tut.= fi>:

=C2=A0

Hi

Thanks for the answer. All csv-files are already present and they will n= ot change during the processing.

Because Flink can read many streams in parallel, i think it is also poss= bile to read many csv-files in parallel.

From what i have understand, it is possible to convert csv-files to stre= ams internally in Flink ? But the problem may be how to synchronize paralle= l reading of csv-files based on timestamps ?

Maybe i should develop an external "replayer" of csv-files, wh= ich generates parallel streams of events (based on timestamps) for Flink ?

But i think the "replayer" is also possible to do by Flink and= it also can be run at an accelerated speed ?

The RideCleansing-example does something like that, but i don't know= if it otherwise appropriate to my purpose.

Best, Esa

=C2=A0

Fabian Hueske kirjoitti 27.2.2018 klo 22:32:<= u>

Hi Esa,=

Reading records from files with timestamps that need= watermarks can be tricky.

If you are aware of Flink's watermark mechanism,= you know that records should be ingested in (roughly) increasing timestamp= order.

This means that files usually cannot be split (i.e, = need to be read by a single task from start to end) and also need to be rea= d in the right order (files with smaller timestamps first).

Also each file should contain records of a certain t= ime interval that should not overlap (too much) with the time interval of o= ther files.

=C2=A0

Unfortunately, Flink does not provide good built-in = support to read files in a specific order.

If all files that you want to process are already pr= esent, you can implement a custom InputFormat by extending a CsvInputFormat= , set unsplittable to true and override the getInputSplitAssigner() to retu= rn an assigner that returns the splits in the correct order.


If you want to process files as they appear, things might be a bit easier g= iven that the timestamps in each new file are larger than the timestamps of= the previous files. In this case, you can use StreamExecutionEnvironment.<= wbr>readFile() with the interval and FileProcessingMode parameter. With a correctly configured watermark assign= er, it should be possible to get valid watermarks.

In any case, reading = timestamped data from files is much more tricky than ingesting data from an= event log which provides the events in the same order in which they were w= ritten.

Best, Fabian

=C2=A0

2018-02-27 20:13 GMT+01:00 Esa Heikkinen <heikkin2@student.tut.= fi>:


I'd want to read csv-files, which includes time series data and one col= umn is timestamp.

Is it better to use addSource() (like in Data-artisans RideCleansing-exerci= se) or CsvSourceTable() ?

I am not sure CsvTableSource() can undertand timestamps ? I have not found = good examples about that.

It is maybe little more job to write csv-parser in addSource()-case ?

Best, Esa

=C2=A0

=C2=A0

=C2=A0


--001a114021fa6f97e80566566bb7--