Return-Path:
X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io
Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io
Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183])
by cust-asf2.ponee.io (Postfix) with ESMTP id 26A62200C36
for ; Fri, 10 Mar 2017 23:18:51 +0100 (CET)
Received: by cust-asf.ponee.io (Postfix)
id 25222160B79; Fri, 10 Mar 2017 22:18:51 +0000 (UTC)
Delivered-To: archive-asf-public@cust-asf.ponee.io
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by cust-asf.ponee.io (Postfix) with SMTP id F1419160B67
for ; Fri, 10 Mar 2017 23:18:49 +0100 (CET)
Received: (qmail 76528 invoked by uid 500); 10 Mar 2017 22:18:49 -0000
Mailing-List: contact commits-help@flink.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@flink.apache.org
Delivered-To: mailing list commits@flink.apache.org
Received: (qmail 76514 invoked by uid 99); 10 Mar 2017 22:18:49 -0000
Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23)
by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Mar 2017 22:18:49 +0000
Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33)
id E74B8DFDC8; Fri, 10 Mar 2017 22:18:48 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: kkloudas@apache.org
To: commits@flink.apache.org
Message-Id: <7894c749ccfa438cafd681e831f1a2f2@git.apache.org>
X-Mailer: ASF-Git Admin Mailer
Subject: flink git commit: [FLINK-5874] Restrict key types in the DataStream
API.
Date: Fri, 10 Mar 2017 22:18:48 +0000 (UTC)
archived-at: Fri, 10 Mar 2017 22:18:51 -0000
Repository: flink
Updated Branches:
refs/heads/master 70e78a620 -> f15a7d2d9
[FLINK-5874] Restrict key types in the DataStream API.
Reject a type from being a key in keyBy() if it is:
1. it is a POJO type but does not override the hashCode() and
relies on the Object.hashCode() implementation.
2. it is an array of any type.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f15a7d2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f15a7d2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f15a7d2d
Branch: refs/heads/master
Commit: f15a7d2d9c9aae72bb3ac3eb2478b3ec4759401b
Parents: 70e78a6
Author: kl0u
Authored: Wed Mar 8 12:11:07 2017 +0100
Committer: kl0u
Committed: Fri Mar 10 17:58:00 2017 +0100
----------------------------------------------------------------------
docs/dev/datastream_api.md | 9 +
.../streaming/api/datastream/DataStream.java | 4 +-
.../streaming/api/datastream/KeyedStream.java | 77 +++++-
.../api/graph/StreamGraphGenerator.java | 6 +-
.../flink/streaming/api/DataStreamTest.java | 238 +++++++++++++++++++
5 files changed, 327 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index df13295..728c945 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -216,6 +216,15 @@ dataStream.filter(new FilterFunction() {
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
{% endhighlight %}
+
+ Attention
+ A type cannot be a key if:
+
+
it is a POJO type but does not override the hashCode() method and
+ relies on the Object.hashCode() implementation.
+
it is an array of any type.
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8fcaf6b..71ef048 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -282,9 +282,9 @@ public class DataStream {
}
/**
- * Partitions the operator state of a {@link DataStream}using field expressions.
+ * Partitions the operator state of a {@link DataStream} using field expressions.
* A field expression is either the name of a public field or a getter method with parentheses
- * of the {@link DataStream}S underlying type. A dot can be used to drill
+ * of the {@link DataStream}'s underlying type. A dot can be used to drill
* down into objects, as in {@code "field1.getInnerField2()" }.
*
* @param fields
http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 7c9f5bc..860aac6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -17,18 +17,25 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -61,6 +68,9 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
import java.util.UUID;
/**
@@ -114,9 +124,72 @@ public class KeyedStream extends DataStream {
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
- this.keyType = keyType;
+ this.keyType = validateKeyType(keyType);
}
-
+
+ /**
+ * Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be
+ * used as a key in the {@code DataStream.keyBy()} operation. This is done by searching depth-first the
+ * key type and checking if each of the composite types satisfies the required conditions
+ * (see {@link #validateKeyTypeIsHashable(TypeInformation)}).
+ *
+ * @param keyType The {@link TypeInformation} of the key.
+ */
+ private TypeInformation validateKeyType(TypeInformation keyType) {
+ Stack> stack = new Stack<>();
+ stack.push(keyType);
+
+ List> unsupportedTypes = new ArrayList<>();
+
+ while (!stack.isEmpty()) {
+ TypeInformation> typeInfo = stack.pop();
+
+ if (!validateKeyTypeIsHashable(typeInfo)) {
+ unsupportedTypes.add(typeInfo);
+ }
+
+ if (typeInfo instanceof TupleTypeInfoBase) {
+ for (int i = 0; i < typeInfo.getArity(); i++) {
+ stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i));
+ }
+ }
+ }
+
+ if (!unsupportedTypes.isEmpty()) {
+ throw new InvalidProgramException("Type " + keyType + " cannot be used as key. Contained " +
+ "UNSUPPORTED key types: " + StringUtils.join(unsupportedTypes, ", ") + ". Look " +
+ "at the keyBy() documentation for the conditions a type has to satisfy in order to be " +
+ "eligible for a key.");
+ }
+
+ return keyType;
+ }
+
+ /**
+ * Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be
+ * used as a key in the {@code DataStream.keyBy()} operation.
+ *
+ * @param type The {@link TypeInformation} of the type to check.
+ * @return {@code false} if:
+ *
+ *
it is a POJO type but does not override the {@link #hashCode()} method and relies on
+ * the {@link Object#hashCode()} implementation.
+ *
it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo},
+ * {@link ObjectArrayTypeInfo}).
+ * ,
+ * {@code true} otherwise.
+ */
+ private boolean validateKeyTypeIsHashable(TypeInformation> type) {
+ try {
+ return (type instanceof PojoTypeInfo)
+ ? !type.getTypeClass().getMethod("hashCode").getDeclaringClass().equals(Object.class)
+ : !(type instanceof PrimitiveArrayTypeInfo || type instanceof BasicArrayTypeInfo || type instanceof ObjectArrayTypeInfo);
+ } catch (NoSuchMethodException ignored) {
+ // this should never happen as we are just searching for the hashCode() method.
+ }
+ return false;
+ }
+
// ------------------------------------------------------------------------
// properties
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index bd018c3..de87a66 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -163,7 +163,7 @@ public class StreamGraphGenerator {
Collection transformedIds;
if (transform instanceof OneInputTransformation, ?>) {
- transformedIds = transformOnInputTransform((OneInputTransformation, ?>) transform);
+ transformedIds = transformOneInputTransform((OneInputTransformation, ?>) transform);
} else if (transform instanceof TwoInputTransformation, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation, ?, ?>) transform);
} else if (transform instanceof SourceTransformation>) {
@@ -496,10 +496,10 @@ public class StreamGraphGenerator {
* Transforms a {@code OneInputTransformation}.
*
*
- * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and
+ * This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and
* wired the inputs to this new node.
*/
- private Collection transformOnInputTransform(OneInputTransformation transform) {
+ private Collection transformOneInputTransform(OneInputTransformation transform) {
Collection inputIds = transform(transform.getInput());
http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index a619338..b4d2421 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -19,14 +19,23 @@ package org.apache.flink.streaming.api;
import java.util.List;
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
@@ -63,7 +72,11 @@ import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Collector;
+import org.hamcrest.core.StringStartsWith;
+import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import static org.junit.Assert.*;
@@ -906,6 +919,231 @@ public class DataStreamTest {
}
/////////////////////////////////////////////////////////////
+ // KeyBy testing
+ /////////////////////////////////////////////////////////////
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testPrimitiveArrayKeyRejection() {
+
+ KeySelector, int[]> keySelector =
+ new KeySelector, int[]>() {
+
+ @Override
+ public int[] getKey(Tuple2 value) throws Exception {
+ int[] ks = new int[value.f0.length];
+ for (int i = 0; i < ks.length; i++) {
+ ks[i] = value.f0[i];
+ }
+ return ks;
+ }
+ };
+
+ testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+ }
+
+ @Test
+ public void testBasicArrayKeyRejection() {
+
+ KeySelector, Integer[]> keySelector =
+ new KeySelector, Integer[]>() {
+
+ @Override
+ public Integer[] getKey(Tuple2 value) throws Exception {
+ return value.f0;
+ }
+ };
+
+ testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+ }
+
+ @Test
+ public void testObjectArrayKeyRejection() {
+
+ KeySelector, Object[]> keySelector =
+ new KeySelector, Object[]>() {
+
+ @Override
+ public Object[] getKey(Tuple2 value) throws Exception {
+ Object[] ks = new Object[value.f0.length];
+ for (int i = 0; i < ks.length; i++) {
+ ks[i] = new Object();
+ }
+ return ks;
+ }
+ };
+
+ ObjectArrayTypeInfo