Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7246618F9B for ; Wed, 29 Apr 2015 14:05:39 +0000 (UTC) Received: (qmail 38706 invoked by uid 500); 29 Apr 2015 14:05:39 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 38670 invoked by uid 500); 29 Apr 2015 14:05:39 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 38660 invoked by uid 99); 29 Apr 2015 14:05:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2015 14:05:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3BA88E083A; Wed, 29 Apr 2015 14:05:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbalassi@apache.org To: commits@flink.apache.org Date: Wed, 29 Apr 2015 14:05:41 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] flink git commit: [streaming] Cleanup for projection update [streaming] Cleanup for projection update Closes #630 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40be172e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40be172e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40be172e Branch: refs/heads/master Commit: 40be172e4b379959308860dae4ba36d796c34114 Parents: 8c46155 Author: mbalassi Authored: Wed Apr 29 15:16:25 2015 +0200 Committer: mbalassi Committed: Wed Apr 29 15:26:00 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/StreamProjection.java | 2 - .../streaming/api/operators/ProjectTest.java | 61 +++++++++++++++++--- .../api/operators/ProjectWithoutClassTest.java | 22 ------- 3 files changed, 53 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/40be172e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java index c19dbb2..447b1fd 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java @@ -48,8 +48,6 @@ import org.apache.flink.api.java.tuple.Tuple9; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.streaming.api.operators.StreamProject; -import java.util.Arrays; - public class StreamProjection { private DataStream dataStream; http://git-wip-us.apache.org/repos/asf/flink/blob/40be172e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java index 0a712e0..035abe6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java @@ -18,39 +18,45 @@ package org.apache.flink.streaming.api.operators; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.datastream.StreamProjection; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.util.MockContext; +import org.apache.flink.streaming.util.TestStreamEnvironment; import org.junit.Test; public class ProjectTest implements Serializable { private static final long serialVersionUID = 1L; @Test - public void test() { + public void operatorTest() { TypeInformation> inType = TypeExtractor - .getForObject(new Tuple5(2, "a", 3, "b", - 4)); + .getForObject(new Tuple5(2, "a", 3, "b", 4)); - int[] fields = new int[] { 4, 4, 3 }; - // Class[] classes = new Class[] { Integer.class, Integer.class, String.class }; + int[] fields = new int[]{4, 4, 3}; @SuppressWarnings("unchecked") StreamProject, Tuple3> operator = new StreamProject, Tuple3>( - fields, - new TupleTypeInfo>(StreamProjection - .extractFieldTypes(fields, inType))); + fields, + new TupleTypeInfo>(StreamProjection + .extractFieldTypes(fields, inType))); List> input = new ArrayList>(); @@ -67,4 +73,43 @@ public class ProjectTest implements Serializable { assertEquals(expected, MockContext.createAndExecute(operator, input)); } + + + // tests using projection from the API without explicitly specifying the types + private static final long MEMORY_SIZE = 32; + private static HashSet> expected = new HashSet>(); + private static HashSet> actual = new HashSet>(); + + @Test + public void APIWithoutTypesTest() { + + for (Long i = 1L; i < 11L; i++) { + expected.add(new Tuple2(i, i.doubleValue())); + } + + StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE); + + env.generateSequence(1, 10).map(new MapFunction>() { + @Override + public Tuple3 map(Long value) throws Exception { + return new Tuple3(value, 'c', value.doubleValue()); + } + }) + .project(0, 2) + .addSink(new SinkFunction() { + @Override + @SuppressWarnings("unchecked") + public void invoke(Tuple value) throws Exception { + actual.add( (Tuple2) value); + } + }); + + try { + env.execute(); + } catch (Exception e) { + fail(e.getMessage()); + } + + assertEquals(expected, actual); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/40be172e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java deleted file mode 100644 index f924cfb..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -public class ProjectWithoutClassTest { - - public static void main(String[] args) throws Exception { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - env.generateSequence(1, 100).map(new MapFunction>() { - - @Override - public Tuple3 map(Long value) throws Exception { - return new Tuple3(value, 'c', (double) value); - } - }).project(0,2).print(); - - env.execute("ProjectWithoutClassTest"); - } -}