flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Windowing isn't applied per key
Date Mon, 02 Oct 2017 09:41:21 GMT
Hi Marcus,

from a first glance your pipeline looks correct. It should not be 
executed with a parallelism of one, if not specified explicitly. Which 
time semantics are you using? If it is event-time, I would check your 
timestamps and watermarks assignment. Maybe you can also check in the 
web frontend which operator is executed with which parallelism. Btw. 
according to the JavaDocs of reduce(): "Sliding time windows will 
aggregate on the granularity of the slide interval" so it is called 
multiple times.

Regards,
Timo


Am 9/29/17 um 8:56 PM schrieb Marcus Clendenin:
>
>
> I have a job that is performing an aggregation over a time window. 
> This windowing is supposed to be happening by key, but the output I am 
> seeing is creating an overall window on everything coming in. Is this 
> happening because I am doing a map of the data before I am running the 
> keyBy command? This is a representation of what I am running
>
> *val *stream = env
>   .addSource(kafkaConsumer)
>
> //filter out bad json
>
> *val *jsonDeserializer = *new *JSONDeserializationSchema()
> *val *filteredStream = stream.filter(text => {
> *try *{
>       jsonDeserializer.deserialize(text.getBytes)
> *true
> *}
> *catch *{
> *case *e: Exception => *false
> *}
>   })
>   val kafkaStream = filteredStream.map(text => 
> jsonDeserializer.deserialize(text.getBytes))
>
> //method used to filter json not meeting the expected requests
> val filteredJsonStream = filterIncorrectJson(kafkaStream)
>
> //method used to map Json to input object
>
> val mappedStream = mapJsonToObject(filteredJsonStream)
>
> // pull key out of object
>
> val keyedStream = mappedStream.keyBy(_.key)
>
> // add window
>
> val windowedStream = keyedStream.timeWindow(windowSize, windowSlide)
>
> // reduce to aggregates
>
> val reducedStream = windowedStream.reduce(aggregateData())
>
> <https://maps.google.com/?q=5411+Page+Rd%0D+Durham,+NC+27709%0D+Office:+%28919&entry=gmail&source=g>

>
>
> I am pulling in data from Kafka as a String, mapping it to my data 
> model and then pulling out the key, applying the time window with a 30 
> minute window, 5 minute slide and doing an aggregation. I am 
> expecting<https://maps.google.com/?q=5411+Page+Rd%0D+Durham,+NC+27709%0D+Office:+%28919&entry=gmail&source=g>

> that the aggregation is happening on a time window that is separate 
> for each iteration of the key but it is happening every 5 minutes for 
> all keys.
>


Mime
View raw message