flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/8] flink git commit: [FLINK-4842] Introduce test to enforce order of operator / udf lifecycles
Date Thu, 20 Oct 2016 14:15:19 GMT
Repository: flink
Updated Branches:
  refs/heads/master 428419d59 -> 1e475c768


[FLINK-4842] Introduce test to enforce order of operator / udf lifecycles


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e475c76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e475c76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e475c76

Branch: refs/heads/master
Commit: 1e475c768ae0d7e13746a3ca6aa258141016d419
Parents: cab9cd4
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Thu Oct 13 11:32:19 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Oct 20 16:14:21 2016 +0200

----------------------------------------------------------------------
 .../AbstractUdfStreamOperatorLifecycleTest.java | 293 +++++++++++++++++++
 .../AbstractUdfStreamOperatorTest.java          | 219 --------------
 .../streaming/runtime/tasks/StreamTaskTest.java |   4 +-
 3 files changed, 295 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e475c76/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
new file mode 100644
index 0000000..cbb833b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test secures the lifecycle of AbstractUdfStreamOperator, including it's UDF handling.
+ */
+public class AbstractUdfStreamOperatorLifecycleTest {
+
+	private static final List<String> EXPECTED_CALL_ORDER_FULL = Arrays.asList(
+			"OPERATOR::setup",
+			"UDF::setRuntimeContext",
+			"OPERATOR::initializeState",
+			"OPERATOR::open",
+			"UDF::open",
+			"OPERATOR::run",
+			"UDF::run",
+			"OPERATOR::snapshotState",
+			"OPERATOR::close",
+			"UDF::close",
+			"OPERATOR::dispose");
+
+	private static final List<String> EXPECTED_CALL_ORDER_CANCEL_RUNNING = Arrays.asList(
+			"OPERATOR::setup",
+			"UDF::setRuntimeContext",
+			"OPERATOR::initializeState",
+			"OPERATOR::open",
+			"UDF::open",
+			"OPERATOR::run",
+			"UDF::run",
+			"OPERATOR::cancel",
+			"UDF::cancel",
+			"OPERATOR::dispose",
+			"UDF::close");
+
+	private static final String ALL_METHODS_STREAM_OPERATOR = "[close[], dispose[], getChainingStrategy[],
" +
+			"getMetricGroup[], initializeState[class org.apache.flink.streaming.runtime.tasks.OperatorStateHandles],
" +
+			"notifyOfCompletedCheckpoint[long], open[], setChainingStrategy[class " +
+			"org.apache.flink.streaming.api.operators.ChainingStrategy], setKeyContextElement1[class
" +
+			"org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " +
+			"setKeyContextElement2[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord],
" +
+			"setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
+			"org.apache.flink.streaming.api.graph.StreamConfig, interface " +
+			"org.apache.flink.streaming.api.operators.Output], snapshotState[long, long, " +
+			"interface org.apache.flink.runtime.state.CheckpointStreamFactory]]";
+
+	private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[],
getRuntimeContext[]" +
+			", open[class org.apache.flink.configuration.Configuration], setRuntimeContext[interface
" +
+			"org.apache.flink.api.common.functions.RuntimeContext]]";
+
+	private static final List<String> ACTUAL_ORDER_TRACKING =
+			Collections.synchronizedList(new ArrayList<String>(EXPECTED_CALL_ORDER_FULL.size()));
+
+	@Test
+	public void testAllMethodsRegisteredInTest() {
+		List<String> methodsWithSignatureString = new ArrayList<>();
+		for (Method method : StreamOperator.class.getMethods()) {
+			methodsWithSignatureString.add(method.getName() + Arrays.toString(method.getParameterTypes()));
+		}
+		Collections.sort(methodsWithSignatureString);
+		Assert.assertEquals("It seems like new methods have been introduced to " + StreamOperator.class
+
+				". Please register them with this test and ensure to document their position in the lifecycle
" +
+				"(if applicable).", ALL_METHODS_STREAM_OPERATOR, methodsWithSignatureString.toString());
+
+		methodsWithSignatureString = new ArrayList<>();
+		for (Method method : RichFunction.class.getMethods()) {
+			methodsWithSignatureString.add(method.getName() + Arrays.toString(method.getParameterTypes()));
+		}
+		Collections.sort(methodsWithSignatureString);
+		Assert.assertEquals("It seems like new methods have been introduced to " + RichFunction.class
+
+				". Please register them with this test and ensure to document their position in the lifecycle
" +
+				"(if applicable).", ALL_METHODS_RICH_FUNCTION, methodsWithSignatureString.toString());
+	}
+
+	@Test
+	public void testLifeCycleFull() throws Exception {
+		ACTUAL_ORDER_TRACKING.clear();
+
+		Configuration taskManagerConfig = new Configuration();
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		MockSourceFunction srcFun = new MockSourceFunction();
+
+		cfg.setStreamOperator(new LifecycleTrackingStreamSource(srcFun, true));
+		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		Task task = StreamTaskTest.createTask(SourceStreamTask.class, cfg, taskManagerConfig);
+
+		task.startTaskThread();
+
+		LifecycleTrackingStreamSource.runStarted.await();
+
+		// wait for clean termination
+		task.getExecutingThread().join();
+		assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+		assertEquals(EXPECTED_CALL_ORDER_FULL, ACTUAL_ORDER_TRACKING);
+	}
+
+	@Test
+	public void testLifeCycleCancel() throws Exception {
+		ACTUAL_ORDER_TRACKING.clear();
+
+		Configuration taskManagerConfig = new Configuration();
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		MockSourceFunction srcFun = new MockSourceFunction();
+		cfg.setStreamOperator(new LifecycleTrackingStreamSource(srcFun, false));
+		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		Task task = StreamTaskTest.createTask(SourceStreamTask.class, cfg, taskManagerConfig);
+
+		task.startTaskThread();
+		LifecycleTrackingStreamSource.runStarted.await();
+
+		// this should cancel the task even though it is blocked on runFinished
+		task.cancelExecution();
+
+		// wait for clean termination
+		task.getExecutingThread().join();
+		assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+		assertEquals(EXPECTED_CALL_ORDER_CANCEL_RUNNING, ACTUAL_ORDER_TRACKING);
+	}
+
+	private static class MockSourceFunction extends RichSourceFunction<Long> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void run(SourceContext<Long> ctx) {
+			ACTUAL_ORDER_TRACKING.add("UDF::run");
+		}
+
+		@Override
+		public void cancel() {
+			ACTUAL_ORDER_TRACKING.add("UDF::cancel");
+		}
+
+		@Override
+		public void setRuntimeContext(RuntimeContext t) {
+			ACTUAL_ORDER_TRACKING.add("UDF::setRuntimeContext");
+			super.setRuntimeContext(t);
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			ACTUAL_ORDER_TRACKING.add("UDF::open");
+			super.open(parameters);
+		}
+
+		@Override
+		public void close() throws Exception {
+			ACTUAL_ORDER_TRACKING.add("UDF::close");
+			super.close();
+		}
+	}
+
+	private static class LifecycleTrackingStreamSource<OUT, SRC extends SourceFunction<OUT>>
+			extends StreamSource<OUT, SRC> implements Serializable {
+
+		private static final long serialVersionUID = 2431488948886850562L;
+		private transient Thread testCheckpointer;
+
+		private final boolean simulateCheckpointing;
+
+		static OneShotLatch runStarted;
+		static OneShotLatch runFinish;
+
+		public LifecycleTrackingStreamSource(SRC sourceFunction, boolean simulateCheckpointing)
{
+			super(sourceFunction);
+			this.simulateCheckpointing = simulateCheckpointing;
+			runStarted = new OneShotLatch();
+			runFinish = new OneShotLatch();
+		}
+
+		@Override
+		public void run(Object lockingObject, Output<StreamRecord<OUT>> collector)
throws Exception {
+			ACTUAL_ORDER_TRACKING.add("OPERATOR::run");
+			super.run(lockingObject, collector);
+			runStarted.trigger();
+			runFinish.await();
+		}
+
+		@Override
+		public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>>
output) {
+			ACTUAL_ORDER_TRACKING.add("OPERATOR::setup");
+			super.setup(containingTask, config, output);
+			if (simulateCheckpointing) {
+				testCheckpointer = new Thread() {
+					@Override
+					public void run() {
+						long id = 0;
+						while (true) {
+							try {
+								Thread.sleep(50);
+								if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
+										new CheckpointMetaData(id++, System.currentTimeMillis()))) {
+									LifecycleTrackingStreamSource.runFinish.trigger();
+									break;
+								}
+							} catch (Exception e) {
+								e.printStackTrace();
+								Assert.fail();
+							}
+						}
+					}
+				};
+				testCheckpointer.start();
+			}
+		}
+
+		@Override
+		public void snapshotState(StateSnapshotContext context) throws Exception {
+			ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotState");
+			super.snapshotState(context);
+		}
+
+		@Override
+		public void initializeState(StateInitializationContext context) throws Exception {
+			ACTUAL_ORDER_TRACKING.add("OPERATOR::initializeState");
+			super.initializeState(context);
+		}
+
+		@Override
+		public void open() throws Exception {
+			ACTUAL_ORDER_TRACKING.add("OPERATOR::open");
+			super.open();
+		}
+
+		@Override
+		public void close() throws Exception {
+			ACTUAL_ORDER_TRACKING.add("OPERATOR::close");
+			super.close();
+		}
+
+		@Override
+		public void cancel() {
+			ACTUAL_ORDER_TRACKING.add("OPERATOR::cancel");
+			super.cancel();
+		}
+
+		@Override
+		public void dispose() throws Exception {
+			ACTUAL_ORDER_TRACKING.add("OPERATOR::dispose");
+			super.dispose();
+			if (simulateCheckpointing) {
+				testCheckpointer.join();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e475c76/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorTest.java
deleted file mode 100644
index f5d633c..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.runtime.taskmanager.CheckpointResponder;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.net.URL;
-import java.util.Collections;
-import java.util.concurrent.Executor;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class AbstractUdfStreamOperatorTest {
-
-	@Test
-	public void testLifeCycle() throws Exception {
-
-		Configuration taskManagerConfig = new Configuration();
-
-		StreamConfig cfg = new StreamConfig(new Configuration());
-		cfg.setStreamOperator(new LifecycleTrackingStreamSource(new MockSourceFunction()));
-		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-		Task task = createTask(SourceStreamTask.class, cfg, taskManagerConfig);
-
-		task.startTaskThread();
-
-		// wait for clean termination
-		task.getExecutingThread().join();
-		assertEquals(ExecutionState.FINISHED, task.getExecutionState());
-	}
-
-	private static class MockSourceFunction extends RichSourceFunction<Long> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void run(SourceContext<Long> ctx) {
-		}
-
-		@Override
-		public void cancel() {
-		}
-
-		@Override
-		public void setRuntimeContext(RuntimeContext t) {
-			System.out.println("!setRuntimeContext");
-			super.setRuntimeContext(t);
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			System.out.println("!open");
-			super.open(parameters);
-		}
-
-		@Override
-		public void close() throws Exception {
-			System.out.println("!close");
-			super.close();
-		}
-	}
-
-	private Task createTask(
-			Class<? extends AbstractInvokable> invokable,
-			StreamConfig taskConfig,
-			Configuration taskManagerConfig) throws Exception {
-
-		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
-		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-
-		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
-		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
-		PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class);
-		Executor executor = mock(Executor.class);
-
-		NetworkEnvironment network = mock(NetworkEnvironment.class);
-		when(network.getResultPartitionManager()).thenReturn(partitionManager);
-		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
-		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
-				.thenReturn(mock(TaskKvStateRegistry.class));
-
-		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-				new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
-				new SerializedValue<>(new ExecutionConfig()),
-				"Test Task", 1, 0, 1, 0,
-				new Configuration(),
-				taskConfig.getConfiguration(),
-				invokable.getName(),
-				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-				Collections.<InputGateDeploymentDescriptor>emptyList(),
-				Collections.<BlobKey>emptyList(),
-				Collections.<URL>emptyList(),
-				0);
-
-		return new Task(
-				tdd,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				network,
-				mock(BroadcastVariableManager.class),
-				mock(TaskManagerConnection.class),
-				mock(InputSplitProvider.class),
-				mock(CheckpointResponder.class),
-				libCache,
-				mock(FileCache.class),
-				new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
-				new UnregisteredTaskMetricsGroup(),
-				consumableNotifier,
-				partitionStateChecker,
-				executor);
-	}
-
-	static class LifecycleTrackingStreamSource<OUT, SRC extends SourceFunction<OUT>>
-			extends StreamSource<OUT, SRC> implements Serializable {
-
-		//private transient final AtomicInteger currentState;
-
-		private static final long serialVersionUID = 2431488948886850562L;
-
-		public LifecycleTrackingStreamSource(SRC sourceFunction) {
-			super(sourceFunction);
-		}
-
-		@Override
-		public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>>
output) {
-			System.out.println("setup");
-			super.setup(containingTask, config, output);
-		}
-
-		@Override
-		public void snapshotState(StateSnapshotContext context) throws Exception {
-			System.out.println("snapshotState");
-			super.snapshotState(context);
-		}
-
-		@Override
-		public void initializeState(StateInitializationContext context) throws Exception {
-			System.out.println("initializeState");
-			super.initializeState(context);
-		}
-
-		@Override
-		public void open() throws Exception {
-			System.out.println("open");
-			super.open();
-		}
-
-		@Override
-		public void close() throws Exception {
-			System.out.println("close");
-			super.close();
-		}
-
-		@Override
-		public void dispose() throws Exception {
-			super.dispose();
-			System.out.println("dispose");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e475c76/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 8aae19f..94f6d5a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -200,13 +200,13 @@ public class StreamTaskTest {
 		}
 	}
 
-	private Task createTask(
+	public static Task createTask(
 			Class<? extends AbstractInvokable> invokable,
 			StreamConfig taskConfig,
 			Configuration taskManagerConfig) throws Exception {
 
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
-		when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
+		when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader());
 		
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);


Mime
View raw message