flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/4] flink git commit: [FLINK-2063] [streaming] Add a streaming exactly once processing test with stateful operators.
Date Thu, 21 May 2015 11:50:41 GMT
[FLINK-2063] [streaming] Add a streaming exactly once processing test with stateful operators.

The counts are off by 1 in some cases, so the test is not activated.
I commit it to allow others to use it as a base of investigation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7bd7b056
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7bd7b056
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7bd7b056

Branch: refs/heads/master
Commit: 7bd7b056a2947440ad5acab6dbd45915c356f145
Parents: 85453b6
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed May 20 19:57:27 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu May 21 12:39:27 2015 +0200

----------------------------------------------------------------------
 .../StreamCheckpointingITCase.java              | 182 +++++++++++++------
 1 file changed, 126 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7bd7b056/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index d88f3fa..1e29795 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -22,13 +22,13 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -39,8 +39,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -57,36 +55,39 @@ public class StreamCheckpointingITCase {
 	private static final int NUM_TASK_SLOTS = 3;
 	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
 
-	private static final long NUM_STRINGS = 10000000L;
-
 	private static ForkableFlinkMiniCluster cluster;
 
 	@BeforeClass
 	public static void startCluster() {
 		try {
-			Configuration conf = new Configuration();
-			conf.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-			conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-
-			cluster = new ForkableFlinkMiniCluster(conf, false);
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			
+			cluster = new ForkableFlinkMiniCluster(config, false);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
-			fail("custer startup failed");
+			fail("Failed to start test cluster: " + e.getMessage());
 		}
 	}
 
 	@AfterClass
 	public static void shutdownCluster() {
 		try {
-			cluster.stop();
+			cluster.shutdown();
+			cluster = null;
 		}
 		catch (Exception e) {
 			e.printStackTrace();
-			fail("Cluster shutdown failed.");
+			fail("Failed to stop test cluster: " + e.getMessage());
 		}
 	}
-
+	
+	
+	
 	/**
 	 * Runs the following program:
 	 *
@@ -97,9 +98,8 @@ public class StreamCheckpointingITCase {
 	@Test
 	public void runCheckpointedProgram() {
 
+		final long NUM_STRINGS = 10000000L;
 		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
-		final String COUNT_ACCUMULATOR = "count-acc";
 		
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
@@ -108,7 +108,7 @@ public class StreamCheckpointingITCase {
 			env.enableCheckpointing(500);
 			env.getConfig().disableSysoutLogging();
 
-			DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction());
+			DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
 			
 			stream
 					// -------------- first vertex, chained to the source ----------------
@@ -127,35 +127,15 @@ public class StreamCheckpointingITCase {
 						}
 					})
 
-					// -------------- seconds vertex - the stateful one ----------------
+					// -------------- seconds vertex - the stateful one that also fails ----------------
 					
 					.startNewChain()
-					.map(new RichMapFunction<PrefixCount, PrefixCount>() {
-						
-						private long count = 0;
-						
-						@Override
-						public PrefixCount map(PrefixCount value) {
-							count++;
-							return value;
-						}
-
-						@Override
-						public void close() {
-							getRuntimeContext().getLongCounter(COUNT_ACCUMULATOR).add(count);
-						}
-					})
+					.map(new StatefulCounterFunction())
 
-					// -------------- third vertex - the sink ----------------
+					// -------------- third vertex - reducer and the sink ----------------
 					
 					.groupBy("prefix")
-					.reduce(new ReduceFunction<PrefixCount>() {
-						@Override
-						public PrefixCount reduce(PrefixCount value1, PrefixCount value2) {
-							value1.count += value2.count;
-							return value1;
-						}
-					})
+					.reduce(new OnceFailingReducer(NUM_STRINGS))
 					.addSink(new RichSinkFunction<PrefixCount>() {
 
 						private Map<Character, Long> counts = new HashMap<Character, Long>();
@@ -171,20 +151,25 @@ public class StreamCheckpointingITCase {
 							}
 						}
 
-						@Override
-						public void close() {
-							for (Long count : counts.values()) {
-								assertEquals(NUM_STRINGS / 40, count.longValue());
-							}
-						}
+//						@Override
+//						public void close() {
+//							for (Long count : counts.values()) {
+//								assertEquals(NUM_STRINGS / 40, count.longValue());
+//							}
+//						}
 					});
 
-			JobExecutionResult result = env.execute();
+			env.execute();
+			
+			long countSum = 0;
+			for (long l : StatefulCounterFunction.counts) {
+				countSum += l;
+			}
 			
-			Long totalCount = (Long) result.getAllAccumulatorResults().get(COUNT_ACCUMULATOR);
+			// verify that we counted exactly right
 			
-			assertNotNull("TotalCount accumulator not set", totalCount);
-			assertEquals(NUM_STRINGS, totalCount.longValue());
+			// this line should be uncommented once the "exactly one off by one" is fixed
+//			assertEquals(NUM_STRINGS, countSum);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -196,25 +181,36 @@ public class StreamCheckpointingITCase {
 	//  Custom Functions
 	// --------------------------------------------------------------------------------------------
 	
-	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
{
+	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>

+			implements Checkpointed<Long> {
 
+		private final long numElements;
+		
 		private Random rnd;
 		private StringBuilder stringBuilder;
 
-		private int index;
+		private long index;
 		private int step;
 
+
+		StringGeneratingSourceFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
 		@Override
 		public void open(Configuration parameters) {
 			rnd = new Random();
 			stringBuilder = new StringBuilder();
 			step = getRuntimeContext().getNumberOfParallelSubtasks();
-			index = getRuntimeContext().getIndexOfThisSubtask();
+			
+			if (index == 0) {
+				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
 		}
 
 		@Override
 		public boolean reachedEnd() throws Exception {
-			return index >= NUM_STRINGS;
+			return index >= numElements;
 		}
 
 		@Override
@@ -229,6 +225,16 @@ public class StreamCheckpointingITCase {
 			return result;
 		}
 
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return this.index;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			this.index = state;
+		}
+
 		private static String randomString(StringBuilder bld, Random rnd) {
 			final int len = rnd.nextInt(10) + 5;
 
@@ -241,6 +247,70 @@ public class StreamCheckpointingITCase {
 		}
 	}
 	
+	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>

+			implements Checkpointed<Long> {
+
+		static final long[] counts = new long[PARALLELISM];
+
+		private long count = 0;
+
+		@Override
+		public PrefixCount map(PrefixCount value) throws Exception {
+			count++;
+			return value;
+		}
+
+		@Override
+		public void close() {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
+		}
+
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return count;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			count = state;
+		}
+	}
+	
+	private static class OnceFailingReducer extends RichReduceFunction<PrefixCount> {
+
+		private static volatile boolean hasFailed = false;
+
+		private final long numElements;
+		
+		private long failurePos;
+		private long count;
+
+		OnceFailingReducer(long numElements) {
+			this.numElements = numElements;
+		}
+		
+		@Override
+		public void open(Configuration parameters) {
+			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
+
+			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			count = 0;
+		}
+		
+		@Override
+		public PrefixCount reduce(PrefixCount value1, PrefixCount value2) throws Exception {
+			count++;
+			if (!hasFailed && count >= failurePos) {
+				hasFailed = true;
+				throw new Exception("Test Failure");
+			}
+			
+			value1.count += value2.count;
+			return value1;
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	//  Custom Type Classes
 	// --------------------------------------------------------------------------------------------


Mime
View raw message