Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D6460200BB9 for ; Mon, 7 Nov 2016 19:18:59 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D4DC3160AF9; Mon, 7 Nov 2016 18:18:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CFB23160AE0 for ; Mon, 7 Nov 2016 19:18:58 +0100 (CET) Received: (qmail 19703 invoked by uid 500); 7 Nov 2016 18:18:53 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 19693 invoked by uid 99); 7 Nov 2016 18:18:52 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Nov 2016 18:18:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 221B8C0C69 for ; Mon, 7 Nov 2016 18:18:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.379 X-Spam-Level: ** X-Spam-Status: No, score=2.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=googlemail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id wS8fL048vA0V for ; Mon, 7 Nov 2016 18:18:50 +0000 (UTC) Received: from mail-wm0-f47.google.com (mail-wm0-f47.google.com [74.125.82.47]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 175F85FC5F for ; Mon, 7 Nov 2016 18:18:50 +0000 (UTC) Received: by mail-wm0-f47.google.com with SMTP id a197so199520771wmd.0 for ; Mon, 07 Nov 2016 10:18:50 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=googlemail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=zHSvQczcpkDsxZhIKGhzrNLqu6oUlL3mzVWU1+vLOuw=; b=uybEnGj4iF4fvpvbUvcb7lHEeNvsI9MrSGmk1t8bgrJMcSgCjoIzcCtKAuE9wt3u7T HcOtdOZuvswJx7ie+mxgJds7nlSNiosmMadYK9DT6gjJXBtNcmyGczS+oqDl/brHzR7/ D5Xw8jo+zXnf157SuoQj+MffXWEl6LhlrJIj7VamdmJMhBjlTCPN9VmunuWUBz3gs9UD IkbNbvwvC1ZpgsPN16DxUyCsRXvC70kHU0IasQghrxFZj9JlCHeMo7BCYF7MhvpIuf01 0J/9ttryDlYc4MI2BxfYEhKSkye7ulg9K9hMtk+G/dlAmlUWz7W/Sm2wcZMpvlvLpGub uFSA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=zHSvQczcpkDsxZhIKGhzrNLqu6oUlL3mzVWU1+vLOuw=; b=hZrDFAOnU651T56sc57qZrqwerP99DtiDMJDDmmVKbC8+aopSPdqUQo7dh09XXX6DP 4tR9wsldkLVNTYUTdN3chUoYukB9F3YknqXNXwj6WTni0OtJiR9f7A81CULK3LPiOGu0 BbzfRy91ppRtouBiLKWtIzNDJlJv3sniLmPPrjUWZt+jMH8hRy6Pe5Ue5ugXwJmFWfkJ wjT5Id0V0lY5k8idRIPS1kaMcvIxhFk517HmcM6/UQH6046txd1fubNBTU3NsjPAsnJ8 q/7GH0hFSDtuw43dFpmK4hsZTjFS+EZvzpY/YNC7RV/qFaRcqK8bMqOVbTNp7zjMpwON ZbyQ== X-Gm-Message-State: ABUngvf/GWmZXZYyI0AACBTRUMtHyeslVc2zVVlkr7VpA8PHSfRT2XW4BhblmWmuKNp9+W5NO2U6DJRfTBJJow== X-Received: by 10.28.71.14 with SMTP id u14mr4207381wma.52.1478542727829; Mon, 07 Nov 2016 10:18:47 -0800 (PST) MIME-Version: 1.0 Received: by 10.28.157.16 with HTTP; Mon, 7 Nov 2016 10:18:47 -0800 (PST) In-Reply-To: References: From: Felix Neutatz Date: Mon, 7 Nov 2016 19:18:47 +0100 Message-ID: Subject: Re: Csv to windows? To: user@flink.apache.org Content-Type: multipart/alternative; boundary=94eb2c0715e213dfb40540ba0e9b archived-at: Mon, 07 Nov 2016 18:19:00 -0000 --94eb2c0715e213dfb40540ba0e9b Content-Type: text/plain; charset=UTF-8 Hi Till, the mapper solution makes sense :) Unfortunately, in my case it was not a typo in the path. I checked and saw that the records are read. You can find the whole program here: https://github.com/FelixNeutatz/CluStream/blob/master/flink-java-project/src/main/java/org/apache/flink/clustream/StreamingJobIndex.java I am happy for any ideas. Best regards, Felix 2016-11-07 16:15 GMT+01:00 Till Rohrmann : > Hi Felix, > > I'm not sure whether grouping/keyBy by processing time makes semantically > any sense. This can be anything depending on the execution order. > Therefore, there is not build in mechanism to group by processing time. But > you can always write a mapper which assigns the current processing time to > the stream record and use this field for grouping. > > Concerning your second problem, could you check the path of the file? At > the moment Flink fails silently if the path is not valid. It might be that > you have a simple typo in the path. I've opened an issue to fix this issue > [1]. > > [1] https://issues.apache.org/jira/browse/FLINK-5027 > > Cheers, > Till > > > > > > On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz > wrote: > >> Hi everybody, >> >> I finally reached streaming territory. For a student project I want to >> implement CluStream for Flink. I guess this is especially interesting to >> try queryable state :) >> >> But I have problems at the first steps. My input data is a csv file of >> records. For the start I just want to window this csv. I don't want to use AllWindows >> because it's not parallelizable. >> >> So my first question is: Can I window by processing time, like this: >> >> connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L)) >> >> I didn't find a way, so I added in the csv an index column and tried to use a countWindow: >> >> DataStreamSource source = env.readTextFile(file.getPath()); >> >> DataStream> connectionRecords = source.map(new MapToVector()).setParallelism(4); >> >> connectionRecords.keyBy(0).countWindow(10).apply ( >> new WindowFunction, Tuple1, Tuple, GlobalWindow>() { >> public void apply (Tuple tuple, >> GlobalWindow window, >> Iterable> values, >> Collector> out) throws Exception { >> int sum = 0; >> Iterator iterator = values.iterator(); >> while (iterator.hasNext () ) { >> Tuple2 t = (Tuple2)iterator.next(); >> sum += 1; >> } >> out.collect (new Tuple1(new Integer(sum))); >> } >> }).writeAsCsv("text"); >> >> To check whether everything works I just count the elements per window and write them into a csv file. >> >> Flink generates the files but all are empty. Can you tell me, what I did wrong? >> >> Best regards, >> >> Felix >> >> > --94eb2c0715e213dfb40540ba0e9b Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Till,

the mapper solution makes sens= e :)

Unfortunately, in my case it was not a typo i= n the path. I checked and saw that the records are read.=C2=A0

I am = happy for any ideas.

Best regards,
Felix=

2016-= 11-07 16:15 GMT+01:00 Till Rohrmann <trohrmann@apache.org>:
Hi Felix,

I'm not sure whether grouping/keyBy by processing time makes se= mantically any sense. This can be anything depending on the execution order= . Therefore, there is not build in mechanism to group by processing time. B= ut you can always write a mapper which assigns the current processing time = to the stream record and use this field for grouping.

<= div>Concerning your second problem, could you check the path of the file? A= t the moment Flink fails silently if the path is not valid. It might be tha= t you have a simple typo in the path. I've opened an issue to fix this = issue [1].


Cheers,
= Till





On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <neutatz@googlemail.com> wrote:
Hi everybody,

I finally reached = streaming=C2=A0territory. For a student project I want to implement=C2=A0Cl= uStream for Flink. I guess this is especially interesting to try queryable = state :)

But I have problems at the first steps. M= y input data is a csv file of records. For the start I just want to window = this csv. I don't want to use=C2=A0AllWindows because it's not=C2=A0parallelizable.

<= div>So my first question i= s: Can I window by processing time, like this:=C2=A0
connectionRecordsT.keyBy(process=
ing_time).timeWindow(Time.milliseconds(1000L))
I didn't find a way, so I =
added in the csv an index column and tried to use a countWindow:
DataStreamSource<String> source =3D env.readTextFile(file.ge=
tPath());

DataStream<Tuple2<Long,Vector>> conn= ectionRecords =3D source.map(new MapToVector()).setParallelism(4);

connectionRecords.keyBy(0).countWindow(10).apply (
new WindowFunction<Tuple2<Long,Vector>, Tuple1<I= nteger>, Tuple, GlobalWindow>() {
public void apply (Tuple tuple,
= GlobalWindow window,
Iterable<Tupl= e2<Long, Vector>> values,
Collector<Tup= le1<Integer>> out) throws Exception {
int sum =3D 0= ;
Iterator iterator =3D values.iterator();
<= span style=3D"color:rgb(0,0,128);font-weight:bold">while (iterator.h= asNext () ) {
Tuple2<Long,Vector> t =3D (Tuple2<Lon= g,Vector>)iterator.next();
sum +=3D 1;
}
out.collect (new Tuple1<Integer&g= t;(new Integer(s= um)));
}
}).writeAsCsv("text");
To check whether everything works I just count the elements per windo=
w and write them into a csv file. 
Flink generates the files but all are empty. Can you tell me, wha=
t I did wrong?
Best regar=
ds,
Felix


--94eb2c0715e213dfb40540ba0e9b--