flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Valuestate is not saving the state
Date Wed, 23 Mar 2016 11:02:17 GMT
Hi,
what is the input for each of those outputs? Could you maybe print this:

    System.out.println(in + “, current booking count "+value)

Also, what is the key that you specify for your KeyedStream?

Cheers,
Aljoscha
> On 23 Mar 2016, at 11:53, Balaji Rajagopalan <balaji.rajagopalan@olacabs.com> wrote:
> 
> I wrote the below code which will increment a counter for the data in the datastream,
and when I print the counter each time it seems the value is reinitialised to 0, and it is
not incrementing, any thoughts. 
> 
> class BookingCntFlatMapFunction extends RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)]
> {
> 
> 
>   @transient var bookingCnt:ValueState[Int] = null
> 
>   override def flatMap(in: (Booking, Long, Long), out: Collector[(Booking, Long, Long)]):
Unit = {
>     var value = bookingCnt.value()
>     value += 1
>     System.out.println("current booking count "+value)
>     
>     bookingCnt.update(value)
>      out.collect(in)
>   }
> 
>   override def open( config:Configuration): Unit = {
>     val descriptor: ValueStateDescriptor[Int] = new ValueStateDescriptor[Int]("bookingcnt",
>       TypeInformation.of(new TypeHint[Int]() {}),0)
>     bookingCnt = getRuntimeContext().getState(descriptor);
> 
>   }
> 
> 
> }
> Output of the program:
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> current booking count 1
> 


Mime
View raw message