flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
Date Thu, 09 Mar 2017 19:00:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903627#comment-15903627
] 

ASF GitHub Bot commented on FLINK-5874:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3501#discussion_r105240820
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
---
    @@ -114,9 +121,53 @@ public KeyedStream(DataStream<T> dataStream, KeySelector<T,
KEY> keySelector, Ty
     				dataStream.getTransformation(),
     				new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
     		this.keySelector = keySelector;
    -		this.keyType = keyType;
    +		this.keyType = validateKeyType(keyType);
     	}
    -	
    +
    +	private TypeInformation<KEY> validateKeyType(TypeInformation<KEY> keyType)
{
    +		Stack<TypeInformation<?>> stack = new Stack<>();
    +		stack.push(keyType);
    +
    +		while (!stack.isEmpty()) {
    +			TypeInformation<?> typeInfo = stack.pop();
    +
    +			if (!validateKeyTypeIsHashable(typeInfo)) {
    +				throw new InvalidProgramException("This type (" + keyType + ") cannot be used as
key.");
    +			}
    +			
    +			if (typeInfo instanceof TupleTypeInfoBase) {
    +				for (int i = 0; i < typeInfo.getArity(); i++) {
    +					stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i));	
    +				}
    +			}
    +		}
    +		return keyType;
    +	}
    +
    +	/**
    +	 * Validates that a given type of element (as encoded by the provided {@link TypeInformation})
can be
    +	 * used as a key in the {@code DataStream.keyBy()} operation.
    +	 *
    +	 * @return {@code false} if:
    --- End diff --
    
    I would shorten this to read ```returns true if the type overrides the hashcode implementation```.
The details can be container in the general javadoc of the method.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> ---------------------------------------------------------------------
>
>                 Key: FLINK-5874
>                 URL: https://issues.apache.org/jira/browse/FLINK-5874
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0, 1.1.4
>            Reporter: Robert Metzger
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute the hash
when shuffling data. Java's default hashCode() implementation doesn't take the arrays contents
into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and in Flink
1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} class, which
has a type-specific hashing function. But introducing this change would break compatibility
with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message