flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Another serialization error
Date Tue, 17 May 2016 13:53:14 GMT
Yes I am

On Tue, May 17, 2016 at 3:45 PM, Robert Metzger <rmetzger@apache.org> wrote:

> Are you using 1.0.2 on the cluster as well?
>
> On Tue, May 17, 2016 at 3:40 PM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
>> I tried to debug my application from Eclipse and I got an infinite
>> recursive call in the TypeExtractor during the analysis of TreeNode (I'm
>> using Flink 1.0.2):
>>
>> Exception in thread "main" java.lang.StackOverflowError
>>     at
>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1482)
>>     at
>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1464)
>>     at
>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:736)
>>     at
>> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
>>     at
>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
>>     at
>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
>>     at
>> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
>>     at
>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
>>     at
>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
>>
>> Why this doesn't happen on the cluster?
>>
>>
>>
>>
>> On Tue, May 17, 2016 at 2:23 PM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> Don't worry Robert,
>>> I know how hard is to debug such errors :)
>>> I hope that maybe the combination of these 3 errors is somehow
>>> related...However these are the answers:
>>>
>>>
>>>    - The job (composed of 16 sub-jobs) fails randomly but, usually, the
>>>    first subjob after the start restart run successfully
>>>    - In this job I sow both A, B and C (but after changing parallelism)
>>>    - Yes, the error behave differently depending on the input data
>>>    (actually the number of default parallelism and slotes in the cluster)
>>>
>>> One more interesting thing I fixed in my code that could be (maybe?) the
>>> of cause of B and C (but not A because that happened after this problem):
>>> I'm reading and writing data from some Parquet-thrift directory (using
>>> the Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ).
>>> In one of the 3 jobs I output some dataset in a Parquet-thrift directory
>>> after decreasing the parallelism and number of slots from M to N using
>>> ds.output().
>>> The first N parquet files were overridden (as expected) but the last M-N
>>> were not removed (I was expecting that Parquet thrift directory was managed
>>> as a single dir) .
>>> Then, when I've read (in the next job) from that directory I discovered
>>> that the job was actually reading all files in that folder (I was convinced
>>> that despite the the M-N files
>>> were left in that dir there was some index file, e.g. _metadata, taking
>>> care of be the entry point for the files in that folder).
>>> I don't know however if this could be a cause of such errors but I
>>> reported it anyway for the sake of completeness and hoping that
>>> this real-life debugging story could be helpful to someone else using
>>> Parquet on Flink :)
>>>
>>> Thanks for the support,
>>>
>>> Flavio
>>>
>>> On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <rmetzger@apache.org>
>>> wrote:
>>>
>>>> The last one is C or A?
>>>>
>>>> How often is it failing (every nth run?) Is it always failing at the
>>>> same execute() call, or at different ones?
>>>> Is it always the exact same exception or is it different ones?
>>>> Does the error behave differently depending on the input data?
>>>>
>>>> Sorry for asking so many questions, but these errors can have many
>>>> causes and just searching the code for potential issues can take a lot of
>>>> time ;)
>>>>
>>>> On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> Ah sorry, I forgot to mention that I don't use any custom kryo
>>>>> serializers..
>>>>>
>>>>> On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>>> I got those exceptions running 3 different types of jobs..I could
>>>>>> have tracked the job and the error...my bad!
>>>>>> However, the most problematic job is the last one, where I run a
>>>>>> series of jobs one after the other (calling env.execute() in a for
loop)..
>>>>>> I you want I can share with you my code (in private for the moment
>>>>>> because it's not public yet) or the dashboard screen via skype while
the
>>>>>> jobs are running..
>>>>>>
>>>>>>
>>>>>> On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <rmetzger@apache.org
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi Flavio,
>>>>>>> thank you for providing additional details.
>>>>>>> I don't think that missing hashCode / equals() implementations
cause
>>>>>>> such an error. They can cause wrong sorting or partitioning of
the data,
>>>>>>> but the serialization should still work properly.
>>>>>>> I suspect the issue somewhere in the serialization stack.
>>>>>>>
>>>>>>> Are you registering any custom kryo serializers?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> From your past emails, you saw the following different exceptions:
>>>>>>>
>>>>>>> A)  Caused by: java.io.UTFDataFormatException: malformed input
>>>>>>> around byte 42
>>>>>>> B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
>>>>>>> C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>>>>>>
>>>>>>> Were they all caused by the same job, or different ones?
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <
>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>
>>>>>>>> Hi Robert,
>>>>>>>> in this specific case the interested classes are:
>>>>>>>>
>>>>>>>>    - Tuple3<String, String, IndexAttributeToExpand>
>>>>>>>>    (IndexAttributeToExpand is a POJO extending another class
and both of them
>>>>>>>>    doesn't implement equals and hashcode)
>>>>>>>>    - Tuple3<String,String, TreeNode<String, Map<String,
>>>>>>>>    Set<String>>>> (TreeNode is a POJO containing
other TreeNode and it doesn't
>>>>>>>>    implement  equals and hashcode)
>>>>>>>>    - Now I've added to both classes hashCode and equals in
order
>>>>>>>>    to be aligned with POJO policies, however the job finished
correctly after
>>>>>>>>    stopping and restarting the cluster...usually when I have
strange
>>>>>>>>    serialization exception I stop and restart the cluster
and everything works.
>>>>>>>>
>>>>>>>> The TreeNode class (the recursive one) is actually the following:
>>>>>>>>
>>>>>>>> import java.io.Serializable;
>>>>>>>> import java.util.ArrayList;
>>>>>>>> import java.util.HashMap;
>>>>>>>> import java.util.Iterator;
>>>>>>>> import java.util.LinkedList;
>>>>>>>> import java.util.List;
>>>>>>>> import java.util.UUID;
>>>>>>>>
>>>>>>>> import com.fasterxml.jackson.annotation.JsonIgnore;
>>>>>>>>
>>>>>>>> public class TreeNode<K,V> implements Serializable
{
>>>>>>>>
>>>>>>>> private static final long serialVersionUID = 1L;
>>>>>>>> private int level = 0;
>>>>>>>>
>>>>>>>> private String uuid;
>>>>>>>> private K key;
>>>>>>>> private V value;
>>>>>>>> @JsonIgnore
>>>>>>>> private TreeNode<K,V> parent;
>>>>>>>> private List<TreeNode<K,V>> children;
>>>>>>>> @JsonIgnore
>>>>>>>> private HashMap<K, List<TreeNode<K,V>>>
lookup;
>>>>>>>>
>>>>>>>>     public TreeNode(K key, V value) {
>>>>>>>>     this.level = 0;
>>>>>>>>     this.key = key;
>>>>>>>>     this.uuid = UUID.randomUUID().toString();
>>>>>>>>         this.value = value;
>>>>>>>>         this.children = new LinkedList<TreeNode<K,V>>();
>>>>>>>>         List<TreeNode<K, V>> thisAsList = new
>>>>>>>> ArrayList<TreeNode<K,V>>();
>>>>>>>>         thisAsList.add(this);
>>>>>>>>         this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
>>>>>>>>         this.lookup.put(key, thisAsList);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     public TreeNode<K,V> addChild(K key, V value) {
>>>>>>>>     TreeNode<K,V> childNode = new TreeNode<K,V>(key,
value);
>>>>>>>>     childNode.level = level +1;
>>>>>>>>         childNode.parent = this;
>>>>>>>>         childNode.lookup = lookup;
>>>>>>>>         childNode.uuid = UUID.randomUUID().toString();
>>>>>>>>         this.children.add(childNode);
>>>>>>>>         List<TreeNode<K, V>> l = lookup.get(childNode.key);
>>>>>>>>         if(l==null){
>>>>>>>>         l = new ArrayList<TreeNode<K,V>>();
>>>>>>>>         lookup.put(childNode.key, l);
>>>>>>>>         }
>>>>>>>>         l.add(childNode);
>>>>>>>>         return childNode;
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     public boolean isLeaf() {
>>>>>>>> return children.isEmpty() ;
>>>>>>>> }
>>>>>>>> public int getLevel() {
>>>>>>>> return level;
>>>>>>>> }
>>>>>>>> public TreeNode<K,V> getParent() {
>>>>>>>> return parent;
>>>>>>>> }
>>>>>>>> public V getValue() {
>>>>>>>> return value;
>>>>>>>> }
>>>>>>>> public String getUuid() {
>>>>>>>> return uuid;
>>>>>>>> }
>>>>>>>> public void setUuid(String uuid) {
>>>>>>>> this.uuid = uuid;
>>>>>>>> }
>>>>>>>> public List<TreeNode<K,V>> getChildren() {
>>>>>>>> return children;
>>>>>>>> }
>>>>>>>> public List<TreeNode<K, V>> getNodesByKey(K key)
{
>>>>>>>> return lookup.get(key);
>>>>>>>> }
>>>>>>>> public K getKey() {
>>>>>>>> return key;
>>>>>>>> }
>>>>>>>> public List<TreeNode<K,V>> getLeafs() {
>>>>>>>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
>>>>>>>> getLeafs(ret);
>>>>>>>> return ret;
>>>>>>>> }
>>>>>>>> private void getLeafs(List<TreeNode<K, V>> ret)
{
>>>>>>>> if(children.isEmpty())
>>>>>>>> ret.add(this);
>>>>>>>> for (TreeNode<K, V> child : children) {
>>>>>>>> child.getLeafs(ret);
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public String toString() {
>>>>>>>> return toString(true);
>>>>>>>> }
>>>>>>>> public String toString(boolean withChildren) {
>>>>>>>> if(key==null)
>>>>>>>> return super.toString();
>>>>>>>> StringBuffer ret = new StringBuffer();
>>>>>>>> for (int i = 0; i < level; i++) {
>>>>>>>> ret.append(" >");
>>>>>>>> }
>>>>>>>> ret.append(" " +key.toString());
>>>>>>>> if(withChildren){
>>>>>>>> for (TreeNode<K, V> child : children) {
>>>>>>>> ret.append("\n").append(child.toString());
>>>>>>>> }
>>>>>>>> }
>>>>>>>> return ret.toString();
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void setValue(V value) {
>>>>>>>> this.value = value;
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void remove(List<TreeNode<K, V>> nodes)
{
>>>>>>>> for (TreeNode<K, V> n : nodes) {
>>>>>>>> removeChildren(n);
>>>>>>>> }
>>>>>>>> for (TreeNode<K, V> n : nodes) {
>>>>>>>> TreeNode<K, V> parent = n.getParent();
>>>>>>>> if(parent==null)
>>>>>>>> continue;
>>>>>>>> parent.children.remove(n);
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> private void removeChildren(TreeNode<K, V> node) {
>>>>>>>> lookup.remove(node.getUuid());
>>>>>>>> if(node.children.isEmpty())
>>>>>>>> return;
>>>>>>>> Iterator<TreeNode<K, V>> it = node.children.iterator();
>>>>>>>> while (it.hasNext()) {
>>>>>>>> TreeNode<K, V> child = (TreeNode<K, V>) it.next();
>>>>>>>> removeChildren(child);
>>>>>>>> it.remove();
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void clear() {
>>>>>>>>     this.key = null;
>>>>>>>>         this.value = null;
>>>>>>>>         this.uuid = null;
>>>>>>>>         for (TreeNode<K, V> child : children) {
>>>>>>>>         child.clear();
>>>>>>>> }
>>>>>>>>         this.children.clear();
>>>>>>>>         this.lookup.clear();
>>>>>>>> }
>>>>>>>>
>>>>>>>> public TreeNode<K, V> getNodeById(K key, String uuid)
{
>>>>>>>> List<TreeNode<K, V>> nodes = getNodesByKey(key);
>>>>>>>> for (TreeNode<K, V> treeNode : nodes) {
>>>>>>>> if(uuid.equals(treeNode.getUuid()))
>>>>>>>> return treeNode;
>>>>>>>> }
>>>>>>>> return null;
>>>>>>>> }
>>>>>>>>
>>>>>>>> public HashMap<K, List<TreeNode<K, V>>>
getLookup() {
>>>>>>>> return lookup;
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void setLookup(HashMap<K, List<TreeNode<K,
V>>> lookup) {
>>>>>>>> this.lookup = lookup;
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void setLevel(int level) {
>>>>>>>> this.level = level;
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void setKey(K key) {
>>>>>>>> this.key = key;
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void setParent(TreeNode<K, V> parent) {
>>>>>>>> this.parent = parent;
>>>>>>>> }
>>>>>>>>
>>>>>>>> public void setChildren(List<TreeNode<K, V>>
children) {
>>>>>>>> this.children = children;
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <
>>>>>>>> rmetzger@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi Flavio,
>>>>>>>>>
>>>>>>>>> which datatype are you using?
>>>>>>>>>
>>>>>>>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier
<
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> Hi to all,
>>>>>>>>>> during these days we've run a lot of Flink jobs and
from time to
>>>>>>>>>> time (apparently randomly) a different Exception
arise during their
>>>>>>>>>> executions...
>>>>>>>>>> I hope one of them could help in finding the source
of the
>>>>>>>>>> problem..This time the exception is:
>>>>>>>>>>
>>>>>>>>>> An error occurred while reading the next record.
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
>>>>>>>>>>      at
>>>>>>>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>>      at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>> Caused by: java.io.UTFDataFormatException: malformed
input around
>>>>>>>>>> byte 42
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130
>>>>>>>>>>
>>>>>>>>>> Could this error be cause by a missing implementation
of
>>>>>>>>>> hashCode() and equals()?
>>>>>>>>>>
>>>>>>>>>> Thanks in advance,
>>>>>>>>>> Flavio
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Mime
View raw message