flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [02/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
Date Mon, 12 Jan 2015 08:16:10 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 9a999fd..7aab050 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -19,46 +19,45 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.FutureTask;
-
 import akka.actor.ActorRef;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.io.network.gates.InputChannelResult;
-import org.apache.flink.runtime.io.network.gates.InputGate;
-import org.apache.flink.runtime.io.network.gates.OutputGate;
-import org.apache.flink.runtime.io.network.gates.RecordAvailabilityListener;
-import org.apache.flink.runtime.io.network.serialization.AdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.api.reader.BufferReader;
+import org.apache.flink.runtime.io.network.api.reader.MockIteratorBufferReader;
+import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.FutureTask;
 
-public class MockEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner {
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MockEnvironment implements Environment {
 	
 	private final MemoryManager memManager;
 
@@ -70,184 +69,114 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 
 	private final Configuration taskConfiguration;
 
-	private final List<InputGate<DeserializationDelegate<Record>>> inputs;
+	private final List<BufferReader> inputs;
 
-	private final List<OutputGate> outputs;
+	private final List<BufferWriter> outputs;
 
 	private final JobID jobID = new JobID();
 
-	private final Buffer mockBuffer;
-	
 	private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
-	
+
+	private final int bufferSize;
 
 	public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.jobConfiguration = new Configuration();
 		this.taskConfiguration = new Configuration();
-		this.inputs = new LinkedList<InputGate<DeserializationDelegate<Record>>>();
-		this.outputs = new LinkedList<OutputGate>();
+		this.inputs = new LinkedList<BufferReader>();
+		this.outputs = new LinkedList<BufferWriter>();
 
 		this.memManager = new DefaultMemoryManager(memorySize, 1);
 		this.ioManager = new IOManagerAsync();
 		this.inputSplitProvider = inputSplitProvider;
-		this.mockBuffer = new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, null);
+		this.bufferSize = bufferSize;
 	}
 
-	public void addInput(MutableObjectIterator<Record> inputIterator) {
-		int id = inputs.size();
-		inputs.add(new MockInputGate(id, inputIterator));
-	}
+	public MockIteratorBufferReader<Record> addInput(MutableObjectIterator<Record> inputIterator) {
+		try {
+			final MockIteratorBufferReader<Record> reader = new MockIteratorBufferReader<Record>(bufferSize, Record.class, inputIterator);
 
-	public void addOutput(List<Record> outputList) {
-		int id = outputs.size();
-		outputs.add(new MockOutputGate(id, outputList));
-	}
+			inputs.add(reader.getMock());
 
-	@Override
-	public Configuration getTaskConfiguration() {
-		return this.taskConfiguration;
-	}
-
-	@Override
-	public MemoryManager getMemoryManager() {
-		return this.memManager;
-	}
-	
-	@Override
-	public IOManager getIOManager() {
-		return this.ioManager;
+			return reader;
+		}
+		catch (Throwable t) {
+			throw new RuntimeException("Error setting up mock readers: " + t.getMessage(), t);
+		}
 	}
 
-	@Override
-	public JobID getJobID() {
-		return this.jobID;
-	}
+	public void addOutput(final List<Record> outputList) {
+		try {
+			// The record-oriented writers wrap the buffer writer. We mock it
+			// to collect the returned buffers and deserialize the content to
+			// the output list
+			BufferProvider mockBufferProvider = mock(BufferProvider.class);
+			when(mockBufferProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() {
 
-	@Override
-	public Buffer requestBuffer(int minBufferSize) throws IOException {
-		return mockBuffer;
-	}
+				@Override
+				public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+					return new Buffer(new MemorySegment(new byte[bufferSize]), mock(BufferRecycler.class));
+				}
+			});
 
-	@Override
-	public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
-		return mockBuffer;
-	}
+			BufferWriter mockWriter = mock(BufferWriter.class);
+			when(mockWriter.getNumberOfOutputChannels()).thenReturn(1);
+			when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider);
 
-	@Override
-	public int getBufferSize() {
-		return this.mockBuffer.size();
-	}
+			final Record record = new Record();
+			final RecordDeserializer<Record> deserializer = new AdaptiveSpanningRecordDeserializer<Record>();
 
-	@Override
-	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
-		return BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED;
-	}
+			// Add records from the buffer to the output list
+			doAnswer(new Answer<Void>() {
 
-	@Override
-	public int getNumberOfChannels() {
-		return 1;
-	}
+				@Override
+				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+					Buffer buffer = (Buffer) invocationOnMock.getArguments()[0];
 
-	@Override
-	public void setDesignatedNumberOfBuffers(int numBuffers) {
+					deserializer.setNextBuffer(buffer);
 
-	}
+					while (deserializer.hasUnfinishedData()) {
+						RecordDeserializer.DeserializationResult result = deserializer.getNextRecord(record);
 
-	@Override
-	public void clearLocalBufferPool() {
+						if (result.isFullRecord()) {
+							outputList.add(record.createCopy());
+						}
 
-	}
+						if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
+								|| result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
+							break;
+						}
+					}
 
-	@Override
-	public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
+					return null;
+				}
+			}).when(mockWriter).writeBuffer(any(Buffer.class), anyInt());
 
+			outputs.add(mockWriter);
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			fail(t.getMessage());
+		}
 	}
 
 	@Override
-	public void logBufferUtilization() {
-
+	public Configuration getTaskConfiguration() {
+		return this.taskConfiguration;
 	}
 
 	@Override
-	public void reportAsynchronousEvent() {
-
+	public MemoryManager getMemoryManager() {
+		return this.memManager;
 	}
 
-	private static class MockInputGate extends InputGate<DeserializationDelegate<Record>> {
-		
-		private MutableObjectIterator<Record> it;
-
-		public MockInputGate(int id, MutableObjectIterator<Record> it) {
-			super(new JobID(), new GateID(), id);
-			this.it = it;
-		}
-
-		@Override
-		public void registerRecordAvailabilityListener(final RecordAvailabilityListener<DeserializationDelegate<Record>> listener) {
-			super.registerRecordAvailabilityListener(listener);
-			this.notifyRecordIsAvailable(0);
-		}
-		
-		@Override
-		public InputChannelResult readRecord(DeserializationDelegate<Record> target) throws IOException, InterruptedException {
-
-			Record reuse = target != null ? target.getInstance() : null;
-
-			// Handle NonReusingDeserializationDelegate, which by default
-			// does not have a Record instance
-			if (reuse == null && target != null) {
-				reuse = new Record();
-				target.setInstance(reuse);
-			}
-			
-			if (it.next(reuse) != null) {
-				// everything comes from the same source channel and buffer in this mock
-				notifyRecordIsAvailable(0);
-				return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
-			} else {
-				return InputChannelResult.END_OF_STREAM;
-			}
-		}
+	@Override
+	public IOManager getIOManager() {
+		return this.ioManager;
 	}
 
-	private class MockOutputGate extends OutputGate {
-		
-		private List<Record> out;
-
-		private RecordDeserializer<Record> deserializer;
-
-		private Record record;
-
-		public MockOutputGate(int index, List<Record> outList) {
-			super(new JobID(), new GateID(), index);
-			this.out = outList;
-			this.deserializer = new AdaptiveSpanningRecordDeserializer<Record>();
-			this.record = new Record();
-		}
-
-		@Override
-		public void sendBuffer(Buffer buffer, int targetChannel) throws IOException, InterruptedException {
-
-			this.deserializer.setNextMemorySegment(MockEnvironment.this.mockBuffer.getMemorySegment(), MockEnvironment.this.mockBuffer.size());
-
-			while (this.deserializer.hasUnfinishedData()) {
-				DeserializationResult result = this.deserializer.getNextRecord(this.record);
-
-				if (result.isFullRecord()) {
-					this.out.add(this.record.createCopy());
-				}
-
-				if (result == DeserializationResult.LAST_RECORD_FROM_BUFFER ||
-					result == DeserializationResult.PARTIAL_RECORD) {
-					break;
-				}
-			}
-		}
-
-		@Override
-		public int getNumChannels() {
-			return 1;
-		}
+	@Override
+	public JobID getJobID() {
+		return this.jobID;
 	}
 
 	@Override
@@ -256,7 +185,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 	}
 
 	@Override
-	public int getCurrentNumberOfSubtasks() {
+	public int getNumberOfSubtasks() {
 		return 1;
 	}
 
@@ -276,96 +205,50 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 	}
 
 	@Override
-	public GateID getNextUnboundInputGateID() {
+	public String getTaskNameWithSubtasks() {
 		return null;
 	}
 
 	@Override
-	public int getNumberOfOutputGates() {
-		return this.outputs.size();
+	public ActorRef getJobManager() {
+		throw new UnsupportedOperationException("getAccumulatorProtocolProxy() is not supported by MockEnvironment");
 	}
 
 	@Override
-	public int getNumberOfInputGates() {
-		return this.inputs.size();
-	}
-
-	@Override
-	public Set<ChannelID> getOutputChannelIDs() {
-		throw new IllegalStateException("getOutputChannelIDs called on MockEnvironment");
-	}
-
-	@Override
-	public Set<ChannelID> getInputChannelIDs() {
-		throw new IllegalStateException("getInputChannelIDs called on MockEnvironment");
-	}
-
-	@Override
-	public Set<GateID> getOutputGateIDs() {
-		throw new IllegalStateException("getOutputGateIDs called on MockEnvironment");
-	}
-
-	@Override
-	public Set<GateID> getInputGateIDs() {
-		throw new IllegalStateException("getInputGateIDs called on MockEnvironment");
-	}
-
-	@Override
-	public Set<ChannelID> getOutputChannelIDsOfGate(final GateID gateID) {
-		throw new IllegalStateException("getOutputChannelIDsOfGate called on MockEnvironment");
-	}
-
-	@Override
-	public Set<ChannelID> getInputChannelIDsOfGate(final GateID gateID) {
-		throw new IllegalStateException("getInputChannelIDsOfGate called on MockEnvironment");
-	}
-
-	@Override
-	public ActorRef getAccumulator() {
-		throw new UnsupportedOperationException("Accumulators are not supported by the MockEnvironment.");
-	}
-
-	@Override
-	public OutputGate createAndRegisterOutputGate() {
-		return this.outputs.remove(0);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate() {
-		return (InputGate<T>) this.inputs.remove(0);
+	public ClassLoader getUserClassLoader() {
+		return getClass().getClassLoader();
 	}
 
 	@Override
-	public int getNumberOfOutputChannels() {
-		return this.outputs.size();
+	public Map<String, FutureTask<Path>> getCopyTask() {
+		return null;
 	}
 
 	@Override
-	public int getNumberOfInputChannels() {
-		return this.inputs.size();
+	public BufferWriter getWriter(int index) {
+		return outputs.get(index);
 	}
 
 	@Override
-	public ClassLoader getUserClassLoader() {
-		return getClass().getClassLoader();
+	public BufferWriter[] getAllWriters() {
+		return outputs.toArray(new BufferWriter[outputs.size()]);
 	}
 
 	@Override
-	public BufferProvider getOutputBufferProvider() {
-		return this;
+	public BufferReader getReader(int index) {
+		return inputs.get(index);
 	}
 
 	@Override
-	public Map<String, FutureTask<Path>> getCopyTask() {
-		return null;
+	public BufferReader[] getAllReaders() {
+		return inputs.toArray(new BufferReader[inputs.size()]);
 	}
 
 	@Override
 	public JobVertexID getJobVertexId() {
 		return new JobVertexID(new byte[16]);
 	}
-	
+
 	@Override
 	public BroadcastVariableManager getBroadcastVariableManager() {
 		return this.bcVarManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 9351929..23cb23b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import java.util.List;
-
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -27,8 +25,9 @@ import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.java.record.io.DelimitedInputFormat;
 import org.apache.flink.api.java.record.io.FileOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.api.reader.MockIteratorBufferReader;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.PactDriver;
@@ -40,6 +39,8 @@ import org.apache.flink.util.MutableObjectIterator;
 import org.junit.After;
 import org.junit.Assert;
 
+import java.util.List;
+
 public abstract class TaskTestBase {
 	
 	protected long memorySize = 0;
@@ -54,11 +55,23 @@ public abstract class TaskTestBase {
 		this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider, bufferSize);
 	}
 
-	public void addInput(MutableObjectIterator<Record> input, int groupId) {
-		this.mockEnv.addInput(input);
+	public MockIteratorBufferReader<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
+		final MockIteratorBufferReader<Record> reader = addInput(input, groupId, true);
+
+		return reader;
+	}
+
+	public MockIteratorBufferReader<Record> addInput(MutableObjectIterator<Record> input, int groupId, boolean read) {
+		final MockIteratorBufferReader<Record> reader = this.mockEnv.addInput(input);
 		TaskConfig conf = new TaskConfig(this.mockEnv.getTaskConfiguration());
 		conf.addInputToGroup(groupId);
 		conf.setInputSerializer(RecordSerializerFactory.get(), groupId);
+
+		if (read) {
+			reader.read();
+		}
+
+		return reader;
 	}
 
 	public void addOutput(List<Record> output) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
index a717f55..58eaad0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/OutputEmitterTest.java
@@ -38,7 +38,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.runtime.io.network.api.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.operators.shipping.OutputEmitter;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.plugable.SerializationDelegate;

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/RecordOutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/RecordOutputEmitterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/RecordOutputEmitterTest.java
index 308b41e..b831a7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/RecordOutputEmitterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/RecordOutputEmitterTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.api.common.distributions.UniformIntegerDistribution;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.runtime.io.network.api.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.operators.shipping.RecordOutputEmitter;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.types.DeserializationException;

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 4355298..46c23c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -26,43 +26,42 @@ import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
-
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartitionInfo;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask;
 import org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.types.IntegerRecord;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -71,7 +70,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
-
 public class TaskManagerTest {
 
 	private static ActorSystem system;
@@ -103,8 +101,8 @@ public class TaskManagerTest {
 
 				final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
 						new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
-						Collections.<GateDeploymentDescriptor>emptyList(),
-						Collections.<GateDeploymentDescriptor>emptyList(),
+						Collections.<PartitionDeploymentDescriptor>emptyList(),
+						Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
 				new Within(duration("1 seconds")){
@@ -142,14 +140,14 @@ public class TaskManagerTest {
 
 				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
-						Collections.<GateDeploymentDescriptor>emptyList(),
-						Collections.<GateDeploymentDescriptor>emptyList(),
+						Collections.<PartitionDeploymentDescriptor>emptyList(),
+						Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7,
 						new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
-						Collections.<GateDeploymentDescriptor>emptyList(),
-						Collections.<GateDeploymentDescriptor>emptyList(),
+						Collections.<PartitionDeploymentDescriptor>emptyList(),
+						Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
 				final FiniteDuration d = duration("1 second");
@@ -244,15 +242,15 @@ public class TaskManagerTest {
 				final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
 				final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
-						new Configuration(), new Configuration(), Sender.class.getName(),
-						Collections.<GateDeploymentDescriptor>emptyList(),
-						Collections.<GateDeploymentDescriptor>emptyList(),
+						new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
+						Collections.<PartitionDeploymentDescriptor>emptyList(),
+						Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
 				final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
-						new Configuration(), new Configuration(), Receiver.class.getName(),
-						Collections.<GateDeploymentDescriptor>emptyList(),
-						Collections.<GateDeploymentDescriptor>emptyList(),
+						new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
+						Collections.<PartitionDeploymentDescriptor>emptyList(),
+						Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
 				new Within(duration("1 second")){
@@ -307,29 +305,35 @@ public class TaskManagerTest {
 			final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 			final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-			final ChannelID senderId = new ChannelID();
-			final ChannelID receiverId = new ChannelID();
-
-			ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator(receiverId)));
+			ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator()));
 			final ActorRef tm = createTaskManager(jm);
 
-			ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor(senderId, receiverId);
+			IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
+
+			List<PartitionDeploymentDescriptor> irpdd = new ArrayList<PartitionDeploymentDescriptor>();
+			irpdd.add(new PartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, IntermediateResultPartitionType.PIPELINED, 1));
+
+			PartitionConsumerDeploymentDescriptor ircdd =
+					new PartitionConsumerDeploymentDescriptor(
+							new IntermediateDataSetID(),
+							new PartitionInfo[]{
+									new PartitionInfo(partitionId, eid1, PartitionInfo.PartitionLocation.LOCAL, null)
+							},
+							0);
 
 			final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
-					new Configuration(), new Configuration(), Sender.class.getName(),
-					Collections.singletonList(new GateDeploymentDescriptor(Collections.singletonList(cdd))),
-					Collections.<GateDeploymentDescriptor>emptyList(),
-					new ArrayList<BlobKey>(), 0);
+					new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
+					irpdd, Collections.<PartitionConsumerDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), 0);
 
 			final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
-					new Configuration(), new Configuration(), Receiver.class.getName(),
-					Collections.<GateDeploymentDescriptor>emptyList(),
-					Collections.singletonList(new GateDeploymentDescriptor(Collections.singletonList(cdd))),
+					new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
+					Collections.<PartitionDeploymentDescriptor>emptyList(),
+					Collections.singletonList(ircdd),
 					new ArrayList<BlobKey>(), 0);
 
 			final FiniteDuration d = duration("1 second");
 
-			new Within(d){
+			new Within(d) {
 
 				@Override
 				protected void run() {
@@ -346,8 +350,8 @@ public class TaskManagerTest {
 						Task t1 = tasks.get(eid1);
 						Task t2 = tasks.get(eid2);
 
-			// wait until the tasks are done. rare thread races may cause the tasks to be done before
-			// we get to the check, so we need to guard the check
+						// wait until the tasks are done. rare thread races may cause the tasks to be done before
+						// we get to the check, so we need to guard the check
 						if (t1 != null) {
 							Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid1),
 									timeout);
@@ -358,15 +362,16 @@ public class TaskManagerTest {
 							Future<Object> response = Patterns.ask(tm, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(eid2),
 									timeout);
 							Await.ready(response, d);
-				assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
-			}
+							assertEquals(ExecutionState.FINISHED, t2.getExecutionState());
+						}
 
 						tm.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
 						tasks = expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks
 								.class).asJava();
 
 						assertEquals(0, tasks.size());
-					}catch (Exception e) {
+					}
+					catch (Exception e) {
 						e.printStackTrace();
 						fail(e.getMessage());
 
@@ -378,7 +383,7 @@ public class TaskManagerTest {
 	
 	@Test
 	public void testCancellingDependentAndStateUpdateFails() {
-		
+
 		// this tests creates two tasks. the sender sends data, and fails to send the
 		// state update back to the job manager
 		// the second one blocks to be canceled
@@ -392,30 +397,37 @@ public class TaskManagerTest {
 			final ExecutionAttemptID eid1 = new ExecutionAttemptID();
 			final ExecutionAttemptID eid2 = new ExecutionAttemptID();
 
-			final ChannelID senderId = new ChannelID();
-			final ChannelID receiverId = new ChannelID();
-
-			ActorRef jm = system.actorOf(Props.create(new SimpleLookupFailingUpdateJobManagerCreator(receiverId)));
+			ActorRef jm = system.actorOf(Props.create(new SimpleLookupFailingUpdateJobManagerCreator()));
 			final ActorRef tm = createTaskManager(jm);
 
-			ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor(senderId, receiverId);
+			IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
+
+			List<PartitionDeploymentDescriptor> irpdd = new ArrayList<PartitionDeploymentDescriptor>();
+			irpdd.add(new PartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, IntermediateResultPartitionType.PIPELINED, 1));
+
+			PartitionConsumerDeploymentDescriptor ircdd =
+					new PartitionConsumerDeploymentDescriptor(
+							new IntermediateDataSetID(),
+							new PartitionInfo[]{
+									new PartitionInfo(partitionId, eid1, PartitionInfo.PartitionLocation.LOCAL, null)
+							},
+							0);
 
 			final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1,
-					new Configuration(), new Configuration(), Sender.class.getName(),
-					Collections.singletonList(new GateDeploymentDescriptor(Collections.singletonList(cdd))),
-					Collections.<GateDeploymentDescriptor>emptyList(),
+					new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
+					irpdd, Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 
 			final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7,
-					new Configuration(), new Configuration(), ReceiverBlocking.class.getName(),
-					Collections.<GateDeploymentDescriptor>emptyList(),
-					Collections.singletonList(new GateDeploymentDescriptor(Collections.singletonList(cdd))),
+					new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
+					Collections.<PartitionDeploymentDescriptor>emptyList(),
+					Collections.singletonList(ircdd),
 					new ArrayList<BlobKey>(), 0);
 
 			final FiniteDuration d = duration("1 second");
 
 			new Within(d){
-			
+
 				@Override
 				protected void run() {
 					try {
@@ -466,7 +478,6 @@ public class TaskManagerTest {
 		}};
 	}
 
-	
 	// --------------------------------------------------------------------------------------------
 
 	public static class SimpleJobManager extends UntypedActor{
@@ -483,18 +494,13 @@ public class TaskManagerTest {
 		}
 	}
 
-	public static class SimpleLookupJobManager extends SimpleJobManager{
-		private final ChannelID receiverID;
+	public static class SimpleLookupJobManager extends SimpleJobManager {
 
-		public SimpleLookupJobManager(ChannelID receiverID){
-			this.receiverID = receiverID;
-		}
 		@Override
 		public void onReceive(Object message) throws Exception {
-			if(message instanceof JobManagerMessages.LookupConnectionInformation){
-				getSender().tell(new JobManagerMessages.ConnectionInformation(ConnectionInfoLookupResponse
-						.createReceiverFoundAndReady(receiverID)), getSelf());
-			}else{
+			if (message instanceof JobManagerMessages.ScheduleOrUpdateConsumers) {
+				getSender().tell(new JobManagerMessages.ConsumerNotificationResult(true, scala.Option.<Throwable>apply(null)), getSelf());
+			} else {
 				super.onReceive(message);
 			}
 		}
@@ -502,10 +508,6 @@ public class TaskManagerTest {
 
 	public static class SimpleLookupFailingUpdateJobManager extends SimpleLookupJobManager{
 
-		public SimpleLookupFailingUpdateJobManager(ChannelID receiverID) {
-			super(receiverID);
-		}
-
 		@Override
 		public void onReceive(Object message) throws Exception{
 			if(message instanceof JobManagerMessages.UpdateTaskExecutionState){
@@ -516,32 +518,19 @@ public class TaskManagerTest {
 		}
 	}
 
-	@SuppressWarnings("serial")
 	public static class SimpleLookupJobManagerCreator implements Creator<SimpleLookupJobManager>{
-		private final ChannelID receiverID;
-
-		public SimpleLookupJobManagerCreator(ChannelID receiverID){
-			this.receiverID = receiverID;
-		}
 
 		@Override
 		public SimpleLookupJobManager create() throws Exception {
-			return new SimpleLookupJobManager(receiverID);
+			return new SimpleLookupJobManager();
 		}
 	}
 
-	@SuppressWarnings("serial")
-	public static class SimpleLookupFailingUpdateJobManagerCreator implements
-			Creator<SimpleLookupFailingUpdateJobManager>{
-		private final ChannelID receiverID;
-
-		public SimpleLookupFailingUpdateJobManagerCreator(ChannelID receiverID){
-			this.receiverID = receiverID;
-		}
+	public static class SimpleLookupFailingUpdateJobManagerCreator implements Creator<SimpleLookupFailingUpdateJobManager>{
 
 		@Override
 		public SimpleLookupFailingUpdateJobManager create() throws Exception {
-			return new SimpleLookupFailingUpdateJobManager(receiverID);
+			return new SimpleLookupFailingUpdateJobManager();
 		}
 	}
 
@@ -591,58 +580,4 @@ public class TaskManagerTest {
 			}
 		}
 	}
-	
-	public static final class Sender extends AbstractInvokable {
-
-		private RecordWriter<IntegerRecord> writer;
-		
-		@Override
-		public void registerInputOutput() {
-			writer = new RecordWriter<IntegerRecord>(this);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			writer.initializeSerializers();
-			writer.emit(new IntegerRecord(42));
-			writer.emit(new IntegerRecord(1337));
-			writer.flush();
-		}
-	}
-	
-	public static final class Receiver extends AbstractInvokable {
-
-		private RecordReader<IntegerRecord> reader;
-		
-		@Override
-		public void registerInputOutput() {
-			reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			IntegerRecord i1 = reader.next();
-			IntegerRecord i2 = reader.next();
-			IntegerRecord i3 = reader.next();
-			
-			if (i1.getValue() != 42 || i2.getValue() != 1337 || i3 != null) {
-				throw new Exception("Wrong Data Received");
-			}
-		}
-	}
-	
-	public static final class ReceiverBlocking extends AbstractInvokable {
-
-		@Override
-		public void registerInputOutput() {
-			new RecordReader<IntegerRecord>(this, IntegerRecord.class);
-		}
-
-		@Override
-		public void invoke() throws Exception {
-			synchronized(this) {
-				wait();
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 4b0a6ec..89d1c4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -18,23 +18,18 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.concurrent.atomic.AtomicReference;
-
 import akka.actor.ActorRef;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.RuntimeEnvironment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.MockNetworkEnvironment;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -44,6 +39,19 @@ import org.apache.flink.util.ExceptionUtils;
 import org.junit.Test;
 import org.mockito.Matchers;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class TaskTest {
 
 	@Test
@@ -257,16 +265,16 @@ public class TaskTest {
 			
 			TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
 					new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
-					Collections.<GateDeploymentDescriptor>emptyList(), 
-					Collections.<GateDeploymentDescriptor>emptyList(),
+					Collections.<PartitionDeploymentDescriptor>emptyList(),
+					Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 			
 			Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
 			
-			RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, getClass().getClassLoader(),
+			RuntimeEnvironment env = new RuntimeEnvironment(mock(ActorRef.class), task, tdd, getClass().getClassLoader(),
 					mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
-					mock(ActorRef.class), new BroadcastVariableManager());
-			
+					new BroadcastVariableManager(), MockNetworkEnvironment.getMock());
+
 			task.setEnvironment(env);
 			
 			assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
@@ -295,16 +303,16 @@ public class TaskTest {
 			
 			TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7,
 					new Configuration(), new Configuration(), TestInvokableWithException.class.getName(),
-					Collections.<GateDeploymentDescriptor>emptyList(), 
-					Collections.<GateDeploymentDescriptor>emptyList(),
+					Collections.<PartitionDeploymentDescriptor>emptyList(),
+					Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 			
 			Task task = new Task(jid, vid, 2, 7, eid, "TestTask", taskManager);
 			
-			RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, getClass().getClassLoader(),
+			RuntimeEnvironment env = new RuntimeEnvironment(mock(ActorRef.class), task, tdd, getClass().getClassLoader(),
 					mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
-					mock(ActorRef.class), new BroadcastVariableManager());
-			
+					new BroadcastVariableManager(), MockNetworkEnvironment.getMock());
+
 			task.setEnvironment(env);
 			
 			assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
index 6bfea64..d9bd232 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 
 public class DiscardingRecycler implements BufferRecycler {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestBufferProvider.java
deleted file mode 100644
index 844f8aa..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestBufferProvider.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.testutils;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.BufferRecycler;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-
-public class TestBufferProvider implements BufferProvider {
-	
-	private final BufferRecycler recycler = new DiscardingRecycler();
-	
-	private final Random rnd = new Random();
-	
-	private final int sizeOfMemorySegments;
-	
-	private final float probabilityForNoneAvailable;
-	
-	
-	public TestBufferProvider(int sizeOfMemorySegments) {
-		this(sizeOfMemorySegments, -1.0f);
-	}
-	
-	public TestBufferProvider(int sizeOfMemorySegments, float probabilityForNoneAvailable) {
-		this.sizeOfMemorySegments = sizeOfMemorySegments;
-		this.probabilityForNoneAvailable = probabilityForNoneAvailable;
-	}
-
-	@Override
-	public Buffer requestBuffer(int sizeOfBuffer) throws IOException {
-		if (rnd.nextFloat() < this.probabilityForNoneAvailable) {
-			return null;
-		} else {
-			MemorySegment segment = new MemorySegment(new byte[this.sizeOfMemorySegments]);
-			return new Buffer(segment, sizeOfBuffer, this.recycler);
-		}
-	}
-
-	@Override
-	public Buffer requestBufferBlocking(int sizeOfBuffer) throws IOException, InterruptedException {
-		MemorySegment segment = new MemorySegment(new byte[this.sizeOfMemorySegments]);
-		return new Buffer(segment, sizeOfBuffer, this.recycler);
-	}
-
-	@Override
-	public int getBufferSize() {
-		return Integer.MAX_VALUE;
-	}
-	
-	@Override
-	public void reportAsynchronousEvent() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener) {
-		throw new UnsupportedOperationException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java
index 6bd45ec..a2587a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java
@@ -50,28 +50,33 @@ public class AtomicDisposableReferenceCounterTest {
 	public void testConcurrentIncrementAndDecrement() throws InterruptedException, ExecutionException, TimeoutException {
 		final Random random = new Random();
 
-		final ExecutorService executor = Executors.newCachedThreadPool();
+		final ExecutorService executor = Executors.newFixedThreadPool(2);
 
-		final MockIncrementer incrementer = new MockIncrementer();
+		try {
+			final MockIncrementer incrementer = new MockIncrementer();
 
-		final MockDecrementer decrementer = new MockDecrementer();
+			final MockDecrementer decrementer = new MockDecrementer();
 
-		// Repeat this to provoke races
-		for (int i = 0; i < 256; i++) {
-			final AtomicDisposableReferenceCounter counter = new AtomicDisposableReferenceCounter();
-			incrementer.setCounter(counter);
-			decrementer.setCounter(counter);
+			// Repeat this to provoke races
+			for (int i = 0; i < 256; i++) {
+				final AtomicDisposableReferenceCounter counter = new AtomicDisposableReferenceCounter();
+				incrementer.setCounter(counter);
+				decrementer.setCounter(counter);
 
-			counter.incrementReferenceCounter();
+				counter.incrementReferenceCounter();
 
-			// Randomly decide which one should be first as the first task usually will win the race
-			boolean incrementFirst = random.nextBoolean();
+				// Randomly decide which one should be first as the first task usually will win the race
+				boolean incrementFirst = random.nextBoolean();
 
-			Future<Boolean> success1 = executor.submit(incrementFirst ? incrementer : decrementer);
-			Future<Boolean> success2 = executor.submit(incrementFirst ? decrementer : incrementer);
+				Future<Boolean> success1 = executor.submit(incrementFirst ? incrementer : decrementer);
+				Future<Boolean> success2 = executor.submit(incrementFirst ? decrementer : incrementer);
 
-			// Only one of the two should win the race and return true
-			assertTrue(success1.get() ^ success2.get());
+				// Only one of the two should win the race and return true
+				assertTrue(success1.get() ^ success2.get());
+			}
+		}
+		finally {
+			executor.shutdownNow();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java
new file mode 100644
index 0000000..ac6744e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.util;
+
+import org.junit.Assert;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
+import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
+import org.apache.flink.runtime.io.network.api.serialization.types.Util;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+
+public class DataInputOutputSerializerTest {
+
+	@Test
+	public void testWrapAsByteBuffer() {
+		SerializationTestType randomInt = Util.randomRecord(SerializationTestTypeFactory.INT);
+
+		DataOutputSerializer serializer = new DataOutputSerializer(randomInt.length());
+		MemorySegment segment = new MemorySegment(new byte[randomInt.length()]);
+
+		try {
+			// empty buffer, read buffer should be empty
+			ByteBuffer wrapper = serializer.wrapAsByteBuffer();
+
+			Assert.assertEquals(0, wrapper.position());
+			Assert.assertEquals(0, wrapper.limit());
+
+			// write to data output, read buffer should still be empty
+			randomInt.write(serializer);
+
+			Assert.assertEquals(0, wrapper.position());
+			Assert.assertEquals(0, wrapper.limit());
+
+			// get updated read buffer, read buffer should contain written data
+			wrapper = serializer.wrapAsByteBuffer();
+
+			Assert.assertEquals(0, wrapper.position());
+			Assert.assertEquals(randomInt.length(), wrapper.limit());
+
+			// clear data output, read buffer should still contain written data
+			serializer.clear();
+
+			Assert.assertEquals(0, wrapper.position());
+			Assert.assertEquals(randomInt.length(), wrapper.limit());
+
+			// get updated read buffer, should be empty
+			wrapper = serializer.wrapAsByteBuffer();
+
+			Assert.assertEquals(0, wrapper.position());
+			Assert.assertEquals(0, wrapper.limit());
+
+			// write to data output and read back to memory
+			randomInt.write(serializer);
+			wrapper = serializer.wrapAsByteBuffer();
+
+			segment.put(0, wrapper, randomInt.length());
+
+			Assert.assertEquals(randomInt.length(), wrapper.position());
+			Assert.assertEquals(randomInt.length(), wrapper.limit());
+		} catch (IOException e) {
+			e.printStackTrace();
+			Assert.fail("Test encountered an unexpected exception.");
+		}
+	}
+
+	@Test
+	public void testRandomValuesWriteRead() {
+		final int numElements = 100000;
+		final ArrayDeque<SerializationTestType> reference = new ArrayDeque<SerializationTestType>();
+
+		DataOutputSerializer serializer = new DataOutputSerializer(1);
+
+		for (SerializationTestType value : Util.randomRecords(numElements)) {
+			reference.add(value);
+
+			try {
+				value.write(serializer);
+			} catch (IOException e) {
+				e.printStackTrace();
+				Assert.fail("Test encountered an unexpected exception.");
+			}
+		}
+
+		DataInputDeserializer deserializer = new DataInputDeserializer(serializer.wrapAsByteBuffer());
+
+		for (SerializationTestType expected : reference) {
+			try {
+				SerializationTestType actual = expected.getClass().newInstance();
+				actual.read(deserializer);
+
+				Assert.assertEquals(expected, actual);
+			} catch (Exception e) {
+				e.printStackTrace();
+				Assert.fail("Test encountered an unexpected exception.");
+			}
+		}
+
+		reference.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/util/LRUCacheMapTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/LRUCacheMapTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/LRUCacheMapTest.java
new file mode 100644
index 0000000..dfc100f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/LRUCacheMapTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Random;
+
+public class LRUCacheMapTest {
+
+	@Test
+	public void testGetAndRemoveLRUWhenEmpty() {
+		final LRUCacheMap<Integer, Integer> lruCache = new LRUCacheMap<Integer, Integer>();
+
+		Assert.assertNull(lruCache.get(1));
+
+		Assert.assertNull(lruCache.removeLRU());
+
+		Assert.assertEquals(0, lruCache.size());
+	}
+
+	@Test
+	public void testPutGetLRURemove() {
+		final LRUCacheMap<Integer, Integer> lruCache = new LRUCacheMap<Integer, Integer>();
+
+		lruCache.put(0, 0);
+		lruCache.put(1, 1);
+		lruCache.put(2, 2);
+		lruCache.put(3, 3);
+		lruCache.put(4, 4); // LRU -> 0, 1, 2, 3, 4
+
+		Assert.assertEquals(0, lruCache.getLRU().intValue()); // 1, 2, 3, 4, 0
+
+		Assert.assertEquals(1, lruCache.getLRU().intValue()); // 2, 3, 4, 0, 1
+
+		lruCache.put(5, 5); // 2, 3, 4, 0, 1, 5
+
+		Assert.assertEquals(2, lruCache.getLRU().intValue()); // 2, 3, 4, 0, 1, 5
+		Assert.assertEquals(3, lruCache.getLRU().intValue()); // 3, 4, 0, 1, 5, 2
+		Assert.assertEquals(4, lruCache.getLRU().intValue()); // 4, 0, 1, 5, 2, 3
+		Assert.assertEquals(0, lruCache.getLRU().intValue()); // 0, 1, 5, 2, 3, 4
+		Assert.assertEquals(1, lruCache.getLRU().intValue()); // 1, 5, 2, 3, 4, 0
+		Assert.assertEquals(5, lruCache.getLRU().intValue()); // 5, 2, 3, 4, 0, 1
+		Assert.assertEquals(2, lruCache.getLRU().intValue()); // 2, 3, 4, 0, 1, 5
+
+		Assert.assertEquals(2, lruCache.remove(2).intValue()); // 3, 4, 0, 1, 5
+		Assert.assertEquals(3, lruCache.removeLRU().intValue()); // 4, 0, 1, 5
+		Assert.assertEquals(4, lruCache.removeLRU().intValue()); // 0, 1, 5
+		Assert.assertEquals(0, lruCache.removeLRU().intValue()); // 1, 5
+		Assert.assertEquals(1, lruCache.removeLRU().intValue()); // 5
+		Assert.assertEquals(5, lruCache.removeLRU().intValue());
+
+		Assert.assertTrue(lruCache.isEmpty());
+	}
+
+	@Test
+	public void testPutGetRemoveLRU() {
+		final LRUCacheMap<Integer, Integer> lruCache = new LRUCacheMap<Integer, Integer>();
+
+		lruCache.put(0, 0);
+		lruCache.put(1, 1);
+		lruCache.put(2, 2);
+		lruCache.put(3, 3);
+		lruCache.put(4, 4); // LRU -> 0, 1, 2, 3, 4
+
+		lruCache.get(1); // 0, 2, 3, 4, 1
+
+		Assert.assertEquals(0, lruCache.removeLRU().intValue()); // 2, 3, 4, 1
+
+		lruCache.get(2); // 3, 4, 1, 2
+
+		lruCache.put(5, 5); // 3, 4, 1, 2, 5
+
+		Assert.assertEquals(3, lruCache.removeLRU().intValue());
+		Assert.assertEquals(4, lruCache.removeLRU().intValue());
+		Assert.assertEquals(1, lruCache.removeLRU().intValue());
+		Assert.assertEquals(2, lruCache.removeLRU().intValue());
+		Assert.assertEquals(5, lruCache.removeLRU().intValue());
+	}
+
+	@Test
+	public void testPutAndRemoveLRU() {
+		final LRUCacheMap<Integer, Integer> lruCache = new LRUCacheMap<Integer, Integer>();
+
+		final int numEntries = 100;
+
+		// --------------------------------------------------------------------
+
+		for (int i = 0; i < numEntries; i++) {
+			// 1. Add random entries to the cache,
+			lruCache.put(i, i);
+		}
+
+		for (int i = 0; i < numEntries; i++) {
+			// 2. remove the least recently used element
+			int lru = lruCache.removeLRU();
+
+			// 3. and verify that it's in insertion order.
+			Assert.assertEquals(i, lru);
+		}
+
+		// Verify that the cache is empty after all entries have been removed
+		Assert.assertEquals(0, lruCache.size());
+		Assert.assertNull(lruCache.removeLRU());
+	}
+
+	@Test
+	public void testPutRandomGetRemoveLRU() {
+		final LRUCacheMap<Integer, Integer> lruCache = new LRUCacheMap<Integer, Integer>();
+
+		final Random random = new Random();
+
+		final int numEntries = 5;
+
+		final int[] expectedLruOrder = new int[numEntries];
+
+		// --------------------------------------------------------------------
+
+		for (int i = 0; i < numEntries; i++) {
+			// 1. Add ascending numbers to the cache,
+			lruCache.put(i, i);
+
+			// Keeps track of the expected LRU access order for the element
+			// with key i (the array index). Initially, LRU order is same as
+			// the insertion order.
+			expectedLruOrder[i] = i;
+		}
+
+		for (int i = 0; i < numEntries * 10; i++) {
+			final int randomKey = random.nextInt(numEntries);
+
+			int currentPosition = expectedLruOrder[randomKey];
+
+			for (int j = 0; j < numEntries; j++) {
+				if (expectedLruOrder[j] > currentPosition) {
+					expectedLruOrder[j]--;
+				}
+			}
+
+			expectedLruOrder[randomKey] = numEntries - 1;
+
+			// 2. access random entries,
+			lruCache.get(randomKey);
+		}
+
+		for (int i = 0; i < numEntries; i++) {
+			// 3. remove the least recently used element,
+			int lru = lruCache.removeLRU();
+
+			// 4. and verify that it's in LRU access order.
+			Assert.assertEquals(expectedLruOrder[lru], i);
+		}
+
+		// Verify that the cache is empty after all entries have been removed
+		Assert.assertEquals(0, lruCache.size());
+		Assert.assertNull(lruCache.removeLRU());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/EventNotificationHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/EventNotificationHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/EventNotificationHandlerTest.java
new file mode 100644
index 0000000..625b93f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/EventNotificationHandlerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.util.event;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.task.IntegerTaskEvent;
+import org.apache.flink.runtime.event.task.StringTaskEvent;
+import org.junit.Test;
+
+/**
+ * This class contains unit tests for the {@link EventNotificationHandler}.
+ * 
+ */
+public class EventNotificationHandlerTest {
+	/**
+	 * A test implementation of an {@link EventListener}.
+	 * 
+	 */
+	private static class TestEventListener implements EventListener<TaskEvent> {
+
+		/**
+		 * The event that was last received by this event listener.
+		 */
+		private TaskEvent receivedEvent = null;
+
+		/**
+		 * {@inheritDoc}
+		 * @param event
+		 */
+		@Override
+		public void onEvent(TaskEvent event) {
+
+			this.receivedEvent = event;
+		}
+
+		/**
+		 * Returns the event which was last received by this event listener. If no event
+		 * has been received so far the return value is <code>null</code>.
+		 * 
+		 * @return the event which was last received, possibly <code>null</code>
+		 */
+		public TaskEvent getLastReceivedEvent() {
+
+			return this.receivedEvent;
+		}
+	}
+
+	/**
+	 * Tests the publish/subscribe mechanisms implemented in the {@link EventNotificationHandler}.
+	 */
+	@Test
+	public void testEventNotificationManager() {
+
+		final EventNotificationHandler evm = new EventNotificationHandler();
+		final TestEventListener listener = new TestEventListener();
+
+		evm.subscribe(listener, StringTaskEvent.class);
+
+		final StringTaskEvent stringTaskEvent1 = new StringTaskEvent("Test 1");
+		final StringTaskEvent stringTaskEvent2 = new StringTaskEvent("Test 2");
+
+		evm.publish(stringTaskEvent1);
+		evm.publish(new IntegerTaskEvent(5));
+
+		assertNotNull(listener.getLastReceivedEvent());
+		StringTaskEvent receivedStringEvent = (StringTaskEvent) listener.getLastReceivedEvent();
+		assertEquals(stringTaskEvent1, receivedStringEvent);
+
+		evm.unsubscribe(listener, StringTaskEvent.class);
+
+		evm.publish(stringTaskEvent2);
+
+		assertNotNull(listener.getLastReceivedEvent());
+		receivedStringEvent = (StringTaskEvent) listener.getLastReceivedEvent();
+		assertEquals(stringTaskEvent1, receivedStringEvent);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index c496975..020231e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -207,14 +207,14 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       sender1.setInvokableClass(classOf[Sender])
       sender2.setInvokableClass(classOf[Sender])
-      receiver.setInvokableClass(classOf[AgnosticReceiver])
+      receiver.setInvokableClass(classOf[AgnosticTertiaryReceiver])
 
       sender1.setParallelism(num_tasks)
       sender2.setParallelism(2 * num_tasks)
       receiver.setParallelism(3 * num_tasks)
 
       receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
-      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
+      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL)
 
       val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
 
@@ -251,7 +251,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       receiver.setParallelism(3 * num_tasks)
 
       receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
-      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
+      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL)
 
       val jobGraph = new JobGraph("Bipartite Job", sender1, receiver, sender2)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index 820f43f..57a49cf 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -20,18 +20,14 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern,
-AbstractJobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks.{AgnosticBinaryReceiver, Receiver}
+import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph}
+import org.apache.flink.runtime.jobmanager.Tasks.{Sender, AgnosticBinaryReceiver, Receiver}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmissionSuccess,
-SubmitJob}
-import org.apache.flink.runtime.taskmanager.TaskManagerTest.Sender
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmissionSuccess, SubmitJob}
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
-import org.scalatest.{Matchers, WordSpecLike, BeforeAndAfterAll}
-import scala.concurrent.duration._
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
 @RunWith(classOf[JUnitRunner])
 class SlotSharingITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with
@@ -103,7 +99,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       receiver.setSlotSharingGroup(sharingGroup)
 
       receiver.connectNewDataSetAsInput(sender1, DistributionPattern.POINTWISE)
-      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.BIPARTITE)
+      receiver.connectNewDataSetAsInput(sender2, DistributionPattern.ALL_TO_ALL)
 
       val jobGraph = new JobGraph("Bipartite job", sender1, sender2, receiver)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
index efeb397..912ed95 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
@@ -18,20 +18,16 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.{PoisonPill, ActorSystem}
+import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern,
-AbstractJobVertex}
+import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, SubmissionSuccess,
-SubmitJob}
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning,
-WaitForAllVerticesToBeRunning}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, SubmissionSuccess, SubmitJob}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning, WaitForAllVerticesToBeRunningOrFinished}
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
-import org.scalatest.{WordSpecLike, Matchers, BeforeAndAfterAll}
-import scala.concurrent.duration._
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
 @RunWith(classOf[JUnitRunner])
 class TaskManagerFailsITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
@@ -67,7 +63,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
           jm ! SubmitJob(jobGraph)
           expectMsg(SubmissionSuccess(jobGraph.getJobID))
 
-          jm ! WaitForAllVerticesToBeRunning(jobID)
+          jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
           expectMsg(AllVerticesRunning(jobID))
 
           // kill one task manager

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index 4cc3bae..cede7f2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -18,22 +18,17 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.{PoisonPill, ActorSystem}
+import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern,
-AbstractJobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks.{Sender, BlockingReceiver}
+import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph}
+import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
-import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, SubmissionSuccess,
-SubmitJob}
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning,
-WaitForAllVerticesToBeRunning}
+import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, SubmissionSuccess, SubmitJob}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning, WaitForAllVerticesToBeRunningOrFinished}
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, WordSpecLike, Matchers}
-
-import scala.concurrent.duration._
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
 @RunWith(classOf[JUnitRunner])
 class TaskManagerFailsWithSlotSharingITCase(_system: ActorSystem) extends TestKit(_system) with
@@ -75,7 +70,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
           jm ! SubmitJob(jobGraph)
           expectMsg(SubmissionSuccess(jobGraph.getJobID))
 
-          jm ! WaitForAllVerticesToBeRunning(jobID)
+          jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
           expectMsg(AllVerticesRunning(jobID))
 
           //kill task manager

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index fb2daf6..316f340 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import org.apache.flink.runtime.io.network.api.{RecordReader, RecordWriter}
+import org.apache.flink.runtime.io.network.api.reader.RecordReader
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
 import org.apache.flink.runtime.types.IntegerRecord
 
@@ -53,12 +54,11 @@ object Tasks {
   class Sender extends AbstractInvokable{
     var writer: RecordWriter[IntegerRecord] = _
     override def registerInputOutput(): Unit = {
-      writer = new RecordWriter[IntegerRecord](this)
+      writer = new RecordWriter[IntegerRecord](getEnvironment().getWriter(0))
     }
 
     override def invoke(): Unit = {
       try{
-        writer.initializeSerializers()
         writer.emit(new IntegerRecord(42))
         writer.emit(new IntegerRecord(1337))
         writer.flush()
@@ -72,7 +72,9 @@ object Tasks {
     var reader: RecordReader[IntegerRecord] = _
 
     override def registerInputOutput(): Unit = {
-      reader = new RecordReader[IntegerRecord](this, classOf[IntegerRecord])
+      val env = getEnvironment()
+
+      reader = new RecordReader[IntegerRecord](env.getReader(0), classOf[IntegerRecord])
     }
 
     override def invoke(): Unit = {
@@ -115,7 +117,9 @@ object Tasks {
     var reader: RecordReader[IntegerRecord] = _
 
     override def registerInputOutput(): Unit = {
-      reader = new RecordReader[IntegerRecord](this, classOf[IntegerRecord])
+      val env = getEnvironment()
+
+      reader = new RecordReader[IntegerRecord](env.getReader(0), classOf[IntegerRecord])
     }
 
     override def invoke(): Unit = {
@@ -128,13 +132,35 @@ object Tasks {
     var reader2: RecordReader[IntegerRecord] = _
 
     override def registerInputOutput(): Unit = {
-      reader1 = new RecordReader[IntegerRecord](this, classOf[IntegerRecord])
-      reader2 = new RecordReader[IntegerRecord](this, classOf[IntegerRecord])
+      val env = getEnvironment()
+
+      reader1 = new RecordReader[IntegerRecord](env.getReader(0), classOf[IntegerRecord])
+      reader2 = new RecordReader[IntegerRecord](env.getReader(1), classOf[IntegerRecord])
+    }
+
+    override def invoke(): Unit = {
+      while(reader1.next() != null){}
+      while(reader2.next() != null){}
+    }
+  }
+
+  class AgnosticTertiaryReceiver extends AbstractInvokable {
+    var reader1: RecordReader[IntegerRecord] = _
+    var reader2: RecordReader[IntegerRecord] = _
+    var reader3: RecordReader[IntegerRecord] = _
+
+    override def registerInputOutput(): Unit = {
+      val env = getEnvironment()
+
+      reader1 = new RecordReader[IntegerRecord](env.getReader(0), classOf[IntegerRecord])
+      reader2 = new RecordReader[IntegerRecord](env.getReader(1), classOf[IntegerRecord])
+      reader3 = new RecordReader[IntegerRecord](env.getReader(2), classOf[IntegerRecord])
     }
 
     override def invoke(): Unit = {
       while(reader1.next() != null){}
       while(reader2.next() != null){}
+      while(reader3.next() != null){}
     }
   }
 
@@ -142,12 +168,10 @@ object Tasks {
     var writer: RecordWriter[IntegerRecord] = _
 
     override def registerInputOutput(): Unit = {
-      writer = new RecordWriter[IntegerRecord](this)
+      writer = new RecordWriter[IntegerRecord](getEnvironment().getWriter(0))
     }
 
     override def invoke(): Unit = {
-      writer.initializeSerializers()
-
       throw new Exception("Test exception")
     }
   }
@@ -156,12 +180,10 @@ object Tasks {
     var writer: RecordWriter[IntegerRecord] = _
 
     override def registerInputOutput(): Unit = {
-      writer = new RecordWriter[IntegerRecord](this)
+      writer = new RecordWriter[IntegerRecord](getEnvironment().getWriter(0))
     }
 
     override def invoke(): Unit = {
-      writer.initializeSerializers()
-
       if(Math.random() < 0.05){
         throw new Exception("Test exception")
       }else{
@@ -173,7 +195,7 @@ object Tasks {
 
   class ExceptionReceiver extends AbstractInvokable {
     override def registerInputOutput(): Unit = {
-      new RecordReader[IntegerRecord](this, classOf[IntegerRecord])
+      new RecordReader[IntegerRecord](getEnvironment().getReader(0), classOf[IntegerRecord])
     }
 
     override def invoke(): Unit = {
@@ -197,7 +219,7 @@ object Tasks {
     }
 
     override def registerInputOutput(): Unit = {
-      new RecordWriter[IntegerRecord](this)
+      new RecordWriter[IntegerRecord](getEnvironment().getWriter(0))
     }
 
     override def invoke(): Unit = {
@@ -208,7 +230,7 @@ object Tasks {
 
   class BlockingReceiver extends AbstractInvokable {
     override def registerInputOutput(): Unit = {
-      new RecordReader[IntegerRecord](this, classOf[IntegerRecord])
+      new RecordReader[IntegerRecord](getEnvironment.getReader(0), classOf[IntegerRecord])
     }
 
     override def invoke(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 059b7fa..3b86a35 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -38,6 +38,9 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
 
   val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
 
+  val waitForAllVerticesToBeRunningOrFinished =
+    scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+
   override def archiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist)
 
   abstract override def receiveWithLogMessages: Receive = {
@@ -62,8 +65,19 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
         val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
         waitForAllVerticesToBeRunning += jobID -> (waiting + sender)
       }
+    case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
+      if(checkIfAllVerticesRunningOrFinished(jobID)){
+        sender ! AllVerticesRunning(jobID)
+      }else{
+        currentJobs.get(jobID) match {
+          case Some((eg, _)) => eg.registerExecutionListener(self)
+          case None =>
+        }
+        val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
+        waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender)
+      }
     case ExecutionStateChanged(jobID, _, _, _, _, _, _, _, _) =>
-      val cleanup = waitForAllVerticesToBeRunning.get(jobID) match {
+      val cleanupRunning = waitForAllVerticesToBeRunning.get(jobID) match {
         case Some(listeners) if checkIfAllVerticesRunning(jobID) =>
           for(listener <- listeners){
             listener ! AllVerticesRunning(jobID)
@@ -72,9 +86,22 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
         case _ => false
       }
 
-      if(cleanup){
+      if(cleanupRunning){
         waitForAllVerticesToBeRunning.remove(jobID)
       }
+
+      val cleanupRunningOrFinished = waitForAllVerticesToBeRunningOrFinished.get(jobID) match {
+        case Some(listeners) if checkIfAllVerticesRunningOrFinished(jobID) =>
+          for(listener <- listeners){
+            listener ! AllVerticesRunning(jobID)
+          }
+          true
+        case _ => false
+      }
+
+      if (cleanupRunningOrFinished) {
+        waitForAllVerticesToBeRunningOrFinished.remove(jobID)
+      }
     case NotifyWhenJobRemoved(jobID) => {
       val tms = instanceManager.getAllRegisteredInstances.map(_.getTaskManager)
 
@@ -97,4 +124,16 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
       case None => false
     }
   }
+
+  def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
+    currentJobs.get(jobID) match {
+      case Some((eg, _)) =>
+        eg.getAllExecutionVertices.forall {
+          case vertex =>
+            (vertex.getExecutionState == ExecutionState.RUNNING
+              || vertex.getExecutionState == ExecutionState.FINISHED)
+        }
+      case None => false
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 7941226..cdd81cd 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -35,6 +35,7 @@ object TestingJobManagerMessages {
   case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph
 
   case class WaitForAllVerticesToBeRunning(jobID: JobID)
+  case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID)
   case class AllVerticesRunning(jobID: JobID)
 
   case class NotifyWhenJobRemoved(jobID: JobID)

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index b1aa437..f1fe3e5 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,13 +19,14 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
+import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.jobgraph.JobID
+import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import org.apache.flink.runtime.{ActorLogMessages}
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
+
 import scala.concurrent.duration._
 
 trait TestingTaskManager extends ActorLogMessages {
@@ -62,7 +63,16 @@ trait TestingTaskManager extends ActorLogMessages {
       sender ! ResponseBroadcastVariablesWithReferences(
         bcVarManager.getNumberOfVariablesWithReferences)
     }
-    
+
+    case RequestNumActiveConnections => {
+      networkEnvironment match {
+        case Some(ne) => sender ! ResponseNumActiveConnections(
+          ne.getConnectionManager.getNumberOfActiveConnections)
+
+        case None => sender ! ResponseNumActiveConnections(0)
+      }
+    }
+
     case NotifyWhenJobRemoved(jobID) => {
       if(runningTasks.values.exists(_.getJobID == jobID)){
         val set = waitForJobRemoval.getOrElse(jobID, Set())

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
index 38cc829..bbbf3d2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -36,6 +36,9 @@ object TestingTaskManagerMessages {
   
   case class ResponseBroadcastVariablesWithReferences(number: Int)
 
+  case object RequestNumActiveConnections
+  case class ResponseNumActiveConnections(number: Int)
+
   case class CheckIfJobRemoved(jobID: JobID)
   
   case object RequestRunningTasks

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index b4da64d..1a1098d 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -18,20 +18,18 @@
 
 package org.apache.flink.test.util;
 
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import scala.concurrent.duration.FiniteDuration;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.junit.Assert;
-import scala.concurrent.duration.FiniteDuration;
-
 public abstract class AbstractTestBase extends TestBaseUtils {
 
 
@@ -66,9 +64,6 @@ public abstract class AbstractTestBase extends TestBaseUtils {
 	public void stopCluster() throws Exception {
 		stopCluster(executor, timeout);
 
-			int numActiveConnections = 0;
-
-			Assert.assertEquals("Not all network connections were released.", 0, numActiveConnections);
 		deleteAllTempFiles();
 	}
 


Mime
View raw message