flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Another serialization error
Date Tue, 17 May 2016 12:23:37 GMT
Don't worry Robert,
I know how hard is to debug such errors :)
I hope that maybe the combination of these 3 errors is somehow
related...However these are the answers:


   - The job (composed of 16 sub-jobs) fails randomly but, usually, the
   first subjob after the start restart run successfully
   - In this job I sow both A, B and C (but after changing parallelism)
   - Yes, the error behave differently depending on the input data
   (actually the number of default parallelism and slotes in the cluster)

One more interesting thing I fixed in my code that could be (maybe?) the of
cause of B and C (but not A because that happened after this problem):
I'm reading and writing data from some Parquet-thrift directory (using the
Hadoop IF/OF ParquetThriftOutputFormat and ParquetThriftInputFormat ).
In one of the 3 jobs I output some dataset in a Parquet-thrift directory
after decreasing the parallelism and number of slots from M to N using
ds.output().
The first N parquet files were overridden (as expected) but the last M-N
were not removed (I was expecting that Parquet thrift directory was managed
as a single dir) .
Then, when I've read (in the next job) from that directory I discovered
that the job was actually reading all files in that folder (I was convinced
that despite the the M-N files
were left in that dir there was some index file, e.g. _metadata, taking
care of be the entry point for the files in that folder).
I don't know however if this could be a cause of such errors but I reported
it anyway for the sake of completeness and hoping that
this real-life debugging story could be helpful to someone else using
Parquet on Flink :)

Thanks for the support,

Flavio

On Tue, May 17, 2016 at 1:50 PM, Robert Metzger <rmetzger@apache.org> wrote:

> The last one is C or A?
>
> How often is it failing (every nth run?) Is it always failing at the same
> execute() call, or at different ones?
> Is it always the exact same exception or is it different ones?
> Does the error behave differently depending on the input data?
>
> Sorry for asking so many questions, but these errors can have many causes
> and just searching the code for potential issues can take a lot of time ;)
>
> On Tue, May 17, 2016 at 12:47 PM, Flavio Pompermaier <pompermaier@okkam.it
> > wrote:
>
>> Ah sorry, I forgot to mention that I don't use any custom kryo
>> serializers..
>>
>> On Tue, May 17, 2016 at 12:39 PM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>>> I got those exceptions running 3 different types of jobs..I could have
>>> tracked the job and the error...my bad!
>>> However, the most problematic job is the last one, where I run a series
>>> of jobs one after the other (calling env.execute() in a for loop)..
>>> I you want I can share with you my code (in private for the moment
>>> because it's not public yet) or the dashboard screen via skype while the
>>> jobs are running..
>>>
>>>
>>> On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <rmetzger@apache.org>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>> thank you for providing additional details.
>>>> I don't think that missing hashCode / equals() implementations cause
>>>> such an error. They can cause wrong sorting or partitioning of the data,
>>>> but the serialization should still work properly.
>>>> I suspect the issue somewhere in the serialization stack.
>>>>
>>>> Are you registering any custom kryo serializers?
>>>>
>>>>
>>>>
>>>> From your past emails, you saw the following different exceptions:
>>>>
>>>> A)  Caused by: java.io.UTFDataFormatException: malformed input around
>>>> byte 42
>>>> B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
>>>> C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>>>
>>>> Were they all caused by the same job, or different ones?
>>>>
>>>>
>>>> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> Hi Robert,
>>>>> in this specific case the interested classes are:
>>>>>
>>>>>    - Tuple3<String, String, IndexAttributeToExpand>
>>>>>    (IndexAttributeToExpand is a POJO extending another class and both
of them
>>>>>    doesn't implement equals and hashcode)
>>>>>    - Tuple3<String,String, TreeNode<String, Map<String,
>>>>>    Set<String>>>> (TreeNode is a POJO containing other
TreeNode and it doesn't
>>>>>    implement  equals and hashcode)
>>>>>    - Now I've added to both classes hashCode and equals in order to
>>>>>    be aligned with POJO policies, however the job finished correctly
after
>>>>>    stopping and restarting the cluster...usually when I have strange
>>>>>    serialization exception I stop and restart the cluster and everything
works.
>>>>>
>>>>> The TreeNode class (the recursive one) is actually the following:
>>>>>
>>>>> import java.io.Serializable;
>>>>> import java.util.ArrayList;
>>>>> import java.util.HashMap;
>>>>> import java.util.Iterator;
>>>>> import java.util.LinkedList;
>>>>> import java.util.List;
>>>>> import java.util.UUID;
>>>>>
>>>>> import com.fasterxml.jackson.annotation.JsonIgnore;
>>>>>
>>>>> public class TreeNode<K,V> implements Serializable {
>>>>>
>>>>> private static final long serialVersionUID = 1L;
>>>>> private int level = 0;
>>>>>
>>>>> private String uuid;
>>>>> private K key;
>>>>> private V value;
>>>>> @JsonIgnore
>>>>> private TreeNode<K,V> parent;
>>>>> private List<TreeNode<K,V>> children;
>>>>> @JsonIgnore
>>>>> private HashMap<K, List<TreeNode<K,V>>> lookup;
>>>>>
>>>>>     public TreeNode(K key, V value) {
>>>>>     this.level = 0;
>>>>>     this.key = key;
>>>>>     this.uuid = UUID.randomUUID().toString();
>>>>>         this.value = value;
>>>>>         this.children = new LinkedList<TreeNode<K,V>>();
>>>>>         List<TreeNode<K, V>> thisAsList = new
>>>>> ArrayList<TreeNode<K,V>>();
>>>>>         thisAsList.add(this);
>>>>>         this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
>>>>>         this.lookup.put(key, thisAsList);
>>>>>     }
>>>>>
>>>>>     public TreeNode<K,V> addChild(K key, V value) {
>>>>>     TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
>>>>>     childNode.level = level +1;
>>>>>         childNode.parent = this;
>>>>>         childNode.lookup = lookup;
>>>>>         childNode.uuid = UUID.randomUUID().toString();
>>>>>         this.children.add(childNode);
>>>>>         List<TreeNode<K, V>> l = lookup.get(childNode.key);
>>>>>         if(l==null){
>>>>>         l = new ArrayList<TreeNode<K,V>>();
>>>>>         lookup.put(childNode.key, l);
>>>>>         }
>>>>>         l.add(childNode);
>>>>>         return childNode;
>>>>>     }
>>>>>
>>>>>     public boolean isLeaf() {
>>>>> return children.isEmpty() ;
>>>>> }
>>>>> public int getLevel() {
>>>>> return level;
>>>>> }
>>>>> public TreeNode<K,V> getParent() {
>>>>> return parent;
>>>>> }
>>>>> public V getValue() {
>>>>> return value;
>>>>> }
>>>>> public String getUuid() {
>>>>> return uuid;
>>>>> }
>>>>> public void setUuid(String uuid) {
>>>>> this.uuid = uuid;
>>>>> }
>>>>> public List<TreeNode<K,V>> getChildren() {
>>>>> return children;
>>>>> }
>>>>> public List<TreeNode<K, V>> getNodesByKey(K key) {
>>>>> return lookup.get(key);
>>>>> }
>>>>> public K getKey() {
>>>>> return key;
>>>>> }
>>>>> public List<TreeNode<K,V>> getLeafs() {
>>>>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
>>>>> getLeafs(ret);
>>>>> return ret;
>>>>> }
>>>>> private void getLeafs(List<TreeNode<K, V>> ret) {
>>>>> if(children.isEmpty())
>>>>> ret.add(this);
>>>>> for (TreeNode<K, V> child : children) {
>>>>> child.getLeafs(ret);
>>>>> }
>>>>> }
>>>>>
>>>>> @Override
>>>>> public String toString() {
>>>>> return toString(true);
>>>>> }
>>>>> public String toString(boolean withChildren) {
>>>>> if(key==null)
>>>>> return super.toString();
>>>>> StringBuffer ret = new StringBuffer();
>>>>> for (int i = 0; i < level; i++) {
>>>>> ret.append(" >");
>>>>> }
>>>>> ret.append(" " +key.toString());
>>>>> if(withChildren){
>>>>> for (TreeNode<K, V> child : children) {
>>>>> ret.append("\n").append(child.toString());
>>>>> }
>>>>> }
>>>>> return ret.toString();
>>>>> }
>>>>>
>>>>> public void setValue(V value) {
>>>>> this.value = value;
>>>>> }
>>>>>
>>>>> public void remove(List<TreeNode<K, V>> nodes) {
>>>>> for (TreeNode<K, V> n : nodes) {
>>>>> removeChildren(n);
>>>>> }
>>>>> for (TreeNode<K, V> n : nodes) {
>>>>> TreeNode<K, V> parent = n.getParent();
>>>>> if(parent==null)
>>>>> continue;
>>>>> parent.children.remove(n);
>>>>> }
>>>>> }
>>>>>
>>>>> private void removeChildren(TreeNode<K, V> node) {
>>>>> lookup.remove(node.getUuid());
>>>>> if(node.children.isEmpty())
>>>>> return;
>>>>> Iterator<TreeNode<K, V>> it = node.children.iterator();
>>>>> while (it.hasNext()) {
>>>>> TreeNode<K, V> child = (TreeNode<K, V>) it.next();
>>>>> removeChildren(child);
>>>>> it.remove();
>>>>> }
>>>>> }
>>>>>
>>>>> public void clear() {
>>>>>     this.key = null;
>>>>>         this.value = null;
>>>>>         this.uuid = null;
>>>>>         for (TreeNode<K, V> child : children) {
>>>>>         child.clear();
>>>>> }
>>>>>         this.children.clear();
>>>>>         this.lookup.clear();
>>>>> }
>>>>>
>>>>> public TreeNode<K, V> getNodeById(K key, String uuid) {
>>>>> List<TreeNode<K, V>> nodes = getNodesByKey(key);
>>>>> for (TreeNode<K, V> treeNode : nodes) {
>>>>> if(uuid.equals(treeNode.getUuid()))
>>>>> return treeNode;
>>>>> }
>>>>> return null;
>>>>> }
>>>>>
>>>>> public HashMap<K, List<TreeNode<K, V>>> getLookup()
{
>>>>> return lookup;
>>>>> }
>>>>>
>>>>> public void setLookup(HashMap<K, List<TreeNode<K, V>>>
lookup) {
>>>>> this.lookup = lookup;
>>>>> }
>>>>>
>>>>> public void setLevel(int level) {
>>>>> this.level = level;
>>>>> }
>>>>>
>>>>> public void setKey(K key) {
>>>>> this.key = key;
>>>>> }
>>>>>
>>>>> public void setParent(TreeNode<K, V> parent) {
>>>>> this.parent = parent;
>>>>> }
>>>>>
>>>>> public void setChildren(List<TreeNode<K, V>> children) {
>>>>> this.children = children;
>>>>> }
>>>>> }
>>>>>
>>>>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <rmetzger@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> which datatype are you using?
>>>>>>
>>>>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>>> Hi to all,
>>>>>>> during these days we've run a lot of Flink jobs and from time
to
>>>>>>> time (apparently randomly) a different Exception arise during
their
>>>>>>> executions...
>>>>>>> I hope one of them could help in finding the source of the
>>>>>>> problem..This time the exception is:
>>>>>>>
>>>>>>> An error occurred while reading the next record.
>>>>>>>      at
>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
>>>>>>>      at
>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
>>>>>>>      at
>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
>>>>>>>      at
>>>>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
>>>>>>>      at
>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
>>>>>>>      at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>      at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>      at java.lang.Thread.run(Thread.java:745)
>>>>>>> Caused by: java.io.UTFDataFormatException: malformed input around
>>>>>>> byte 42
>>>>>>>      at
>>>>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
>>>>>>>      at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
>>>>>>>      at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>>>>>>>      at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>>>      at
>>>>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
>>>>>>>      at
>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
>>>>>>>      at
>>>>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
>>>>>>>      at
>>>>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130
>>>>>>>
>>>>>>> Could this error be cause by a missing implementation of hashCode()
>>>>>>> and equals()?
>>>>>>>
>>>>>>> Thanks in advance,
>>>>>>> Flavio
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Mime
View raw message