flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marcus Clendenin <marcusc...@gmail.com>
Subject Windowing isn't applied per key
Date Fri, 29 Sep 2017 18:56:10 GMT
I have a job that is performing an aggregation over a time window. This
windowing is supposed to be happening by key, but the output I am seeing is
creating an overall window on everything coming in. Is this happening
because I am doing a map of the data before I am running the keyBy command?
This is a representation of what I am running



*val *stream = env
  .addSource(kafkaConsumer)

//filter out bad json

*val *jsonDeserializer = *new *JSONDeserializationSchema()
  *val *filteredStream = stream.filter(text => {
    *try *{
      jsonDeserializer.deserialize(text.getBytes)

*true     *}
    *catch *{
      *case *e: Exception =>
*false     *}
  })
  val kafkaStream = filteredStream.map(text =>
jsonDeserializer.deserialize(text.getBytes))

//method used to filter json not meeting the expected requests
val filteredJsonStream = filterIncorrectJson(kafkaStream)

//method used to map Json to input object

val mappedStream = mapJsonToObject(filteredJsonStream)

// pull key out of object

val keyedStream = mappedStream.keyBy(_.key)

// add window

val windowedStream = keyedStream.timeWindow(windowSize, windowSlide)

// reduce to aggregates

val reducedStream = windowedStream.reduce(aggregateData())




<https://maps.google.com/?q=5411+Page+Rd%0D+Durham,+NC+27709%0D+Office:+(919&entry=gmail&source=g>

I am pulling in data from Kafka as a String, mapping it to my data model
and then pulling out the key, applying the time window with a 30 minute
window, 5 minute slide and doing an aggregation. I am expecting
<https://maps.google.com/?q=5411+Page+Rd%0D+Durham,+NC+27709%0D+Office:+(919&entry=gmail&source=g>
that the aggregation is happening on a time window that is separate for
each iteration of the key but it is happening every 5 minutes for all keys.

Mime
View raw message