flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vishnu Viswanath <vishnu.viswanat...@gmail.com>
Subject Re: countWindow custom WindowFunction
Date Wed, 13 Jul 2016 11:35:44 GMT
Hi David,

countWindow(size,slide) creates a GlobalWindow, not a TimeWindow. Also you
have to use Tuple instead of Tuple2.

class SequentialDeltaCheck extends WindowFunction[RawObservation,
String, Tuple, GlobalWindow]{

  def apply(key: Tuple, window: GlobalWindow, input:
Iterable[RawObservation], out: Collector[String]): Unit = {
    val previous: Double = input.head.observation
    val current: Double = input.last.observation

    val delta: Double = current - previous
    out.collect(s"TEST-DELTA: $window, $delta")
  }
}

Thanks and Regards,
Vishnu Viswanath,
www.vishnuviswanath.com

On Wed, Jul 13, 2016 at 7:07 AM, Ciar, David B. <dciar86@ceh.ac.uk> wrote:

Hello everyone,
>
>
> I'm relatively new to using Apache Flink and Scala, and am just getting to
> grips with some of the basic functionality both provide.  I've hit a wall
> trying to implement a custom WindowFunction over a keyed countWindow
> however, and hoped someone may have a pointer.  The full code is in a Gist (
> https://gist.github.com/dbciar/37df92d321c180f5e96e5e3f17806c91), and I
> am using version Flink 1.0.3, Scala 2.11.
>
>
> So my workflow is that I read string values from a Kafka queue, parse
> these into a DataStream of RawObservation type using a custom map, and then
> create a keyed countWindow stream.
>
>
> The problem is that when I try to implement a custom WindowFunction the
> IDE gives an error on the ".apply" function "Cannot resolve symbol apply".
> I have a feeling that this might be caused by my WindowFunction not being
> implemented correctly and not matching the signature of the apply
> function.  I think this as when I remove the '[String]' return type from
> apply ('.apply[String]') I get the following errors:
>
>
> --------------------------------------------------------------------
> ------------------------------------
>
> Unspecified value parameters: foldFunction: (NotInferedR, RawObservation)
> => NotInferedR, windowFunction: (Tuple, GlobalWindow,
> Iterable[NotInferedR], Collector[NotInferedR]) => Unit
>
> Unspecified value parameters: foldFunction: FoldFunction[RawObservation,
> NotInferedR], function: WindowFunction[NotInferedR, NotInferedR, Tuple,
> GlobalWindow]
>
>
> Unspecified value parameters: function: WindowFunction[RawObservation,
> NotInferedR, Tuple, GlobalWindow]
>
> Unspecified value parameters: windowFunction: (Tuple, GlobalWindow,
> Iterable[RawObservation], Collector[NotInferedR]) => Unit
>
> Type mismatch, expected: (Tuple, GlobalWindow, Iterable[RawObservation],
> Collector[NotInferedR]) => Unit, actual: SequentialDeltaCheck
>
> Type mismatch, expected: WindowFunction[RawObservation, NotInferedR,
> Tuple, GlobalWindow], actual: SequentialDeltaCheck
>
> --------------------------------------------------------------------
> ------------------------------------
>
>
> As an aside to this, when defining the WindowFunction, I wasn't sure if I
> was correctly setting the key type to Tuple2, as it is a compound key.
>
>
> Any help or pointers to something I may have missed in the docs would be
> great, I've a had a look through but nothing jumped out at me.  I also
> think I could probably do this using the fold transform, but I wanted to
> try using window functions first.
>
>
> Thanks,
>
> David
>
>
> The workflow:
>
>
> val stream: DataStream[RawObservation] = env
>   .addSource(new FlinkKafkaConsumer09[String]("sensor_raw", new SimpleStringSchema(),
properties))
>   .map(new RawTupleToObservation())
>
> /**
>   * Take the stream of RawObservation objects, parse out the Event Time and add watermarks,
>   * key by the site and sensor values, then create a sliding countWindow for subsequent
observations
>   */
> val timedObservations: DataStream[RawObservation] = stream
>   .assignTimestampsAndWatermarks(new ObservationTimestamp())
>
> val windowedObservations = timedObservations
>   .keyBy("site")
>   .countWindow(2,1)
>
>
> val deltaStream: DataStream[String] = windowedObservations
>   .apply[String](new SequentialDeltaCheck())
>
>
> The WindowFunction:
>
>
> class SequentialDeltaCheck extends WindowFunction[RawObservation, String, String, TimeWindow]{
>
>   def apply(key: String, window: TimeWindow, input: Iterable[RawObservation], out: Collector[String]):
Unit = {
>     val previous: Double = input.head.observation
>     val current: Double = input.last.observation
>
>     val delta: Double = current - previous
>     out.collect(s"TEST-DELTA: $window, $delta")
>   }
> }
>
>
>
> ------------------------------
> This message (and any attachments) is for the recipient only. NERC is
> subject to the Freedom of Information Act 2000 and the contents of this
> email and any reply you make may be disclosed by NERC unless it is exempt
> from release under the Act. Any material supplied to NERC may be stored in
> an electronic records management system.
> ------------------------------
>
‚Äč
-- 
Thanks and Regards,
Vishnu Viswanath,
*www.vishnuviswanath.com <http://www.vishnuviswanath.com>*

Mime
View raw message