flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Another serialization error
Date Tue, 17 May 2016 14:42:16 GMT
I found that in the cluster I was using a release version of a dependency
that has changed..so now I have the error also in the cluster :)

This is caused by the addition of the setParent() method to TreeNode:

    public void setParent(TreeNode<K, V> parent) {
        this.parent = parent;
    }

without that Flink doesn't complain at least..then I'm not sure whether
things are working correctly then...


On Tue, May 17, 2016 at 3:53 PM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> Yes I am
>
>
> On Tue, May 17, 2016 at 3:45 PM, Robert Metzger <rmetzger@apache.org>
> wrote:
>
>> Are you using 1.0.2 on the cluster as well?
>>
>> On Tue, May 17, 2016 at 3:40 PM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> I tried to debug my application from Eclipse and I got an infinite
>>> recursive call in the TypeExtractor during the analysis of TreeNode (I'm
>>> using Flink 1.0.2):
>>>
>>> Exception in thread "main" java.lang.StackOverflowError
>>>     at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1482)
>>>     at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1464)
>>>     at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:736)
>>>     at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
>>>     at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
>>>     at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
>>>     at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
>>>     at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
>>>     at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
>>>
>>> Why this doesn't happen on the cluster?
>>>
>>>
>>>
>>>
>>> On Tue, May 17, 2016 at 2:23 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> Don't worry Robert,
>>>> I know how hard is to debug such errors :)
>>>> I hope that maybe the combination of these 3 errors is somehow
>>>> related...However these are the answers:
>>>>
>>>>
>>>>    - The job (composed of 16 sub-jobs) fails randomly but, usually,
>>>>    the first subjob after the start restart run successfully
>>>>    - In this job I sow both A, B and C (but after changing parallelism)
>>>>    - Yes, the error behave differently depending on the input data
>>>>    (actually the number of default parallelism and slotes in the cluster)
>>>>
>>>> One more interesting thing I fixed in my code that could be (maybe?)
>>>> the of cause of B and C (but not A because that happened after this
>>>> problem):
>>>> I'm reading and writing data from some Parquet-thrift directory (using
>>>> the Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ).
>>>> In one of the 3 jobs I output some dataset in a Parquet-thrift
>>>> directory after decreasing the parallelism and number of slots from M to
N
>>>> using ds.output().
>>>> The first N parquet files were overridden (as expected) but the last
>>>> M-N were not removed (I was expecting that Parquet thrift directory was
>>>> managed as a single dir) .
>>>> Then, when I've read (in the next job) from that directory I discovered
>>>> that the job was actually reading all files in that folder (I was convinced
>>>> that despite the the M-N files
>>>> were left in that dir there was some index file, e.g. _metadata, taking
>>>> care of be the entry point for the files in that folder).
>>>> I don't know however if this could be a cause of such errors but I
>>>> reported it anyway for the sake of completeness and hoping that
>>>> this real-life debugging story could be helpful to someone else using
>>>> Parquet on Flink :)
>>>>
>>>> Thanks for the support,
>>>>
>>>> Flavio
>>>>
>>>> On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <rmetzger@apache.org>
>>>> wrote:
>>>>
>>>>> The last one is C or A?
>>>>>
>>>>> How often is it failing (every nth run?) Is it always failing at the
>>>>> same execute() call, or at different ones?
>>>>> Is it always the exact same exception or is it different ones?
>>>>> Does the error behave differently depending on the input data?
>>>>>
>>>>> Sorry for asking so many questions, but these errors can have many
>>>>> causes and just searching the code for potential issues can take a lot
of
>>>>> time ;)
>>>>>
>>>>> On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>>> Ah sorry, I forgot to mention that I don't use any custom kryo
>>>>>> serializers..
>>>>>>
>>>>>> On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>>> I got those exceptions running 3 different types of jobs..I could
>>>>>>> have tracked the job and the error...my bad!
>>>>>>> However, the most problematic job is the last one, where I run
a
>>>>>>> series of jobs one after the other (calling env.execute() in
a for loop)..
>>>>>>> I you want I can share with you my code (in private for the moment
>>>>>>> because it's not public yet) or the dashboard screen via skype
while the
>>>>>>> jobs are running..
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <
>>>>>>> rmetzger@apache.org> wrote:
>>>>>>>
>>>>>>>> Hi Flavio,
>>>>>>>> thank you for providing additional details.
>>>>>>>> I don't think that missing hashCode / equals() implementations
>>>>>>>> cause such an error. They can cause wrong sorting or partitioning
of the
>>>>>>>> data, but the serialization should still work properly.
>>>>>>>> I suspect the issue somewhere in the serialization stack.
>>>>>>>>
>>>>>>>> Are you registering any custom kryo serializers?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> From your past emails, you saw the following different exceptions:
>>>>>>>>
>>>>>>>> A)  Caused by: java.io.UTFDataFormatException: malformed
input
>>>>>>>> around byte 42
>>>>>>>> B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
>>>>>>>> C)  Caused by: java.lang.ArrayIndexOutOfBoundsException:
-2
>>>>>>>>
>>>>>>>> Were they all caused by the same job, or different ones?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <
>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>
>>>>>>>>> Hi Robert,
>>>>>>>>> in this specific case the interested classes are:
>>>>>>>>>
>>>>>>>>>    - Tuple3<String, String, IndexAttributeToExpand>
>>>>>>>>>    (IndexAttributeToExpand is a POJO extending another
class and both of them
>>>>>>>>>    doesn't implement equals and hashcode)
>>>>>>>>>    - Tuple3<String,String, TreeNode<String, Map<String,
>>>>>>>>>    Set<String>>>> (TreeNode is a POJO
containing other TreeNode and it doesn't
>>>>>>>>>    implement  equals and hashcode)
>>>>>>>>>    - Now I've added to both classes hashCode and equals
in order
>>>>>>>>>    to be aligned with POJO policies, however the job
finished correctly after
>>>>>>>>>    stopping and restarting the cluster...usually when
I have strange
>>>>>>>>>    serialization exception I stop and restart the cluster
and everything works.
>>>>>>>>>
>>>>>>>>> The TreeNode class (the recursive one) is actually the
following:
>>>>>>>>>
>>>>>>>>> import java.io.Serializable;
>>>>>>>>> import java.util.ArrayList;
>>>>>>>>> import java.util.HashMap;
>>>>>>>>> import java.util.Iterator;
>>>>>>>>> import java.util.LinkedList;
>>>>>>>>> import java.util.List;
>>>>>>>>> import java.util.UUID;
>>>>>>>>>
>>>>>>>>> import com.fasterxml.jackson.annotation.JsonIgnore;
>>>>>>>>>
>>>>>>>>> public class TreeNode<K,V> implements Serializable
{
>>>>>>>>>
>>>>>>>>> private static final long serialVersionUID = 1L;
>>>>>>>>> private int level = 0;
>>>>>>>>>
>>>>>>>>> private String uuid;
>>>>>>>>> private K key;
>>>>>>>>> private V value;
>>>>>>>>> @JsonIgnore
>>>>>>>>> private TreeNode<K,V> parent;
>>>>>>>>> private List<TreeNode<K,V>> children;
>>>>>>>>> @JsonIgnore
>>>>>>>>> private HashMap<K, List<TreeNode<K,V>>>
lookup;
>>>>>>>>>
>>>>>>>>>     public TreeNode(K key, V value) {
>>>>>>>>>     this.level = 0;
>>>>>>>>>     this.key = key;
>>>>>>>>>     this.uuid = UUID.randomUUID().toString();
>>>>>>>>>         this.value = value;
>>>>>>>>>         this.children = new LinkedList<TreeNode<K,V>>();
>>>>>>>>>         List<TreeNode<K, V>> thisAsList =
new
>>>>>>>>> ArrayList<TreeNode<K,V>>();
>>>>>>>>>         thisAsList.add(this);
>>>>>>>>>         this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
>>>>>>>>>         this.lookup.put(key, thisAsList);
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     public TreeNode<K,V> addChild(K key, V value)
{
>>>>>>>>>     TreeNode<K,V> childNode = new TreeNode<K,V>(key,
value);
>>>>>>>>>     childNode.level = level +1;
>>>>>>>>>         childNode.parent = this;
>>>>>>>>>         childNode.lookup = lookup;
>>>>>>>>>         childNode.uuid = UUID.randomUUID().toString();
>>>>>>>>>         this.children.add(childNode);
>>>>>>>>>         List<TreeNode<K, V>> l = lookup.get(childNode.key);
>>>>>>>>>         if(l==null){
>>>>>>>>>         l = new ArrayList<TreeNode<K,V>>();
>>>>>>>>>         lookup.put(childNode.key, l);
>>>>>>>>>         }
>>>>>>>>>         l.add(childNode);
>>>>>>>>>         return childNode;
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     public boolean isLeaf() {
>>>>>>>>> return children.isEmpty() ;
>>>>>>>>> }
>>>>>>>>> public int getLevel() {
>>>>>>>>> return level;
>>>>>>>>> }
>>>>>>>>> public TreeNode<K,V> getParent() {
>>>>>>>>> return parent;
>>>>>>>>> }
>>>>>>>>> public V getValue() {
>>>>>>>>> return value;
>>>>>>>>> }
>>>>>>>>> public String getUuid() {
>>>>>>>>> return uuid;
>>>>>>>>> }
>>>>>>>>> public void setUuid(String uuid) {
>>>>>>>>> this.uuid = uuid;
>>>>>>>>> }
>>>>>>>>> public List<TreeNode<K,V>> getChildren()
{
>>>>>>>>> return children;
>>>>>>>>> }
>>>>>>>>> public List<TreeNode<K, V>> getNodesByKey(K
key) {
>>>>>>>>> return lookup.get(key);
>>>>>>>>> }
>>>>>>>>> public K getKey() {
>>>>>>>>> return key;
>>>>>>>>> }
>>>>>>>>> public List<TreeNode<K,V>> getLeafs() {
>>>>>>>>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
>>>>>>>>> getLeafs(ret);
>>>>>>>>> return ret;
>>>>>>>>> }
>>>>>>>>> private void getLeafs(List<TreeNode<K, V>>
ret) {
>>>>>>>>> if(children.isEmpty())
>>>>>>>>> ret.add(this);
>>>>>>>>> for (TreeNode<K, V> child : children) {
>>>>>>>>> child.getLeafs(ret);
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public String toString() {
>>>>>>>>> return toString(true);
>>>>>>>>> }
>>>>>>>>> public String toString(boolean withChildren) {
>>>>>>>>> if(key==null)
>>>>>>>>> return super.toString();
>>>>>>>>> StringBuffer ret = new StringBuffer();
>>>>>>>>> for (int i = 0; i < level; i++) {
>>>>>>>>> ret.append(" >");
>>>>>>>>> }
>>>>>>>>> ret.append(" " +key.toString());
>>>>>>>>> if(withChildren){
>>>>>>>>> for (TreeNode<K, V> child : children) {
>>>>>>>>> ret.append("\n").append(child.toString());
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>> return ret.toString();
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public void setValue(V value) {
>>>>>>>>> this.value = value;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public void remove(List<TreeNode<K, V>> nodes)
{
>>>>>>>>> for (TreeNode<K, V> n : nodes) {
>>>>>>>>> removeChildren(n);
>>>>>>>>> }
>>>>>>>>> for (TreeNode<K, V> n : nodes) {
>>>>>>>>> TreeNode<K, V> parent = n.getParent();
>>>>>>>>> if(parent==null)
>>>>>>>>> continue;
>>>>>>>>> parent.children.remove(n);
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> private void removeChildren(TreeNode<K, V> node)
{
>>>>>>>>> lookup.remove(node.getUuid());
>>>>>>>>> if(node.children.isEmpty())
>>>>>>>>> return;
>>>>>>>>> Iterator<TreeNode<K, V>> it = node.children.iterator();
>>>>>>>>> while (it.hasNext()) {
>>>>>>>>> TreeNode<K, V> child = (TreeNode<K, V>) it.next();
>>>>>>>>> removeChildren(child);
>>>>>>>>> it.remove();
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public void clear() {
>>>>>>>>>     this.key = null;
>>>>>>>>>         this.value = null;
>>>>>>>>>         this.uuid = null;
>>>>>>>>>         for (TreeNode<K, V> child : children) {
>>>>>>>>>         child.clear();
>>>>>>>>> }
>>>>>>>>>         this.children.clear();
>>>>>>>>>         this.lookup.clear();
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public TreeNode<K, V> getNodeById(K key, String
uuid) {
>>>>>>>>> List<TreeNode<K, V>> nodes = getNodesByKey(key);
>>>>>>>>> for (TreeNode<K, V> treeNode : nodes) {
>>>>>>>>> if(uuid.equals(treeNode.getUuid()))
>>>>>>>>> return treeNode;
>>>>>>>>> }
>>>>>>>>> return null;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public HashMap<K, List<TreeNode<K, V>>>
getLookup() {
>>>>>>>>> return lookup;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public void setLookup(HashMap<K, List<TreeNode<K,
V>>> lookup) {
>>>>>>>>> this.lookup = lookup;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public void setLevel(int level) {
>>>>>>>>> this.level = level;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public void setKey(K key) {
>>>>>>>>> this.key = key;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public void setParent(TreeNode<K, V> parent) {
>>>>>>>>> this.parent = parent;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> public void setChildren(List<TreeNode<K, V>>
children) {
>>>>>>>>> this.children = children;
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <
>>>>>>>>> rmetzger@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Flavio,
>>>>>>>>>>
>>>>>>>>>> which datatype are you using?
>>>>>>>>>>
>>>>>>>>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier
<
>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi to all,
>>>>>>>>>>> during these days we've run a lot of Flink jobs
and from time to
>>>>>>>>>>> time (apparently randomly) a different Exception
arise during their
>>>>>>>>>>> executions...
>>>>>>>>>>> I hope one of them could help in finding the
source of the
>>>>>>>>>>> problem..This time the exception is:
>>>>>>>>>>>
>>>>>>>>>>> An error occurred while reading the next record.
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
>>>>>>>>>>>      at
>>>>>>>>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>>      at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>> Caused by: java.io.UTFDataFormatException: malformed
input
>>>>>>>>>>> around byte 42
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130
>>>>>>>>>>>
>>>>>>>>>>> Could this error be cause by a missing implementation
of
>>>>>>>>>>> hashCode() and equals()?
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance,
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message