flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: KryoException with joda Datetime
Date Fri, 15 May 2015 12:56:20 GMT
I think distinct() is failing on "null" values because its using a reduce
operation internally.

On Fri, May 15, 2015 at 2:46 PM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> I found that the problem is caused by a distinct() that I perform before
> printing..are null values something not managed by distinct??
>
> On Fri, May 15, 2015 at 2:38 PM, Robert Metzger <rmetzger@apache.org>
> wrote:
>
>> Hi,
>>
>> the error means that you are grouping on a field which contains null
>> values. We can not compare elements against null, that's why we throw the
>> exception.
>> Are you sure that you're not having any null elements inside the DataSet
>> you're comparing against?
>>
>>
>> I'm not 100% sure that my fix is correct .. maybe the tests will uncover
>> that I've overseen something (they are still running).
>>
>>
>>
>> On Fri, May 15, 2015 at 2:26 PM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> Hi Robert,
>>> I applied your fix but still I get one error (not in the same point at
>>> least..)
>>> Basically what I do is:
>>>
>>> DataSet<Tuple2<String, DateTime>> someDates; //this is empty in my
test
>>> DataSet<Tuple3<String, String, DateTime>> someEvents;
>>> DataSet<Tuple4<String, String, DateTime, DateTime>> res =
>>> someEvents.coGroup(someDates).where(0).equalTo(0).with(
>>>  new myCoGroupFunction<Tuple3<String, String, DateTime>, Tuple2<String,
>>> DateTime>, Tuple4<String, String, DateTime, DateTime>> (...));
>>> res.print();
>>>
>>> in myCoGroupFunction I declare a Tuple4<String, String, DateTime,
>>> DateTime> reuse = new Tuple4<>() and I collect reuse tuples with t.f3
=
>>> null.
>>>
>>> Then I get this stackTrace:
>>>
>>> Caused by: org.apache.flink.types.NullKeyFieldException
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:76)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:30)
>>> at
>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115)
>>> at
>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233)
>>> at
>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194)
>>> at
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504)
>>> at
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> On Fri, May 15, 2015 at 1:57 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> Thanks a lot Robert! Don't mention it ;)
>>>>
>>>>
>>>> On Fri, May 15, 2015 at 1:54 PM, Robert Metzger <rmetzger@apache.org>
>>>> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> the patch is in my branch "flink2019".
>>>>> Its really good that you've found the bug. We were using the wrong
>>>>> kryo instance to create copies of generic types.
>>>>>
>>>>> Once travis validates that everything is good, I'll push it to master.
>>>>>
>>>>> On Fri, May 15, 2015 at 12:41 PM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>>> So do you think you could release a path soon? I need it to continue
>>>>>> my work..otherwise if it's very simple you could send me the snippet
of
>>>>>> code to change my local flink version ;)
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>> On Fri, May 15, 2015 at 11:22 AM, Robert Metzger <rmetzger@apache.org
>>>>>> > wrote:
>>>>>>
>>>>>>> Yes ;)
>>>>>>>
>>>>>>> On Fri, May 15, 2015 at 11:10 AM, Flavio Pompermaier <
>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>
>>>>>>>> Do you think it's comething easy to fix..?
>>>>>>>>
>>>>>>>> On Fri, May 15, 2015 at 10:51 AM, Robert Metzger <
>>>>>>>> rmetzger@apache.org> wrote:
>>>>>>>>
>>>>>>>>> No problem ;)
>>>>>>>>>
>>>>>>>>> I was able to reproduce the issue and filed a JIRA for
it:
>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-2019
>>>>>>>>>
>>>>>>>>> On Fri, May 15, 2015 at 10:36 AM, Flavio Pompermaier
<
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> Unfortunately it's really difficult for me to extract
the
>>>>>>>>>> code..I'm using joda shipped with Flink 0.9-SNAPSHOT
(i.e. 2.5) and before
>>>>>>>>>> today I've never seen this error..als o because DateTime
is Serializable :)
>>>>>>>>>>
>>>>>>>>>> On Fri, May 15, 2015 at 10:25 AM, Fabian Hueske <
>>>>>>>>>> fhueske@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Is there a chance that the version of JodaTime
changed?
>>>>>>>>>>>
>>>>>>>>>>> 2015-05-15 10:22 GMT+02:00 Robert Metzger <rmetzger@apache.org>:
>>>>>>>>>>>
>>>>>>>>>>>> Can you share the Flink program?
>>>>>>>>>>>> Or at least the definition of the Tuple?
>>>>>>>>>>>>
>>>>>>>>>>>> I'll look into this issue in a few minutes.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, May 15, 2015 at 10:13 AM, Flavio
Pompermaier <
>>>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I'm using Flink 0.9-SNAPSHOT and I've
never seen this error
>>>>>>>>>>>>> before today (the job haven't changed..)
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, May 15, 2015 at 10:09 AM, Robert
Metzger <
>>>>>>>>>>>>> rmetzger@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Flavio,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> which version of Flink are you using?
>>>>>>>>>>>>>> If you are using 0.9 something, then
this should actually
>>>>>>>>>>>>>> work  ;)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, May 15, 2015 at 10:06 AM,
Flavio Pompermaier <
>>>>>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> this morning I run my Flink job
and I got the following
>>>>>>>>>>>>>>> exception serializing a DateTime
Tuple..could you help me to understand
>>>>>>>>>>>>>>> what's happening here?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.KryoException:
Class cannot be
>>>>>>>>>>>>>>> created (missing no-arg constructor):
org.joda.time.chrono.ISOChronology
>>>>>>>>>>>>>>> Serialization trace:
>>>>>>>>>>>>>>> iChronology (org.joda.time.DateTime)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>>>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624)
>>>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.copy(ObjectField.java:140)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:634)
>>>>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:77)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator.setReference(GenericTypeComparator.java:1)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:72)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleComparator.setReference(TupleComparator.java:1)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.nextKey(NonReusingKeyGroupedIterator.java:115)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.sortAndCombine(SynchronousChainedCombineDriver.java:233)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.close(SynchronousChainedCombineDriver.java:194)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:504)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>

Mime
View raw message