Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 26A62200C36 for ; Fri, 10 Mar 2017 23:18:51 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 25222160B79; Fri, 10 Mar 2017 22:18:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F1419160B67 for ; Fri, 10 Mar 2017 23:18:49 +0100 (CET) Received: (qmail 76528 invoked by uid 500); 10 Mar 2017 22:18:49 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 76514 invoked by uid 99); 10 Mar 2017 22:18:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Mar 2017 22:18:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E74B8DFDC8; Fri, 10 Mar 2017 22:18:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kkloudas@apache.org To: commits@flink.apache.org Message-Id: <7894c749ccfa438cafd681e831f1a2f2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-5874] Restrict key types in the DataStream API. Date: Fri, 10 Mar 2017 22:18:48 +0000 (UTC) archived-at: Fri, 10 Mar 2017 22:18:51 -0000 Repository: flink Updated Branches: refs/heads/master 70e78a620 -> f15a7d2d9 [FLINK-5874] Restrict key types in the DataStream API. Reject a type from being a key in keyBy() if it is: 1. it is a POJO type but does not override the hashCode() and relies on the Object.hashCode() implementation. 2. it is an array of any type. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f15a7d2d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f15a7d2d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f15a7d2d Branch: refs/heads/master Commit: f15a7d2d9c9aae72bb3ac3eb2478b3ec4759401b Parents: 70e78a6 Author: kl0u Authored: Wed Mar 8 12:11:07 2017 +0100 Committer: kl0u Committed: Fri Mar 10 17:58:00 2017 +0100 ---------------------------------------------------------------------- docs/dev/datastream_api.md | 9 + .../streaming/api/datastream/DataStream.java | 4 +- .../streaming/api/datastream/KeyedStream.java | 77 +++++- .../api/graph/StreamGraphGenerator.java | 6 +- .../flink/streaming/api/DataStreamTest.java | 238 +++++++++++++++++++ 5 files changed, 327 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/docs/dev/datastream_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md index df13295..728c945 100644 --- a/docs/dev/datastream_api.md +++ b/docs/dev/datastream_api.md @@ -216,6 +216,15 @@ dataStream.filter(new FilterFunction() { dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple {% endhighlight %} +

+ Attention + A type cannot be a key if: +

    +
  1. it is a POJO type but does not override the hashCode() method and + relies on the Object.hashCode() implementation.
  2. +
  3. it is an array of any type.
  4. +
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 8fcaf6b..71ef048 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -282,9 +282,9 @@ public class DataStream { } /** - * Partitions the operator state of a {@link DataStream}using field expressions. + * Partitions the operator state of a {@link DataStream} using field expressions. * A field expression is either the name of a public field or a getter method with parentheses - * of the {@link DataStream}S underlying type. A dot can be used to drill + * of the {@link DataStream}'s underlying type. A dot can be used to drill * down into objects, as in {@code "field1.getInnerField2()" }. * * @param fields http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 7c9f5bc..860aac6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -17,18 +17,25 @@ package org.apache.flink.streaming.api.datastream; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.ProcessFunction; @@ -61,6 +68,9 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; import java.util.UUID; /** @@ -114,9 +124,72 @@ public class KeyedStream extends DataStream { dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + /** + * Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be + * used as a key in the {@code DataStream.keyBy()} operation. This is done by searching depth-first the + * key type and checking if each of the composite types satisfies the required conditions + * (see {@link #validateKeyTypeIsHashable(TypeInformation)}). + * + * @param keyType The {@link TypeInformation} of the key. + */ + private TypeInformation validateKeyType(TypeInformation keyType) { + Stack> stack = new Stack<>(); + stack.push(keyType); + + List> unsupportedTypes = new ArrayList<>(); + + while (!stack.isEmpty()) { + TypeInformation typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + unsupportedTypes.add(typeInfo); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + + if (!unsupportedTypes.isEmpty()) { + throw new InvalidProgramException("Type " + keyType + " cannot be used as key. Contained " + + "UNSUPPORTED key types: " + StringUtils.join(unsupportedTypes, ", ") + ". Look " + + "at the keyBy() documentation for the conditions a type has to satisfy in order to be " + + "eligible for a key."); + } + + return keyType; + } + + /** + * Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be + * used as a key in the {@code DataStream.keyBy()} operation. + * + * @param type The {@link TypeInformation} of the type to check. + * @return {@code false} if: + *
    + *
  1. it is a POJO type but does not override the {@link #hashCode()} method and relies on + * the {@link Object#hashCode()} implementation.
  2. + *
  3. it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, + * {@link ObjectArrayTypeInfo}).
  4. + *
, + * {@code true} otherwise. + */ + private boolean validateKeyTypeIsHashable(TypeInformation type) { + try { + return (type instanceof PojoTypeInfo) + ? !type.getTypeClass().getMethod("hashCode").getDeclaringClass().equals(Object.class) + : !(type instanceof PrimitiveArrayTypeInfo || type instanceof BasicArrayTypeInfo || type instanceof ObjectArrayTypeInfo); + } catch (NoSuchMethodException ignored) { + // this should never happen as we are just searching for the hashCode() method. + } + return false; + } + // ------------------------------------------------------------------------ // properties // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index bd018c3..de87a66 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -163,7 +163,7 @@ public class StreamGraphGenerator { Collection transformedIds; if (transform instanceof OneInputTransformation) { - transformedIds = transformOnInputTransform((OneInputTransformation) transform); + transformedIds = transformOneInputTransform((OneInputTransformation) transform); } else if (transform instanceof TwoInputTransformation) { transformedIds = transformTwoInputTransform((TwoInputTransformation) transform); } else if (transform instanceof SourceTransformation) { @@ -496,10 +496,10 @@ public class StreamGraphGenerator { * Transforms a {@code OneInputTransformation}. * *

- * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and + * This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and * wired the inputs to this new node. */ - private Collection transformOnInputTransform(OneInputTransformation transform) { + private Collection transformOneInputTransform(OneInputTransformation transform) { Collection inputIds = transform(transform.getInput()); http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index a619338..b4d2421 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -19,14 +19,23 @@ package org.apache.flink.streaming.api; import java.util.List; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.ConnectedStreams; @@ -63,7 +72,11 @@ import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.util.Collector; +import org.hamcrest.core.StringStartsWith; +import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import static org.junit.Assert.*; @@ -906,6 +919,231 @@ public class DataStreamTest { } ///////////////////////////////////////////////////////////// + // KeyBy testing + ///////////////////////////////////////////////////////////// + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, Object[]> keySelector = + new KeySelector, Object[]>() { + + @Override + public Object[] getKey(Tuple2 value) throws Exception { + Object[] ks = new Object[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new Object(); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + Object[].class, new GenericTypeInfo<>(Object.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage(new StringStartsWith("Type " + expectedKeyType + " cannot be used as key.")); + + input.keyBy(keySelector); + } + + //////////////// Composite Key Tests : POJOs //////////////// + + @Test + public void testPOJOWithNestedArrayNoHashCodeKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOWithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage(new StringStartsWith("Type " + expectedTypeInfo + " cannot be used as key.")); + + input.keyBy("id"); + } + + @Test + public void testPOJOWithNestedArrayAndHashCodeWorkAround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOWithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOWithHashCode getKey(POJOWithHashCode value) throws Exception { + return value; + } + }).addSink(new SinkFunction() { + @Override + public void invoke(POJOWithHashCode value) throws Exception { + Assert.assertEquals(value.getId(), new int[]{1, 2}); + } + }); + } + + @Test + public void testPOJOnoHashCodeKeyRejection() { + + KeySelector keySelector = + new KeySelector() { + @Override + public POJOWithoutHashCode getKey(POJOWithoutHashCode value) throws Exception { + return value; + } + }; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOWithoutHashCode(new int[] {1, 2})); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + + input.keyBy(keySelector); + } + + //////////////// Composite Key Tests : Tuples //////////////// + + @Test + public void testTupleNestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "test-test")); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage(new StringStartsWith("Type " + expectedTypeInfo + " cannot be used as key.")); + + input.keyBy(new KeySelector, Tuple2>() { + @Override + public Tuple2 getKey(Tuple2 value) throws Exception { + return value; + } + }); + } + + @Test + public void testPrimitiveKeyAcceptance() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setMaxParallelism(1); + + DataStream input = env.fromElements(new Integer(10000)); + + KeyedStream keyedStream = input.keyBy(new KeySelector() { + @Override + public Object getKey(Integer value) throws Exception { + return value; + } + }); + + keyedStream.addSink(new SinkFunction() { + @Override + public void invoke(Integer value) throws Exception { + Assert.assertEquals(10000L, (long) value); + } + }); + } + + public static class POJOWithoutHashCode { + + private int[] id; + + public POJOWithoutHashCode() {} + + public POJOWithoutHashCode(int[] id) { + this.id = id; + } + + public int[] getId() { + return id; + } + + public void setId(int[] id) { + this.id = id; + } + } + + public static class POJOWithHashCode extends POJOWithoutHashCode { + + public POJOWithHashCode() { + } + + public POJOWithHashCode(int[] id) { + super(id); + } + + @Override + public int hashCode() { + int hash = 31; + for (int i : getId()) { + hash = 37 * hash + i; + } + return hash; + } + } + + ///////////////////////////////////////////////////////////// // Utilities /////////////////////////////////////////////////////////////