Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 66246200C41 for ; Thu, 9 Mar 2017 20:00:01 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 64B12160B84; Thu, 9 Mar 2017 19:00:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 91365160B5F for ; Thu, 9 Mar 2017 20:00:00 +0100 (CET) Received: (qmail 23301 invoked by uid 500); 9 Mar 2017 18:59:59 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 23287 invoked by uid 99); 9 Mar 2017 18:59:59 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Mar 2017 18:59:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 44D2AC6688 for ; Thu, 9 Mar 2017 18:59:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.02 X-Spam-Level: X-Spam-Status: No, score=-4.02 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 2zqN8ttT14qX for ; Thu, 9 Mar 2017 18:59:58 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 1D7995FCED for ; Thu, 9 Mar 2017 18:59:55 +0000 (UTC) Received: (qmail 21823 invoked by uid 99); 9 Mar 2017 18:59:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Mar 2017 18:59:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2E45ADFF66; Thu, 9 Mar 2017 18:59:55 +0000 (UTC) From: zentol To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #3501: [FLINK-5874] Restrict key types in the DataStream ... Content-Type: text/plain Message-Id: <20170309185955.2E45ADFF66@git1-us-west.apache.org> Date: Thu, 9 Mar 2017 18:59:55 +0000 (UTC) archived-at: Thu, 09 Mar 2017 19:00:01 -0000 Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105241358 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } ///////////////////////////////////////////////////////////// + // KeyBy testing + ///////////////////////////////////////////////////////////// + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector, int[]> keySelector = + new KeySelector, int[]>() { + + @Override + public int[] getKey(Tuple2 value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector, Integer[]> keySelector = + new KeySelector, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2 value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector, TestClass[]> keySelector = + new KeySelector, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2 value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private void testKeyRejection(KeySelector, K> keySelector, TypeInformation expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exception { + return value; + } + }).addSink(new SinkFunction() { + @Override + public void invoke(POJOwithHashCode value) throws Exception { + Assert.assertEquals(value.getId(), new int[]{1, 2}); + } + }); + } + + @Test + public void testPOJOnoHashCodeKeyRejection() { + + KeySelector keySelector = + new KeySelector() { + @Override + public POJOnoHashCode getKey(POJOnoHashCode value) throws Exception { + return value; + } + }; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new POJOnoHashCode(new int[] {1, 2})); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + + input.keyBy(keySelector); + } + + // composite key tests : Tuples + + @Test + public void testTupleNestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "test-test")); + + TypeInformation expectedTypeInfo = new TupleTypeInfo>( + BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy(new KeySelector, Tuple2>() { + @Override + public Tuple2 getKey(Tuple2 value) throws Exception { + return value; + } + }); + } + + @Test + public void testPrimitiveKeyRejection() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setMaxParallelism(1); + + DataStream input = env.fromElements(new Integer(10000)); + + TypeInformation expectedTypeInfo = IntegerTypeInfo.INT_TYPE_INFO; + + KeyedStream keyedStream = input.keyBy(new KeySelector() { + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }); + + Assert.assertEquals(expectedTypeInfo, keyedStream.getKeyType()); + + keyedStream.addSink(new SinkFunction() { + @Override + public void invoke(Integer value) throws Exception { + Assert.assertEquals(10000L, (long) value); + } + }); + + env.execute(); + } + + private static class TestClass { + + private final int id; + + TestClass(int id) { + this.id = id; + } + } + + public static class POJOnoHashCode { + + private int[] id; + + public POJOnoHashCode() {} + + public POJOnoHashCode(int[] id) { + this.id = id; + } + + public int[] getId() { + return id; + } + + public void setId(int[] id) { + this.id = id; + } + } + + public static class POJOwithHashCode { --- End diff -- You could extend PojoNoHashCode and only implement hashCode. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---