flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ciar, David B." <dcia...@ceh.ac.uk>
Subject countWindow custom WindowFunction
Date Wed, 13 Jul 2016 11:07:12 GMT
Hello everyone,


I'm relatively new to using Apache Flink and Scala, and am just getting to grips with some
of the basic functionality both provide.  I've hit a wall trying to implement a custom WindowFunction
over a keyed countWindow however, and hoped someone may have a pointer.  The full code is
in a Gist (https://gist.github.com/dbciar/37df92d321c180f5e96e5e3f17806c91), and I am using
version Flink 1.0.3, Scala 2.11.


So my workflow is that I read string values from a Kafka queue, parse these into a DataStream
of RawObservation type using a custom map, and then create a keyed countWindow stream.


The problem is that when I try to implement a custom WindowFunction the IDE gives an error
on the ".apply" function "Cannot resolve symbol apply".  I have a feeling that this might
be caused by my WindowFunction not being implemented correctly and not matching the signature
of the apply function.  I think this as when I remove the '[String]' return type from apply
('.apply[String]') I get the following errors:


--------------------------------------------------------------------------------------------------------

Unspecified value parameters: foldFunction: (NotInferedR, RawObservation) => NotInferedR,
windowFunction: (Tuple, GlobalWindow, Iterable[NotInferedR], Collector[NotInferedR]) =>
Unit

Unspecified value parameters: foldFunction: FoldFunction[RawObservation, NotInferedR], function:
WindowFunction[NotInferedR, NotInferedR, Tuple, GlobalWindow]


Unspecified value parameters: function: WindowFunction[RawObservation, NotInferedR, Tuple,
GlobalWindow]

Unspecified value parameters: windowFunction: (Tuple, GlobalWindow, Iterable[RawObservation],
Collector[NotInferedR]) => Unit

Type mismatch, expected: (Tuple, GlobalWindow, Iterable[RawObservation], Collector[NotInferedR])
=> Unit, actual: SequentialDeltaCheck

Type mismatch, expected: WindowFunction[RawObservation, NotInferedR, Tuple, GlobalWindow],
actual: SequentialDeltaCheck

--------------------------------------------------------------------------------------------------------



As an aside to this, when defining the WindowFunction, I wasn't sure if I was correctly setting
the key type to Tuple2, as it is a compound key.


Any help or pointers to something I may have missed in the docs would be great, I've a had
a look through but nothing jumped out at me.  I also think I could probably do this using
the fold transform, but I wanted to try using window functions first.


Thanks,

David


The workflow:


val stream: DataStream[RawObservation] = env
  .addSource(new FlinkKafkaConsumer09[String]("sensor_raw", new SimpleStringSchema(), properties))
  .map(new RawTupleToObservation())

/**
  * Take the stream of RawObservation objects, parse out the Event Time and add watermarks,
  * key by the site and sensor values, then create a sliding countWindow for subsequent observations
  */
val timedObservations: DataStream[RawObservation] = stream
  .assignTimestampsAndWatermarks(new ObservationTimestamp())

val windowedObservations = timedObservations
  .keyBy("site")
  .countWindow(2,1)


val deltaStream: DataStream[String] = windowedObservations
  .apply[String](new SequentialDeltaCheck())


The WindowFunction:


class SequentialDeltaCheck extends WindowFunction[RawObservation, String, String, TimeWindow]{

  def apply(key: String, window: TimeWindow, input: Iterable[RawObservation], out: Collector[String]):
Unit = {
    val previous: Double = input.head.observation
    val current: Double = input.last.observation

    val delta: Double = current - previous
    out.collect(s"TEST-DELTA: $window, $delta")
  }
}



________________________________
This message (and any attachments) is for the recipient only. NERC is subject to the Freedom
of Information Act 2000 and the contents of this email and any reply you make may be disclosed
by NERC unless it is exempt from release under the Act. Any material supplied to NERC may
be stored in an electronic records management system.
________________________________

Mime
View raw message