Hi,
you would indeed use apply(), or better fold(<initial_value>, <fold_function>, <window_function>) to map the result of folding your window to some other data type. If you will, a WindowFunction allows "mapping" the result of your windowing to a different type.

Best,
Aljoscha

On Wed, 15 Feb 2017 at 06:14 nsengupta <sengupta.nirmalya@gmail.com> wrote:
I have gone through this  post
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WindowedStream-operation-questions-td6006.html>
, where Aljoscha explains that /mapping/ on WindowedStream is /not/ allowed.

So, I think I haven't asked the question properly. Here is (hopefully) a
better and easier version:

1.    I begin with records of type RawMITSIMTuple.
2.    When I group them using a Window, I get an
AllWindowedStream[RawMITSIMTuple].
3.    I /fold/ the tuples obtained in the Window, which gives me a
DataStream[Vector[RawMITSIMTuple].
4.    What I need is a DataStream[PositionReport]. So, I need to flatMap the
output of previous step, where I first get hold of each of the
RawMITSIMTuple and map that to PositionReport.

val positionReportStream = this
      .readRawMITSIMTuplesInjected(envDefault,args(0))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
      .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => {
          collectorBin :+ rawRecord)
        })
      .flatMap(r => r.map(e => this.preparePositionReport(e)))

This gives me what I want, but I feel this is verbose and inefficient. Am I
thinking correctly? If so, what is a better idiom to use in such cases?

-- Nirmalya




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11630.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.