flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nico Kruber <n...@data-artisans.com>
Subject Re: Queryable State
Date Mon, 16 Jan 2017 12:25:26 GMT
Hi Dawid,
regarding the original code, I couldn't reproduce this with the Java code I 
wrote and my guess is that the second parameter of the ListStateDescriptor is 
wrong:

      .asQueryableState(
        "type-time-series-count",
        new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
          "type-time-series-count",
          classOf[KeyedDataPoint[java.lang.Integer]]))

this should rather be 

TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}

as in the query itself. It sounds strange to me that you don't get ant 
ClassCastException or a compile-time error due to the type being wrong but I 
lack some Scala knowledge to get to the ground of this.


Regarding the removal of the queryable list state "sink", I created a JIRA 
issue for it and will open a PR:
https://issues.apache.org/jira/browse/FLINK-5507


Nico

On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> Hi Nico,
> 
> Recently I've tried the queryable state a bit differently, by using
> ValueState with a value of a util.ArrayList and a ValueSerializer for
> util.ArrayList and it works as expected.
> 
> The non-working example you can browse here:
> https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a7
> 0a6fe2219 The working example here:
> https://github.com/dawidwys/flink-intro/tree/master
> (The QueryableJob is in module flink-queryable-job and the QueryClient in
> flink-state-server)
> 
> Sure, I am aware of the downfall of the ListState. I need it just for
> presentational purpose, but you may be right there might not be any
> production use for this state and it should be removed.
> Maybe the problem is just with the ListState and removing it would resolve
> also my problem :)
> 
> Regards
> Dawid Wysakowicz
> 
> 2017-01-13 18:50 GMT+01:00 Nico Kruber <nico@data-artisans.com>:
> > Hi Dawid,
> > I'll try to reproduce the error in the next couple of days. Can you also
> > share
> > the value deserializer you use? Also, have you tried even smaller examples
> > in
> > the meantime? Did they work?
> > 
> > As a side-note in general regarding the queryable state "sink" using
> > ListState
> > (".asQueryableState(<name>, ListStateDescriptor)"): everything that enters
> > this operator will be stored forever and never cleaned. Eventually, it
> > will
> > pile up too much memory and is thus of limited use. Maybe it should even
> > be
> > removed from the API.
> > 
> > 
> > Nico
> > 
> > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > Hey Ufuk.
> > > Did you maybe had a while to have a look at that problem?
> > > 
> > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <uce@apache.org>:
> > > > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > > > the course of the day. From a first impression, this seems like a bug
> > > > to me.
> > > > 
> > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > > 
> > > > <wysakowicz.dawid@gmail.com> wrote:
> > > > > Hi I was experimenting with the Query State feature and I have some
> > > > 
> > > > problems
> > > > 
> > > > > querying the state.
> > > > > 
> > > > > The code which I use to produce the queryable state is:
> > > > >     env.addSource(kafkaConsumer).map(
> > > > >     
> > > > >       e => e match {
> > > > >       
> > > > >         case LoginClickEvent(_, t) => ("login", 1, t)
> > > > >         case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > > >       
> > > > >       }).keyBy(0).timeWindow(Time.seconds(1))
> > > > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > > >       e2._3)))
> > > > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3,
> > 
> > e._2))
> > 
> > > > >       .keyBy("key")
> > > > >       .asQueryableState(
> > > > >       
> > > > >         "type-time-series-count",
> > > > >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > > > >         
> > > > >           "type-time-series-count",
> > > > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > > > > 
> > > > > As you see it is a rather simple job, in which I try to count events
> > 
> > of
> > 
> > > > > different types in windows and then query by event type.
> > > > > 
> > > > > In client code I do:
> > > > >     // Query Flink state
> > > > >     val future = client.getKvState(jobId, "type-time-series-count",
> > > > > 
> > > > > key.hashCode, seralizedKey)
> > > > > 
> > > > >     // Await async result
> > > > >     val serializedResult: Array[Byte] = Await.result(
> > > > >     
> > > > >       future, new FiniteDuration(
> > > > >       
> > > > >         10,
> > > > >         duration.SECONDS))
> > > > >     
> > > > >     // Deserialize response
> > > > >     val results = deserializeResponse(serializedResult)
> > > > >     
> > > > >     results
> > > > >   
> > > > >   }
> > > > > 
> > > > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > > > util.List[KeyedDataPoint[lang
> > > > > 
> > > > >   .Integer]] = {
> > > > >   
> > > > >     KvStateRequestSerializer.deserializeList(serializedResult,
> > > > > 
> > > > > getValueSerializer())
> > > > > 
> > > > >   }
> > > > > 
> > > > > As I was trying to debug the issue I see the first element in list
> > 
> > gets
> > 
> > > > > deserialized correctly, but it fails on the second one. It seems
> > > > > like
> > > > > the
> > > > > serialized result is broken. Do you have any idea if I am doing sth
> > > > 
> > > > wrong or
> > > > 
> > > > > there is some bug?
> > > > > 
> > > > > 
> > > > > The exception I get is:
> > > > > java.io.EOFException: null
> > > > > at
> > > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > > > 
> > > > DataInputDeserializer.java:157)
> > > > 
> > > > > at
> > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > > > 
> > > > DataInputDeserializer.java:240)
> > > > 
> > > > > at
> > > > > org.apache.flink.api.java.typeutils.runtime.
> > 
> > PojoSerializer.deserialize(
> > 
> > > > PojoSerializer.java:386)
> > > > 
> > > > > at
> > > > > org.apache.flink.runtime.query.netty.message.
> > 
> > KvStateRequestSerializer.
> > 
> > > > deserializeList(KvStateRequestSerializer.java:487)
> > > > 
> > > > > at
> > > > > com.dataartisans.stateserver.queryclient.QueryClient.
> > > > 
> > > > deserializeResponse(QueryClient.scala:44)
> > > > 
> > > > > You can browse the exact code at: https://github.com/dawidwys/
> > > > 
> > > > flink-intro
> > > > 
> > > > > I would be grateful for any advice.
> > > > > 
> > > > > Regards
> > > > > Dawid Wysakowicz
Mime
View raw message