flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/5] flink git commit: [FLINK-3242] Also Set User-specified StateBackend without Checkpointing
Date Thu, 28 Jan 2016 15:32:18 GMT
[FLINK-3242] Also Set User-specified StateBackend without Checkpointing

Before, the user-specified StateBackedn would not be set when generating the
JobGraph if checkpointing was disabled.

This closes #1516


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83b88c2c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83b88c2c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83b88c2c

Branch: refs/heads/master
Commit: 83b88c2c606f0d36bc04a7250629eb00516af919
Parents: f6d2ce9
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Jan 18 11:53:31 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jan 28 14:30:28 2016 +0100

----------------------------------------------------------------------
 .../api/graph/StreamingJobGraphGenerator.java   |   2 +-
 .../runtime/state/StateBackendITCase.java       | 134 +++++++++++++++++++
 2 files changed, 135 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/83b88c2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 50c6a15..56b16a4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -317,10 +317,10 @@ public class StreamingJobGraphGenerator {
 
 		final CheckpointConfig ceckpointCfg = streamGraph.getCheckpointConfig();
 		
+		config.setStateBackend(streamGraph.getStateBackend());
 		config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled());
 		if (ceckpointCfg.isCheckpointingEnabled()) {
 			config.setCheckpointMode(ceckpointCfg.getCheckpointingMode());
-			config.setStateBackend(streamGraph.getStateBackend());
 		}
 		else {
 			// the "at-least-once" input handler is slightly cheaper (in the absence of checkpoints),

http://git-wip-us.apache.org/repos/asf/flink/blob/83b88c2c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
new file mode 100644
index 0000000..cdfef85
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.state;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
+
+	/**
+	 * Verify that the user-specified state backend is used even if checkpointing is disabled.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testStateBackendWithoutCheckpointing() throws Exception {
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(1);
+
+		see.setNumberOfExecutionRetries(0);
+		see.setStateBackend(new FailingStateBackend());
+
+
+		see.fromElements(new Tuple2<>("Hello", 1))
+			.keyBy(0)
+			.map(new RichMapFunction<Tuple2<String,Integer>, String>() {
+				private static final long serialVersionUID = 1L;
+
+				@Override
+				public void open(Configuration parameters) throws Exception {
+					super.open(parameters);
+					getRuntimeContext().getKeyValueState("test", String.class, "");
+				}
+
+				@Override
+				public String map(Tuple2<String, Integer> value) throws Exception {
+					return value.f0;
+				}
+			})
+			.print();
+
+		boolean caughtSuccess = false;
+		try {
+			see.execute();
+		} catch (JobExecutionException e) {
+			if (e.getCause() instanceof SuccessException) {
+				caughtSuccess = true;
+			} else {
+				throw e;
+			}
+		}
+
+		assertTrue(caughtSuccess);
+	}
+
+
+	public static class FailingStateBackend extends StateBackend<FailingStateBackend>
{
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void initializeForJob(Environment env) throws Exception {
+			throw new SuccessException();
+		}
+
+		@Override
+		public void disposeAllStateForCurrentJob() throws Exception {
+
+		}
+
+		@Override
+		public void close() throws Exception {
+
+		}
+
+		@Override
+		public <K, V> KvState<K, V, FailingStateBackend> createKvState(String stateId,
+			String stateName,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<V> valueSerializer,
+			V defaultValue) throws Exception {
+			return null;
+		}
+
+		@Override
+		public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID,
+			long timestamp) throws Exception {
+			return null;
+		}
+
+		@Override
+		public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S
state,
+			long checkpointID,
+			long timestamp) throws Exception {
+			return null;
+		}
+	}
+
+	static final class SuccessException extends Exception {
+		private static final long serialVersionUID = -9218191172606739598L;
+	}
+
+}


Mime
View raw message