flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [hotfix] Fix access to temp file directories in SpillingAdaptiveSpanningRecordDeserializer
Date Thu, 26 May 2016 19:17:25 GMT
[hotfix] Fix access to temp file directories in SpillingAdaptiveSpanningRecordDeserializer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf256c7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf256c7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf256c7f

Branch: refs/heads/master
Commit: bf256c7fbe05accdadc8470013879f567341d1aa
Parents: 5a7f4e3
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed May 25 17:04:31 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu May 26 21:15:20 2016 +0200

----------------------------------------------------------------------
 .../api/common/functions/RuntimeContext.java    |  2 +-
 .../org/apache/flink/metrics/MetricGroup.java   |  6 +--
 .../clusterframework/types/ResourceID.java      | 34 +++++++--------
 .../runtime/io/disk/iomanager/IOManager.java    | 15 ++++++-
 .../api/reader/AbstractRecordReader.java        | 12 +++++-
 .../network/api/reader/MutableRecordReader.java | 12 +++++-
 .../io/network/api/reader/RecordReader.java     | 17 +++++---
 ...llingAdaptiveSpanningRecordDeserializer.java | 16 ++-----
 .../task/IterationSynchronizationSinkTask.java  |  6 ++-
 .../flink/runtime/operators/BatchTask.java      | 18 +++++---
 .../flink/runtime/operators/DataSinkTask.java   |  8 +++-
 .../taskmanager/TaskManagerRuntimeInfo.java     | 35 ++++++++++++++--
 .../flink/runtime/taskmanager/TaskManager.scala |  3 +-
 .../SpanningRecordSerializationTest.java        |  4 +-
 .../network/serialization/LargeRecordsTest.java |  5 ++-
 .../SlotCountExceedingParallelismTest.java      |  5 ++-
 .../operators/drivers/TestTaskContext.java      |  3 +-
 .../testutils/BinaryOperatorTestBase.java       |  3 +-
 .../operators/testutils/DriverTestBase.java     |  3 +-
 .../operators/testutils/MockEnvironment.java    |  5 ++-
 .../testutils/UnaryOperatorTestBase.java        |  3 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  2 +-
 .../runtime/taskmanager/TaskCancelTest.java     | 10 +++--
 .../flink/runtime/taskmanager/TaskTest.java     |  2 +-
 .../apache/flink/runtime/jobmanager/Tasks.scala | 44 ++++++++++++++++----
 .../runtime/io/StreamInputProcessor.java        |  3 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  3 +-
 .../runtime/tasks/StreamMockEnvironment.java    |  4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  2 +-
 .../runtime/NetworkStackThroughputITCase.java   | 20 ++++++---
 30 files changed, 212 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index ed2f613..9a04b24 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -63,7 +63,7 @@ public interface RuntimeContext {
 	 * Returns the metric group for this parallel subtask.
 	 * 
 	 * @return The metric group for this parallel subtask.
-     */
+	 */
 	MetricGroup getMetricGroup();
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index a3832ff..6c9e044 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -21,14 +21,14 @@ package org.apache.flink.metrics;
 import org.apache.flink.annotation.PublicEvolving;
 
 /**
- * A MetricGroup is a named container for {@link Metric Metrics} and {@link MetricGroup MetricGroups}.
+ * A MetricGroup is a named container for {@link Metric Metrics} and further metric subgroups.
  * 
  * <p>Instances of this class can be used to register new metrics with Flink and to create a nested
  * hierarchy based on the group names.
  * 
  * <p>A MetricGroup is uniquely identified by it's place in the hierarchy and name.
  * 
- * <p>Metrics groups can be {@link #close() closed}. Upon closing, they de-register all metrics
+ * <p>Metrics groups can be {@link #close() closed}. Upon closing, the group de-register all metrics
  * from any metrics reporter and any internal maps. Note that even closed metrics groups
  * return Counters, Gauges, etc to the code, to prevent exceptions in the monitored code.
  * These metrics simply do not get reported any more, when created on a closed group.
@@ -39,7 +39,7 @@ public interface MetricGroup {
 	// ------------------------------------------------------------------------
 	//  Closing
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Marks the group as closed.
 	 * Recursively unregisters all {@link Metric Metrics} contained in this group.

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
index e599456..9d82c76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
@@ -45,23 +45,11 @@ public class ResourceID implements Serializable {
 		return resourceId;
 	}
 
-	/**
-	 * Generate a random resource id.
-	 * @return A random resource id.
-	 */
-	public static ResourceID generate() {
-		return new ResourceID(new AbstractID().toString());
-	}
-
 	@Override
 	public final boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		} else if (o == null || !(o instanceof ResourceID)) {
-			return false;
-		} else {
-			return resourceId.equals(((ResourceID) o).resourceId);
-		}
+		return this == o ||
+				(o != null && o.getClass() == ResourceID.class && 
+					this.resourceId.equals(((ResourceID) o).resourceId));
 	}
 
 	@Override
@@ -71,8 +59,18 @@ public class ResourceID implements Serializable {
 
 	@Override
 	public String toString() {
-		return "ResourceID{" +
-			"resourceId='" + resourceId + '\'' +
-			'}';
+		return "ResourceID (" + resourceId + ')';
+	}
+
+	// ------------------------------------------------------------------------
+	//  factory
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Generate a random resource id.
+	 * @return A random resource id.
+	 */
+	public static ResourceID generate() {
+		return new ResourceID(new AbstractID().toString());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 0942f72..7904cc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -281,7 +282,19 @@ public abstract class IOManager {
 	public File[] getSpillingDirectories() {
 		return this.paths;
 	}
-	
+
+	/**
+	 * Gets the directories that the I/O manager spills to, as path strings.
+	 *
+	 * @return The directories that the I/O manager spills to, as path strings.
+	 */
+	public String[] getSpillingDirectoriesPaths() {
+		String[] strings = new String[this.paths.length];
+		for (int i = 0; i < strings.length; i++) {
+			strings[i] = paths[i].getAbsolutePath();
+		}
+		return strings;
+	}
 	
 	protected int getNextPathNum() {
 		final int next = this.nextPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index a784f54..48ac558 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -45,14 +45,22 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 
 	private boolean isFinished;
 
+	/**
+	 * Creates a new AbstractRecordReader that de-serializes records from the given input gate and
+	 * can spill partial records to disk, if they grow large.
+	 *
+	 * @param inputGate The input gate to read from.
+	 * @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
+	 *                       reconstructs multiple large records.
+	 */
 	@SuppressWarnings("unchecked")
-	protected AbstractRecordReader(InputGate inputGate) {
+	protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) {
 		super(inputGate);
 
 		// Initialize one deserializer per input channel
 		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
 		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(tmpDirectories);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
index d7cc7e9..9836ba4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
@@ -25,8 +25,16 @@ import java.io.IOException;
 
 public class MutableRecordReader<T extends IOReadableWritable> extends AbstractRecordReader<T> implements MutableReader<T> {
 
-	public MutableRecordReader(InputGate inputGate) {
-		super(inputGate);
+	/**
+	 * Creates a new MutableRecordReader that de-serializes records from the given input gate and
+	 * can spill partial records to disk, if they grow large.
+	 * 
+	 * @param inputGate The input gate to read from.
+	 * @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
+	 *                       reconstructs multiple large records.
+	 */
+	public MutableRecordReader(InputGate inputGate, String[] tmpDirectories) {
+		super(inputGate, tmpDirectories);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
index d45920e..9eed374 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
@@ -29,8 +29,16 @@ public class RecordReader<T extends IOReadableWritable> extends AbstractRecordRe
 
 	private T currentRecord;
 
-	public RecordReader(InputGate inputGate, Class<T> recordType) {
-		super(inputGate);
+	/**
+	 * Creates a new RecordReader that de-serializes records from the given input gate and
+	 * can spill partial records to disk, if they grow large.
+	 *
+	 * @param inputGate The input gate to read from.
+	 * @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
+	 *                       reconstructs multiple large records.
+	 */
+	public RecordReader(InputGate inputGate, Class<T> recordType, String[] tmpDirectories) {
+		super(inputGate, tmpDirectories);
 
 		this.recordType = recordType;
 	}
@@ -73,10 +81,7 @@ public class RecordReader<T extends IOReadableWritable> extends AbstractRecordRe
 		try {
 			return recordType.newInstance();
 		}
-		catch (InstantiationException e) {
-			throw new RuntimeException("Cannot instantiate class " + recordType.getName(), e);
-		}
-		catch (IllegalAccessException e) {
+		catch (InstantiationException | IllegalAccessException e) {
 			throw new RuntimeException("Cannot instantiate class " + recordType.getName(), e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 49f7584..7e96390 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -65,18 +63,12 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 	private AccumulatorRegistry.Reporter reporter;
 
-	private transient Counter numRecordsIn;
-	private transient Counter numBytesIn;
+	private Counter numRecordsIn;
+	private Counter numBytesIn;
 
-	public SpillingAdaptiveSpanningRecordDeserializer() {
-		
-		String tempDirString = GlobalConfiguration.getString(
-				ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-				ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
-		String[] directories = tempDirString.split(",|" + File.pathSeparator);
-		
+	public SpillingAdaptiveSpanningRecordDeserializer(String[] tmpDirectories) {
 		this.nonSpanningWrapper = new NonSpanningWrapper();
-		this.spanningWrapper = new SpanningWrapper(directories);
+		this.spanningWrapper = new SpanningWrapper(tmpDirectories);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 2b710d2..f1ab93f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -72,7 +72,9 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 	
 	@Override
 	public void invoke() throws Exception {
-		this.headEventReader = new MutableRecordReader<IntValue>(getEnvironment().getInputGate(0));
+		this.headEventReader = new MutableRecordReader<IntValue>(
+				getEnvironment().getInputGate(0),
+				getEnvironment().getTaskManagerInfo().getTmpDirectories());
 
 		TaskConfig taskConfig = new TaskConfig(getTaskConfiguration());
 		
@@ -184,7 +186,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 		
 		// read (and thereby process all events in the handler's event handling functions)
 		try {
-			while (this.headEventReader.next(rec)) {
+			if (this.headEventReader.next(rec)) {
 				throw new RuntimeException("Synchronization task must not see any records!");
 			}
 		} catch (InterruptedException iex) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index 546193c..f38b988 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -659,14 +659,18 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 
 			if (groupSize == 1) {
 				// non-union case
-				inputReaders[i] = new MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset));
+				inputReaders[i] = new MutableRecordReader<IOReadableWritable>(
+						getEnvironment().getInputGate(currentReaderOffset),
+						getEnvironment().getTaskManagerInfo().getTmpDirectories());
 			} else if (groupSize > 1){
 				// union case
 				InputGate[] readers = new InputGate[groupSize];
 				for (int j = 0; j < groupSize; ++j) {
 					readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
 				}
-				inputReaders[i] = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers));
+				inputReaders[i] = new MutableRecordReader<IOReadableWritable>(
+						new UnionInputGate(readers),
+						getEnvironment().getTaskManagerInfo().getTmpDirectories());
 			} else {
 				throw new Exception("Illegal input group size in task configuration: " + groupSize);
 			}
@@ -701,14 +705,18 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 			final int groupSize = this.config.getBroadcastGroupSize(i);
 			if (groupSize == 1) {
 				// non-union case
-				broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset));
+				broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(
+						getEnvironment().getInputGate(currentReaderOffset),
+						getEnvironment().getTaskManagerInfo().getTmpDirectories());
 			} else if (groupSize > 1){
 				// union case
 				InputGate[] readers = new InputGate[groupSize];
 				for (int j = 0; j < groupSize; ++j) {
 					readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
 				}
-				broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers));
+				broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(
+						new UnionInputGate(readers),
+						getEnvironment().getTaskManagerInfo().getTmpDirectories());
 			} else {
 				throw new Exception("Illegal input group size in task configuration: " + groupSize);
 			}
@@ -765,8 +773,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 	 *
 	 * NOTE: This method must be invoked after the invocation of {@code #initInputReaders()} and
 	 * {@code #initInputSerializersAndComparators(int)}!
-	 *
-	 * @param numInputs
 	 */
 	protected void initLocalStrategies(int numInputs) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 39bf23f..380edd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -332,10 +332,14 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 		numGates += groupSize;
 		if (groupSize == 1) {
 			// non-union case
-			inputReader = new MutableRecordReader<DeserializationDelegate<IT>>(getEnvironment().getInputGate(0));
+			inputReader = new MutableRecordReader<DeserializationDelegate<IT>>(
+					getEnvironment().getInputGate(0),
+					getEnvironment().getTaskManagerInfo().getTmpDirectories());
 		} else if (groupSize > 1){
 			// union case
-			inputReader = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(getEnvironment().getAllInputGates()));
+			inputReader = new MutableRecordReader<IOReadableWritable>(
+					new UnionInputGate(getEnvironment().getAllInputGates()),
+					getEnvironment().getTaskManagerInfo().getTmpDirectories());
 		} else {
 			throw new Exception("Illegal input group size in task configuration: " + groupSize);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
index 8d06f10..9ac982e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.configuration.Configuration;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Encapsulation of TaskManager runtime information, like hostname and configuration.
  */
@@ -33,14 +36,32 @@ public class TaskManagerRuntimeInfo implements java.io.Serializable {
 	/** configuration that the TaskManager was started with */
 	private final Configuration configuration;
 
+	/** list of temporary file directories */
+	private final String[] tmpDirectories;
+	
 	/**
 	 * Creates a runtime info.
+	 * 
 	 * @param hostname The host name of the interface that the TaskManager uses to communicate.
 	 * @param configuration The configuration that the TaskManager was started with.
+	 * @param tmpDirectory The temporary file directory.   
 	 */
-	public TaskManagerRuntimeInfo(String hostname, Configuration configuration) {
-		this.hostname = hostname;
-		this.configuration = configuration;
+	public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String tmpDirectory) {
+		this(hostname, configuration, new String[] { tmpDirectory });
+	}
+	
+	/**
+	 * Creates a runtime info.
+	 * @param hostname The host name of the interface that the TaskManager uses to communicate.
+	 * @param configuration The configuration that the TaskManager was started with.
+	 * @param tmpDirectories The list of temporary file directories.   
+	 */
+	public TaskManagerRuntimeInfo(String hostname, Configuration configuration, String[] tmpDirectories) {
+		checkArgument(tmpDirectories.length > 0);
+		this.hostname = checkNotNull(hostname);
+		this.configuration = checkNotNull(configuration);
+		this.tmpDirectories = tmpDirectories;
+		
 	}
 
 	/**
@@ -58,4 +79,12 @@ public class TaskManagerRuntimeInfo implements java.io.Serializable {
 	public Configuration getConfiguration() {
 		return configuration;
 	}
+
+	/**
+	 * Gets the list of temporary file directories.
+	 * @return The list of temporary file directories.
+	 */
+	public String[] getTmpDirectories() {
+		return tmpDirectories;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a5cc18d..eb7a0ef 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -187,7 +187,8 @@ class TaskManager(
 
   private val runtimeInfo = new TaskManagerRuntimeInfo(
        connectionInfo.getHostname(),
-       new UnmodifiableConfiguration(config.configuration))
+       new UnmodifiableConfiguration(config.configuration),
+       config.tmpDirPaths)
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index 819a94f..9d0ee67 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -106,7 +106,9 @@ public class SpanningRecordSerializationTest {
 	
 	private void testSpillingDeserializer(Util.MockRecords records, int segmentSize) throws Exception {
 		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>();
+		RecordDeserializer<SerializationTestType> deserializer = 
+				new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(
+						new String[] { System.getProperty("java.io.tmpdir") });
 		
 		test(records, segmentSize, serializer, deserializer);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
index d628596..1574fe9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
@@ -147,7 +147,10 @@ public class LargeRecordsTest {
 			final int SEGMENT_SIZE = 32 * 1024;
 
 			final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-			final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>();
+			
+			final RecordDeserializer<SerializationTestType> deserializer =
+					new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(
+							new String[] { System.getProperty("java.io.tmpdir") } );
 
 			final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 561bda3..e12faf9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -30,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.types.IntValue;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -152,7 +152,8 @@ public class SlotCountExceedingParallelismTest {
 		public void invoke() throws Exception {
 			RecordReader<IntValue> reader = new RecordReader<>(
 					getEnvironment().getInputGate(0),
-					IntValue.class);
+					IntValue.class,
+					getEnvironment().getTaskManagerInfo().getTmpDirectories());
 
 			try {
 				final int numberOfSubtaskIndexesToReceive = getTaskConfiguration().getInteger(CONFIG_KEY, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 0300a07..15ad353 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -74,7 +74,8 @@ public class TestTaskContext<S, T> implements TaskContext<S, T> {
 	
 	public TestTaskContext(long memoryInBytes) {
 		this.memoryManager = new MemoryManager(memoryInBytes, 1, 32 * 1024, MemoryType.HEAP, true);
-		this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
+		this.taskManageInfo = new TaskManagerRuntimeInfo(
+				"localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 2c3dcf1..6e9b817 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -110,7 +110,8 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog
 		this.owner = new DummyInvokable();
 		this.taskConfig = new TaskConfig(new Configuration());
 		this.executionConfig = executionConfig;
-		this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
+		this.taskManageInfo = new TaskManagerRuntimeInfo(
+				"localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
 	}
 	
 	@Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 6381733..eb2a3a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -114,7 +114,8 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Ta
 		this.owner = new DummyInvokable();
 		this.taskConfig = new TaskConfig(new Configuration());
 		this.executionConfig = executionConfig;
-		this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
+		this.taskManageInfo = new TaskManagerRuntimeInfo(
+				"localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
 	}
 
 	@Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 31fd08c..b774b48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -208,7 +208,10 @@ public class MockEnvironment implements Environment {
 
 	@Override
 	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
+		return new TaskManagerRuntimeInfo(
+				"localhost",
+				new UnmodifiableConfiguration(new Configuration()),
+				System.getProperty("java.io.tmpdir"));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 77e18c6..50bb1ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -115,7 +115,8 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg
 		this.executionConfig = executionConfig;
 		this.comparators = new ArrayList<TypeComparator<IN>>(2);
 
-		this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
+		this.taskManageInfo = new TaskManagerRuntimeInfo(
+				"localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
 	}
 
 	@Parameterized.Parameters

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index ae05ae9..e1f551c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -171,7 +171,7 @@ public class TaskAsyncCallTest {
 				new FiniteDuration(60, TimeUnit.SECONDS),
 				libCache,
 				mock(FileCache.class),
-				new TaskManagerRuntimeInfo("localhost", new Configuration()),
+				new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
 				mock(TaskMetricGroup.class));
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
index 09dd817..fc5a4a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -41,7 +40,9 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.types.IntValue;
+
 import org.junit.Test;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -247,10 +248,11 @@ public class TaskCancelTest {
 		@Override
 		public void invoke() throws Exception {
 			UnionInputGate union = new UnionInputGate(getEnvironment().getAllInputGates());
-			RecordReader<IntValue> reader = new RecordReader<>(union, IntValue.class);
+			RecordReader<IntValue> reader = new RecordReader<>(
+					union, IntValue.class, getEnvironment().getTaskManagerInfo().getTmpDirectories());
 
-			while (reader.next() != null) {
-			}
+			//noinspection StatementWithEmptyBody
+			while (reader.next() != null) {}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index f237c87..1762a7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -623,7 +623,7 @@ public class TaskTest {
 				new FiniteDuration(60, TimeUnit.SECONDS),
 				libCache,
 				mock(FileCache.class),
-				new TaskManagerRuntimeInfo("localhost", new Configuration()),
+				new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
 				mock(TaskMetricGroup.class));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index d871c3d..87c123a 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -64,7 +64,11 @@ object Tasks {
   class Forwarder extends AbstractInvokable {
 
     override def invoke(): Unit = {
-      val reader = new RecordReader[IntValue](getEnvironment.getInputGate(0), classOf[IntValue])
+      val reader = new RecordReader[IntValue](
+        getEnvironment.getInputGate(0),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
+      
       val writer = new RecordWriter[IntValue](getEnvironment.getWriter(0))
 
       try {
@@ -88,7 +92,10 @@ object Tasks {
   class Receiver extends AbstractInvokable {
 
     override def invoke(): Unit = {
-      val reader = new RecordReader[IntValue](getEnvironment.getInputGate(0), classOf[IntValue])
+      val reader = new RecordReader[IntValue](
+        getEnvironment.getInputGate(0),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
 
       val i1 = reader.next()
       val i2 = reader.next()
@@ -140,7 +147,10 @@ object Tasks {
   class AgnosticReceiver extends AbstractInvokable {
 
     override def invoke(): Unit = {
-      val reader= new RecordReader[IntValue](getEnvironment.getInputGate(0), classOf[IntValue])
+      val reader= new RecordReader[IntValue](
+        getEnvironment.getInputGate(0),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
 
       while(reader.next() != null){}
     }
@@ -149,8 +159,15 @@ object Tasks {
   class AgnosticBinaryReceiver extends AbstractInvokable {
 
     override def invoke(): Unit = {
-      val reader1 = new RecordReader[IntValue](getEnvironment.getInputGate(0), classOf[IntValue])
-      val reader2 = new RecordReader[IntValue](getEnvironment.getInputGate(1), classOf[IntValue])
+      val reader1 = new RecordReader[IntValue](
+        getEnvironment.getInputGate(0),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
+      
+      val reader2 = new RecordReader[IntValue](
+        getEnvironment.getInputGate(1),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
 
       while(reader1.next() != null){}
       while(reader2.next() != null){}
@@ -162,9 +179,20 @@ object Tasks {
     override def invoke(): Unit = {
       val env = getEnvironment
 
-      val reader1 = new RecordReader[IntValue](env.getInputGate(0), classOf[IntValue])
-      val reader2 = new RecordReader[IntValue](env.getInputGate(1), classOf[IntValue])
-      val reader3 = new RecordReader[IntValue](env.getInputGate(2), classOf[IntValue])
+      val reader1 = new RecordReader[IntValue](
+        env.getInputGate(0),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
+      
+      val reader2 = new RecordReader[IntValue](
+        env.getInputGate(1),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
+      
+      val reader3 = new RecordReader[IntValue](
+        env.getInputGate(2),
+        classOf[IntValue],
+        getEnvironment.getTaskManagerInfo.getTmpDirectories)
 
       while(reader1.next() != null){}
       while(reader2.next() != null){}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 1dde85b..ab69ab7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -117,7 +117,8 @@ public class StreamInputProcessor<IN> {
 		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
 		
 		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
+					ioManager.getSpillingDirectoriesPaths());
 		}
 
 		watermarks = new long[inputGate.getNumberOfInputChannels()];

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 07ada23..733e7fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -139,7 +139,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
 		
 		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<>(
+					ioManager.getSpillingDirectoriesPaths());
 		}
 
 		// determine which unioned channels belong to input 1 and which belong to input 2

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index c62c881..b2d0196 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
@@ -50,6 +49,7 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -303,7 +303,7 @@ public class StreamMockEnvironment implements Environment {
 
 	@Override
 	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
+		return new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index deda82f..f054e18 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -158,7 +158,7 @@ public class StreamTaskTest {
 				new FiniteDuration(60, TimeUnit.SECONDS),
 				libCache,
 				mock(FileCache.class),
-				new TaskManagerRuntimeInfo("localhost", new Configuration()),
+				new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
 				mock(TaskMetricGroup.class));
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/bf256c7f/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 06df46f..5506f55 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -18,27 +18,28 @@
 
 package org.apache.flink.test.runtime;
 
-import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
 import org.junit.Ignore;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.test.util.JavaProgramTestBase;
 
 @Ignore
 public class NetworkStackThroughputITCase {
@@ -195,7 +196,11 @@ public class NetworkStackThroughputITCase {
 
 		@Override
 		public void invoke() throws Exception {
-			RecordReader<SpeedTestRecord> reader = new RecordReader<>(getEnvironment().getInputGate(0), SpeedTestRecord.class);
+			RecordReader<SpeedTestRecord> reader = new RecordReader<>(
+					getEnvironment().getInputGate(0),
+					SpeedTestRecord.class,
+					getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
 			RecordWriter<SpeedTestRecord> writer = new RecordWriter<>(getEnvironment().getWriter(0));
 
 			try {
@@ -215,7 +220,10 @@ public class NetworkStackThroughputITCase {
 
 		@Override
 		public void invoke() throws Exception {
-			RecordReader<SpeedTestRecord> reader = new RecordReader<>(getEnvironment().getInputGate(0), SpeedTestRecord.class);
+			RecordReader<SpeedTestRecord> reader = new RecordReader<>(
+					getEnvironment().getInputGate(0),
+					SpeedTestRecord.class,
+					getEnvironment().getTaskManagerInfo().getTmpDirectories());
 
 			try {
 				boolean isSlow = getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);


Mime
View raw message