flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Balaji Rajagopalan <balaji.rajagopa...@olacabs.com>
Subject Re: Valuestate is not saving the state
Date Wed, 23 Mar 2016 11:07:15 GMT
(Booking(te7uc4,compact,jek@gmail.com,Mon Feb 29 19:19:40 IST
2016),1458730980000,1458731040000)current booking count 1
(Booking(tdr1ym,compact,ere65@gmail.com,Mon Feb 29 18:41:07 IST
2016),1458730980000,1458731040000)current booking count 1
(Booking(t9zvqw,compact,yasere@gmail.com,Mon Feb 29 19:19:40 IST
2016),1458730980000,1458731040000)current booking count 1
(Booking(tdr1e8,compact,k.ere@gmail.com,Mon Feb 29 18:41:07 IST
2016),1458730980000,1458731040000)current booking count 1
(Booking(tdntcj,compact,erer@gmail.com,Mon Feb 29 19:19:40 IST
2016),1458730980000,1458731040000)current booking count 1
(Booking(tdr1wv,compact,eree@gmail.com,Mon Feb 29 18:41:07 IST
2016),1458730980000,1458731040000)current booking count 1
(Booking(tdr1wv,compact,ereve@yahoo.in,Mon Feb 29 18:41:07 IST
2016),1458730980000,1458731040000)current booking count 1

The key is email id from the booking object.


On Wed, Mar 23, 2016 at 4:32 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> what is the input for each of those outputs? Could you maybe print this:
>
>     System.out.println(in + “, current booking count "+value)
>
> Also, what is the key that you specify for your KeyedStream?
>
> Cheers,
> Aljoscha
> > On 23 Mar 2016, at 11:53, Balaji Rajagopalan <
> balaji.rajagopalan@olacabs.com> wrote:
> >
> > I wrote the below code which will increment a counter for the data in
> the datastream, and when I print the counter each time it seems the value
> is reinitialised to 0, and it is not incrementing, any thoughts.
> >
> > class BookingCntFlatMapFunction extends
> RichFlatMapFunction[(Booking,Long,Long),(Booking,Long,Long)]
> > {
> >
> >
> >   @transient var bookingCnt:ValueState[Int] = null
> >
> >   override def flatMap(in: (Booking, Long, Long), out:
> Collector[(Booking, Long, Long)]): Unit = {
> >     var value = bookingCnt.value()
> >     value += 1
> >     System.out.println("current booking count "+value)
> >
> >     bookingCnt.update(value)
> >      out.collect(in)
> >   }
> >
> >   override def open( config:Configuration): Unit = {
> >     val descriptor: ValueStateDescriptor[Int] = new
> ValueStateDescriptor[Int]("bookingcnt",
> >       TypeInformation.of(new TypeHint[Int]() {}),0)
> >     bookingCnt = getRuntimeContext().getState(descriptor);
> >
> >   }
> >
> >
> > }
> > Output of the program:
> > current booking count 1
> > current booking count 1
> > current booking count 1
> > current booking count 1
> > current booking count 1
> > current booking count 1
> > current booking count 1
> >
>
>

Mime
View raw message