flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Another serialization error
Date Tue, 17 May 2016 13:40:37 GMT
I tried to debug my application from Eclipse and I got an infinite
recursive call in the TypeExtractor during the analysis of TreeNode (I'm
using Flink 1.0.2):

Exception in thread "main" java.lang.StackOverflowError
    at
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1482)
    at
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1464)
    at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:736)
    at
org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
    at
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
    at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
    at
org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
    at
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
    at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)

Why this doesn't happen on the cluster?



On Tue, May 17, 2016 at 2:23 PM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> Don't worry Robert,
> I know how hard is to debug such errors :)
> I hope that maybe the combination of these 3 errors is somehow
> related...However these are the answers:
>
>
>    - The job (composed of 16 sub-jobs) fails randomly but, usually, the
>    first subjob after the start restart run successfully
>    - In this job I sow both A, B and C (but after changing parallelism)
>    - Yes, the error behave differently depending on the input data
>    (actually the number of default parallelism and slotes in the cluster)
>
> One more interesting thing I fixed in my code that could be (maybe?) the
> of cause of B and C (but not A because that happened after this problem):
> I'm reading and writing data from some Parquet-thrift directory (using the
> Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ).
> In one of the 3 jobs I output some dataset in a Parquet-thrift directory
> after decreasing the parallelism and number of slots from M to N using
> ds.output().
> The first N parquet files were overridden (as expected) but the last M-N
> were not removed (I was expecting that Parquet thrift directory was managed
> as a single dir) .
> Then, when I've read (in the next job) from that directory I discovered
> that the job was actually reading all files in that folder (I was convinced
> that despite the the M-N files
> were left in that dir there was some index file, e.g. _metadata, taking
> care of be the entry point for the files in that folder).
> I don't know however if this could be a cause of such errors but I
> reported it anyway for the sake of completeness and hoping that
> this real-life debugging story could be helpful to someone else using
> Parquet on Flink :)
>
> Thanks for the support,
>
> Flavio
>
> On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <rmetzger@apache.org>
> wrote:
>
>> The last one is C or A?
>>
>> How often is it failing (every nth run?) Is it always failing at the same
>> execute() call, or at different ones?
>> Is it always the exact same exception or is it different ones?
>> Does the error behave differently depending on the input data?
>>
>> Sorry for asking so many questions, but these errors can have many causes
>> and just searching the code for potential issues can take a lot of time ;)
>>
>> On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>>> Ah sorry, I forgot to mention that I don't use any custom kryo
>>> serializers..
>>>
>>> On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> I got those exceptions running 3 different types of jobs..I could have
>>>> tracked the job and the error...my bad!
>>>> However, the most problematic job is the last one, where I run a series
>>>> of jobs one after the other (calling env.execute() in a for loop)..
>>>> I you want I can share with you my code (in private for the moment
>>>> because it's not public yet) or the dashboard screen via skype while the
>>>> jobs are running..
>>>>
>>>>
>>>> On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <rmetzger@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Flavio,
>>>>> thank you for providing additional details.
>>>>> I don't think that missing hashCode / equals() implementations cause
>>>>> such an error. They can cause wrong sorting or partitioning of the data,
>>>>> but the serialization should still work properly.
>>>>> I suspect the issue somewhere in the serialization stack.
>>>>>
>>>>> Are you registering any custom kryo serializers?
>>>>>
>>>>>
>>>>>
>>>>> From your past emails, you saw the following different exceptions:
>>>>>
>>>>> A)  Caused by: java.io.UTFDataFormatException: malformed input around
>>>>> byte 42
>>>>> B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
>>>>> C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>>>>
>>>>> Were they all caused by the same job, or different ones?
>>>>>
>>>>>
>>>>> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>>> Hi Robert,
>>>>>> in this specific case the interested classes are:
>>>>>>
>>>>>>    - Tuple3<String, String, IndexAttributeToExpand>
>>>>>>    (IndexAttributeToExpand is a POJO extending another class and
both of them
>>>>>>    doesn't implement equals and hashcode)
>>>>>>    - Tuple3<String,String, TreeNode<String, Map<String,
>>>>>>    Set<String>>>> (TreeNode is a POJO containing other
TreeNode and it doesn't
>>>>>>    implement  equals and hashcode)
>>>>>>    - Now I've added to both classes hashCode and equals in order
to
>>>>>>    be aligned with POJO policies, however the job finished correctly
after
>>>>>>    stopping and restarting the cluster...usually when I have strange
>>>>>>    serialization exception I stop and restart the cluster and everything
works.
>>>>>>
>>>>>> The TreeNode class (the recursive one) is actually the following:
>>>>>>
>>>>>> import java.io.Serializable;
>>>>>> import java.util.ArrayList;
>>>>>> import java.util.HashMap;
>>>>>> import java.util.Iterator;
>>>>>> import java.util.LinkedList;
>>>>>> import java.util.List;
>>>>>> import java.util.UUID;
>>>>>>
>>>>>> import com.fasterxml.jackson.annotation.JsonIgnore;
>>>>>>
>>>>>> public class TreeNode<K,V> implements Serializable {
>>>>>>
>>>>>> private static final long serialVersionUID = 1L;
>>>>>> private int level = 0;
>>>>>>
>>>>>> private String uuid;
>>>>>> private K key;
>>>>>> private V value;
>>>>>> @JsonIgnore
>>>>>> private TreeNode<K,V> parent;
>>>>>> private List<TreeNode<K,V>> children;
>>>>>> @JsonIgnore
>>>>>> private HashMap<K, List<TreeNode<K,V>>> lookup;
>>>>>>
>>>>>>     public TreeNode(K key, V value) {
>>>>>>     this.level = 0;
>>>>>>     this.key = key;
>>>>>>     this.uuid = UUID.randomUUID().toString();
>>>>>>         this.value = value;
>>>>>>         this.children = new LinkedList<TreeNode<K,V>>();
>>>>>>         List<TreeNode<K, V>> thisAsList = new
>>>>>> ArrayList<TreeNode<K,V>>();
>>>>>>         thisAsList.add(this);
>>>>>>         this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
>>>>>>         this.lookup.put(key, thisAsList);
>>>>>>     }
>>>>>>
>>>>>>     public TreeNode<K,V> addChild(K key, V value) {
>>>>>>     TreeNode<K,V> childNode = new TreeNode<K,V>(key,
value);
>>>>>>     childNode.level = level +1;
>>>>>>         childNode.parent = this;
>>>>>>         childNode.lookup = lookup;
>>>>>>         childNode.uuid = UUID.randomUUID().toString();
>>>>>>         this.children.add(childNode);
>>>>>>         List<TreeNode<K, V>> l = lookup.get(childNode.key);
>>>>>>         if(l==null){
>>>>>>         l = new ArrayList<TreeNode<K,V>>();
>>>>>>         lookup.put(childNode.key, l);
>>>>>>         }
>>>>>>         l.add(childNode);
>>>>>>         return childNode;
>>>>>>     }
>>>>>>
>>>>>>     public boolean isLeaf() {
>>>>>> return children.isEmpty() ;
>>>>>> }
>>>>>> public int getLevel() {
>>>>>> return level;
>>>>>> }
>>>>>> public TreeNode<K,V> getParent() {
>>>>>> return parent;
>>>>>> }
>>>>>> public V getValue() {
>>>>>> return value;
>>>>>> }
>>>>>> public String getUuid() {
>>>>>> return uuid;
>>>>>> }
>>>>>> public void setUuid(String uuid) {
>>>>>> this.uuid = uuid;
>>>>>> }
>>>>>> public List<TreeNode<K,V>> getChildren() {
>>>>>> return children;
>>>>>> }
>>>>>> public List<TreeNode<K, V>> getNodesByKey(K key) {
>>>>>> return lookup.get(key);
>>>>>> }
>>>>>> public K getKey() {
>>>>>> return key;
>>>>>> }
>>>>>> public List<TreeNode<K,V>> getLeafs() {
>>>>>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
>>>>>> getLeafs(ret);
>>>>>> return ret;
>>>>>> }
>>>>>> private void getLeafs(List<TreeNode<K, V>> ret) {
>>>>>> if(children.isEmpty())
>>>>>> ret.add(this);
>>>>>> for (TreeNode<K, V> child : children) {
>>>>>> child.getLeafs(ret);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public String toString() {
>>>>>> return toString(true);
>>>>>> }
>>>>>> public String toString(boolean withChildren) {
>>>>>> if(key==null)
>>>>>> return super.toString();
>>>>>> StringBuffer ret = new StringBuffer();
>>>>>> for (int i = 0; i < level; i++) {
>>>>>> ret.append(" >");
>>>>>> }
>>>>>> ret.append(" " +key.toString());
>>>>>> if(withChildren){
>>>>>> for (TreeNode<K, V> child : children) {
>>>>>> ret.append("\n").append(child.toString());
>>>>>> }
>>>>>> }
>>>>>> return ret.toString();
>>>>>> }
>>>>>>
>>>>>> public void setValue(V value) {
>>>>>> this.value = value;
>>>>>> }
>>>>>>
>>>>>> public void remove(List<TreeNode<K, V>> nodes) {
>>>>>> for (TreeNode<K, V> n : nodes) {
>>>>>> removeChildren(n);
>>>>>> }
>>>>>> for (TreeNode<K, V> n : nodes) {
>>>>>> TreeNode<K, V> parent = n.getParent();
>>>>>> if(parent==null)
>>>>>> continue;
>>>>>> parent.children.remove(n);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> private void removeChildren(TreeNode<K, V> node) {
>>>>>> lookup.remove(node.getUuid());
>>>>>> if(node.children.isEmpty())
>>>>>> return;
>>>>>> Iterator<TreeNode<K, V>> it = node.children.iterator();
>>>>>> while (it.hasNext()) {
>>>>>> TreeNode<K, V> child = (TreeNode<K, V>) it.next();
>>>>>> removeChildren(child);
>>>>>> it.remove();
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> public void clear() {
>>>>>>     this.key = null;
>>>>>>         this.value = null;
>>>>>>         this.uuid = null;
>>>>>>         for (TreeNode<K, V> child : children) {
>>>>>>         child.clear();
>>>>>> }
>>>>>>         this.children.clear();
>>>>>>         this.lookup.clear();
>>>>>> }
>>>>>>
>>>>>> public TreeNode<K, V> getNodeById(K key, String uuid) {
>>>>>> List<TreeNode<K, V>> nodes = getNodesByKey(key);
>>>>>> for (TreeNode<K, V> treeNode : nodes) {
>>>>>> if(uuid.equals(treeNode.getUuid()))
>>>>>> return treeNode;
>>>>>> }
>>>>>> return null;
>>>>>> }
>>>>>>
>>>>>> public HashMap<K, List<TreeNode<K, V>>> getLookup()
{
>>>>>> return lookup;
>>>>>> }
>>>>>>
>>>>>> public void setLookup(HashMap<K, List<TreeNode<K, V>>>
lookup) {
>>>>>> this.lookup = lookup;
>>>>>> }
>>>>>>
>>>>>> public void setLevel(int level) {
>>>>>> this.level = level;
>>>>>> }
>>>>>>
>>>>>> public void setKey(K key) {
>>>>>> this.key = key;
>>>>>> }
>>>>>>
>>>>>> public void setParent(TreeNode<K, V> parent) {
>>>>>> this.parent = parent;
>>>>>> }
>>>>>>
>>>>>> public void setChildren(List<TreeNode<K, V>> children)
{
>>>>>> this.children = children;
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <rmetzger@apache.org
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi Flavio,
>>>>>>>
>>>>>>> which datatype are you using?
>>>>>>>
>>>>>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <
>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>
>>>>>>>> Hi to all,
>>>>>>>> during these days we've run a lot of Flink jobs and from
time to
>>>>>>>> time (apparently randomly) a different Exception arise during
their
>>>>>>>> executions...
>>>>>>>> I hope one of them could help in finding the source of the
>>>>>>>> problem..This time the exception is:
>>>>>>>>
>>>>>>>> An error occurred while reading the next record.
>>>>>>>>      at
>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
>>>>>>>>      at
>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
>>>>>>>>      at
>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
>>>>>>>>      at
>>>>>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
>>>>>>>>      at
>>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
>>>>>>>>      at
>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>>      at
>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>      at java.lang.Thread.run(Thread.java:745)
>>>>>>>> Caused by: java.io.UTFDataFormatException: malformed input
around
>>>>>>>> byte 42
>>>>>>>>      at
>>>>>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
>>>>>>>>      at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
>>>>>>>>      at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>>>>>>>>      at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>>>>      at
>>>>>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
>>>>>>>>      at
>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
>>>>>>>>      at
>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
>>>>>>>>      at
>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130
>>>>>>>>
>>>>>>>> Could this error be cause by a missing implementation of
hashCode()
>>>>>>>> and equals()?
>>>>>>>>
>>>>>>>> Thanks in advance,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Mime
View raw message