flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dawid Wysakowicz <wysakowicz.da...@gmail.com>
Subject Re: Queryable State
Date Mon, 16 Jan 2017 13:47:59 GMT
Hi Nico, Ufuk,

Thanks for diving into this issue.

@Nico

I don't think that's the problem. The code can be exactly reproduced in
java. I am using other constructor for ListDescriptor than you did:

You used:

> public ListStateDescriptor(String name, TypeInformation<T> typeInfo)
>

While I used:

>  public ListStateDescriptor(String name, Class<T> typeClass)


I think the problem is with the way I deserialized the value on the
QueryClient side as I tried to use:

>

KvStateRequestSerializer.deserializeList(serializedResult, {

      TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {})

        .createSerializer(new ExecutionConfig)

    })


I have not checked it, but now I suspect this code would work:

> KvStateRequestSerializer.deserializeValue(serializedResult, {
>       TypeInformation.of(new
> TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
>         .createSerializer(new ExecutionConfig)
>     })


Regarding removing the queryable state list I agree, using it seems
pointless. Moreover while removing it I would take a second look at those
functions:

> KvStateRequestSerializer::deserializeList
>
 KvStateRequestSerializer.serializeList


As I think they are not used at all even right now. Thanks for your time.

Regards
Dawid Wysakowicz

2017-01-16 13:25 GMT+01:00 Nico Kruber <nico@data-artisans.com>:

> Hi Dawid,
> regarding the original code, I couldn't reproduce this with the Java code I
> wrote and my guess is that the second parameter of the ListStateDescriptor
> is
> wrong:
>
>       .asQueryableState(
>         "type-time-series-count",
>         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
>           "type-time-series-count",
>           classOf[KeyedDataPoint[java.lang.Integer]]))
>
> this should rather be
>
> TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
>
> as in the query itself. It sounds strange to me that you don't get ant
> ClassCastException or a compile-time error due to the type being wrong but
> I
> lack some Scala knowledge to get to the ground of this.
>
>
> Regarding the removal of the queryable list state "sink", I created a JIRA
> issue for it and will open a PR:
> https://issues.apache.org/jira/browse/FLINK-5507
>
>
> Nico
>
> On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > Hi Nico,
> >
> > Recently I've tried the queryable state a bit differently, by using
> > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > util.ArrayList and it works as expected.
> >
> > The non-working example you can browse here:
> > https://github.com/dawidwys/flink-intro/tree/
> c66f01117b0fe3c0adc8923000543a7
> > 0a6fe2219 The working example here:
> > https://github.com/dawidwys/flink-intro/tree/master
> > (The QueryableJob is in module flink-queryable-job and the QueryClient in
> > flink-state-server)
> >
> > Sure, I am aware of the downfall of the ListState. I need it just for
> > presentational purpose, but you may be right there might not be any
> > production use for this state and it should be removed.
> > Maybe the problem is just with the ListState and removing it would
> resolve
> > also my problem :)
> >
> > Regards
> > Dawid Wysakowicz
> >
> > 2017-01-13 18:50 GMT+01:00 Nico Kruber <nico@data-artisans.com>:
> > > Hi Dawid,
> > > I'll try to reproduce the error in the next couple of days. Can you
> also
> > > share
> > > the value deserializer you use? Also, have you tried even smaller
> examples
> > > in
> > > the meantime? Did they work?
> > >
> > > As a side-note in general regarding the queryable state "sink" using
> > > ListState
> > > (".asQueryableState(<name>, ListStateDescriptor)"): everything that
> enters
> > > this operator will be stored forever and never cleaned. Eventually, it
> > > will
> > > pile up too much memory and is thus of limited use. Maybe it should
> even
> > > be
> > > removed from the API.
> > >
> > >
> > > Nico
> > >
> > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > > Hey Ufuk.
> > > > Did you maybe had a while to have a look at that problem?
> > > >
> > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <uce@apache.org>:
> > > > > Hey Dawid! Thanks for reporting this. I will try to have a look
> over
> > > > > the course of the day. From a first impression, this seems like a
> bug
> > > > > to me.
> > > > >
> > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > > >
> > > > > <wysakowicz.dawid@gmail.com> wrote:
> > > > > > Hi I was experimenting with the Query State feature and I have
> some
> > > > >
> > > > > problems
> > > > >
> > > > > > querying the state.
> > > > > >
> > > > > > The code which I use to produce the queryable state is:
> > > > > >     env.addSource(kafkaConsumer).map(
> > > > > >
> > > > > >       e => e match {
> > > > > >
> > > > > >         case LoginClickEvent(_, t) => ("login", 1, t)
> > > > > >         case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > > >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > > > >
> > > > > >       }).keyBy(0).timeWindow(Time.seconds(1))
> > > > > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > > > >       e2._3)))
> > > > > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1,
> e._3,
> > >
> > > e._2))
> > >
> > > > > >       .keyBy("key")
> > > > > >       .asQueryableState(
> > > > > >
> > > > > >         "type-time-series-count",
> > > > > >         new ListStateDescriptor[KeyedDataPoint[java.lang.
> Integer]](
> > > > > >
> > > > > >           "type-time-series-count",
> > > > > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > > > > >
> > > > > > As you see it is a rather simple job, in which I try to count
> events
> > >
> > > of
> > >
> > > > > > different types in windows and then query by event type.
> > > > > >
> > > > > > In client code I do:
> > > > > >     // Query Flink state
> > > > > >     val future = client.getKvState(jobId,
> "type-time-series-count",
> > > > > >
> > > > > > key.hashCode, seralizedKey)
> > > > > >
> > > > > >     // Await async result
> > > > > >     val serializedResult: Array[Byte] = Await.result(
> > > > > >
> > > > > >       future, new FiniteDuration(
> > > > > >
> > > > > >         10,
> > > > > >         duration.SECONDS))
> > > > > >
> > > > > >     // Deserialize response
> > > > > >     val results = deserializeResponse(serializedResult)
> > > > > >
> > > > > >     results
> > > > > >
> > > > > >   }
> > > > > >
> > > > > >   private def deserializeResponse(serializedResult:
> Array[Byte]):
> > > > > > util.List[KeyedDataPoint[lang
> > > > > >
> > > > > >   .Integer]] = {
> > > > > >
> > > > > >     KvStateRequestSerializer.deserializeList(serializedResult,
> > > > > >
> > > > > > getValueSerializer())
> > > > > >
> > > > > >   }
> > > > > >
> > > > > > As I was trying to debug the issue I see the first element in
> list
> > >
> > > gets
> > >
> > > > > > deserialized correctly, but it fails on the second one. It seems
> > > > > > like
> > > > > > the
> > > > > > serialized result is broken. Do you have any idea if I am doing
> sth
> > > > >
> > > > > wrong or
> > > > >
> > > > > > there is some bug?
> > > > > >
> > > > > >
> > > > > > The exception I get is:
> > > > > > java.io.EOFException: null
> > > > > > at
> > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > > > >
> > > > > DataInputDeserializer.java:157)
> > > > >
> > > > > > at
> > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > > > >
> > > > > DataInputDeserializer.java:240)
> > > > >
> > > > > > at
> > > > > > org.apache.flink.api.java.typeutils.runtime.
> > >
> > > PojoSerializer.deserialize(
> > >
> > > > > PojoSerializer.java:386)
> > > > >
> > > > > > at
> > > > > > org.apache.flink.runtime.query.netty.message.
> > >
> > > KvStateRequestSerializer.
> > >
> > > > > deserializeList(KvStateRequestSerializer.java:487)
> > > > >
> > > > > > at
> > > > > > com.dataartisans.stateserver.queryclient.QueryClient.
> > > > >
> > > > > deserializeResponse(QueryClient.scala:44)
> > > > >
> > > > > > You can browse the exact code at: https://github.com/dawidwys/
> > > > >
> > > > > flink-intro
> > > > >
> > > > > > I would be grateful for any advice.
> > > > > >
> > > > > > Regards
> > > > > > Dawid Wysakowicz
>

Mime
View raw message