flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dawid Wysakowicz <wysakowicz.da...@gmail.com>
Subject Re: Queryable State
Date Fri, 27 Jan 2017 16:22:25 GMT
Hi Nico,

No problem at all, I've already presented my showcase with
ValueStateDescriptor.

Anyway, if I could help you somehow with the Queryablestate let me know. I
will be happy to contribute some code.

2017-01-25 14:47 GMT+01:00 Nico Kruber <nico@data-artisans.com>:

> Hi Dawid,
> sorry for the late reply, I was fixing some issues for queryable state and
> may
> now have gotten to the point of your error: you may be seeing a race
> condition
> with the MemoryStateBackend state backend (the default) as described here:
> https://issues.apache.org/jira/browse/FLINK-5642
> I'm currently working on a fix.
>
> KvStateRequestSerializer#deserializeList(), however, is the right
> function to
> de-serialise list state! - KvStateRequestSerializer#deserializeValue()
> will
> not work!
>
> Thanks for the tip regarding KvStateRequestSerializer#serializeList, this
> was
> indeed not used since the list state backends had their own serialisation
> function.
> We removed KvStateRequestSerializer#serializeList as well as the queryable
> list state sink for 1.2 and up.
>
>
> Nico
>
> On Monday, 16 January 2017 14:47:59 CET Dawid Wysakowicz wrote:
> > Hi Nico, Ufuk,
> >
> > Thanks for diving into this issue.
> >
> > @Nico
> >
> > I don't think that's the problem. The code can be exactly reproduced in
> > java. I am using other constructor for ListDescriptor than you did:
> >
> > You used:
> > > public ListStateDescriptor(String name, TypeInformation<T> typeInfo)
> >
> > While I used:
> > >  public ListStateDescriptor(String name, Class<T> typeClass)
> >
> > I think the problem is with the way I deserialized the value on the
> > QueryClient side as I tried to use:
> >
> >
> >
> > KvStateRequestSerializer.deserializeList(serializedResult, {
> >
> >       TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]()
> {})
> >
> >         .createSerializer(new ExecutionConfig)
> >
> >     })
> >
> > I have not checked it, but now I suspect this code would work:
> > > KvStateRequestSerializer.deserializeValue(serializedResult, {
> > >
> > >       TypeInformation.of(new
> > >
> > > TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
> > >
> > >         .createSerializer(new ExecutionConfig)
> > >
> > >     })
> >
> > Regarding removing the queryable state list I agree, using it seems
> > pointless. Moreover while removing it I would take a second look at those
> >
> > functions:
> > > KvStateRequestSerializer::deserializeList
> >
> >  KvStateRequestSerializer.serializeList
> >
> >
> > As I think they are not used at all even right now. Thanks for your time.
> >
> > Regards
> > Dawid Wysakowicz
> >
> > 2017-01-16 13:25 GMT+01:00 Nico Kruber <nico@data-artisans.com>:
> > > Hi Dawid,
> > > regarding the original code, I couldn't reproduce this with the Java
> code
> > > I
> > > wrote and my guess is that the second parameter of the
> ListStateDescriptor
> > > is
> > >
> > > wrong:
> > >       .asQueryableState(
> > >
> > >         "type-time-series-count",
> > >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > >
> > >           "type-time-series-count",
> > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > >
> > > this should rather be
> > >
> > > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
> > >
> > > as in the query itself. It sounds strange to me that you don't get ant
> > > ClassCastException or a compile-time error due to the type being wrong
> but
> > > I
> > > lack some Scala knowledge to get to the ground of this.
> > >
> > >
> > > Regarding the removal of the queryable list state "sink", I created a
> JIRA
> > > issue for it and will open a PR:
> > > https://issues.apache.org/jira/browse/FLINK-5507
> > >
> > >
> > > Nico
> > >
> > > On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > > > Hi Nico,
> > > >
> > > > Recently I've tried the queryable state a bit differently, by using
> > > > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > > > util.ArrayList and it works as expected.
> > > >
> > > > The non-working example you can browse here:
> > > > https://github.com/dawidwys/flink-intro/tree/
> > >
> > > c66f01117b0fe3c0adc8923000543a7
> > >
> > > > 0a6fe2219 The working example here:
> > > > https://github.com/dawidwys/flink-intro/tree/master
> > > > (The QueryableJob is in module flink-queryable-job and the
> QueryClient
> > > > in
> > > > flink-state-server)
> > > >
> > > > Sure, I am aware of the downfall of the ListState. I need it just for
> > > > presentational purpose, but you may be right there might not be any
> > > > production use for this state and it should be removed.
> > > > Maybe the problem is just with the ListState and removing it would
> > >
> > > resolve
> > >
> > > > also my problem :)
> > > >
> > > > Regards
> > > > Dawid Wysakowicz
> > > >
> > > > 2017-01-13 18:50 GMT+01:00 Nico Kruber <nico@data-artisans.com>:
> > > > > Hi Dawid,
> > > > > I'll try to reproduce the error in the next couple of days. Can you
> > >
> > > also
> > >
> > > > > share
> > > > > the value deserializer you use? Also, have you tried even smaller
> > >
> > > examples
> > >
> > > > > in
> > > > > the meantime? Did they work?
> > > > >
> > > > > As a side-note in general regarding the queryable state "sink"
> using
> > > > > ListState
> > > > > (".asQueryableState(<name>, ListStateDescriptor)"): everything
that
> > >
> > > enters
> > >
> > > > > this operator will be stored forever and never cleaned.
> Eventually, it
> > > > > will
> > > > > pile up too much memory and is thus of limited use. Maybe it should
> > >
> > > even
> > >
> > > > > be
> > > > > removed from the API.
> > > > >
> > > > >
> > > > > Nico
> > > > >
> > > > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > > > > Hey Ufuk.
> > > > > > Did you maybe had a while to have a look at that problem?
> > > > > >
> > > > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <uce@apache.org>:
> > > > > > > Hey Dawid! Thanks for reporting this. I will try to have
a look
> > >
> > > over
> > >
> > > > > > > the course of the day. From a first impression, this seems
> like a
> > >
> > > bug
> > >
> > > > > > > to me.
> > > > > > >
> > > > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > > > > >
> > > > > > > <wysakowicz.dawid@gmail.com> wrote:
> > > > > > > > Hi I was experimenting with the Query State feature
and I
> have
> > >
> > > some
> > >
> > > > > > > problems
> > > > > > >
> > > > > > > > querying the state.
> > > > > > > >
> > > > > > > > The code which I use to produce the queryable state
is:
> > > > > > > >     env.addSource(kafkaConsumer).map(
> > > > > > > >
> > > > > > > >       e => e match {
> > > > > > > >
> > > > > > > >         case LoginClickEvent(_, t) => ("login",
1, t)
> > > > > > > >         case LogoutClickEvent(_, t) => ("logout",
1, t)
> > > > > > > >         case ButtonClickEvent(_, _, t) => ("button",
1, t)
> > > > > > > >
> > > > > > > >       }).keyBy(0).timeWindow(Time.seconds(1))
> > > > > > > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2,
> Math.max(e1._3,
> > > > > > > >       e2._3)))
> > > > > > > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1,
> > >
> > > e._3,
> > >
> > > > > e._2))
> > > > >
> > > > > > > >       .keyBy("key")
> > > > > > > >       .asQueryableState(
> > > > > > > >
> > > > > > > >         "type-time-series-count",
> > > > > > > >         new ListStateDescriptor[KeyedDataPoint[java.lang.
> > >
> > > Integer]](
> > >
> > > > > > > >           "type-time-series-count",
> > > > > > > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > > > > > > >
> > > > > > > > As you see it is a rather simple job, in which I try
to count
> > >
> > > events
> > >
> > > > > of
> > > > >
> > > > > > > > different types in windows and then query by event
type.
> > > > > > > >
> > > > > > > > In client code I do:
> > > > > > > >     // Query Flink state
> > > > > > > >     val future = client.getKvState(jobId,
> > >
> > > "type-time-series-count",
> > >
> > > > > > > > key.hashCode, seralizedKey)
> > > > > > > >
> > > > > > > >     // Await async result
> > > > > > > >     val serializedResult: Array[Byte] = Await.result(
> > > > > > > >
> > > > > > > >       future, new FiniteDuration(
> > > > > > > >
> > > > > > > >         10,
> > > > > > > >         duration.SECONDS))
> > > > > > > >
> > > > > > > >     // Deserialize response
> > > > > > > >     val results = deserializeResponse(serializedResult)
> > > > > > > >
> > > > > > > >     results
> > > > > > > >
> > > > > > > >   }
> > >
> > > > > > > >   private def deserializeResponse(serializedResult:
> > > Array[Byte]):
> > > > > > > > util.List[KeyedDataPoint[lang
> > > > > > > >
> > > > > > > >   .Integer]] = {
> > > > > > > >
> > > > > > > >     KvStateRequestSerializer.deserializeList(
> serializedResult,
> > > > > > > >
> > > > > > > > getValueSerializer())
> > > > > > > >
> > > > > > > >   }
> > > > > > > >
> > > > > > > > As I was trying to debug the issue I see the first
element in
> > >
> > > list
> > >
> > > > > gets
> > > > >
> > > > > > > > deserialized correctly, but it fails on the second
one. It
> seems
> > > > > > > > like
> > > > > > > > the
> > > > > > > > serialized result is broken. Do you have any idea
if I am
> doing
> > >
> > > sth
> > >
> > > > > > > wrong or
> > > > > > >
> > > > > > > > there is some bug?
> > > > > > > >
> > > > > > > >
> > > > > > > > The exception I get is:
> > > > > > > > java.io.EOFException: null
> > > > > > > > at
> > > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.
> readFully(
> > > > > > >
> > > > > > > DataInputDeserializer.java:157)
> > > > > > >
> > > > > > > > at
> > > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > > > > > >
> > > > > > > DataInputDeserializer.java:240)
> > > > > > >
> > > > > > > > at
> > > > > > > > org.apache.flink.api.java.typeutils.runtime.
> > > > >
> > > > > PojoSerializer.deserialize(
> > > > >
> > > > > > > PojoSerializer.java:386)
> > > > > > >
> > > > > > > > at
> > > > > > > > org.apache.flink.runtime.query.netty.message.
> > > > >
> > > > > KvStateRequestSerializer.
> > > > >
> > > > > > > deserializeList(KvStateRequestSerializer.java:487)
> > > > > > >
> > > > > > > > at
> > > > > > > > com.dataartisans.stateserver.queryclient.QueryClient.
> > > > > > >
> > > > > > > deserializeResponse(QueryClient.scala:44)
> > > > > > >
> > > > > > > > You can browse the exact code at:
> https://github.com/dawidwys/
> > > > > > >
> > > > > > > flink-intro
> > > > > > >
> > > > > > > > I would be grateful for any advice.
> > > > > > > >
> > > > > > > > Regards
> > > > > > > > Dawid Wysakowicz
>
>

Mime
View raw message