flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-1532] [tests] Fix spurious failure in AggregatorsITCase (plus minor cleanups)
Date Fri, 13 Feb 2015 16:29:02 GMT
[FLINK-1532] [tests] Fix spurious failure in AggregatorsITCase (plus minor cleanups)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a22b71c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a22b71c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a22b71c

Branch: refs/heads/master
Commit: 0a22b71c887749d297e0f00f4bbdc4af58832a48
Parents: 1dafd81
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Feb 13 12:22:35 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Feb 13 17:11:13 2015 +0100

----------------------------------------------------------------------
 .../aggregators/AggregatorsITCase.java          | 25 ++++++++++----------
 1 file changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a22b71c/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 63cac17..9dcf6fc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -37,16 +37,15 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Test the functionality of aggregators in bulk and delta iterative cases.
- *
  */
 @RunWith(Parameterized.class)
 public class AggregatorsITCase extends MultipleProgramsTestBase {
@@ -287,7 +286,6 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 
 		@Override
 		public void open(Configuration conf) {
-
 			aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
 		}
 
@@ -319,10 +317,16 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	@SuppressWarnings("serial")
 	public static final class TupleMakerMap extends RichMapFunction<Integer, Tuple2<Integer,
Integer>> {
 
+		private Random rnd;
+
 		@Override
-		public Tuple2<Integer, Integer> map(Integer value) throws Exception {
-			Random ran = new Random();
-			Integer nodeId = Integer.valueOf(ran.nextInt(100000));
+		public void open(Configuration parameters){
+			rnd = new Random(0xC0FFEBADBEEFDEADL + getRuntimeContext().getIndexOfThisSubtask());
+		}
+
+		@Override
+		public Tuple2<Integer, Integer> map(Integer value) {
+			Integer nodeId = Integer.valueOf(rnd.nextInt(100000));
 			return new Tuple2<Integer, Integer>(nodeId, value);
 		}
 
@@ -337,7 +341,6 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 
 		@Override
 		public void open(Configuration conf) {
-
 			aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
 			superstep = getIterationRuntimeContext().getSuperstepNumber();
 
@@ -366,15 +369,13 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 		private int superstep;
 
 		@Override
-		public void open(Configuration conf) { 
-
+		public void open(Configuration conf) {
 			superstep = getIterationRuntimeContext().getSuperstepNumber();
-
 		}
 
 		@Override
 		public void flatMap(Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>
value,
-				Collector<Tuple2<Integer, Integer>> out) throws Exception {
+				Collector<Tuple2<Integer, Integer>> out) {
 
 			if (value.f0.f1  > superstep) {
 				out.collect(value.f0);


Mime
View raw message