flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-3697] Properly access type information for nested POJO key selection
Date Fri, 08 Apr 2016 08:40:57 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.0 17909aae7 -> 43093e3b1


[FLINK-3697] Properly access type information for nested POJO key selection

This closes #1851


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43093e3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43093e3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43093e3b

Branch: refs/heads/release-1.0
Commit: 43093e3b11dda1c64d8a68186d8c708929bc5bb1
Parents: 17909aa
Author: Robert Metzger <rmetzger@apache.org>
Authored: Tue Apr 5 14:48:40 2016 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Fri Apr 8 10:40:25 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/api/common/operators/Keys.java |   6 +-
 .../flink/streaming/util/FieldAccessor.java     |   5 +-
 .../streaming/util/keys/KeySelectorUtil.java    |   5 +-
 .../flink/streaming/api/CoStreamITCase.java     |   2 +-
 .../streaming/api/DataStreamPojoITCase.java     | 206 +++++++++++++++++++
 5 files changed, 216 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/43093e3b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
index ad21c47..abe41af 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java
@@ -42,7 +42,7 @@ public abstract class Keys<T> {
 
 	public abstract int[] computeLogicalKeyPositions();
 
-	protected abstract TypeInformation<?>[] getKeyFieldTypes();
+	public abstract TypeInformation<?>[] getKeyFieldTypes();
 
 	public abstract <E> void validateCustomPartitioner(Partitioner<E> partitioner,
TypeInformation<E> typeInfo);
 
@@ -134,7 +134,7 @@ public abstract class Keys<T> {
 		}
 
 		@Override
-		protected TypeInformation<?>[] getKeyFieldTypes() {
+		public TypeInformation<?>[] getKeyFieldTypes() {
 			TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()];
 			for (int i = 0; i < keyFields.size(); i++) {
 				fieldTypes[i] = keyFields.get(i).getType();
@@ -337,7 +337,7 @@ public abstract class Keys<T> {
 		}
 
 		@Override
-		protected TypeInformation<?>[] getKeyFieldTypes() {
+		public TypeInformation<?>[] getKeyFieldTypes() {
 			TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()];
 			for (int i = 0; i < keyFields.size(); i++) {
 				fieldTypes[i] = keyFields.get(i).getType();

http://git-wip-us.apache.org/repos/asf/flink/blob/43093e3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
index b1d34e7..a23353b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
@@ -182,11 +182,14 @@ public abstract class FieldAccessor<R, F> implements Serializable
{
 			@SuppressWarnings("unchecked")
 			CompositeType<R> cType = (CompositeType<R>) type;
 
+			if(field.contains(".")) {
+				throw new IllegalArgumentException("The Pojo field accessor currently doesn't support
nested POJOs");
+			}
+
 			List<CompositeType.FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);
 
 			int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
 			this.fieldType = fieldDescriptors.get(0).getType();
-			Class<?> keyClass = fieldType.getTypeClass();
 
 			if (cType instanceof PojoTypeInfo) {
 				comparator = (PojoComparator<R>) cType.createComparator(

http://git-wip-us.apache.org/repos/asf/flink/blob/43093e3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 6f65b05..b377d70 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -53,13 +53,12 @@ public final class KeySelectorUtil {
 		
 		int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
 		int numKeyFields = logicalKeyPositions.length;
-		
+
+		TypeInformation<?>[] typeInfos = keys.getKeyFieldTypes();
 		// use ascending order here, the code paths for that are usually a slight bit faster
 		boolean[] orders = new boolean[numKeyFields];
-		TypeInformation<?>[] typeInfos = new TypeInformation<?>[numKeyFields];
 		for (int i = 0; i < numKeyFields; i++) {
 			orders[i] = true;
-			typeInfos[i] = compositeType.getTypeAt(logicalKeyPositions[i]);
 		}
 
 		TypeComparator<X> comparator = compositeType.createComparator(logicalKeyPositions,
orders, 0, executionConfig);

http://git-wip-us.apache.org/repos/asf/flink/blob/43093e3b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamITCase.java
index 4df09a3..600e807 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamITCase.java
@@ -69,7 +69,7 @@ public class CoStreamITCase extends StreamingMultipleProgramsTestBase {
 
 					@Override
 					public Tuple2<Integer, Integer> map(Integer value) throws Exception {
-						return new Tuple2<Integer, Integer>(value, value + 1);
+						return new Tuple2<>(value, value + 1);
 					}
 				})
 				.rebalance()

http://git-wip-us.apache.org/repos/asf/flink/blob/43093e3b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamPojoITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamPojoITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamPojoITCase.java
new file mode 100644
index 0000000..523523b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamPojoITCase.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Integration test for streaming programs using POJOs and key selectors
+ *
+ * See FLINK-3697
+ */
+public class DataStreamPojoITCase extends StreamingMultipleProgramsTestBase {
+	static List<Data> elements = new ArrayList<>();
+	static {
+		elements.add(new Data(0,0,0));
+		elements.add(new Data(0,0,0));
+		elements.add(new Data(1,1,1));
+		elements.add(new Data(1,1,1));
+		elements.add(new Data(2,2,3));
+		elements.add(new Data(2,2,3));
+	}
+
+	/**
+	 * Test composite key on the Data POJO (with nested fields)
+	 */
+	@Test
+	public void testCompositeKeyOnNestedPojo() throws Exception {
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.getConfig().disableObjectReuse();
+		see.setParallelism(3);
+
+		DataStream<Data> dataStream = see.fromCollection(elements);
+
+		DataStream<Data> summedStream = dataStream
+				.keyBy("aaa", "abc", "wxyz")
+				.sum("sum")
+				.keyBy("aaa", "abc", "wxyz")
+				.flatMap(new FlatMapFunction<Data, Data>() {
+					Data[] first = new Data[3];
+					@Override
+					public void flatMap(Data value, Collector<Data> out) throws Exception {
+						if(first[value.aaa] == null) {
+							first[value.aaa] = value;
+							if(value.sum != 1) {
+								throw new RuntimeException("Expected the sum to be one");
+							}
+						} else {
+							if(value.sum != 2) {
+								throw new RuntimeException("Expected the sum to be two");
+							}
+							if(first[value.aaa].aaa != value.aaa) {
+								throw new RuntimeException("aaa key wrong");
+							}
+							if(first[value.aaa].abc != value.abc) {
+								throw new RuntimeException("abc key wrong");
+							}
+							if(first[value.aaa].wxyz != value.wxyz) {
+								throw new RuntimeException("wxyz key wrong");
+							}
+						}
+					}
+				});
+
+		summedStream.print();
+
+		see.execute();
+	}
+
+	/**
+	 * Test composite & nested key on the Data POJO
+	 */
+	@Test
+	public void testNestedKeyOnNestedPojo() throws Exception {
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.getConfig().disableObjectReuse();
+		see.setParallelism(4);
+
+		DataStream<Data> dataStream = see.fromCollection(elements);
+
+		DataStream<Data> summedStream = dataStream
+				.keyBy("aaa", "stats.count")
+				.sum("sum")
+				.keyBy("aaa", "stats.count")
+				.flatMap(new FlatMapFunction<Data, Data>() {
+					Data[] first = new Data[3];
+					@Override
+					public void flatMap(Data value, Collector<Data> out) throws Exception {
+						if(value.stats.count != 123) {
+							throw new RuntimeException("Wrong value for value.stats.count");
+						}
+						if(first[value.aaa] == null) {
+							first[value.aaa] = value;
+							if(value.sum != 1) {
+								throw new RuntimeException("Expected the sum to be one");
+							}
+						} else {
+							if(value.sum != 2) {
+								throw new RuntimeException("Expected the sum to be two");
+							}
+							if(first[value.aaa].aaa != value.aaa) {
+								throw new RuntimeException("aaa key wrong");
+							}
+							if(first[value.aaa].abc != value.abc) {
+								throw new RuntimeException("abc key wrong");
+							}
+							if(first[value.aaa].wxyz != value.wxyz) {
+								throw new RuntimeException("wxyz key wrong");
+							}
+						}
+					}
+				});
+
+		summedStream.print();
+
+		see.execute();
+	}
+
+
+	/**
+	 * As per FLINK-3702 Flink doesn't support nested pojo fields for sum()
+	 */
+	@Test(expected = IllegalArgumentException.class)
+	public void testFailOnNestedPojoFieldAccessor() throws Exception {
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Data> dataStream = see.fromCollection(elements);
+		dataStream.keyBy("aaa", "stats.count").sum("stats.count");
+	}
+
+	public static class Data {
+		public int sum; // sum
+		public int aaa; // keyBy
+		public int abc; //keyBy
+		public long wxyz; // keyBy
+		public int t1;
+		public int t2;
+		public Policy policy;
+		public Stats stats;
+
+		public Data() {
+		}
+		public Data(int aaa, int abc, int wxyz) {
+			this.sum = 1;
+			this.aaa = aaa;
+			this.abc = abc;
+			this.wxyz = wxyz;
+			this.stats = new Stats();
+			this.stats.count = 123L;
+		}
+
+		@Override
+		public String toString() {
+			return "Data{" +
+					"sum=" + sum +
+					", aaa=" + aaa +
+					", abc=" + abc +
+					", wxyz=" + wxyz +
+					'}';
+		}
+	}
+	public static class Policy {
+		public short a;
+		public short b;
+		public boolean c;
+		public boolean d;
+
+		public Policy() {}
+	}
+
+	public static class Stats {
+		public long count;
+		public float a;
+		public float b;
+		public float c;
+		public float d;
+		public float e;
+
+		public Stats() {}
+	}
+
+}
+


Mime
View raw message