flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rahul Raj <rahulrajms...@gmail.com>
Subject Windows getting created only on first execution
Date Wed, 11 Oct 2017 04:19:34 GMT
Hi ,

I have written a program which reads data from Kafka, parses the json and
does some reduce operation. The problem I am facing is, the program
executes perfectly for the first time on a day. But when I kill the program
and execute it again, an empty file is created. Even after compiling again
and running, an empty file is created.

var kafkaConsumer = new FlinkKafkaConsumer08(


      new SimpleStringSchema,



    var messageStream =

    var mts = messageStream.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks[String] {

      var ts = Long.MinValue

      override def extractTimestamp(element: String,
previousElementTimestamp: Long): Long = {

        var timestamp = json_decode(element).toLong

        ts = Math.max(timestamp,previousElementTimestamp)



      override def getCurrentWatermark(): Watermark = {

        new Watermark(ts)



    var output = mts






I am using FileSystem as statebackend. I am assuming this problem is
related to memory cleaning, but I don't understand what's happening.

Any help?

Rahul Raj

View raw message