flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [2/3] flink git commit: [FLINK-2324] [streaming] ITCase added for partitioned states
Date Thu, 30 Jul 2015 14:45:39 GMT
[FLINK-2324] [streaming] ITCase added for partitioned states


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58cd4ea6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58cd4ea6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58cd4ea6

Branch: refs/heads/master
Commit: 58cd4ea615bbbbec682ec82f6380bc30c12c5899
Parents: 0558644
Author: Gyula Fora <gyfora@apache.org>
Authored: Tue Jul 28 15:46:13 2015 +0200
Committer: Gyula Fora <gyfora@apache.org>
Committed: Thu Jul 30 16:44:53 2015 +0200

----------------------------------------------------------------------
 .../PartitionedStateCheckpointingITCase.java    | 257 +++++++++++++++++++
 1 file changed, 257 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58cd4ea6/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
new file mode 100644
index 0000000..88361e2
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * A simple test that runs a streaming topology with checkpointing enabled.
+ * 
+ * The test triggers a failure after a while and verifies that, after
+ * completion, the state reflects the "exactly once" semantics.
+ * 
+ * It is designed to check partitioned states.
+ */
+@SuppressWarnings("serial")
+public class PartitionedStateCheckpointingITCase {
+
+	private static final int NUM_TASK_MANAGERS = 2;
+	private static final int NUM_TASK_SLOTS = 3;
+	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+
+	private static ForkableFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void startCluster() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+
+			cluster = new ForkableFlinkMiniCluster(config, false);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to start test cluster: " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void shutdownCluster() {
+		try {
+			cluster.shutdown();
+			cluster = null;
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to stop test cluster: " + e.getMessage());
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void runCheckpointedProgram() {
+
+		final long NUM_STRINGS = 10000000L;
+		assertTrue("Broken test setup", (NUM_STRINGS/2) % 40 == 0);
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
+					cluster.getJobManagerRPCPort());
+			env.setParallelism(PARALLELISM);
+			env.enableCheckpointing(500);
+			env.getConfig().disableSysoutLogging();
+
+			DataStream<Integer> stream1 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS
/ 2));
+			DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS
/ 2));
+
+			stream1.union(stream2)
+					.groupBy(new IdentityKeySelector<Integer>())
+					.map(new OnceFailingPartitionedSum(NUM_STRINGS))
+					.keyBy(0)
+					.addSink(new CounterSink());
+
+			env.execute();
+
+			// verify that we counted exactly right
+			for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.allSums.entrySet()) {
+				assertEquals(new Long(sum.getKey() * NUM_STRINGS / 40), sum.getValue());
+			}
+			System.out.println("new");
+			for (Long count : CounterSink.allCounts.values()) {
+				assertEquals(new Long(NUM_STRINGS / 40), count);
+			}
+			
+			assertEquals(40, CounterSink.allCounts.size());
+			assertEquals(40, OnceFailingPartitionedSum.allSums.size());
+			
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Custom Functions
+	// --------------------------------------------------------------------------------------------
+
+	private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer>
{
+
+		private final long numElements;
+
+		private OperatorState<Integer> index;
+		private int step;
+
+		private volatile boolean isRunning;
+
+		static final long[] counts = new long[PARALLELISM];
+
+		@Override
+		public void close() throws IOException {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = index.value();
+		}
+
+		IntGeneratingSourceFunction(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws IOException {
+			step = getRuntimeContext().getNumberOfParallelSubtasks();
+
+			index = getRuntimeContext().getOperatorState("index",
+					getRuntimeContext().getIndexOfThisSubtask(), false);
+
+			isRunning = true;
+		}
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			final Object lockingObject = ctx.getCheckpointLock();
+
+			while (isRunning && index.value() < numElements) {
+
+				synchronized (lockingObject) {
+					index.update(index.value() + step);
+					ctx.collect(index.value() % 40);
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+	}
+
+	private static class OnceFailingPartitionedSum extends RichMapFunction<Integer, Tuple2<Integer,
Long>> {
+
+		private static Map<Integer, Long> allSums = new ConcurrentHashMap<Integer, Long>();
+		private static volatile boolean hasFailed = false;
+
+		private final long numElements;
+
+		private long failurePos;
+		private long count;
+
+		private OperatorState<Long> sum;
+
+		OnceFailingPartitionedSum(long numElements) {
+			this.numElements = numElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws IOException {
+			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext()
+					.getNumberOfParallelSubtasks());
+			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext()
+					.getNumberOfParallelSubtasks());
+
+			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			count = 0;
+			sum = getRuntimeContext().getOperatorState("sum", 0L, true);
+		}
+
+		@Override
+		public Tuple2<Integer, Long> map(Integer value) throws Exception {
+			count++;
+			if (!hasFailed && count >= failurePos) {
+				hasFailed = true;
+				throw new Exception("Test Failure");
+			}
+
+			long currentSum = sum.value() + value;
+			sum.update(currentSum);
+			allSums.put(value, currentSum);
+			return new Tuple2<Integer, Long>(value, currentSum);
+		}
+	}
+
+	private static class CounterSink extends RichSinkFunction<Tuple2<Integer, Long>>
{
+
+		private static Map<Integer, Long> allCounts = new ConcurrentHashMap<Integer, Long>();
+
+		private OperatorState<Long> counts;
+
+		@Override
+		public void open(Configuration parameters) throws IOException {
+			counts = getRuntimeContext().getOperatorState("count", 0L, true);
+		}
+
+		@Override
+		public void invoke(Tuple2<Integer, Long> value) throws Exception {
+			long currentCount = counts.value() + 1;
+			counts.update(currentCount);
+			allCounts.put(value.f0, currentCount);
+
+		}
+	}
+	
+	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+
+	}
+}


Mime
View raw message