flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rahul Raj <rahulrajms...@gmail.com>
Subject Windows getting created only on first execution
Date Wed, 11 Oct 2017 04:19:34 GMT
Hi ,

I have written a program which reads data from Kafka, parses the json and
does some reduce operation. The problem I am facing is, the program
executes perfectly for the first time on a day. But when I kill the program
and execute it again, an empty file is created. Even after compiling again
and running, an empty file is created.

var kafkaConsumer = new FlinkKafkaConsumer08(

      params.getRequired("input-topic"),

      new SimpleStringSchema,

      params.getProperties)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    var messageStream =
env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))


    var mts = messageStream.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks[String] {

      var ts = Long.MinValue

      override def extractTimestamp(element: String,
previousElementTimestamp: Long): Long = {

        var timestamp = json_decode(element).toLong

        ts = Math.max(timestamp,previousElementTimestamp)

        timestamp

      }


      override def getCurrentWatermark(): Watermark = {

        new Watermark(ts)

      }

    })

    var output = mts

      .keyBy(t=>json_decode(t))

      .window(EventTimeSessionWindows.withGap(Time.seconds(60)))

      .allowedLateness(Time.seconds(5))

      .reduce((v1,v2)=>v1+"----"+v2)


output.writeAsText(path).setParallelism(1)


I am using FileSystem as statebackend. I am assuming this problem is
related to memory cleaning, but I don't understand what's happening.

Any help?


Rahul Raj

Mime
View raw message