flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5874) Reject arrays as keys in DataStream API to avoid inconsistent hashing
Date Thu, 09 Mar 2017 19:00:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903635#comment-15903635
] 

ASF GitHub Bot commented on FLINK-5874:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3501#discussion_r105241450
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
---
    @@ -906,6 +919,256 @@ public void testChannelSelectors() {
     	}
     
     	/////////////////////////////////////////////////////////////
    +	// KeyBy testing
    +	/////////////////////////////////////////////////////////////
    +
    +	@Rule
    +	public ExpectedException expectedException = ExpectedException.none();
    +
    +	@Test
    +	public void testPrimitiveArrayKeyRejection() {
    +
    +		KeySelector<Tuple2<Integer[], String>, int[]> keySelector =
    +				new KeySelector<Tuple2<Integer[], String>, int[]>() {
    +
    +			@Override
    +			public int[] getKey(Tuple2<Integer[], String> value) throws Exception {
    +				int[] ks = new int[value.f0.length];
    +				for (int i = 0; i < ks.length; i++) {
    +					ks[i] = value.f0[i];
    +				}
    +				return ks;
    +			}
    +		};
    +
    +		testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
    +	}
    +
    +	@Test
    +	public void testBasicArrayKeyRejection() {
    +
    +		KeySelector<Tuple2<Integer[], String>, Integer[]> keySelector =
    +				new KeySelector<Tuple2<Integer[], String>, Integer[]>() {
    +
    +			@Override
    +			public Integer[] getKey(Tuple2<Integer[], String> value) throws Exception {
    +				return value.f0;
    +			}
    +		};
    +
    +		testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
    +	}
    +
    +	@Test
    +	public void testObjectArrayKeyRejection() {
    +
    +		KeySelector<Tuple2<Integer[], String>, TestClass[]> keySelector =
    +				new KeySelector<Tuple2<Integer[], String>, TestClass[]>() {
    +
    +					@Override
    +					public TestClass[] getKey(Tuple2<Integer[], String> value) throws Exception
{
    +						TestClass[] ks = new TestClass[value.f0.length];
    +						for (int i = 0; i < ks.length; i++) {
    +							ks[i] = new TestClass(value.f0[i]);
    +						}
    +						return ks;
    +					}
    +				};
    +
    +		ObjectArrayTypeInfo<TestClass[], TestClass> keyTypeInfo = ObjectArrayTypeInfo.getInfoFor(
    +				TestClass[].class, new GenericTypeInfo<>(TestClass.class));
    +
    +		testKeyRejection(keySelector, keyTypeInfo);
    +	}
    +
    +	private <K> void testKeyRejection(KeySelector<Tuple2<Integer[], String>,
K> keySelector, TypeInformation<K> expectedKeyType) {
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Tuple2<Integer[], String>> input = env.fromElements(
    +				new Tuple2<>(new Integer[] {1, 2}, "barfoo")
    +		);
    +
    +		Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector,
input.getType()));
    +
    +		// adjust the rule
    +		expectedException.expect(InvalidProgramException.class);
    +		expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used
as key.");
    +
    +		input.keyBy(keySelector);
    +	}
    +
    +	// composite key tests : POJOs
    +
    +	@Test
    +	public void testPOJONestedArrayKeyRejection() {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<POJOwithHashCode> input = env.fromElements(
    +				new POJOwithHashCode(new int[] {1, 2}));
    +
    +		TypeInformation<?> expectedTypeInfo = new TupleTypeInfo<Tuple1<int[]>>(
    +				PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
    +
    +		// adjust the rule
    +		expectedException.expect(InvalidProgramException.class);
    +		expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used
as key.");
    +
    +		input.keyBy("id");
    +	}
    +
    +	@Test
    +	public void testNestedArrayWorkArround() {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<POJOwithHashCode> input = env.fromElements(
    +				new POJOwithHashCode(new int[] {1, 2}));
    +
    +		input.keyBy(new KeySelector<POJOwithHashCode, POJOwithHashCode>() {
    +			@Override
    +			public POJOwithHashCode getKey(POJOwithHashCode value) throws Exception {
    +				return value;
    +			}
    +		}).addSink(new SinkFunction<POJOwithHashCode>() {
    +			@Override
    +			public void invoke(POJOwithHashCode value) throws Exception {
    +				Assert.assertEquals(value.getId(), new int[]{1, 2});
    +			}
    +		});
    +	}
    +
    +	@Test
    +	public void testPOJOnoHashCodeKeyRejection() {
    +
    +		KeySelector<POJOnoHashCode, POJOnoHashCode> keySelector =
    +				new KeySelector<POJOnoHashCode, POJOnoHashCode>() {
    +					@Override
    +					public POJOnoHashCode getKey(POJOnoHashCode value) throws Exception {
    +						return value;
    +					}
    +				};
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<POJOnoHashCode> input = env.fromElements(
    +				new POJOnoHashCode(new int[] {1, 2}));
    +
    +		// adjust the rule
    +		expectedException.expect(InvalidProgramException.class);
    +
    +		input.keyBy(keySelector);
    +	}
    +
    +	// composite key tests : Tuples
    +
    +	@Test
    +	public void testTupleNestedArrayKeyRejection() {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Tuple2<Integer[], String>> input = env.fromElements(
    +				new Tuple2<>(new Integer[] {1, 2}, "test-test"));
    +
    +		TypeInformation<?> expectedTypeInfo = new TupleTypeInfo<Tuple2<Integer[],
String>>(
    +				BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    +
    +		// adjust the rule
    +		expectedException.expect(InvalidProgramException.class);
    +		expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used
as key.");
    +
    +		input.keyBy(new KeySelector<Tuple2<Integer[],String>, Tuple2<Integer[],String>>()
{
    +			@Override
    +			public Tuple2<Integer[], String> getKey(Tuple2<Integer[], String> value)
throws Exception {
    +				return value;
    +			}
    +		});
    +	}
    +
    +	@Test
    +	public void testPrimitiveKeyRejection() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setMaxParallelism(1);
    +
    +		DataStream<Integer> input = env.fromElements(new Integer(10000));
    +
    +		TypeInformation<?> expectedTypeInfo = IntegerTypeInfo.INT_TYPE_INFO;
    +
    +		KeyedStream<Integer, Integer> keyedStream = input.keyBy(new KeySelector<Integer,
Integer>() {
    +			@Override
    +			public Integer getKey(Integer value) throws Exception {
    +				return value;
    +			}
    +		});
    +
    +		Assert.assertEquals(expectedTypeInfo, keyedStream.getKeyType());
    +
    +		keyedStream.addSink(new SinkFunction<Integer>() {
    +			@Override
    +			public void invoke(Integer value) throws Exception {
    +				Assert.assertEquals(10000L, (long) value);
    +			}
    +		});
    +
    +		env.execute();
    +	}
    +
    +	private static class TestClass {
    +
    +		private final int id;
    +
    +		TestClass(int id) {
    +			this.id = id;
    +		}
    +	}
    +
    +	public static class POJOnoHashCode {
    --- End diff --
    
    the naming is also inconsistent with the other class; this class name doesn't contain
"with".


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> ---------------------------------------------------------------------
>
>                 Key: FLINK-5874
>                 URL: https://issues.apache.org/jira/browse/FLINK-5874
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0, 1.1.4
>            Reporter: Robert Metzger
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute the hash
when shuffling data. Java's default hashCode() implementation doesn't take the arrays contents
into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and in Flink
1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} class, which
has a type-specific hashing function. But introducing this change would break compatibility
with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message