flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sonex <alfredjens...@gmail.com>
Subject Re: Windows emit results at the end of the stream
Date Tue, 28 Mar 2017 10:09:19 GMT
I have prepared a small dummy dataset (data.txt) as follows:

Hello|5
Hi|15
WordsWithoutMeaning|25
AnotherWord|34
HelloWorld|46
HelloPlanet|67
HelloFlinkUsers|89
HelloProgrammers|98
DummyPhrase|105
AnotherDummy|123

And below is the code:

import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

val parallelism = 8
// sliding step (ms)
val slidingStep = 30
// window size (ms)
val windowSize = 30

// start the streaming environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.disableSysoutLogging
    // set the degree of parallelism
    env.setParallelism(parallelism)
    // set the time characteristic
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputFormat = new TextInputFormat(new Path("data.txt"))
    env.readFile(inputFormat,"data.txt",
FileProcessingMode.PROCESS_CONTINUOUSLY,10000L)
      .map{
        element =>
          val partsOfElement = element.split("[|]")
          (partsOfElement.head,partsOfElement.last.toLong)
      }.assignAscendingTimestamps(_._2)
      .keyBy(_._1)
     
.timeWindow(Time.milliseconds(windowSize),Time.milliseconds(slidingStep))
      .apply(new Test)

    env.execute


And the test class is the following:

class Test extends WindowFunction[(String,Long),String,String,TimeWindow] {
    override def apply(key: String, window: TimeWindow, input:
Iterable[(String, Long)], out: Collector[String]): Unit = {
      println(s"$key -- ${window.getEnd}")
      out.collect(input.head._1)
    }
  }


Each window result is simply the first element from the iterable and when
the window is processed it prints the key with the end time of the window.
If we set the parallelism to 8 as above, it does nothing. If we decrease the
parallelism to 4, it only emits results from the first window. You can run
the above code and test it yourself.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-emit-results-at-the-end-of-the-stream-tp12337p12433.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message