flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [15/22] flink git commit: [FLINK-6731] [tests] Activate strict checkstyle for flink-tests
Date Wed, 12 Jul 2017 23:44:15 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
deleted file mode 100644
index d62bcd4..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ExecutionEnvironmentITCase.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.util.Collector;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test ExecutionEnvironment from user perspective
- */
-@SuppressWarnings("serial")
-public class ExecutionEnvironmentITCase extends TestLogger {
-	
-	private static final int PARALLELISM = 5;
-
-	/**
-	 * Ensure that the user can pass a custom configuration object to the LocalEnvironment
-	 */
-	@Test
-	public void testLocalEnvironmentWithConfig() throws Exception {
-		Configuration conf = new Configuration();
-		conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
-		env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
-		env.getConfig().disableSysoutLogging();
-
-		DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
-				.rebalance()
-				.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
-					@Override
-					public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
-						out.collect(getRuntimeContext().getIndexOfThisSubtask());
-					}
-				});
-		List<Integer> resultCollection = result.collect();
-		assertEquals(PARALLELISM, resultCollection.size());
-	}
-
-	private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
-
-		private transient boolean emitted;
-
-		@Override
-		public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
-			assertEquals(PARALLELISM, numSplits);
-			return super.createInputSplits(numSplits);
-		}
-
-		@Override
-		public boolean reachedEnd() {
-			return emitted;
-		}
-
-		@Override
-		public Integer nextRecord(Integer reuse) {
-			if (emitted) {
-				return null;
-			}
-			emitted = true;
-			return 1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
deleted file mode 100644
index 993b137..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-@RunWith(Parameterized.class)
-public class FilterITCase extends MultipleProgramsTestBase {
-	public FilterITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test
-	public void testAllRejectingFilter() throws Exception {
-		/*
-		 * Test all-rejecting filter.
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-				filter(new Filter1());
-
-		List<Tuple3<Integer, Long, String>> result = filterDs.collect();
-
-		String expected = "\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class Filter1 implements FilterFunction<Tuple3<Integer,Long,String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-			return false;
-		}
-	}
-
-	@Test
-	public void testAllPassingFilter() throws Exception {
-		/*
-		 * Test all-passing filter.
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-				filter(new Filter2());
-		List<Tuple3<Integer, Long, String>> result = filterDs.collect();
-
-		String expected = "1,1,Hi\n" +
-				"2,2,Hello\n" +
-				"3,2,Hello world\n" +
-				"4,3,Hello world, how are you?\n" +
-				"5,3,I am fine.\n" +
-				"6,3,Luke Skywalker\n" +
-				"7,4,Comment#1\n" +
-				"8,4,Comment#2\n" +
-				"9,4,Comment#3\n" +
-				"10,4,Comment#4\n" +
-				"11,5,Comment#5\n" +
-				"12,5,Comment#6\n" +
-				"13,5,Comment#7\n" +
-				"14,5,Comment#8\n" +
-				"15,5,Comment#9\n" +
-				"16,6,Comment#10\n" +
-				"17,6,Comment#11\n" +
-				"18,6,Comment#12\n" +
-				"19,6,Comment#13\n" +
-				"20,6,Comment#14\n" +
-				"21,6,Comment#15\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class Filter2 implements FilterFunction<Tuple3<Integer,Long,String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-			return true;
-		}
-	}
-
-	@Test
-	public void testFilterOnStringTupleField() throws Exception {
-		/*
-		 * Test filter on String tuple field.
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-				filter(new Filter3());
-		List<Tuple3<Integer, Long, String>> result = filterDs.collect();
-
-		String expected = "3,2,Hello world\n"
-				+
-				"4,3,Hello world, how are you?\n";
-
-		compareResultAsTuples(result, expected);
-
-	}
-
-	public static class Filter3 implements FilterFunction<Tuple3<Integer,Long,String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-			return value.f2.contains("world");
-		}
-	}
-
-	@Test
-	public void testFilterOnIntegerTupleField() throws Exception {
-		/*
-		 * Test filter on Integer tuple field.
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-				filter(new Filter4());
-		List<Tuple3<Integer, Long, String>> result = filterDs.collect();
-
-		String expected = "2,2,Hello\n" +
-				"4,3,Hello world, how are you?\n" +
-				"6,3,Luke Skywalker\n" +
-				"8,4,Comment#2\n" +
-				"10,4,Comment#4\n" +
-				"12,5,Comment#6\n" +
-				"14,5,Comment#8\n" +
-				"16,6,Comment#10\n" +
-				"18,6,Comment#12\n" +
-				"20,6,Comment#14\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class Filter4 implements FilterFunction<Tuple3<Integer,Long,String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-			return (value.f0 % 2) == 0;
-		}
-	}
-
-	@Test
-	public void testFilterBasicType() throws Exception {
-		/*
-		 * Test filter on basic type
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
-		DataSet<String> filterDs = ds.
-				filter(new Filter5());
-		List<String> result = filterDs.collect();
-
-		String expected = "Hi\n" +
-				"Hello\n" +
-				"Hello world\n" +
-				"Hello world, how are you?\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class Filter5 implements FilterFunction<String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(String value) throws Exception {
-			return value.startsWith("H");
-		}
-	}
-
-	@Test
-	public void testFilterOnCustomType() throws Exception {
-		/*
-		 * Test filter on custom type
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-		DataSet<CustomType> filterDs = ds.
-				filter(new Filter6());
-		List<CustomType> result = filterDs.collect();
-
-		String expected = "3,3,Hello world, how are you?\n"
-				+
-				"3,4,I am fine.\n" +
-				"3,5,Luke Skywalker\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class Filter6 implements FilterFunction<CustomType> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(CustomType value) throws Exception {
-			return value.myString.contains("a");
-		}
-	}
-
-	@Test
-	public void testRichFilterOnStringTupleField() throws Exception {
-		/*
-		 * Test filter on String tuple field.
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-				filter(new RichFilter1()).withBroadcastSet(ints, "ints");
-		List<Tuple3<Integer, Long, String>> result = filterDs.collect();
-
-		String expected = "1,1,Hi\n" +
-				"2,2,Hello\n" +
-				"3,2,Hello world\n" +
-				"4,3,Hello world, how are you?\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class RichFilter1 extends RichFilterFunction<Tuple3<Integer,Long,String>> {
-		private static final long serialVersionUID = 1L;
-
-		int literal = -1;
-
-		@Override
-		public void open(Configuration config) {
-			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
-			for(int i: ints) {
-				literal = literal < i ? i : literal;
-			}
-		}
-
-		@Override
-		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-			return value.f0 < literal;
-		}
-	}
-
-	@Test
-	public void testFilterWithBroadcastVariables() throws Exception {
-		/*
-		 * Test filter with broadcast variables
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-				filter(new RichFilter2()).withBroadcastSet(intDs, "ints");
-		List<Tuple3<Integer, Long, String>> result = filterDs.collect();
-
-		String expected = "11,5,Comment#5\n" +
-				"12,5,Comment#6\n" +
-				"13,5,Comment#7\n" +
-				"14,5,Comment#8\n" +
-				"15,5,Comment#9\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class RichFilter2 extends RichFilterFunction<Tuple3<Integer,Long,String>> {
-		private static final long serialVersionUID = 1L;
-		private  int broadcastSum = 0;
-
-		@Override
-		public void open(Configuration config) {
-			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
-			for(Integer i : ints) {
-				broadcastSum += i;
-			}
-		}
-
-		@Override
-		public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
-			return (value.f1 == (broadcastSum / 11));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
deleted file mode 100644
index 3eb870d..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class FirstNITCase extends MultipleProgramsTestBase {
-	public FirstNITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test
-	public void testFirstNOnUngroupedDS() throws Exception {
-		/*
-		 * First-n on ungrouped data set
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple1<Integer>> seven = ds.first(7).map(new OneMapper()).sum(0);
-
-		List<Tuple1<Integer>> result = seven.collect();
-
-		String expected = "(7)\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testFirstNOnGroupedDS() throws Exception {
-		/*
-		 * First-n on grouped data set
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).first(4)
-				.map(new OneMapper2()).groupBy(0).sum(1);
-
-		List<Tuple2<Long, Integer>> result = first.collect();
-
-		String expected = "(1,1)\n(2,2)\n(3,3)\n(4,4)\n(5,4)\n(6,4)\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testFirstNOnGroupedAndSortedDS() throws Exception {
-		/*
-		 * First-n on grouped and sorted data set
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
-				.project(1,0);
-
-		List<Tuple2<Long, Integer>> result = first.collect();
-
-		String expected = "(1,1)\n"
-				+ "(2,3)\n(2,2)\n"
-				+ "(3,6)\n(3,5)\n(3,4)\n"
-				+ "(4,10)\n(4,9)\n(4,8)\n"
-				+ "(5,15)\n(5,14)\n(5,13)\n"
-				+ "(6,21)\n(6,20)\n(6,19)\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	/**
-	 * Test for FLINK-2135
-	 */
-	@Test
-	public void testFaultyCast() throws Exception {
-		ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<String> b = ee.fromElements("a", "b");
-		GroupReduceOperator<String, String> a = b.groupBy(new KeySelector<String, Long>() {
-			@Override
-			public Long getKey(String value) throws Exception {
-				return 1L;
-			}
-		}).sortGroup(new KeySelector<String, Double>() {
-			@Override
-			public Double getKey(String value) throws Exception {
-				return 1.0;
-			}
-		}, Order.DESCENDING).first(1);
-
-		List<String> result = b.collect();
-
-		String expected = "a\nb";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> {
-		private static final long serialVersionUID = 1L;
-		private final Tuple1<Integer> one = new Tuple1<Integer>(1);
-		@Override
-		public Tuple1<Integer> map(Tuple3<Integer, Long, String> value) {
-			return one;
-		}
-	}
-
-	public static class OneMapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>> {
-		private static final long serialVersionUID = 1L;
-		private final Tuple2<Long, Integer> one = new Tuple2<Long, Integer>(0l,1);
-		@Override
-		public Tuple2<Long, Integer> map(Tuple3<Integer, Long, String> value) {
-			one.f0 = value.f1;
-			return one;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
deleted file mode 100644
index 4962da8..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-@RunWith(Parameterized.class)
-public class FlatMapITCase extends MultipleProgramsTestBase {
-	public FlatMapITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test
-	public void testNonPassingFlatMap() throws Exception {
-		/*
-		 * Test non-passing flatmap
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
-		DataSet<String> nonPassingFlatMapDs = ds.
-				flatMap(new FlatMapper1());
-
-		List<String> result = nonPassingFlatMapDs.collect();
-
-		String expected = "\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class FlatMapper1 implements FlatMapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String value, Collector<String> out) throws Exception {
-			if ( value.contains("bananas") ) {
-				out.collect(value);
-			}
-		}
-	}
-
-	@Test
-	public void testDataDuplicatingFlatMap() throws Exception {
-		/*
-		 * Test data duplicating flatmap
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
-		DataSet<String> duplicatingFlatMapDs = ds.
-				flatMap(new FlatMapper2());
-
-		List<String> result = duplicatingFlatMapDs.collect();
-
-		String expected = "Hi\n" + "HI\n" +
-				"Hello\n" + "HELLO\n" +
-				"Hello world\n" + "HELLO WORLD\n" +
-				"Hello world, how are you?\n" + "HELLO WORLD, HOW ARE YOU?\n" +
-				"I am fine.\n" + "I AM FINE.\n" +
-				"Luke Skywalker\n" + "LUKE SKYWALKER\n" +
-				"Random comment\n" + "RANDOM COMMENT\n" +
-				"LOL\n" + "LOL\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class FlatMapper2 implements FlatMapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String value, Collector<String> out) throws Exception {
-			out.collect(value);
-			out.collect(value.toUpperCase());
-		}
-	}
-
-	@Test
-	public void testFlatMapWithVaryingNumberOfEmittedTuples() throws Exception {
-		/*
-		 * Test flatmap with varying number of emitted tuples
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> varyingTuplesMapDs = ds.
-				flatMap(new FlatMapper3());
-
-		List<Tuple3<Integer, Long, String>> result = varyingTuplesMapDs.collect();
-
-		String expected = "1,1,Hi\n" +
-				"2,2,Hello\n" + "2,2,Hello\n" +
-				"4,3,Hello world, how are you?\n" +
-				"5,3,I am fine.\n" + "5,3,I am fine.\n" +
-				"7,4,Comment#1\n" +
-				"8,4,Comment#2\n" + "8,4,Comment#2\n" +
-				"10,4,Comment#4\n" +
-				"11,5,Comment#5\n" + "11,5,Comment#5\n" +
-				"13,5,Comment#7\n" +
-				"14,5,Comment#8\n" + "14,5,Comment#8\n" +
-				"16,6,Comment#10\n" +
-				"17,6,Comment#11\n" + "17,6,Comment#11\n" +
-				"19,6,Comment#13\n" +
-				"20,6,Comment#14\n" + "20,6,Comment#14\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class FlatMapper3 implements FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Tuple3<Integer, Long, String> value,
-				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-			final int numTuples = value.f0 % 3;
-			for ( int i = 0; i < numTuples; i++ ) {
-				out.collect(value);
-			}
-		}
-	}
-
-	@Test
-	public void testTypeConversionFlatMapperCustomToTuple() throws Exception {
-		/*
-		 * Test type conversion flatmapper (Custom -> Tuple)
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> typeConversionFlatMapDs = ds.
-				flatMap(new FlatMapper4());
-
-		List<Tuple3<Integer, Long, String>> result = typeConversionFlatMapDs.collect();
-
-		String expected = "1,0,Hi\n" +
-				"2,1,Hello\n" +
-				"2,2,Hello world\n" +
-				"3,3,Hello world, how are you?\n" +
-				"3,4,I am fine.\n" +
-				"3,5,Luke Skywalker\n" +
-				"4,6,Comment#1\n" +
-				"4,7,Comment#2\n" +
-				"4,8,Comment#3\n" +
-				"4,9,Comment#4\n" +
-				"5,10,Comment#5\n" +
-				"5,11,Comment#6\n" +
-				"5,12,Comment#7\n" +
-				"5,13,Comment#8\n" +
-				"5,14,Comment#9\n" +
-				"6,15,Comment#10\n" +
-				"6,16,Comment#11\n" +
-				"6,17,Comment#12\n" +
-				"6,18,Comment#13\n" +
-				"6,19,Comment#14\n" +
-				"6,20,Comment#15\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class FlatMapper4 implements FlatMapFunction<CustomType, Tuple3<Integer, Long, String>> {
-		private static final long serialVersionUID = 1L;
-		private final Tuple3<Integer, Long, String> outTuple =
-				new Tuple3<Integer, Long, String>();
-
-		@Override
-		public void flatMap(CustomType value, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-			outTuple.setField(value.myInt, 0);
-			outTuple.setField(value.myLong, 1);
-			outTuple.setField(value.myString, 2);
-			out.collect(outTuple);
-		}
-	}
-
-	@Test
-	public void testTypeConversionFlatMapperTupleToBasic() throws Exception {
-		/*
-		 * Test type conversion flatmapper (Tuple -> Basic)
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<String> typeConversionFlatMapDs = ds.
-				flatMap(new FlatMapper5());
-
-		List<String> result = typeConversionFlatMapDs.collect();
-
-		String expected = "Hi\n" + "Hello\n" + "Hello world\n"
-				+
-				"Hello world, how are you?\n" +
-				"I am fine.\n" + "Luke Skywalker\n" +
-				"Comment#1\n" +	"Comment#2\n" +
-				"Comment#3\n" +	"Comment#4\n" +
-				"Comment#5\n" +	"Comment#6\n" +
-				"Comment#7\n" + "Comment#8\n" +
-				"Comment#9\n" +	"Comment#10\n" +
-				"Comment#11\n" + "Comment#12\n" +
-				"Comment#13\n" + "Comment#14\n" +
-				"Comment#15\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class FlatMapper5 implements FlatMapFunction<Tuple3<Integer, Long, String>,String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Tuple3<Integer, Long, String> value, Collector<String> out) throws Exception {
-			out.collect(value.f2);
-		}
-	}
-
-	@Test
-	public void testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt() throws Exception {
-		/*
-		 * Test flatmapper if UDF returns input object
-		 * multiple times and changes it in between
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> inputObjFlatMapDs = ds.
-				flatMap(new FlatMapper6());
-
-		List<Tuple3<Integer, Long, String>> result = inputObjFlatMapDs.collect();
-
-		String expected = "0,1,Hi\n" +
-				"0,2,Hello\n" + "1,2,Hello\n" +
-				"0,2,Hello world\n" + "1,2,Hello world\n" + "2,2,Hello world\n" +
-				"0,3,I am fine.\n" +
-				"0,3,Luke Skywalker\n" + "1,3,Luke Skywalker\n" +
-				"0,4,Comment#1\n" + "1,4,Comment#1\n" + "2,4,Comment#1\n" +
-				"0,4,Comment#3\n" +
-				"0,4,Comment#4\n" + "1,4,Comment#4\n" +
-				"0,5,Comment#5\n" + "1,5,Comment#5\n" + "2,5,Comment#5\n" +
-				"0,5,Comment#7\n" +
-				"0,5,Comment#8\n" + "1,5,Comment#8\n" +
-				"0,5,Comment#9\n" + "1,5,Comment#9\n" + "2,5,Comment#9\n" +
-				"0,6,Comment#11\n" +
-				"0,6,Comment#12\n" + "1,6,Comment#12\n" +
-				"0,6,Comment#13\n" + "1,6,Comment#13\n" + "2,6,Comment#13\n" +
-				"0,6,Comment#15\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class FlatMapper6 implements FlatMapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap( Tuple3<Integer, Long, String> value,
-				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-			final int numTuples = value.f0 % 4;
-			for ( int i = 0; i < numTuples; i++ ) {
-				value.setField(i, 0);
-				out.collect(value);
-			}
-		}
-	}
-
-	@Test
-	public void testFlatMapWithBroadcastSet() throws Exception {
-		/*
-		 * Test flatmap with broadcast set
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> bcFlatMapDs = ds.
-				flatMap(new RichFlatMapper1()).withBroadcastSet(ints, "ints");
-		List<Tuple3<Integer, Long, String>> result = bcFlatMapDs.collect();
-
-		String expected = "55,1,Hi\n" +
-				"55,2,Hello\n" +
-				"55,2,Hello world\n" +
-				"55,3,Hello world, how are you?\n" +
-				"55,3,I am fine.\n" +
-				"55,3,Luke Skywalker\n" +
-				"55,4,Comment#1\n" +
-				"55,4,Comment#2\n" +
-				"55,4,Comment#3\n" +
-				"55,4,Comment#4\n" +
-				"55,5,Comment#5\n" +
-				"55,5,Comment#6\n" +
-				"55,5,Comment#7\n" +
-				"55,5,Comment#8\n" +
-				"55,5,Comment#9\n" +
-				"55,6,Comment#10\n" +
-				"55,6,Comment#11\n" +
-				"55,6,Comment#12\n" +
-				"55,6,Comment#13\n" +
-				"55,6,Comment#14\n" +
-				"55,6,Comment#15\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class RichFlatMapper1 extends RichFlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>> {
-		private static final long serialVersionUID = 1L;
-		private final Tuple3<Integer, Long, String> outTuple =
-				new Tuple3<Integer, Long, String>();
-		private Integer f2Replace = 0;
-
-		@Override
-		public void open(Configuration config) {
-			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
-			int sum = 0;
-			for(Integer i : ints) {
-				sum += i;
-			}
-			f2Replace = sum;
-		}
-
-		@Override
-		public void flatMap(Tuple3<Integer, Long, String> value,
-				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-			outTuple.setFields(f2Replace, value.f1, value.f2);
-			out.collect(outTuple);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
deleted file mode 100644
index 9b56c63..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-/**
- * The GroupCombine operator is not easy to test because it is essentially just a combiner. The result can be
- * the result of a normal groupReduce at any stage its execution. The basic idea is to preserve the grouping key
- * in the partial result, so that we can do a reduceGroup afterwards to finalize the results for verification.
- * In addition, we can use hashPartition to partition the data and check if no shuffling (just combining) has
- * been performed.
- */
-public class GroupCombineITCase extends MultipleProgramsTestBase {
-
-	public GroupCombineITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	private static String identityResult = "1,1,Hi\n" +
-			"2,2,Hello\n" +
-			"3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n" +
-			"5,3,I am fine.\n" +
-			"6,3,Luke Skywalker\n" +
-			"7,4,Comment#1\n" +
-			"8,4,Comment#2\n" +
-			"9,4,Comment#3\n" +
-			"10,4,Comment#4\n" +
-			"11,5,Comment#5\n" +
-			"12,5,Comment#6\n" +
-			"13,5,Comment#7\n" +
-			"14,5,Comment#8\n" +
-			"15,5,Comment#9\n" +
-			"16,6,Comment#10\n" +
-			"17,6,Comment#11\n" +
-			"18,6,Comment#12\n" +
-			"19,6,Comment#13\n" +
-			"20,6,Comment#14\n" +
-			"21,6,Comment#15\n";
-
-	@Test
-	public void testAllGroupCombineIdentity() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
-				// combine
-				.combineGroup(new IdentityFunction())
-				// fully reduce
-				.reduceGroup(new IdentityFunction());
-
-		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
-
-		compareResultAsTuples(result, identityResult);
-	}
-
-	@Test
-	public void testIdentity() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
-				// combine
-				.combineGroup(new IdentityFunction())
-				// fully reduce
-				.reduceGroup(new IdentityFunction());
-
-		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
-
-		compareResultAsTuples(result, identityResult);
-	}
-
-	@Test
-	public void testIdentityWithGroupBy() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
-				.groupBy(1)
-				// combine
-				.combineGroup(new IdentityFunction())
-				// fully reduce
-				.reduceGroup(new IdentityFunction());
-
-		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
-
-		compareResultAsTuples(result, identityResult);
-	}
-
-	@Test
-	public void testIdentityWithGroupByAndSort() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> reduceDs = ds
-				.groupBy(1)
-				.sortGroup(1, Order.DESCENDING)
-				// reduce partially
-				.combineGroup(new IdentityFunction())
-				.groupBy(1)
-				.sortGroup(1, Order.DESCENDING)
-				// fully reduce
-				.reduceGroup(new IdentityFunction());
-
-		List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
-
-		compareResultAsTuples(result, identityResult);
-	}
-
-	@Test
-	public void testPartialReduceWithIdenticalInputOutputType() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// data
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		DataSet<Tuple2<Long, Tuple3<Integer, Long, String>>> dsWrapped = ds
-				// wrap values as Kv pairs with the grouping key as key
-				.map(new Tuple3KvWrapper());
-
-		List<Tuple3<Integer, Long, String>> result = dsWrapped
-				.groupBy(0)
-				// reduce partially
-				.combineGroup(new Tuple3toTuple3GroupReduce())
-				.groupBy(0)
-				// reduce fully to check result
-				.reduceGroup(new Tuple3toTuple3GroupReduce())
-				//unwrap
-				.map(new MapFunction<Tuple2<Long, Tuple3<Integer, Long, String>>, Tuple3<Integer, Long, String>>() {
-					@Override
-					public Tuple3<Integer, Long, String> map(Tuple2<Long, Tuple3<Integer, Long, String>> value) throws Exception {
-						return value.f1;
-					}
-				}).collect();
-
-		String expected = "1,1,combined\n" +
-				"5,4,combined\n" +
-				"15,9,combined\n" +
-				"34,16,combined\n" +
-				"65,25,combined\n" +
-				"111,36,combined\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testPartialReduceWithDifferentInputOutputType() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// data
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		DataSet<Tuple2<Long, Tuple3<Integer, Long, String>>> dsWrapped = ds
-				// wrap values as Kv pairs with the grouping key as key
-				.map(new Tuple3KvWrapper());
-
-		List<Tuple2<Integer, Long>> result = dsWrapped
-				.groupBy(0)
-				// reduce partially
-				.combineGroup(new Tuple3toTuple2GroupReduce())
-				.groupBy(0)
-				// reduce fully to check result
-				.reduceGroup(new Tuple2toTuple2GroupReduce())
-				//unwrap
-				.map(new MapFunction<Tuple2<Long,Tuple2<Integer,Long>>, Tuple2<Integer,Long>>() {
-					@Override
-					public Tuple2<Integer, Long> map(Tuple2<Long, Tuple2<Integer, Long>> value) throws Exception {
-						return value.f1;
-					}
-				}).collect();
-
-		String expected = "1,3\n" +
-				"5,20\n" +
-				"15,58\n" +
-				"34,52\n" +
-				"65,70\n" +
-				"111,96\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	// check if no shuffle is being executed
-	public void testCheckPartitionShuffleGroupBy() throws Exception {
-
-		org.junit.Assume.assumeTrue(mode != TestExecutionMode.COLLECTION);
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// data
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		// partition and group data
-		UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = ds.partitionByHash(0).groupBy(1);
-
-		List<Tuple2<Long, Integer>> result = partitionedDS
-				.combineGroup(
-						new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() {
-			@Override
-			public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception {
-				int count = 0;
-				long key = 0;
-				for (Tuple3<Integer, Long, String> value : values) {
-					key = value.f1;
-					count++;
-				}
-				out.collect(new Tuple2<>(key, count));
-			}
-		}).collect();
-
-		String[] localExpected = new String[] { "(6,6)", "(5,5)" + "(4,4)", "(3,3)", "(2,2)", "(1,1)" };
-
-		String[] resultAsStringArray = new String[result.size()];
-		for (int i = 0; i < resultAsStringArray.length; ++i) {
-			resultAsStringArray[i] = result.get(i).toString();
-		}
-		Arrays.sort(resultAsStringArray);
-
-		Assert.assertEquals("The two arrays were identical.", false, Arrays.equals(localExpected, resultAsStringArray));
-	}
-
-	@Test
-	// check if parallelism of 1 results in the same data like a shuffle
-	public void testCheckPartitionShuffleDOP1() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		env.setParallelism(1);
-
-		// data
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		// partition and group data
-		UnsortedGrouping<Tuple3<Integer, Long, String>> partitionedDS = ds.partitionByHash(0).groupBy(1);
-
-		List<Tuple2<Long, Integer>> result = partitionedDS
-				.combineGroup(
-				new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>() {
-					@Override
-					public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception {
-						int count = 0;
-						long key = 0;
-						for (Tuple3<Integer, Long, String> value : values) {
-							key = value.f1;
-							count++;
-						}
-						out.collect(new Tuple2<>(key, count));
-					}
-				}).collect();
-
-		String expected = "6,6\n" +
-				"5,5\n" +
-				"4,4\n" +
-				"3,3\n" +
-				"2,2\n" +
-				"1,1\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	// check if all API methods are callable
-	public void testAPI() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple1<String>> ds = CollectionDataSets.getStringDataSet(env).map(new MapFunction<String, Tuple1<String>>() {
-			@Override
-			public Tuple1<String> map(String value) throws Exception {
-				return new Tuple1<>(value);
-			}
-		});
-
-		// all methods on DataSet
-		ds.combineGroup(new GroupCombineFunctionExample())
-		.output(new DiscardingOutputFormat<Tuple1<String>>());
-
-		// all methods on UnsortedGrouping
-		ds.groupBy(0).combineGroup(new GroupCombineFunctionExample())
-		.output(new DiscardingOutputFormat<Tuple1<String>>());
-
-		// all methods on SortedGrouping
-		ds.groupBy(0).sortGroup(0, Order.ASCENDING).combineGroup(new GroupCombineFunctionExample())
-		.output(new DiscardingOutputFormat<Tuple1<String>>());
-
-		env.execute();
-	}
-
-	public static class GroupCombineFunctionExample implements GroupCombineFunction<Tuple1<String>, Tuple1<String>> {
-
-		@Override
-		public void combine(Iterable<Tuple1<String>> values, Collector<Tuple1<String>> out) throws Exception {
-			for (Tuple1<String> value : values) {
-				out.collect(value);
-			}
-		}
-	}
-
-	public static class ScalaGroupCombineFunctionExample implements GroupCombineFunction<scala.Tuple1<String>, scala.Tuple1<String>> {
-
-		@Override
-		public void combine(Iterable<scala.Tuple1<String>> values, Collector<scala.Tuple1<String>> out) throws Exception {
-			for (scala.Tuple1<String> value : values) {
-				out.collect(value);
-			}
-		}
-	}
-
-	public static class IdentityFunction implements GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>,
-	GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
-
-		@Override
-		public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-			for (Tuple3<Integer, Long, String> value : values) {
-				out.collect(new Tuple3<>(value.f0, value.f1, value.f2));
-			}
-		}
-
-		@Override
-		public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-			for (Tuple3<Integer, Long, String> value : values) {
-				out.collect(new Tuple3<>(value.f0, value.f1, value.f2));
-			}
-		}
-	}
-
-
-	public static class Tuple3toTuple3GroupReduce implements KvGroupReduce<Long, Tuple3<Integer, Long, String>,
-			Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
-
-		@Override
-		public void combine(Iterable<Tuple2<Long, Tuple3<Integer, Long, String>>> values, Collector<Tuple2<Long,
-				Tuple3<Integer, Long, String>>> out) throws Exception {
-			int i = 0;
-			long l = 0;
-			long key = 0;
-
-			// collapse groups
-			for (Tuple2<Long, Tuple3<Integer, Long, String>> value : values) {
-				key = value.f0;
-				Tuple3<Integer, Long, String> extracted = value.f1;
-				i += extracted.f0;
-				l += extracted.f1;
-			}
-
-			Tuple3<Integer, Long, String> result = new Tuple3<>(i, l, "combined");
-			out.collect(new Tuple2<>(key, result));
-		}
-
-		@Override
-		public void reduce(Iterable<Tuple2<Long, Tuple3<Integer, Long, String>>> values,
-											 Collector<Tuple2<Long, Tuple3<Integer, Long, String>>> out) throws Exception {
-			combine(values, out);
-		}
-	}
-
-	public static class Tuple3toTuple2GroupReduce implements KvGroupReduce<Long, Tuple3<Integer, Long, String>,
-			Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
-
-		@Override
-		public void combine(Iterable<Tuple2<Long, Tuple3<Integer, Long, String>>> values, Collector<Tuple2<Long,
-				Tuple2<Integer, Long>>> out) throws Exception {
-			int i = 0;
-			long l = 0;
-			long key = 0;
-
-			// collapse groups
-			for (Tuple2<Long, Tuple3<Integer, Long, String>> value : values) {
-				key = value.f0;
-				Tuple3<Integer, Long, String> extracted = value.f1;
-				i += extracted.f0;
-				l += extracted.f1 + extracted.f2.length();
-			}
-
-			Tuple2<Integer, Long> result = new Tuple2<>(i, l);
-			out.collect(new Tuple2<>(key, result));
-		}
-
-		@Override
-		public void reduce(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long,
-				Tuple2<Integer, Long>>> out) throws Exception {
-			new Tuple2toTuple2GroupReduce().reduce(values, out);
-		}
-	}
-
-	public static class Tuple2toTuple2GroupReduce implements KvGroupReduce<Long, Tuple2<Integer, Long>,
-			Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
-
-		@Override
-		public void combine(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long, Tuple2<Integer,
-				Long>>> out) throws Exception {
-			int i = 0;
-			long l = 0;
-			long key = 0;
-
-			// collapse groups
-			for (Tuple2<Long, Tuple2<Integer, Long>> value : values) {
-				key = value.f0;
-				Tuple2<Integer, Long> extracted = value.f1;
-				i += extracted.f0;
-				l += extracted.f1;
-			}
-
-			Tuple2<Integer, Long> result = new Tuple2<>(i, l);
-
-			out.collect(new Tuple2<>(key, result));
-		}
-
-		@Override
-		public void reduce(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long,
-				Tuple2<Integer, Long>>> out) throws Exception {
-			combine(values, out);
-		}
-	}
-
-	public class Tuple3KvWrapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple2<Long,
-			Tuple3<Integer, Long, String>>> {
-		@Override
-		public Tuple2<Long, Tuple3<Integer, Long, String>> map(Tuple3<Integer, Long, String> value) throws Exception {
-			return new Tuple2<>(value.f1, value);
-		}
-	}
-
-
-	public interface CombineAndReduceGroup <IN, INT, OUT> extends GroupCombineFunction<IN, INT>,
-			GroupReduceFunction<INT, OUT> {
-	}
-
-	public interface KvGroupReduce<K, V, INT, OUT> extends CombineAndReduceGroup<Tuple2<K, V>, Tuple2<K, INT>,
-			Tuple2<K, OUT>> {
-	}
-
-}


Mime
View raw message