flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: Another serialization error
Date Tue, 17 May 2016 13:45:16 GMT
Are you using 1.0.2 on the cluster as well?

On Tue, May 17, 2016 at 3:40 PM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> I tried to debug my application from Eclipse and I got an infinite
> recursive call in the TypeExtractor during the analysis of TreeNode (I'm
> using Flink 1.0.2):
>
> Exception in thread "main" java.lang.StackOverflowError
>     at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1482)
>     at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1464)
>     at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:736)
>     at
> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
>     at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
>     at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
>     at
> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
>     at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
>     at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
>
> Why this doesn't happen on the cluster?
>
>
>
>
> On Tue, May 17, 2016 at 2:23 PM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
>> Don't worry Robert,
>> I know how hard is to debug such errors :)
>> I hope that maybe the combination of these 3 errors is somehow
>> related...However these are the answers:
>>
>>
>>    - The job (composed of 16 sub-jobs) fails randomly but, usually, the
>>    first subjob after the start restart run successfully
>>    - In this job I sow both A, B and C (but after changing parallelism)
>>    - Yes, the error behave differently depending on the input data
>>    (actually the number of default parallelism and slotes in the cluster)
>>
>> One more interesting thing I fixed in my code that could be (maybe?) the
>> of cause of B and C (but not A because that happened after this problem):
>> I'm reading and writing data from some Parquet-thrift directory (using
>> the Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ).
>> In one of the 3 jobs I output some dataset in a Parquet-thrift directory
>> after decreasing the parallelism and number of slots from M to N using
>> ds.output().
>> The first N parquet files were overridden (as expected) but the last M-N
>> were not removed (I was expecting that Parquet thrift directory was managed
>> as a single dir) .
>> Then, when I've read (in the next job) from that directory I discovered
>> that the job was actually reading all files in that folder (I was convinced
>> that despite the the M-N files
>> were left in that dir there was some index file, e.g. _metadata, taking
>> care of be the entry point for the files in that folder).
>> I don't know however if this could be a cause of such errors but I
>> reported it anyway for the sake of completeness and hoping that
>> this real-life debugging story could be helpful to someone else using
>> Parquet on Flink :)
>>
>> Thanks for the support,
>>
>> Flavio
>>
>> On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <rmetzger@apache.org>
>> wrote:
>>
>>> The last one is C or A?
>>>
>>> How often is it failing (every nth run?) Is it always failing at the
>>> same execute() call, or at different ones?
>>> Is it always the exact same exception or is it different ones?
>>> Does the error behave differently depending on the input data?
>>>
>>> Sorry for asking so many questions, but these errors can have many
>>> causes and just searching the code for potential issues can take a lot of
>>> time ;)
>>>
>>> On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> Ah sorry, I forgot to mention that I don't use any custom kryo
>>>> serializers..
>>>>
>>>> On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> I got those exceptions running 3 different types of jobs..I could have
>>>>> tracked the job and the error...my bad!
>>>>> However, the most problematic job is the last one, where I run a
>>>>> series of jobs one after the other (calling env.execute() in a for loop)..
>>>>> I you want I can share with you my code (in private for the moment
>>>>> because it's not public yet) or the dashboard screen via skype while
the
>>>>> jobs are running..
>>>>>
>>>>>
>>>>> On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <rmetzger@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>> thank you for providing additional details.
>>>>>> I don't think that missing hashCode / equals() implementations cause
>>>>>> such an error. They can cause wrong sorting or partitioning of the
data,
>>>>>> but the serialization should still work properly.
>>>>>> I suspect the issue somewhere in the serialization stack.
>>>>>>
>>>>>> Are you registering any custom kryo serializers?
>>>>>>
>>>>>>
>>>>>>
>>>>>> From your past emails, you saw the following different exceptions:
>>>>>>
>>>>>> A)  Caused by: java.io.UTFDataFormatException: malformed input
>>>>>> around byte 42
>>>>>> B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
>>>>>> C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>>>>>
>>>>>> Were they all caused by the same job, or different ones?
>>>>>>
>>>>>>
>>>>>> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>>> Hi Robert,
>>>>>>> in this specific case the interested classes are:
>>>>>>>
>>>>>>>    - Tuple3<String, String, IndexAttributeToExpand>
>>>>>>>    (IndexAttributeToExpand is a POJO extending another class
and both of them
>>>>>>>    doesn't implement equals and hashcode)
>>>>>>>    - Tuple3<String,String, TreeNode<String, Map<String,
>>>>>>>    Set<String>>>> (TreeNode is a POJO containing
other TreeNode and it doesn't
>>>>>>>    implement  equals and hashcode)
>>>>>>>    - Now I've added to both classes hashCode and equals in order
to
>>>>>>>    be aligned with POJO policies, however the job finished correctly
after
>>>>>>>    stopping and restarting the cluster...usually when I have
strange
>>>>>>>    serialization exception I stop and restart the cluster and
everything works.
>>>>>>>
>>>>>>> The TreeNode class (the recursive one) is actually the following:
>>>>>>>
>>>>>>> import java.io.Serializable;
>>>>>>> import java.util.ArrayList;
>>>>>>> import java.util.HashMap;
>>>>>>> import java.util.Iterator;
>>>>>>> import java.util.LinkedList;
>>>>>>> import java.util.List;
>>>>>>> import java.util.UUID;
>>>>>>>
>>>>>>> import com.fasterxml.jackson.annotation.JsonIgnore;
>>>>>>>
>>>>>>> public class TreeNode<K,V> implements Serializable {
>>>>>>>
>>>>>>> private static final long serialVersionUID = 1L;
>>>>>>> private int level = 0;
>>>>>>>
>>>>>>> private String uuid;
>>>>>>> private K key;
>>>>>>> private V value;
>>>>>>> @JsonIgnore
>>>>>>> private TreeNode<K,V> parent;
>>>>>>> private List<TreeNode<K,V>> children;
>>>>>>> @JsonIgnore
>>>>>>> private HashMap<K, List<TreeNode<K,V>>> lookup;
>>>>>>>
>>>>>>>     public TreeNode(K key, V value) {
>>>>>>>     this.level = 0;
>>>>>>>     this.key = key;
>>>>>>>     this.uuid = UUID.randomUUID().toString();
>>>>>>>         this.value = value;
>>>>>>>         this.children = new LinkedList<TreeNode<K,V>>();
>>>>>>>         List<TreeNode<K, V>> thisAsList = new
>>>>>>> ArrayList<TreeNode<K,V>>();
>>>>>>>         thisAsList.add(this);
>>>>>>>         this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
>>>>>>>         this.lookup.put(key, thisAsList);
>>>>>>>     }
>>>>>>>
>>>>>>>     public TreeNode<K,V> addChild(K key, V value) {
>>>>>>>     TreeNode<K,V> childNode = new TreeNode<K,V>(key,
value);
>>>>>>>     childNode.level = level +1;
>>>>>>>         childNode.parent = this;
>>>>>>>         childNode.lookup = lookup;
>>>>>>>         childNode.uuid = UUID.randomUUID().toString();
>>>>>>>         this.children.add(childNode);
>>>>>>>         List<TreeNode<K, V>> l = lookup.get(childNode.key);
>>>>>>>         if(l==null){
>>>>>>>         l = new ArrayList<TreeNode<K,V>>();
>>>>>>>         lookup.put(childNode.key, l);
>>>>>>>         }
>>>>>>>         l.add(childNode);
>>>>>>>         return childNode;
>>>>>>>     }
>>>>>>>
>>>>>>>     public boolean isLeaf() {
>>>>>>> return children.isEmpty() ;
>>>>>>> }
>>>>>>> public int getLevel() {
>>>>>>> return level;
>>>>>>> }
>>>>>>> public TreeNode<K,V> getParent() {
>>>>>>> return parent;
>>>>>>> }
>>>>>>> public V getValue() {
>>>>>>> return value;
>>>>>>> }
>>>>>>> public String getUuid() {
>>>>>>> return uuid;
>>>>>>> }
>>>>>>> public void setUuid(String uuid) {
>>>>>>> this.uuid = uuid;
>>>>>>> }
>>>>>>> public List<TreeNode<K,V>> getChildren() {
>>>>>>> return children;
>>>>>>> }
>>>>>>> public List<TreeNode<K, V>> getNodesByKey(K key)
{
>>>>>>> return lookup.get(key);
>>>>>>> }
>>>>>>> public K getKey() {
>>>>>>> return key;
>>>>>>> }
>>>>>>> public List<TreeNode<K,V>> getLeafs() {
>>>>>>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
>>>>>>> getLeafs(ret);
>>>>>>> return ret;
>>>>>>> }
>>>>>>> private void getLeafs(List<TreeNode<K, V>> ret) {
>>>>>>> if(children.isEmpty())
>>>>>>> ret.add(this);
>>>>>>> for (TreeNode<K, V> child : children) {
>>>>>>> child.getLeafs(ret);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public String toString() {
>>>>>>> return toString(true);
>>>>>>> }
>>>>>>> public String toString(boolean withChildren) {
>>>>>>> if(key==null)
>>>>>>> return super.toString();
>>>>>>> StringBuffer ret = new StringBuffer();
>>>>>>> for (int i = 0; i < level; i++) {
>>>>>>> ret.append(" >");
>>>>>>> }
>>>>>>> ret.append(" " +key.toString());
>>>>>>> if(withChildren){
>>>>>>> for (TreeNode<K, V> child : children) {
>>>>>>> ret.append("\n").append(child.toString());
>>>>>>> }
>>>>>>> }
>>>>>>> return ret.toString();
>>>>>>> }
>>>>>>>
>>>>>>> public void setValue(V value) {
>>>>>>> this.value = value;
>>>>>>> }
>>>>>>>
>>>>>>> public void remove(List<TreeNode<K, V>> nodes) {
>>>>>>> for (TreeNode<K, V> n : nodes) {
>>>>>>> removeChildren(n);
>>>>>>> }
>>>>>>> for (TreeNode<K, V> n : nodes) {
>>>>>>> TreeNode<K, V> parent = n.getParent();
>>>>>>> if(parent==null)
>>>>>>> continue;
>>>>>>> parent.children.remove(n);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> private void removeChildren(TreeNode<K, V> node) {
>>>>>>> lookup.remove(node.getUuid());
>>>>>>> if(node.children.isEmpty())
>>>>>>> return;
>>>>>>> Iterator<TreeNode<K, V>> it = node.children.iterator();
>>>>>>> while (it.hasNext()) {
>>>>>>> TreeNode<K, V> child = (TreeNode<K, V>) it.next();
>>>>>>> removeChildren(child);
>>>>>>> it.remove();
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> public void clear() {
>>>>>>>     this.key = null;
>>>>>>>         this.value = null;
>>>>>>>         this.uuid = null;
>>>>>>>         for (TreeNode<K, V> child : children) {
>>>>>>>         child.clear();
>>>>>>> }
>>>>>>>         this.children.clear();
>>>>>>>         this.lookup.clear();
>>>>>>> }
>>>>>>>
>>>>>>> public TreeNode<K, V> getNodeById(K key, String uuid) {
>>>>>>> List<TreeNode<K, V>> nodes = getNodesByKey(key);
>>>>>>> for (TreeNode<K, V> treeNode : nodes) {
>>>>>>> if(uuid.equals(treeNode.getUuid()))
>>>>>>> return treeNode;
>>>>>>> }
>>>>>>> return null;
>>>>>>> }
>>>>>>>
>>>>>>> public HashMap<K, List<TreeNode<K, V>>> getLookup()
{
>>>>>>> return lookup;
>>>>>>> }
>>>>>>>
>>>>>>> public void setLookup(HashMap<K, List<TreeNode<K, V>>>
lookup) {
>>>>>>> this.lookup = lookup;
>>>>>>> }
>>>>>>>
>>>>>>> public void setLevel(int level) {
>>>>>>> this.level = level;
>>>>>>> }
>>>>>>>
>>>>>>> public void setKey(K key) {
>>>>>>> this.key = key;
>>>>>>> }
>>>>>>>
>>>>>>> public void setParent(TreeNode<K, V> parent) {
>>>>>>> this.parent = parent;
>>>>>>> }
>>>>>>>
>>>>>>> public void setChildren(List<TreeNode<K, V>> children)
{
>>>>>>> this.children = children;
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <
>>>>>>> rmetzger@apache.org> wrote:
>>>>>>>
>>>>>>>> Hi Flavio,
>>>>>>>>
>>>>>>>> which datatype are you using?
>>>>>>>>
>>>>>>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <
>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>
>>>>>>>>> Hi to all,
>>>>>>>>> during these days we've run a lot of Flink jobs and from
time to
>>>>>>>>> time (apparently randomly) a different Exception arise
during their
>>>>>>>>> executions...
>>>>>>>>> I hope one of them could help in finding the source of
the
>>>>>>>>> problem..This time the exception is:
>>>>>>>>>
>>>>>>>>> An error occurred while reading the next record.
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
>>>>>>>>>      at
>>>>>>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>      at java.lang.Thread.run(Thread.java:745)
>>>>>>>>> Caused by: java.io.UTFDataFormatException: malformed
input around
>>>>>>>>> byte 42
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
>>>>>>>>>      at
>>>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130
>>>>>>>>>
>>>>>>>>> Could this error be cause by a missing implementation
of
>>>>>>>>> hashCode() and equals()?
>>>>>>>>>
>>>>>>>>> Thanks in advance,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Mime
View raw message