From "Ciar, David B." <dcia...@ceh.ac.uk>
Subject countWindow custom WindowFunction
Date Wed, 13 Jul 2016 11:07:12 GMT
Hello everyone,

I'm relatively new to using Apache Flink and Scala, and am just getting to grips with some
of the basic functionality both provide.  I've hit a wall trying to implement a custom WindowFunction
over a keyed countWindow however, and hoped someone may have a pointer.  The full code is
in a Gist (https://gist.github.com/dbciar/37df92d321c180f5e96e5e3f17806c91), and I am using
version Flink 1.0.3, Scala 2.11.

So my workflow is that I read string values from a Kafka queue, parse these into a DataStream
of RawObservation type using a custom map, and then create a keyed countWindow stream.

The problem is that when I try to implement a custom WindowFunction the IDE gives an error
on the ".apply" function "Cannot resolve symbol apply".  I have a feeling that this might
be caused by my WindowFunction not being implemented correctly and not matching the signature
of the apply function.  I think this as when I remove the '[String]' return type from apply
('.apply[String]') I get the following errors:


Unspecified value parameters: foldFunction: (NotInferedR, RawObservation) => NotInferedR,
windowFunction: (Tuple, GlobalWindow, Iterable[NotInferedR], Collector[NotInferedR]) =>

Unspecified value parameters: foldFunction: FoldFunction[RawObservation, NotInferedR], function:
WindowFunction[NotInferedR, NotInferedR, Tuple, GlobalWindow]

Unspecified value parameters: function: WindowFunction[RawObservation, NotInferedR, Tuple,

Unspecified value parameters: windowFunction: (Tuple, GlobalWindow, Iterable[RawObservation],
Collector[NotInferedR]) => Unit

Type mismatch, expected: (Tuple, GlobalWindow, Iterable[RawObservation], Collector[NotInferedR])
=> Unit, actual: SequentialDeltaCheck

Type mismatch, expected: WindowFunction[RawObservation, NotInferedR, Tuple, GlobalWindow],
actual: SequentialDeltaCheck


As an aside to this, when defining the WindowFunction, I wasn't sure if I was correctly setting
the key type to Tuple2, as it is a compound key.

Any help or pointers to something I may have missed in the docs would be great, I've a had
a look through but nothing jumped out at me.  I also think I could probably do this using
the fold transform, but I wanted to try using window functions first.



The workflow:

val stream: DataStream[RawObservation] = env
  .addSource(new FlinkKafkaConsumer09[String]("sensor_raw", new SimpleStringSchema(), properties))
  .map(new RawTupleToObservation())

  * Take the stream of RawObservation objects, parse out the Event Time and add watermarks,
  * key by the site and sensor values, then create a sliding countWindow for subsequent observations
val timedObservations: DataStream[RawObservation] = stream
  .assignTimestampsAndWatermarks(new ObservationTimestamp())

val windowedObservations = timedObservations

val deltaStream: DataStream[String] = windowedObservations
  .apply[String](new SequentialDeltaCheck())

The WindowFunction:

class SequentialDeltaCheck extends WindowFunction[RawObservation, String, String, TimeWindow]{

  def apply(key: String, window: TimeWindow, input: Iterable[RawObservation], out: Collector[String]):
Unit = {
    val previous: Double = input.head.observation
    val current: Double = input.last.observation

    val delta: Double = current - previous
    out.collect(s"TEST-DELTA: $window, $delta")

