flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [18/23] git commit: [FLINK-1110] Add iteration aggregators to collection based execution
Date Fri, 03 Oct 2014 16:25:14 GMT
[FLINK-1110] Add iteration aggregators to collection based execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/15f2544c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/15f2544c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/15f2544c

Branch: refs/heads/master
Commit: 15f2544c7677ccfff02f506ea7600d84ef40a171
Parents: 3cc9a28
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Oct 1 21:42:12 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Oct 3 16:22:34 2014 +0200

----------------------------------------------------------------------
 .../functions/util/CopyingListCollector.java    |   2 +-
 .../util/IterationRuntimeUDFContext.java        |  48 -----
 .../common/functions/util/ListCollector.java    |   2 +-
 .../common/operators/CollectionExecutor.java    |  79 ++++++++-
 .../operators/base/BulkIterationBase.java       |  31 ++--
 .../flink/test/util/JavaProgramTestBase.java    |   4 +
 .../test/accumulators/AccumulatorITCase.java    | 176 ++++++-------------
 .../test/iterative/DeltaPageRankITCase.java     |  28 +--
 ...nentsWithParametrizableAggregatorITCase.java |   2 +
 ...entsWithParametrizableConvergenceITCase.java |   2 +
 .../apache/flink/test/util/FailingTestBase.java |  83 +++++----
 11 files changed, 200 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f2544c/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
index 8620981..1573ef9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/CopyingListCollector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f2544c/flink-core/src/main/java/org/apache/flink/api/common/functions/util/IterationRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/IterationRuntimeUDFContext.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/IterationRuntimeUDFContext.java
deleted file mode 100644
index d4de199..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/IterationRuntimeUDFContext.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.functions.util;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.types.Value;
-
-public class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext
{
-
-	private final int superstep;
-
-	public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex,
int superstep) {
-		super(name, numParallelSubtasks, subtaskIndex);
-		this.superstep = superstep;
-	}
-
-	@Override
-	public int getSuperstepNumber() {
-		return superstep;
-	}
-
-	@Override
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return null;
-	}
-
-	@Override
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f2544c/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
index f7f7865..a3a369b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f2544c/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index e82d7cb..d204491 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -33,8 +33,11 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.aggregators.AggregatorWithName;
+import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.util.IterationRuntimeUDFContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
@@ -45,6 +48,7 @@ import org.apache.flink.api.common.operators.util.TypeComparable;
 import org.apache.flink.api.common.typeinfo.CompositeType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.types.Value;
 import org.apache.flink.util.Visitor;
 
 /**
@@ -58,6 +62,10 @@ public class CollectionExecutor {
 	
 	private final Map<String, Accumulator<?, ?>> accumulators;
 	
+	private final Map<String, Value> previousAggregates;
+	
+	private final Map<String, Aggregator<?>> aggregators;
+	
 	private final boolean mutableObjectSafeMode;
 	
 	// --------------------------------------------------------------------------------------------
@@ -68,8 +76,11 @@ public class CollectionExecutor {
 		
 	public CollectionExecutor(boolean mutableObjectSafeMode) {
 		this.mutableObjectSafeMode = mutableObjectSafeMode;
+		
 		this.intermediateResults = new HashMap<Operator<?>, List<?>>();
 		this.accumulators = new HashMap<String, Accumulator<?,?>>();
+		this.previousAggregates = new HashMap<String, Value>();
+		this.aggregators = new HashMap<String, Aggregator<?>>();
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -251,6 +262,14 @@ public class CollectionExecutor {
 			iteration.getTerminationCriterion().accept(dynCollector);
 		}
 		
+		// register the aggregators
+		for (AggregatorWithName<?> a : iteration.getAggregators().getAllRegisteredAggregators())
{
+			aggregators.put(a.getName(), a.getAggregator());
+		}
+		
+		String convCriterionAggName = iteration.getAggregators().getConvergenceCriterionAggregatorName();
+		ConvergenceCriterion<Value> convCriterion = (ConvergenceCriterion<Value>) iteration.getAggregators().getConvergenceCriterion();
+		
 		List<T> currentResult = inputData;
 		
 		final int maxIterations = iteration.getMaximumNumberOfIterations();
@@ -265,8 +284,13 @@ public class CollectionExecutor {
 
 			// evaluate the termination criterion
 			if (iteration.getTerminationCriterion() != null) {
-				List<?> term = execute(((SingleInputOperator<?, ?, ?>) iteration.getTerminationCriterion()).getInput(),
superstep);
-				if (term.isEmpty()) {
+				execute(iteration.getTerminationCriterion(), superstep);
+			}
+			
+			// evaluate the aggregator convergence criterion
+			if (convCriterion != null && convCriterionAggName != null) {
+				Value v = aggregators.get(convCriterionAggName).getAggregate();
+				if (convCriterion.isConverged(superstep, v)) {
 					break;
 				}
 			}
@@ -275,8 +299,17 @@ public class CollectionExecutor {
 			for (Operator<?> o : dynamics) {
 				intermediateResults.remove(o);
 			}
+			
+			// set the previous iteration's aggregates and reset the aggregators
+			for (Map.Entry<String, Aggregator<?>> e : aggregators.entrySet()) {
+				previousAggregates.put(e.getKey(), e.getValue().getAggregate());
+				e.getValue().reset();
+			}
 		}
 		
+		previousAggregates.clear();
+		aggregators.clear();
+		
 		return currentResult;
 	}
 
@@ -323,6 +356,10 @@ public class CollectionExecutor {
 
 		List<?> currentWorkset = worksetInputData;
 
+		// register the aggregators
+		for (AggregatorWithName<?> a : iteration.getAggregators().getAllRegisteredAggregators())
{
+			aggregators.put(a.getName(), a.getAggregator());
+		}
 
 		final int maxIterations = iteration.getMaximumNumberOfIterations();
 
@@ -355,7 +392,16 @@ public class CollectionExecutor {
 			for (Operator<?> o : dynamics) {
 				intermediateResults.remove(o);
 			}
+			
+			// set the previous iteration's aggregates and reset the aggregators
+			for (Map.Entry<String, Aggregator<?>> e : aggregators.entrySet()) {
+				previousAggregates.put(e.getKey(), e.getValue().getAggregate());
+				e.getValue().reset();
+			}
 		}
+		
+		previousAggregates.clear();
+		aggregators.clear();
 
 		List<T> currentSolution = new ArrayList<T>(solutionMap.size());
 		currentSolution.addAll(solutionMap.values());
@@ -427,4 +473,31 @@ public class CollectionExecutor {
 			}
 		}
 	}
+	
+	private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext
{
+
+		private final int superstep;
+
+		public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex,
int superstep) {
+			super(name, numParallelSubtasks, subtaskIndex);
+			this.superstep = superstep;
+		}
+
+		@Override
+		public int getSuperstepNumber() {
+			return superstep;
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+			return (T) aggregators.get(name);
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public <T extends Value> T getPreviousIterationAggregate(String name) {
+			return (T) previousAggregates.get(name);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f2544c/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index 4fbf65e..31080cd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.base;
 
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -31,7 +29,7 @@ import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.operators.IterationOperator;
 import org.apache.flink.api.common.operators.Operator;
@@ -40,17 +38,15 @@ import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Nothing;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Visitor;
 
 /**
  * 
  */
-@SuppressWarnings("deprecation")
 public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractRichFunction>
implements IterationOperator {
 	
 	private static String DEFAULT_NAME = "<Unnamed Bulk Iteration>";
@@ -122,10 +118,13 @@ public class BulkIterationBase<T> extends SingleInputOperator<T,
T, AbstractRich
 	 * @param criterion
 	 */
 	public <X> void setTerminationCriterion(Operator<X> criterion) {
-		CollectorMapOperatorBase<X, Nothing, TerminationCriterionMapper<X>> mapper
=
-				new CollectorMapOperatorBase<X, Nothing, TerminationCriterionMapper<X>>(
+		
+		TypeInformation<X> type = criterion.getOperatorInfo().getOutputType();
+		
+		FlatMapOperatorBase<X, X, TerminationCriterionMapper<X>> mapper =
+				new FlatMapOperatorBase<X, X, TerminationCriterionMapper<X>>(
 						new TerminationCriterionMapper<X>(),
-						new UnaryOperatorInformation<X, Nothing>(criterion.getOperatorInfo().getOutputType(),
new NothingTypeInfo()),
+						new UnaryOperatorInformation<X, X>(type, type),
 						"Termination Criterion Aggregation Wrapper");
 		mapper.setInput(criterion);
 		
@@ -233,7 +232,7 @@ public class BulkIterationBase<T> extends SingleInputOperator<T,
T, AbstractRich
 	/**
 	 * Special Mapper that is added before a termination criterion and is only a container for
an special aggregator
 	 */
-	public static class TerminationCriterionMapper<X> extends AbstractRichFunction implements
Serializable, GenericCollectorMap<X, Nothing> {
+	public static class TerminationCriterionMapper<X> extends AbstractRichFunction implements
FlatMapFunction<X, X> {
 		private static final long serialVersionUID = 1L;
 		
 		private TerminationCriterionAggregator aggregator;
@@ -244,7 +243,7 @@ public class BulkIterationBase<T> extends SingleInputOperator<T,
T, AbstractRich
 		}
 		
 		@Override
-		public void map(X in, Collector<Nothing> out) {
+		public void flatMap(X in, Collector<X> out) {
 			aggregator.aggregate(1L);
 		}
 	}
@@ -280,9 +279,10 @@ public class BulkIterationBase<T> extends SingleInputOperator<T,
T, AbstractRich
 	/**
 	 * Convergence for the termination criterion is reached if no tuple is output at current
iteration for the termination criterion branch
 	 */
-	@SuppressWarnings("serial")
 	public static class TerminationCriterionAggregationConvergence implements ConvergenceCriterion<LongValue>
{
 
+		private static final long serialVersionUID = 1L;
+		
 		private static final Logger log = LoggerFactory.getLogger(TerminationCriterionAggregationConvergence.class);
 
 		@Override
@@ -293,12 +293,7 @@ public class BulkIterationBase<T> extends SingleInputOperator<T,
T, AbstractRich
 				log.info("Termination criterion stats in iteration [" + iteration + "]: " + count);
 			}
 
-			if(count == 0) {
-				return true;
-			}
-			else {
-				return false;
-			}
+			return count == 0;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f2544c/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 8e7a58b..61e10d4 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -64,6 +64,10 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 		setTaskManagerNumSlots(degreeOfParallelism);
 	}
 	
+	public int getDegreeOfParallelism() {
+		return isCollectionExecution ? 1 : degreeOfParallelism;
+	}
+	
 	public JobExecutionResult getLatestExecutionResult() {
 		return this.latestExecutionResult;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f2544c/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index f26e8ff..e5c2108 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -16,49 +16,32 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.accumulators;
 
 import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
 import org.apache.flink.api.common.accumulators.Histogram;
 import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.util.SerializableHashSet;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.SimpleStringUtils;
 import org.junit.Assert;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -70,22 +53,17 @@ import com.google.common.collect.Sets;
  * TODO Test conflict when different UDFs write to accumulator with same name
  * but with different type. The conflict will occur in JobManager while merging.
  */
-@RunWith(Parameterized.class)
-public class AccumulatorITCase extends RecordAPITestBase {
+@SuppressWarnings("serial")
+public class AccumulatorITCase extends JavaProgramTestBase {
 
 	private static final String INPUT = "one\n" + "two two\n" + "three three three\n";
 	private static final String EXPECTED = "one 1\ntwo 2\nthree 3\n";
-	
-	private static final int DOP = 2;
 
-	protected String dataPath;
-	protected String resultPath;
-	
-	public AccumulatorITCase(Configuration config) {
-		super(config);
-		setTaskManagerNumSlots(DOP);
-	}
+	private String dataPath;
+	private String resultPath;
 
+	private JobExecutionResult result;
+	
 	@Override
 	protected void preSubmit() throws Exception {
 		dataPath = createTempFile("datapoints.txt", INPUT);
@@ -98,12 +76,12 @@ public class AccumulatorITCase extends RecordAPITestBase {
 		
 		// Test accumulator results
 		System.out.println("Accumulator results:");
-		JobExecutionResult res = getJobExecutionResult();
+		JobExecutionResult res = this.result;
 		System.out.println(AccumulatorHelper.getResultsFormated(res.getAllAccumulatorResults()));
 		
 		Assert.assertEquals(new Integer(3), (Integer) res.getAccumulatorResult("num-lines"));
 
-		Assert.assertEquals(new Double(DOP), (Double)res.getAccumulatorResult("open-close-counter"));
+		Assert.assertEquals(new Double(getDegreeOfParallelism()), (Double)res.getAccumulatorResult("open-close-counter"));
 		
 		// Test histogram (words per line distribution)
 		Map<Integer, Integer> dist = Maps.newHashMap();
@@ -111,65 +89,40 @@ public class AccumulatorITCase extends RecordAPITestBase {
 		Assert.assertEquals(dist, res.getAccumulatorResult("words-per-line"));
 		
 		// Test distinct words (custom accumulator)
-		Set<StringRecord> distinctWords = Sets.newHashSet();
-		distinctWords.add(new StringRecord("one"));
-		distinctWords.add(new StringRecord("two"));
-		distinctWords.add(new StringRecord("three"));
+		Set<StringValue> distinctWords = Sets.newHashSet();
+		distinctWords.add(new StringValue("one"));
+		distinctWords.add(new StringValue("two"));
+		distinctWords.add(new StringValue("three"));
 		Assert.assertEquals(distinctWords, res.getAccumulatorResult("distinct-words"));
 	}
 
 	@Override
-	protected Plan getTestJob() {
-		Plan plan = getTestPlanPlan(config.getInteger("IterationAllReducer#NoSubtasks", 1), dataPath,
resultPath);
-		return plan;
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() {
-		Configuration config1 = new Configuration();
-		config1.setInteger("IterationAllReducer#NoSubtasks", DOP);
-		return toParameterList(config1);
-	}
-	
-	static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		
-		FileDataSource source = new FileDataSource(new TextInputFormat(), input, "Input Lines");
-		source.setParameter(TextInputFormat.CHARSET_NAME, "ASCII");
-		MapOperator mapper = MapOperator.builder(new TokenizeLine())
-			.input(source)
-			.name("Tokenize Lines")
-			.build();
-		ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
-			.input(mapper)
-			.name("Count Words")
-			.build();
-		@SuppressWarnings("unchecked")
-		FileDataSink out = new FileDataSink(new CsvOutputFormat("\n"," ", StringValue.class, IntValue.class),
output, reducer, "Word Counts");
-
-		Plan plan = new Plan(out, "WordCount Example");
-		plan.setDefaultParallelism(numSubTasks);
+		DataSet<String> input = env.readTextFile(dataPath); 
 		
-		return plan;
+		input.flatMap(new TokenizeLine())
+			.groupBy(0)
+			.reduceGroup(new CountWords())
+			.writeAsCsv(resultPath, "\n", " ");
+		
+		this.result = env.execute();
 	}
 	
-	public static class TokenizeLine extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		private final Record outputRecord = new Record();
-		private StringValue word;
-		private final IntValue one = new IntValue(1);
-		private final SimpleStringUtils.WhitespaceTokenizer tokenizer = new SimpleStringUtils.WhitespaceTokenizer();
+	public static class TokenizeLine extends RichFlatMapFunction<String, Tuple2<String,
Integer>> {
 
 		// Needs to be instantiated later since the runtime context is not yet
 		// initialized at this place
-		IntCounter cntNumLines = null;
-		Histogram wordsPerLineDistribution = null;
+		private IntCounter cntNumLines;
+		private Histogram wordsPerLineDistribution;
 
 		// This counter will be added without convenience functions
-		DoubleCounter openCloseCounter = new DoubleCounter();
-		private SetAccumulator<StringRecord> distinctWords = null;
-    
+		private DoubleCounter openCloseCounter = new DoubleCounter();
+		private SetAccumulator<StringValue> distinctWords;
+
 		@Override
-		public void open(Configuration parameters) throws Exception {
+		public void open(Configuration parameters) {
 		  
 			// Add counters using convenience functions
 			this.cntNumLines = getRuntimeContext().getIntCounter("num-lines");
@@ -180,7 +133,7 @@ public class AccumulatorITCase extends RecordAPITestBase {
 
 			// Add custom counter. Didn't find a way to do this with
 			// getAccumulator()
-			this.distinctWords = new SetAccumulator<StringRecord>();
+			this.distinctWords = new SetAccumulator<StringValue>();
 			this.getRuntimeContext().addAccumulator("distinct-words", distinctWords);
 
 			// Create counter and test increment
@@ -208,23 +161,13 @@ public class AccumulatorITCase extends RecordAPITestBase {
 		}
 		
 		@Override
-		public void map(Record record, Collector<Record> collector) {
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
 			this.cntNumLines.add(1);
-			
-			StringValue line = record.getField(0, StringValue.class);
-			SimpleStringUtils.replaceNonWordChars(line, ' ');
-			SimpleStringUtils.toLowerCase(line);
-			this.tokenizer.setStringToTokenize(line);
 			int wordsPerLine = 0;
-			this.word = new StringValue();
-			while (tokenizer.next(this.word))
-			{
-				// Use custom counter
-				distinctWords.add(new StringRecord(this.word.getValue()));
-  
-				this.outputRecord.setField(0, this.word);
-				this.outputRecord.setField(1, this.one);
-				collector.collect(this.outputRecord);
+			
+			for (String token : value.toLowerCase().split("\\W+")) {
+				distinctWords.add(new StringValue(token));
+				out.collect(new Tuple2<String, Integer>(token, 1));
 				++ wordsPerLine;
 			}
 			wordsPerLineDistribution.add(wordsPerLine);
@@ -238,48 +181,39 @@ public class AccumulatorITCase extends RecordAPITestBase {
 		}
 	}
 
-	@Combinable
-	@ConstantFields(0)
-	public static class CountWords extends ReduceFunction implements Serializable {
-		
-		private static final long serialVersionUID = 1L;
-		
-		private final IntValue cnt = new IntValue();
+	
+	public static class CountWords extends RichGroupReduceFunction<Tuple2<String, Integer>,
Tuple2<String, Integer>> {
 		
-		private IntCounter reduceCalls = null;
-		private IntCounter combineCalls = null;
+		private IntCounter reduceCalls;
+		private IntCounter combineCalls;
 		
 		@Override
-		public void open(Configuration parameters) throws Exception {
+		public void open(Configuration parameters) {
 			this.reduceCalls = getRuntimeContext().getIntCounter("reduce-calls");
 			this.combineCalls = getRuntimeContext().getIntCounter("combine-calls");
 		}
 		
 		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws
Exception {
+		public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String,
Integer>> out) {
 			reduceCalls.add(1);
-			reduceInternal(records, out);
+			reduceInternal(values, out);
 		}
 		
 		@Override
-		public void combine(Iterator<Record> records, Collector<Record> out) throws
Exception {
+		public void combine(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String,
Integer>> out) {
 			combineCalls.add(1);
-			reduceInternal(records, out);
+			reduceInternal(values, out);
 		}
 		
-		private void reduceInternal(Iterator<Record> records, Collector<Record> out)
{
-			Record element = null;
+		private void reduceInternal(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String,
Integer>> out) {
 			int sum = 0;
+			String key = null;
 			
-			while (records.hasNext()) {
-				element = records.next();
-				IntValue i = element.getField(1, IntValue.class);
-				sum += i.getValue();
+			for (Tuple2<String, Integer> e : values) {
+				key = e.f0;
+				sum += e.f1;
 			}
-
-			this.cnt.setValue(sum);
-			element.setField(1, this.cnt);
-			out.collect(element);
+			out.collect(new Tuple2<String, Integer>(key, sum));
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f2544c/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
index 155315c..cf59a3f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DeltaPageRankITCase.java
@@ -16,20 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.iterative;
 
-import java.util.Collection;
-
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents;
 import org.apache.flink.test.util.RecordAPITestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
-@RunWith(Parameterized.class)
+
 public class DeltaPageRankITCase extends RecordAPITestBase {
 	
 	protected String verticesPath;
@@ -38,12 +31,6 @@ public class DeltaPageRankITCase extends RecordAPITestBase {
 	
 	protected String resultPath;
 	
-	
-	public DeltaPageRankITCase(Configuration config) {
-		super(config);
-		setTaskManagerNumSlots(DOP);
-	}
-	
 	@Override
 	protected void preSubmit() throws Exception {
 		verticesPath = createTempFile("vertices.txt", INITIAL_VERTICES_WITH_RANK);
@@ -55,10 +42,7 @@ public class DeltaPageRankITCase extends RecordAPITestBase {
 	
 	@Override
 	protected Plan getTestJob() {
-		int dop = config.getInteger("NumSubtasks", 1);
-		int maxIterations = config.getInteger("NumIterations", 1);
-		
-		String[] params = { String.valueOf(dop) , verticesPath, edgesPath, resultPath, String.valueOf(maxIterations)
};
+		String[] params = { String.valueOf(DOP) , verticesPath, edgesPath, resultPath, "3" };
 		
 		WorksetConnectedComponents cc = new WorksetConnectedComponents();
 		return cc.getPlan(params);
@@ -68,14 +52,6 @@ public class DeltaPageRankITCase extends RecordAPITestBase {
 	protected void postSubmit() throws Exception {
 //		compareResultsByLinesInMemory(RESULT_RANKS, resultPath);
 	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() {
-		Configuration config1 = new Configuration();
-		config1.setInteger("NumSubtasks", DOP);
-		config1.setInteger("NumIterations", 3);
-		return toParameterList(config1);
-	}
 	
 	
 	private static final String INITIAL_VERTICES_WITH_RANK = "1 0.025\n" +

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f2544c/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
index ff76a7b..faaa541 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
@@ -51,6 +51,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends
JavaP
 	@Override
 	protected void preSubmit() throws Exception {
 		// vertices input
+		verticesInput.clear();
 		verticesInput.add(new Tuple2<Long, Long>(1l,1l));
 		verticesInput.add(new Tuple2<Long, Long>(2l,2l));
 		verticesInput.add(new Tuple2<Long, Long>(3l,3l));
@@ -62,6 +63,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends
JavaP
 		verticesInput.add(new Tuple2<Long, Long>(9l,9l));
 
 		// vertices input
+		edgesInput.clear();
 		edgesInput.add(new Tuple2<Long, Long>(1l,2l));
 		edgesInput.add(new Tuple2<Long, Long>(1l,3l));
 		edgesInput.add(new Tuple2<Long, Long>(2l,3l));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f2544c/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
index 5946275..4d890e9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
@@ -54,6 +54,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends
Java
 	@Override
 	protected void preSubmit() throws Exception {
 		// vertices input
+		verticesInput.clear();
 		verticesInput.add(new Tuple2<Long, Long>(1l,1l));
 		verticesInput.add(new Tuple2<Long, Long>(2l,2l));
 		verticesInput.add(new Tuple2<Long, Long>(3l,3l));
@@ -65,6 +66,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends
Java
 		verticesInput.add(new Tuple2<Long, Long>(9l,9l));
 
 		// vertices input
+		edgesInput.clear();
 		edgesInput.add(new Tuple2<Long, Long>(1l,2l));
 		edgesInput.add(new Tuple2<Long, Long>(1l,3l));
 		edgesInput.add(new Tuple2<Long, Long>(2l,3l));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f2544c/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
index ea19c27..2e70eb0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.util;
 
 import org.junit.Assert;
@@ -56,57 +55,63 @@ public abstract class FailingTestBase extends RecordAPITestBase {
 	
 	/**
 	 * Tests that both jobs, the failing and the working one, are handled correctly.
-	 * The first (failing) job must be canceled and the Nephele client must report the failure.
+	 * The first (failing) job must be canceled and the client must report the failure.
 	 * The second (working) job must finish successfully and compute the correct result.
-	 * A timeout waits for the successful return for the Nephele client. In case of a deadlock

+	 * A timeout waits for the successful return for the client. In case of a deadlock 
 	 * (or too small value for timeout) the time runs out and this test fails. 
 	 * 
 	 */
 	@Override
 	public void testJob() throws Exception {
-		// pre-submit
-		try {
-			preSubmit();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Pre-submit work caused an error: " + e.getMessage());
-		}
-		
-		// init submission thread
-		SubmissionThread st = new SubmissionThread(Thread.currentThread(), this.executor, getFailingJobGraph(),
getJobGraph());
-		// start submission thread
-		st.start();
-		
+		startCluster();
 		try {
-			// wait for timeout
-			Thread.sleep(getTimeout()*1000);
-			Assert.fail("Failing job and successful job did not fail.");
-		} catch(InterruptedException ie) {
-			// will have happened if all works fine
-		}
-		
-		Exception cte = st.error;
-		if (cte != null) {
-			cte.printStackTrace();
-			Assert.fail("Task Canceling failed: " + cte.getMessage());
-		}
-		
-		// post-submit
-		try {
-			postSubmit();
+			// pre-submit
+			try {
+				preSubmit();
+			}
+			catch (Exception e) {
+				System.err.println(e.getMessage());
+				e.printStackTrace();
+				Assert.fail("Pre-submit work caused an error: " + e.getMessage());
+			}
+			
+			// init submission thread
+			SubmissionThread st = new SubmissionThread(Thread.currentThread(), this.executor, getFailingJobGraph(),
getJobGraph());
+			// start submission thread
+			st.start();
+			
+			try {
+				// wait for timeout
+				Thread.sleep(getTimeout()*1000);
+				Assert.fail("Failing job and successful job did not fail.");
+			} catch(InterruptedException ie) {
+				// will have happened if all works fine
+			}
+			
+			Exception cte = st.error;
+			if (cte != null) {
+				cte.printStackTrace();
+				Assert.fail("Task Canceling failed: " + cte.getMessage());
+			}
+			
+			// post-submit
+			try {
+				postSubmit();
+			}
+			catch (Exception e) {
+				System.err.println(e.getMessage());
+				e.printStackTrace();
+				Assert.fail("Post-submit work caused an error: " + e.getMessage());
+			}
 		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Post-submit work caused an error: " + e.getMessage());
+		finally {
+			stopCluster();
 		}
 	}
 	
 	/**
 	 * Thread for submitting both jobs sequentially to the test cluster.
-	 * First, the failing job is submitted. The working job is submitted after the Nephele client
returns 
+	 * First, the failing job is submitted. The working job is submitted after the client returns

 	 * from the call of its submitJobAndWait() method. 
 	 */
 	private class SubmissionThread extends Thread {


Mime
View raw message