Hi David,

countWindow(size,slide) creates a GlobalWindow, not a TimeWindow. Also you have to use Tuple instead of Tuple2.

class SequentialDeltaCheck extends WindowFunction[RawObservation, String, Tuple, GlobalWindow]{

  def apply(key: Tuple, window: GlobalWindow, input: Iterable[RawObservation], out: Collector[String]): Unit = {
    val previous: Double = input.head.observation
    val current: Double = input.last.observation

    val delta: Double = current - previous
    out.collect(s"TEST-DELTA: $window, $delta")
  }
}

Thanks and Regards,
Vishnu Viswanath,
www.vishnuviswanath.com

On Wed, Jul 13, 2016 at 7:07 AM, Ciar, David B. <dciar86@ceh.ac.uk> wrote:

Hello everyone,


I'm relatively new to using Apache Flink and Scala, and am just getting to grips with some of the basic functionality both provide.  I've hit a wall trying to implement a custom WindowFunction over a keyed countWindow however, and hoped someone may have a pointer.  The full code is in a Gist (https://gist.github.com/dbciar/37df92d321c180f5e96e5e3f17806c91), and I am using version Flink 1.0.3, Scala 2.11. 


So my workflow is that I read string values from a Kafka queue, parse these into a DataStream of RawObservation type using a custom map, and then create a keyed countWindow stream.


The problem is that when I try to implement a custom WindowFunction the IDE gives an error on the ".apply" function "Cannot resolve symbol apply".  I have a feeling that this might be caused by my WindowFunction not being implemented correctly and not matching the signature of the apply function.  I think this as when I remove the '[String]' return type from apply ('.apply[String]') I get the following errors:


--------------------------------------------------------------------------------------------------------

Unspecified value parameters: foldFunction: (NotInferedR, RawObservation) => NotInferedR, windowFunction: (Tuple, GlobalWindow, Iterable[NotInferedR], Collector[NotInferedR]) => Unit

Unspecified value parameters: foldFunction: FoldFunction[RawObservation, NotInferedR], function: WindowFunction[NotInferedR, NotInferedR, Tuple, GlobalWindow]


Unspecified value parameters: function: WindowFunction[RawObservation, NotInferedR, Tuple, GlobalWindow]

Unspecified value parameters: windowFunction: (Tuple, GlobalWindow, Iterable[RawObservation], Collector[NotInferedR]) => Unit

Type mismatch, expected: (Tuple, GlobalWindow, Iterable[RawObservation], Collector[NotInferedR]) => Unit, actual: SequentialDeltaCheck

Type mismatch, expected: WindowFunction[RawObservation, NotInferedR, Tuple, GlobalWindow], actual: SequentialDeltaCheck

--------------------------------------------------------------------------------------------------------



As an aside to this, when defining the WindowFunction, I wasn't sure if I was correctly setting the key type to Tuple2, as it is a compound key.


Any help or pointers to something I may have missed in the docs would be great, I've a had a look through but nothing jumped out at me.  I also think I could probably do this using the fold transform, but I wanted to try using window functions first.


Thanks,

David


The workflow:


val stream: DataStream[RawObservation] = env
.addSource(new FlinkKafkaConsumer09[String]("sensor_raw", new SimpleStringSchema(), properties))
.map(new RawTupleToObservation())

/**
* Take the stream of RawObservation objects, parse out the Event Time and add watermarks,
* key by the site and sensor values, then create a sliding countWindow for subsequent observations
*/
val timedObservations: DataStream[RawObservation] = stream
.assignTimestampsAndWatermarks(new ObservationTimestamp())

val windowedObservations = timedObservations
.keyBy("site")
.countWindow(2,1)


val deltaStream: DataStream[String] = windowedObservations
.apply[String](new SequentialDeltaCheck())


The WindowFunction:


class SequentialDeltaCheck extends WindowFunction[RawObservation, String, String, TimeWindow]{

def apply(key: String, window: TimeWindow, input: Iterable[RawObservation], out: Collector[String]): Unit = {
val previous: Double = input.head.observation
val current: Double = input.last.observation

val delta: Double = current - previous
out.collect(s"TEST-DELTA: $window, $delta")
}
}



This message (and any attachments) is for the recipient only. NERC is subject to the Freedom of Information Act 2000 and the contents of this email and any reply you make may be disclosed by NERC unless it is exempt from release under the Act. Any material supplied to NERC may be stored in an electronic records management system.

--
Thanks and Regards,
Vishnu Viswanath,