flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jayant Ameta <wittyam...@gmail.com>
Subject Re: Queryable state when key is UUID - getting Kyro Exception
Date Fri, 26 Oct 2018 03:52:58 GMT
MapStateDescriptor<UUID, String> descriptor = new
MapStateDescriptor<>("rulePatterns", UUID.class,
    String.class);

Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <bupt_ljy@163.com> wrote:

> Hi,
>
>    Can you show us the descriptor in the codes below?
>
>     client.getKvState(JobID.fromHexString(
> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>
>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>
>>
> Jiayi Liao, Best
>
>
>  Original Message
> *Sender:* Jayant Ameta<wittyameta@gmail.com>
> *Recipient:* bupt_ljy<bupt_ljy@163.com>
> *Cc:* Tzu-Li (Gordon) Tai<tzulitai@apache.org>; user<user@flink.apache.org
> >
> *Date:* Friday, Oct 26, 2018 02:26
> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>
> Also, I haven't provided any custom serializer in my flink job. Shouldn't
> the same configuration work for queryable state client?
>
> Jayant Ameta
>
>
> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <wittyameta@gmail.com> wrote:
>
>> Hi Gordon,
>> Following is the stack trace that I'm getting:
>>
>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: Failed request 0.*
>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>> * Caused by: java.lang.RuntimeException: Error while processing request
>> with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
>> unregistered class ID: -985346241*
>> *Serialization trace:*
>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>> * at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>> * at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>> * at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>> * at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>> * at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>> * at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>> * at
>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>> * at
>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>> * at
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
>> * at
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
>> * at
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
>> * at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>> * at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>> * at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>> * at java.lang.Thread.run(Thread.java:748)*
>>
>> I am not using any custom serialize as mentioned by Jiayi.
>>
>> Jayant Ameta
>>
>>
>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bupt_ljy@163.com> wrote:
>>
>>> Hi  Jayant,
>>>
>>>   There should be a Serializer parameter in the constructor of the
>>> StateDescriptor, you should create a new serializer like this:
>>>
>>>
>>>    new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>>>
>>>
>>>  By the way, can you show us your kryo exception like what Gordon said?
>>>
>>>
>>> Jiayi Liao, Best
>>>
>>>
>>>
>>>  Original Message
>>> *Sender:* Tzu-Li (Gordon) Tai<tzulitai@apache.org>
>>> *Recipient:* Jayant Ameta<wittyameta@gmail.com>; bupt_ljy<
>>> bupt_ljy@163.com>
>>> *Cc:* user<user@flink.apache.org>
>>> *Date:* Thursday, Oct 25, 2018 17:18
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Hi Jayant,
>>>
>>> What is the Kryo exception message that you are getting?
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyameta@gmail.com)
>>> wrote:
>>>
>>> Hi,
>>> I've not configured any serializer in the descriptor. (Neither in flink
>>> job, nor in state query client).
>>> Which serializer should I use?
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bupt_ljy@163.com> wrote:
>>>
>>>> Hi,
>>>>
>>>>    It seems that your codes are right. Are you sure that you’re using
>>>> the same Serializer as the Flink program do? Could you show the serializer
>>>> in descriptor?
>>>>
>>>>
>>>>
>>>> Jiayi Liao, Best
>>>>
>>>>  Original Message
>>>> *Sender:* Jayant Ameta<wittyameta@gmail.com>
>>>> *Recipient:* user<user@flink.apache.org>
>>>> *Date:* Thursday, Oct 25, 2018 14:17
>>>> *Subject:* Queryable state when key is UUID - getting Kyro Exception
>>>>
>>>> I get Kyro exception when querying the state.
>>>>
>>>> Key: UUID
>>>> MapState<UUID, String>
>>>>
>>>> Client code snippet:
>>>>
>>>> CompletableFuture<MapState<UUID, String>> resultFuture =
>>>>     client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"),
"rule",
>>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>> MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);
>>>>
>>>>
>>>> Any better way to query it?
>>>>
>>>>
>>>> Jayant Ameta
>>>>
>>>

Mime
View raw message