flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: ConcurrentModificationException when using histogram accumulators
Date Fri, 14 Oct 2016 09:56:40 GMT
Hi Yukun,

I think you've found a bug in the code. The accumulators don't seem to be
really thread safe. I've created an issue to fix this issue [1]. Thanks for
reporting the problem :-)

[1] https://issues.apache.org/jira/browse/FLINK-4829

Cheers,
Till

On Fri, Oct 14, 2016 at 8:32 AM, Yukun Guo <gyk.net@gmail.com> wrote:

> This happens when the TaskManager is serializing an
> org.apache.flink.api.common.accumulators.Histogram by iterating through
> the underlying TreeMap while a MapFunction for updating the accumulator
> attempts to modify the TreeMap concurrently. How could I fix it?
>
>
> The call stack:
>
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry     -
> Failed to serialize accumulators for task.
> java.util.ConcurrentModificationException
>         at java.util.TreeMap$PrivateEntryIterator.
> nextEntry(TreeMap.java:1211)
>         at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
>         at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
>         at java.util.TreeMap.writeObject(TreeMap.java:2436)
>         at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at java.io.ObjectStreamClass.invokeWriteObject(
> ObjectStreamClass.java:1028)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1496)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
>         at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
>         at java.util.HashMap.writeObject(HashMap.java:1362)
>         at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at java.io.ObjectStreamClass.invokeWriteObject(
> ObjectStreamClass.java:1028)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1496)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
>         at org.apache.flink.util.InstantiationUtil.serializeObject(
> InstantiationUtil.java:301)
>         at org.apache.flink.util.SerializedValue.<init>(
> SerializedValue.java:52)
>         at org.apache.flink.runtime.accumulators.
> AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
>         at org.apache.flink.runtime.accumulators.AccumulatorRegistry.
> getSnapshot(AccumulatorRegistry.java:75)
>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$
> sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
>         ...
>

Mime
View raw message