flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marcus Clendenin <marcusc...@gmail.com>
Subject Windowing isn't applied per key
Date Fri, 29 Sep 2017 18:56:10 GMT
I have a job that is performing an aggregation over a time window. This
windowing is supposed to be happening by key, but the output I am seeing is
creating an overall window on everything coming in. Is this happening
because I am doing a map of the data before I am running the keyBy command?
This is a representation of what I am running

*val *stream = env

//filter out bad json

*val *jsonDeserializer = *new *JSONDeserializationSchema()
  *val *filteredStream = stream.filter(text => {
    *try *{

*true     *}
    *catch *{
      *case *e: Exception =>
*false     *}
  val kafkaStream = filteredStream.map(text =>

//method used to filter json not meeting the expected requests
val filteredJsonStream = filterIncorrectJson(kafkaStream)

//method used to map Json to input object

val mappedStream = mapJsonToObject(filteredJsonStream)

// pull key out of object

val keyedStream = mappedStream.keyBy(_.key)

// add window

val windowedStream = keyedStream.timeWindow(windowSize, windowSlide)

// reduce to aggregates

val reducedStream = windowedStream.reduce(aggregateData())


I am pulling in data from Kafka as a String, mapping it to my data model
and then pulling out the key, applying the time window with a 30 minute
window, 5 minute slide and doing an aggregation. I am expecting
that the aggregation is happening on a time window that is separate for
each iteration of the key but it is happening every 5 minutes for all keys.

View raw message