flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Maximilian Michels (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3880) Use ConcurrentHashMap for Accumulators
Date Fri, 06 May 2016 16:21:12 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274298#comment-15274298
] 

Maximilian Michels commented on FLINK-3880:
-------------------------------------------

You're right, the synchronized map is a bottle neck. Actually, it is not even necessary that
it synchronizes. In a regular Flink job, it can only be accessed by one task at a time. Only
if the user spawned additional threads, it could be concurrently modified. In this case the
user would have to take care of the synchronization (and if not get a ConcurrentModificationException).
So we can simply make it a normal map.

> Use ConcurrentHashMap for Accumulators
> --------------------------------------
>
>                 Key: FLINK-3880
>                 URL: https://issues.apache.org/jira/browse/FLINK-3880
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.1.0
>            Reporter: Ken Krugler
>            Priority: Minor
>
> I was looking at improving DataSet performance - this is for a job created using the
Cascading-Flink planner for Cascading 3.1.
> While doing a quick "poor man's profiler" session with one of the TaskManager processes,
I noticed that many (most?) of the threads that were actually running were in this state:
> {code:java}
> "DataSource (/working1/terms) (8/20)" daemon prio=10 tid=0x00007f55673e0800 nid=0x666a
runnable [0x00007f556abcf000]
>    java.lang.Thread.State: RUNNABLE
>     at java.util.Collections$SynchronizedMap.get(Collections.java:2037)
>     - locked <0x00000006e73fe718> (a java.util.Collections$SynchronizedMap)
>     at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.getAccumulator(AbstractRuntimeUDFContext.java:162)
>     at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.getLongCounter(AbstractRuntimeUDFContext.java:113)
>     at com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess.getOrInitCounter(FlinkFlowProcess.java:245)
>     at com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess.increment(FlinkFlowProcess.java:128)
>     at com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess.increment(FlinkFlowProcess.java:122)
>     at cascading.tap.hadoop.util.MeasuredRecordReader.next(MeasuredRecordReader.java:65)
>     at cascading.scheme.hadoop.SequenceFile.source(SequenceFile.java:97)
>     at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:166)
>     at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:139)
>     at com.dataartisans.flink.cascading.runtime.source.TapSourceStage.readNextRecord(TapSourceStage.java:70)
>     at com.dataartisans.flink.cascading.runtime.source.TapInputFormat.reachedEnd(TapInputFormat.java:175)
>     at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:745)}}}
> {code}
> It looks like Cascading is asking Flink to increment a counter with each Tuple read,
and that in turn is often blocked on getting access to the Accumulator object in a map. It
looks like this is a SynchronizedMap, but using a ConcurrentHashMap (for example) would reduce
this contention.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message