flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Another serialization error
Date Tue, 17 May 2016 10:39:47 GMT
I got those exceptions running 3 different types of jobs..I could have
tracked the job and the error...my bad!
However, the most problematic job is the last one, where I run a series of
jobs one after the other (calling env.execute() in a for loop)..
I you want I can share with you my code (in private for the moment because
it's not public yet) or the dashboard screen via skype while the jobs are
running..

On Tue, May 17, 2016 at 12:31 PM, Robert Metzger <rmetzger@apache.org>
wrote:

> Hi Flavio,
> thank you for providing additional details.
> I don't think that missing hashCode / equals() implementations cause such
> an error. They can cause wrong sorting or partitioning of the data, but the
> serialization should still work properly.
> I suspect the issue somewhere in the serialization stack.
>
> Are you registering any custom kryo serializers?
>
>
>
> From your past emails, you saw the following different exceptions:
>
> A)  Caused by: java.io.UTFDataFormatException: malformed input around
> byte 42
> B)  Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
> C)  Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>
> Were they all caused by the same job, or different ones?
>
>
> On Tue, May 17, 2016 at 12:12 PM, Flavio Pompermaier <pompermaier@okkam.it
> > wrote:
>
>> Hi Robert,
>> in this specific case the interested classes are:
>>
>>    - Tuple3<String, String, IndexAttributeToExpand>
>>    (IndexAttributeToExpand is a POJO extending another class and both of them
>>    doesn't implement equals and hashcode)
>>    - Tuple3<String,String, TreeNode<String, Map<String, Set<String>>>>
>>    (TreeNode is a POJO containing other TreeNode and it doesn't
>>    implement  equals and hashcode)
>>    - Now I've added to both classes hashCode and equals in order to be
>>    aligned with POJO policies, however the job finished correctly after
>>    stopping and restarting the cluster...usually when I have strange
>>    serialization exception I stop and restart the cluster and everything works.
>>
>> The TreeNode class (the recursive one) is actually the following:
>>
>> import java.io.Serializable;
>> import java.util.ArrayList;
>> import java.util.HashMap;
>> import java.util.Iterator;
>> import java.util.LinkedList;
>> import java.util.List;
>> import java.util.UUID;
>>
>> import com.fasterxml.jackson.annotation.JsonIgnore;
>>
>> public class TreeNode<K,V> implements Serializable {
>>
>> private static final long serialVersionUID = 1L;
>> private int level = 0;
>>
>> private String uuid;
>> private K key;
>> private V value;
>> @JsonIgnore
>> private TreeNode<K,V> parent;
>> private List<TreeNode<K,V>> children;
>> @JsonIgnore
>> private HashMap<K, List<TreeNode<K,V>>> lookup;
>>
>>     public TreeNode(K key, V value) {
>>     this.level = 0;
>>     this.key = key;
>>     this.uuid = UUID.randomUUID().toString();
>>         this.value = value;
>>         this.children = new LinkedList<TreeNode<K,V>>();
>>         List<TreeNode<K, V>> thisAsList = new ArrayList<TreeNode<K,V>>();
>>         thisAsList.add(this);
>>         this.lookup = new HashMap<K, List<TreeNode<K,V>>>();
>>         this.lookup.put(key, thisAsList);
>>     }
>>
>>     public TreeNode<K,V> addChild(K key, V value) {
>>     TreeNode<K,V> childNode = new TreeNode<K,V>(key, value);
>>     childNode.level = level +1;
>>         childNode.parent = this;
>>         childNode.lookup = lookup;
>>         childNode.uuid = UUID.randomUUID().toString();
>>         this.children.add(childNode);
>>         List<TreeNode<K, V>> l = lookup.get(childNode.key);
>>         if(l==null){
>>         l = new ArrayList<TreeNode<K,V>>();
>>         lookup.put(childNode.key, l);
>>         }
>>         l.add(childNode);
>>         return childNode;
>>     }
>>
>>     public boolean isLeaf() {
>> return children.isEmpty() ;
>> }
>> public int getLevel() {
>> return level;
>> }
>> public TreeNode<K,V> getParent() {
>> return parent;
>> }
>> public V getValue() {
>> return value;
>> }
>> public String getUuid() {
>> return uuid;
>> }
>> public void setUuid(String uuid) {
>> this.uuid = uuid;
>> }
>> public List<TreeNode<K,V>> getChildren() {
>> return children;
>> }
>> public List<TreeNode<K, V>> getNodesByKey(K key) {
>> return lookup.get(key);
>> }
>> public K getKey() {
>> return key;
>> }
>> public List<TreeNode<K,V>> getLeafs() {
>> List<TreeNode<K,V>> ret = new ArrayList<TreeNode<K,V>>();
>> getLeafs(ret);
>> return ret;
>> }
>> private void getLeafs(List<TreeNode<K, V>> ret) {
>> if(children.isEmpty())
>> ret.add(this);
>> for (TreeNode<K, V> child : children) {
>> child.getLeafs(ret);
>> }
>> }
>>
>> @Override
>> public String toString() {
>> return toString(true);
>> }
>> public String toString(boolean withChildren) {
>> if(key==null)
>> return super.toString();
>> StringBuffer ret = new StringBuffer();
>> for (int i = 0; i < level; i++) {
>> ret.append(" >");
>> }
>> ret.append(" " +key.toString());
>> if(withChildren){
>> for (TreeNode<K, V> child : children) {
>> ret.append("\n").append(child.toString());
>> }
>> }
>> return ret.toString();
>> }
>>
>> public void setValue(V value) {
>> this.value = value;
>> }
>>
>> public void remove(List<TreeNode<K, V>> nodes) {
>> for (TreeNode<K, V> n : nodes) {
>> removeChildren(n);
>> }
>> for (TreeNode<K, V> n : nodes) {
>> TreeNode<K, V> parent = n.getParent();
>> if(parent==null)
>> continue;
>> parent.children.remove(n);
>> }
>> }
>>
>> private void removeChildren(TreeNode<K, V> node) {
>> lookup.remove(node.getUuid());
>> if(node.children.isEmpty())
>> return;
>> Iterator<TreeNode<K, V>> it = node.children.iterator();
>> while (it.hasNext()) {
>> TreeNode<K, V> child = (TreeNode<K, V>) it.next();
>> removeChildren(child);
>> it.remove();
>> }
>> }
>>
>> public void clear() {
>>     this.key = null;
>>         this.value = null;
>>         this.uuid = null;
>>         for (TreeNode<K, V> child : children) {
>>         child.clear();
>> }
>>         this.children.clear();
>>         this.lookup.clear();
>> }
>>
>> public TreeNode<K, V> getNodeById(K key, String uuid) {
>> List<TreeNode<K, V>> nodes = getNodesByKey(key);
>> for (TreeNode<K, V> treeNode : nodes) {
>> if(uuid.equals(treeNode.getUuid()))
>> return treeNode;
>> }
>> return null;
>> }
>>
>> public HashMap<K, List<TreeNode<K, V>>> getLookup() {
>> return lookup;
>> }
>>
>> public void setLookup(HashMap<K, List<TreeNode<K, V>>> lookup)
{
>> this.lookup = lookup;
>> }
>>
>> public void setLevel(int level) {
>> this.level = level;
>> }
>>
>> public void setKey(K key) {
>> this.key = key;
>> }
>>
>> public void setParent(TreeNode<K, V> parent) {
>> this.parent = parent;
>> }
>>
>> public void setChildren(List<TreeNode<K, V>> children) {
>> this.children = children;
>> }
>> }
>>
>> On Tue, May 17, 2016 at 12:00 PM, Robert Metzger <rmetzger@apache.org>
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> which datatype are you using?
>>>
>>> On Tue, May 17, 2016 at 11:42 AM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> Hi to all,
>>>> during these days we've run a lot of Flink jobs and from time to time
>>>> (apparently randomly) a different Exception arise during their executions...
>>>> I hope one of them could help in finding the source of the
>>>> problem..This time the exception is:
>>>>
>>>> An error occurred while reading the next record.
>>>>      at
>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:148)
>>>>      at
>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
>>>>      at
>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
>>>>      at
>>>> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:61)
>>>>      at
>>>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:125)
>>>>      at
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>      at
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>      at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.io.UTFDataFormatException: malformed input around byte
>>>> 42
>>>>      at
>>>> org.apache.flink.runtime.memory.AbstractPagedInputView.readUTF(AbstractPagedInputView.java:488)
>>>>      at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:403)
>>>>      at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>>>>      at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>      at
>>>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
>>>>      at
>>>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
>>>>      at
>>>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
>>>>      at
>>>> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130
>>>>
>>>> Could this error be cause by a missing implementation of hashCode() and
>>>> equals()?
>>>>
>>>> Thanks in advance,
>>>> Flavio
>>>>
>>>
>>
>

Mime
View raw message