flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yassine MARZOUGUI <y.marzou...@mindlytix.com>
Subject Re: Csv to windows?
Date Mon, 07 Nov 2016 20:28:24 GMT
Hi Flelix,

As I see in kddcup.newtestdata_small_unlabeled_index
<https://github.com/FelixNeutatz/CluStream/blob/master/flink-java-project/src/main/resources/data/kddcup.newtestdata_small_unlabeled_index>,
the first field of connectionRecords (splits[0]), is unique for each
record, therefore when apply keyBy(0), it will logically partition your
stream by that field and each partition will contain only one element. So
the countWindow(2) actually never fires because it never reaches 2
elements. That's why your files stay empty.

Could you please go into more detail about what the expected output is? Then
we might be able to figure out the proper way to achieve it.

Best,
Yassine

2016-11-07 19:18 GMT+01:00 Felix Neutatz <neutatz@googlemail.com>:

> Hi Till,
>
> the mapper solution makes sense :)
>
> Unfortunately, in my case it was not a typo in the path. I checked and saw
> that the records are read.
>
> You can find the whole program here: https://github.com/
> FelixNeutatz/CluStream/blob/master/flink-java-project/src/
> main/java/org/apache/flink/clustream/StreamingJobIndex.java
>
> I am happy for any ideas.
>
> Best regards,
> Felix
>
> 2016-11-07 16:15 GMT+01:00 Till Rohrmann <trohrmann@apache.org>:
>
>> Hi Felix,
>>
>> I'm not sure whether grouping/keyBy by processing time makes semantically
>> any sense. This can be anything depending on the execution order.
>> Therefore, there is not build in mechanism to group by processing time. But
>> you can always write a mapper which assigns the current processing time to
>> the stream record and use this field for grouping.
>>
>> Concerning your second problem, could you check the path of the file? At
>> the moment Flink fails silently if the path is not valid. It might be that
>> you have a simple typo in the path. I've opened an issue to fix this issue
>> [1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-5027
>>
>> Cheers,
>> Till
>>
>>
>>
>>
>>
>> On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz <neutatz@googlemail.com>
>> wrote:
>>
>>> Hi everybody,
>>>
>>> I finally reached streaming territory. For a student project I want to
>>> implement CluStream for Flink. I guess this is especially interesting to
>>> try queryable state :)
>>>
>>> But I have problems at the first steps. My input data is a csv file of
>>> records. For the start I just want to window this csv. I don't want to use AllWindows
>>> because it's not parallelizable.
>>>
>>> So my first question is: Can I window by processing time, like this:
>>>
>>> connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))
>>>
>>> I didn't find a way, so I added in the csv an index column and tried to use a
countWindow:
>>>
>>> DataStreamSource<String> source = env.readTextFile(file.getPath());
>>>
>>> DataStream<Tuple2<Long,Vector>> connectionRecords = source.map(new
MapToVector()).setParallelism(4);
>>>
>>> connectionRecords.keyBy(0).countWindow(10).apply (
>>>    new WindowFunction<Tuple2<Long,Vector>, Tuple1<Integer>, Tuple,
GlobalWindow>() {
>>>       public void apply (Tuple tuple,
>>>                      GlobalWindow window,
>>>                      Iterable<Tuple2<Long, Vector>> values,
>>>                      Collector<Tuple1<Integer>> out) throws Exception
{
>>>          int sum = 0;
>>>          Iterator iterator = values.iterator();
>>>          while (iterator.hasNext () ) {
>>>             Tuple2<Long,Vector> t = (Tuple2<Long,Vector>)iterator.next();
>>>             sum += 1;
>>>          }
>>>          out.collect (new Tuple1<Integer>(new Integer(sum)));
>>>       }
>>> }).writeAsCsv("text");
>>>
>>> To check whether everything works I just count the elements per window and write
them into a csv file.
>>>
>>> Flink generates the files but all are empty. Can you tell me, what I did wrong?
>>>
>>> Best regards,
>>>
>>> Felix
>>>
>>>
>>
>

Mime
View raw message