flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-3071] Add asynchronous state materialization thread
Date Wed, 02 Dec 2015 13:26:22 GMT
Repository: flink
Updated Branches:
  refs/heads/master 04b38028d -> 0ccbd2779


[FLINK-3071] Add asynchronous state materialization thread

This also adds a test for asynchronous state handles.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ccbd277
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ccbd277
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ccbd277

Branch: refs/heads/master
Commit: 0ccbd27793a195b5762e8c563d75901f8cca4248
Parents: 04b3802
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Nov 27 16:25:02 2015 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Dec 2 14:17:51 2015 +0100

----------------------------------------------------------------------
 .../runtime/state/AsynchronousStateHandle.java  |  15 +-
 .../runtime/tasks/AsynchronousException.java    |  35 +++
 .../streaming/runtime/tasks/StreamTask.java     |  87 +++++++-
 .../streaming/runtime/tasks/TimerException.java |   2 +-
 .../tasks/StreamTaskAsyncCheckpointTest.java    | 211 +++++++++++++++++++
 .../runtime/tasks/StreamTaskTestHarness.java    |  33 ++-
 6 files changed, 364 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ccbd277/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
index 4e02caa..fee1efe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
@@ -23,10 +23,21 @@ package org.apache.flink.runtime.state;
  * of representing a materialized handle to state this would normally hold the (immutable)
state
  * internally and can materialize it if requested.
  */
-public interface AsynchronousStateHandle<T> extends StateHandle<T> {
+public abstract class AsynchronousStateHandle<T> implements StateHandle<T> {
+	private static final long serialVersionUID = 1L;
 
 	/**
 	 * Materializes the state held by this {@code AsynchronousStateHandle}.
 	 */
-	void materialize() throws Exception;
+	public abstract StateHandle<T> materialize() throws Exception;
+
+	@Override
+	public final T getState(ClassLoader userCodeClassLoader) throws Exception {
+		throw new UnsupportedOperationException("This must not be called. This is likely an internal
bug.");
+	}
+
+	@Override
+	public final void discardState() throws Exception {
+		throw new UnsupportedOperationException("This must not be called. This is likely an internal
bug.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccbd277/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
new file mode 100644
index 0000000..5b3fee0
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+/**
+ * {@code RuntimeException} for wrapping exceptions that are thrown in Threads that are not
the
+ * main compute Thread.
+ */
+public class AsynchronousException extends RuntimeException {
+	private static final long serialVersionUID = 1L;
+
+	public AsynchronousException(Throwable cause) {
+		super(cause);
+	}
+
+	@Override
+	public String toString() {
+		return "AsynchronousException{" + getCause() + "}";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccbd277/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2942ba2..a1c76b1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -17,7 +17,9 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -30,6 +32,8 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.AsynchronousStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
@@ -132,9 +136,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	/** The state to be restored once the initialization is done */
 	private StreamTaskStateList lazyRestoreState;
 
-	/** This field is used to forward an exception that is caught in the timer thread. Subclasses
-	 * must ensure that exceptions stored here get thrown on the actual execution Thread. */
-	private volatile TimerException timerException;
+	/**
+	 * This field is used to forward an exception that is caught in the timer thread or other
+	 * asynchronous Threads. Subclasses must ensure that exceptions stored here get thrown on
the
+	 * actual execution Thread. */
+	private volatile AsynchronousException asyncException;
+
+	protected Set<Thread> asyncCheckpointThreads;
 	
 	/** Flag to mark the task "in operation", in which case check
 	 * needs to be initialized to true, so that early cancel() before invoke() behaves correctly
*/
@@ -181,6 +189,8 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			timerService = Executors.newSingleThreadScheduledExecutor(
 					new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
 
+			asyncCheckpointThreads = new HashSet<>();
+
 			// task specific initialization
 			init();
 			
@@ -247,6 +257,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			isRunning = false;
 
 			timerService.shutdownNow();
+
+			for (Thread checkpointThread: asyncCheckpointThreads) {
+				checkpointThread.interrupt();
+			}
+			asyncCheckpointThreads.clear();
 			
 			// release the output resources. this method should never fail.
 			if (operatorChain != null) {
@@ -342,6 +357,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			}
 			timerService.shutdown();
 		}
+
+		if (asyncCheckpointThreads != null) {
+			for (Thread checkpointThread : asyncCheckpointThreads) {
+				checkpointThread.interrupt();
+			}
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -422,7 +443,8 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+	@SuppressWarnings("unchecked,rawtypes")
+	public void triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception
{
 		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
 		
 		synchronized (lock) {
@@ -438,20 +460,65 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				try {
 					final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
 					final StreamTaskState[] states = new StreamTaskState[allOperators.length];
-					
+
+					boolean hasAsyncStates = false;
+
 					for (int i = 0; i < states.length; i++) {
 						StreamOperator<?> operator = allOperators[i];
 						if (operator != null) {
 							StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
+							if (state.getOperatorState() instanceof AsynchronousStateHandle) {
+								hasAsyncStates = true;
+							}
+							if (state.getFunctionState() instanceof AsynchronousStateHandle) {
+								hasAsyncStates = true;
+							}
 							states[i] = state.isEmpty() ? null : state;
 						}
 					}
 
+
 					StreamTaskStateList allStates = new StreamTaskStateList(states);
+
 					if (allStates.isEmpty()) {
 						getEnvironment().acknowledgeCheckpoint(checkpointId);
-					} else {
+					} else if (!hasAsyncStates) {
 						getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
+					} else {
+						// start a Thread that does the asynchronous materialization and
+						// then sends the checkpoint acknowledge
+
+						Thread checkpointThread = new Thread() {
+							@Override
+							public void run() {
+								try {
+									for (StreamTaskState state : states) {
+										if (state != null) {
+											if (state.getFunctionState() instanceof AsynchronousStateHandle) {
+												AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>)
state.getFunctionState();
+												state.setFunctionState((StateHandle) asyncState.materialize());
+											}
+											if (state.getOperatorState() instanceof AsynchronousStateHandle) {
+												AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>)
state.getOperatorState();
+												state.setOperatorState((StateHandle) asyncState.materialize());
+											}
+										}
+									}
+									StreamTaskStateList allStates = new StreamTaskStateList(states);
+									getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
+								} catch (Exception e) {
+									LOG.error("Caught exception while materializing asynchronous checkpoints.", e);
+									if (asyncException == null) {
+										asyncException = new AsynchronousException(e);
+									}
+								}
+								asyncCheckpointThreads.remove(this);
+								LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId,
getName());
+							}
+						};
+
+						asyncCheckpointThreads.add(checkpointThread);
+						checkpointThread.start();
 					}
 				}
 				catch (Exception e) {
@@ -562,8 +629,8 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	}
 	
 	public void checkTimerException() throws TimerException {
-		if (timerException != null) {
-			throw timerException;
+		if (asyncException != null) {
+			throw asyncException;
 		}
 	}
 
@@ -616,8 +683,8 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					target.trigger(timestamp);
 				} catch (Throwable t) {
 					LOG.error("Caught exception while processing timer.", t);
-					if (task.timerException == null) {
-						task.timerException = new TimerException(t);
+					if (task.asyncException == null) {
+						task.asyncException = new TimerException(t);
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccbd277/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
index 3e1c1e5..0ea6ea5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
  * {@code RuntimeException} for wrapping exceptions that are thrown in the timer callback
of
  * the timer service in {@link StreamTask}.
  */
-public class TimerException extends RuntimeException {
+public class TimerException extends AsynchronousException {
 	private static final long serialVersionUID = 1L;
 
 	public TimerException(Throwable cause) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccbd277/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
new file mode 100644
index 0000000..319cbc8
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AsynchronousStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.OneShotLatch;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.lang.reflect.Field;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for asynchronous checkpoints.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+@SuppressWarnings("serial")
+public class StreamTaskAsyncCheckpointTest {
+
+	/**
+	 * This ensures that asynchronous state handles are actually materialized asynchonously.
+	 *
+	 * <p>We use latches to block at various stages and see if the code still continues
through
+	 * the parts that are not asynchronous.
+	 * @throws Exception
+	 */
+	@Test
+	public void testAsyncCheckpoints() throws Exception {
+		final OneShotLatch delayCheckpointLatch = new OneShotLatch();
+		final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
+
+		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
+		
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		
+		streamConfig.setStreamOperator(new AsyncCheckpointOperator());
+
+		StreamMockEnvironment mockEnv = new StreamMockEnvironment(
+			testHarness.jobConfig,
+			testHarness.taskConfig,
+			testHarness.memorySize,
+			new MockInputSplitProvider(),
+			testHarness.bufferSize) {
+
+			@Override
+			public void acknowledgeCheckpoint(long checkpointId) {
+				super.acknowledgeCheckpoint(checkpointId);
+			}
+
+			@Override
+			public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
+				super.acknowledgeCheckpoint(checkpointId, state);
+
+				// block on the latch, to verify that triggerCheckpoint returns below,
+				// even though the async checkpoint would not finish
+				try {
+					delayCheckpointLatch.await();
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+
+				assertTrue(state instanceof StreamTaskStateList);
+				StreamTaskStateList stateList = (StreamTaskStateList) state;
+
+				// should be only one state
+				StreamTaskState taskState = stateList.getState(this.getUserClassLoader())[0];
+				StateHandle<?> operatorState = taskState.getOperatorState();
+				assertTrue("It must be a TestStateHandle", operatorState instanceof TestStateHandle);
+				TestStateHandle testState = (TestStateHandle) operatorState;
+				assertEquals(42, testState.checkpointId);
+				assertEquals(17, testState.timestamp);
+
+				// we now know that the checkpoint went through
+				ensureCheckpointLatch.trigger();
+			}
+		};
+
+		testHarness.invoke(mockEnv);
+
+		// wait for the task to be running
+		for (Field field: StreamTask.class.getDeclaredFields()) {
+			if (field.getName().equals("isRunning")) {
+				field.setAccessible(true);
+				while (!field.getBoolean(task)) {
+					Thread.sleep(10);
+				}
+
+			}
+		}
+
+		task.triggerCheckpoint(42, 17);
+
+		// now we allow the checkpoint
+		delayCheckpointLatch.trigger();
+
+		// wait for the checkpoint to go through
+		ensureCheckpointLatch.await();
+
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+	}
+
+
+	// ------------------------------------------------------------------------
+
+	public static class AsyncCheckpointOperator
+		extends AbstractStreamOperator<String>
+		implements OneInputStreamOperator<String, String> {
+		@Override
+		public void processElement(StreamRecord<String> element) throws Exception {
+			// we also don't care
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			// not interested
+		}
+
+
+		@Override
+		public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp)
throws Exception {
+			StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+			AsynchronousStateHandle<String> asyncState =
+				new DataInputViewAsynchronousStateHandle(checkpointId, timestamp);
+
+			taskState.setOperatorState(asyncState);
+
+			return taskState;
+		}
+
+		@Override
+		public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception
{
+			super.restoreState(taskState, recoveryTimestamp);
+		}
+	}
+
+	private static class DataInputViewAsynchronousStateHandle extends AsynchronousStateHandle<String>
{
+
+		private final long checkpointId;
+		private final long timestamp;
+
+		public DataInputViewAsynchronousStateHandle(long checkpointId, long timestamp) {
+			this.checkpointId = checkpointId;
+			this.timestamp = timestamp;
+		}
+
+		@Override
+		public StateHandle<String> materialize() throws Exception {
+			return new TestStateHandle(checkpointId, timestamp);
+		}
+	}
+
+	private static class TestStateHandle implements StateHandle<String> {
+
+		public final long checkpointId;
+		public final long timestamp;
+
+		public TestStateHandle(long checkpointId, long timestamp) {
+			this.checkpointId = checkpointId;
+			this.timestamp = timestamp;
+		}
+
+		@Override
+		public String getState(ClassLoader userCodeClassLoader) throws Exception {
+			return null;
+		}
+
+		@Override
+		public void discardState() throws Exception {
+		}
+	}
+	
+	public static class DummyMapFunction<T> implements MapFunction<T, T> {
+		@Override
+		public T map(T value) { return value; }
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccbd277/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 655608b..2f4437d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -64,17 +64,17 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  */
 public class StreamTaskTestHarness<OUT> {
 
-	private static final int DEFAULT_MEMORY_MANAGER_SIZE = 1024 * 1024;
+	public  static final int DEFAULT_MEMORY_MANAGER_SIZE = 1024 * 1024;
 
-	private static final int DEFAULT_NETWORK_BUFFER_SIZE = 1024;
+	public static final int DEFAULT_NETWORK_BUFFER_SIZE = 1024;
 
-	protected long memorySize = 0;
-	protected int bufferSize = 0;
+	public long memorySize = 0;
+	public int bufferSize = 0;
 
 	protected StreamMockEnvironment mockEnv;
 	protected ExecutionConfig executionConfig;
-	private Configuration jobConfig;
-	private Configuration taskConfig;
+	public Configuration jobConfig;
+	public Configuration taskConfig;
 	protected StreamConfig streamConfig;
 
 	private AbstractInvokable task;
@@ -165,6 +165,27 @@ public class StreamTaskTestHarness<OUT> {
 		taskThread.start();
 	}
 
+	/**
+	 * Invoke the Task. This resets the output of any previous invocation. This will start a
new
+	 * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the
+	 * Task thread to finish running.
+	 *
+	 * <p>Variant for providing a custom environment.
+	 */
+	public void invoke(StreamMockEnvironment mockEnv) throws Exception {
+		this.mockEnv = mockEnv;
+
+		task.setEnvironment(mockEnv);
+
+		initializeInputs();
+		initializeOutput();
+
+		task.registerInputOutput();
+
+		taskThread = new TaskThread(task);
+		taskThread.start();
+	}
+
 	public void waitForTaskCompletion() throws Exception {
 		if (taskThread == null) {
 			throw new IllegalStateException("Task thread was not started.");


Mime
View raw message