flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felix Neutatz <neut...@googlemail.com>
Subject Re: Csv to windows?
Date Tue, 08 Nov 2016 09:17:51 GMT
Hi Yassine,

thanks that explains it :)

Best regards,
Felix

On Nov 7, 2016 21:28, "Yassine MARZOUGUI" <y.marzougui@mindlytix.com> wrote:

> Hi Flelix,
>
> As I see in kddcup.newtestdata_small_unlabeled_index
> <https://github.com/FelixNeutatz/CluStream/blob/master/flink-java-project/src/main/resources/data/kddcup.newtestdata_small_unlabeled_index>,
> the first field of connectionRecords (splits[0]), is unique for each
> record, therefore when apply keyBy(0), it will logically partition your
> stream by that field and each partition will contain only one element. So
> the countWindow(2) actually never fires because it never reaches 2
> elements. That's why your files stay empty.
>
> Could you please go into more detail about what the expected output is? Then
> we might be able to figure out the proper way to achieve it.
>
> Best,
> Yassine
>
> 2016-11-07 19:18 GMT+01:00 Felix Neutatz <neutatz@googlemail.com>:
>
>> Hi Till,
>>
>> the mapper solution makes sense :)
>>
>> Unfortunately, in my case it was not a typo in the path. I checked and
>> saw that the records are read.
>>
>> You can find the whole program here: https://github.com/Felix
>> Neutatz/CluStream/blob/master/flink-java-project/src/main/
>> java/org/apache/flink/clustream/StreamingJobIndex.java
>>
>> I am happy for any ideas.
>>
>> Best regards,
>> Felix
>>
>> 2016-11-07 16:15 GMT+01:00 Till Rohrmann <trohrmann@apache.org>:
>>
>>> Hi Felix,
>>>
>>> I'm not sure whether grouping/keyBy by processing time makes
>>> semantically any sense. This can be anything depending on the execution
>>> order. Therefore, there is not build in mechanism to group by processing
>>> time. But you can always write a mapper which assigns the current
>>> processing time to the stream record and use this field for grouping.
>>>
>>> Concerning your second problem, could you check the path of the file? At
>>> the moment Flink fails silently if the path is not valid. It might be that
>>> you have a simple typo in the path. I've opened an issue to fix this issue
>>> [1].
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-5027
>>>
>>> Cheers,
>>> Till
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <neutatz@googlemail.com>
>>> wrote:
>>>
>>>> Hi everybody,
>>>>
>>>> I finally reached streaming territory. For a student project I want to
>>>> implement CluStream for Flink. I guess this is especially interesting to
>>>> try queryable state :)
>>>>
>>>> But I have problems at the first steps. My input data is a csv file of
>>>> records. For the start I just want to window this csv. I don't want to use
AllWindows
>>>> because it's not parallelizable.
>>>>
>>>> So my first question is: Can I window by processing time, like this:
>>>>
>>>> connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))
>>>>
>>>> I didn't find a way, so I added in the csv an index column and tried to use
a countWindow:
>>>>
>>>> DataStreamSource<String> source = env.readTextFile(file.getPath());
>>>>
>>>> DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new
MapToVector()).setParallelism(4);
>>>>
>>>> connectionRecords.keyBy(0).countWindow(10).apply (
>>>>    new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>,
Tuple, GlobalWindow>() {
>>>>       public void apply (Tuple tuple,
>>>>                      GlobalWindow window,
>>>>                      Iterable<Tuple2<Long, Vector>> values,
>>>>                      Collector<Tuple1<Integer>> out) throws Exception
{
>>>>          int sum = 0;
>>>>          Iterator iterator = values.iterator();
>>>>          while (iterator.hasNext () ) {
>>>>             Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next();
>>>>             sum += 1;
>>>>          }
>>>>          out.collect (new Tuple1<Integer>(new Integer(sum)));
>>>>       }
>>>> }).writeAsCsv("text");
>>>>
>>>> To check whether everything works I just count the elements per window and
write them into a csv file.
>>>>
>>>> Flink generates the files but all are empty. Can you tell me, what I did
wrong?
>>>>
>>>> Best regards,
>>>>
>>>> Felix
>>>>
>>>>
>>>
>>
>

Mime
View raw message