flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-4788] [streaming api] Fix state backend classloading from configuration
Date Mon, 10 Oct 2016 18:03:16 GMT
Repository: flink
Updated Branches:
  refs/heads/master 1836e08f0 -> 9e17cbd6b


[FLINK-4788] [streaming api] Fix state backend classloading from configuration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e17cbd6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e17cbd6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e17cbd6

Branch: refs/heads/master
Commit: 9e17cbd6b768f73299a6a344fdf44539802fb76c
Parents: 1836e08
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Oct 10 14:33:57 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Oct 10 20:02:33 2016 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     |  3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java | 95 +++++++++-----------
 2 files changed, 43 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e17cbd6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8ada6d3..4893fed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -752,8 +752,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				backendName = "jobmanager";
 			}
 
-			backendName = backendName.toLowerCase();
-			switch (backendName) {
+			switch (backendName.toLowerCase()) {
 				case "jobmanager":
 					LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
 					stateBackend = new MemoryStateBackend();

http://git-wip-us.apache.org/repos/asf/flink/blob/9e17cbd6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 47a4090..8aae19f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import akka.actor.ActorRef;
-
 import akka.dispatch.Futures;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
@@ -44,8 +43,10 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackendFactory;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -60,10 +61,10 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
@@ -75,22 +76,20 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.PriorityQueue;
-import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class StreamTaskTest {
 
-		/**
+	/**
 	 * This test checks that cancel calls that are issued before the operator is
 	 * instantiated still lead to proper canceling.
 	 */
@@ -101,7 +100,7 @@ public class StreamTaskTest {
 		cfg.setStreamOperator(new SlowlyDeserializingOperator());
 		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-		Task task = createTask(SourceStreamTask.class, cfg);
+		Task task = createTask(SourceStreamTask.class, cfg, new Configuration());
 
 		TestingExecutionStateListener testingExecutionStateListener = new TestingExecutionStateListener();
 
@@ -137,6 +136,24 @@ public class StreamTaskTest {
 		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
 	}
 
+	@Test
+	public void testStateBackendLoading() throws Exception {
+		Configuration taskManagerConfig = new Configuration();
+		taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
+
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
+		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		Task task = createTask(SourceStreamTask.class, cfg, taskManagerConfig);
+
+		task.startTaskThread();
+
+		// wait for clean termination
+		task.getExecutingThread().join();
+		assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+	}
+
 
 	// ------------------------------------------------------------------------
 	//  Test Utilities
@@ -144,9 +161,9 @@ public class StreamTaskTest {
 
 	private static class TestingExecutionStateListener implements TaskExecutionStateListener
{
 
-		ExecutionState executionState = null;
+		private ExecutionState executionState = null;
 
-		PriorityQueue<Tuple2<ExecutionState, Promise<ExecutionState>>> priorityQueue
= new PriorityQueue<>(
+		private final PriorityQueue<Tuple2<ExecutionState, Promise<ExecutionState>>>
priorityQueue = new PriorityQueue<>(
 			1,
 			new Comparator<Tuple2<ExecutionState, Promise<ExecutionState>>>() {
 				@Override
@@ -183,7 +200,11 @@ public class StreamTaskTest {
 		}
 	}
 
-	private Task createTask(Class<? extends AbstractInvokable> invokable, StreamConfig
taskConfig) throws Exception {
+	private Task createTask(
+			Class<? extends AbstractInvokable> invokable,
+			StreamConfig taskConfig,
+			Configuration taskManagerConfig) throws Exception {
+
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
 		
@@ -191,6 +212,7 @@ public class StreamTaskTest {
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
 		PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class);
 		Executor executor = mock(Executor.class);
+
 		NetworkEnvironment network = mock(NetworkEnvironment.class);
 		when(network.getResultPartitionManager()).thenReturn(partitionManager);
 		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
@@ -221,8 +243,8 @@ public class StreamTaskTest {
 			mock(CheckpointResponder.class),
 			libCache,
 			mock(FileCache.class),
-			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
-			mock(TaskMetricGroup.class),
+			new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
+			new UnregisteredTaskMetricsGroup(),
 			consumableNotifier,
 			partitionStateChecker,
 			executor);
@@ -279,45 +301,12 @@ public class StreamTaskTest {
 		public void cancel() {}
 	}
 
-	// ------------------------------------------------------------------------
-	//  Test JobManager/TaskManager gateways
-	// ------------------------------------------------------------------------
-	
-	private static class DummyGateway implements ActorGateway {
+	public static final class MockStateBackend implements StateBackendFactory<AbstractStateBackend>
{
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Future<Object> ask(Object message, FiniteDuration timeout) {
-			return null;
-		}
-
-		@Override
-		public void tell(Object message) {}
-
-		@Override
-		public void tell(Object message, ActorGateway sender) {}
-
-		@Override
-		public void forward(Object message, ActorGateway sender) {}
-
-		@Override
-		public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout,
ExecutionContext executionContext) {
-			return null;
-		}
-
-		@Override
-		public String path() {
-			return null;
-		}
-
-		@Override
-		public ActorRef actor() {
-			return null;
-		}
-
-		@Override
-		public UUID leaderSessionID() {
-			return null;
+		public AbstractStateBackend createFromConfig(Configuration config) throws Exception {
+			return mock(AbstractStateBackend.class);
 		}
 	}
 }


Mime
View raw message