flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xingcan Cui (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5891) ConnectedComponents is broken when object reuse enabled
Date Fri, 24 Feb 2017 05:45:45 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882017#comment-15882017
] 

Xingcan Cui commented on FLINK-5891:
------------------------------------

Thanks for your explanation, [~greghogan]. I'm afraid my PR on https://issues.apache.org/jira/browse/FLINK-1526
gets the same problem as I also store values with non-primitive types (anyhow the primitive
types will not be affected, right?) from the received messages. I saw the following code in
Flink's ML lib. To avoid the reference problem, it makes a deep copy of each {{StreamRecord<IN>
element}}.
{code:title=AbstractCEPBasePatternOperator.java | borderStyle=solid}
...
// we have to buffer the elements until we receive the proper watermark
if (getExecutionConfig().isObjectReuseEnabled()) {
    // copy the StreamRecord so that it cannot be changed
    priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()),
    element.getTimestamp()));
} else {
    priorityQueue.offer(element);
}
updatePriorityQueue(priorityQueue);
...
{code}
So, what's your suggestions on fixing this? I'd like to work on it (and surely also the PR
of Flink-1526).

> ConnectedComponents is broken when object reuse enabled
> -------------------------------------------------------
>
>                 Key: FLINK-5891
>                 URL: https://issues.apache.org/jira/browse/FLINK-5891
>             Project: Flink
>          Issue Type: Bug
>          Components: Gelly
>    Affects Versions: 1.3.0
>            Reporter: Greg Hogan
>
> {{org.apache.flink.graph.library.ConnectedComponents.CCUpdater#updateVertex}} is storing
a value from its iterator.
> {{GSAConnectedComponents}} does not have this limitation.
> {code}
> 	public static final class CCUpdater<K, VV extends Comparable<VV>>
> 		extends GatherFunction<K, VV, VV> {
> 		@Override
> 		public void updateVertex(Vertex<K, VV> vertex, MessageIterator<VV> messages)
throws Exception {
> 			VV current = vertex.getValue();
> 			VV min = current;
> 			for (VV msg : messages) {
> 				if (msg.compareTo(min) < 0) {
> 					min = msg;
> 				}
> 			}
> 			if (!min.equals(current)) {
> 				setNewVertexValue(min);
> 			}
> 		}
> 	}
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message