flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [03/13] flink git commit: [FLINK-1350] [runtime] Add blocking result partition variant
Date Wed, 18 Mar 2015 16:48:52 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 007ad5e..68b0e6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -22,8 +22,8 @@ import akka.actor.ActorRef;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.RuntimeEnvironment;
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.util.ExceptionUtils;
 import org.junit.Test;
-import org.mockito.Matchers;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -51,7 +50,6 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -67,7 +65,7 @@ public class TaskTest {
 			final RuntimeEnvironment env = mock(RuntimeEnvironment.class);
 			
 			Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender()));
-			doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), any(Throwable.class));
+			doNothing().when(task).unregisterTask();
 			task.setEnvironment(env);
 			
 			assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
@@ -86,8 +84,8 @@ public class TaskTest {
 			
 			task.markFailed(new Exception("test"));
 			assertTrue(ExecutionState.CANCELED == task.getExecutionState());
-			
-			verify(task, times(1)).notifyExecutionStateChange(ExecutionState.CANCELED, null);
+
+			verify(task).unregisterTask();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -103,11 +101,12 @@ public class TaskTest {
 			final ExecutionAttemptID eid = new ExecutionAttemptID();
 			
 			final Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender()));
-			doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), any(Throwable.class));
+			doNothing().when(task).unregisterTask();
 			
 			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 			
 			Thread operation = new Thread() {
+				@Override
 				public void run() {
 					try {
 						assertTrue(task.markAsFinished());
@@ -135,8 +134,8 @@ public class TaskTest {
 			}
 			
 			assertEquals(ExecutionState.FINISHED, task.getExecutionState());
-			
-			verify(task).notifyExecutionStateChange(ExecutionState.FINISHED, null);
+
+			verify(task).unregisterTask();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -152,11 +151,12 @@ public class TaskTest {
 			final ExecutionAttemptID eid = new ExecutionAttemptID();
 			
 			final Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender()));
-			doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), any(Throwable.class));
-			
+			doNothing().when(task).unregisterTask();
+
 			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 			
 			Thread operation = new Thread() {
+				@Override
 				public void run() {
 					try {
 						task.markFailed(new Exception("test exception message"));
@@ -185,7 +185,7 @@ public class TaskTest {
 			
 			// make sure the final state is correct and the task manager knows the changes
 			assertEquals(ExecutionState.FAILED, task.getExecutionState());
-			verify(task).notifyExecutionStateChange(Matchers.eq(ExecutionState.FAILED), any(Throwable.class));
+			verify(task).unregisterTask();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -201,7 +201,7 @@ public class TaskTest {
 			final ExecutionAttemptID eid = new ExecutionAttemptID();
 
 			final Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender()));
-			doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), any(Throwable.class));
+			doNothing().when(task).unregisterTask();
 			
 			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 			
@@ -210,6 +210,7 @@ public class TaskTest {
 			final OneShotLatch afterCanceling = new OneShotLatch();
 			
 			Thread operation = new Thread() {
+				@Override
 				public void run() {
 					try {
 						toRunning.trigger();
@@ -245,7 +246,7 @@ public class TaskTest {
 			
 			// make sure the final state is correct and the task manager knows the changes
 			assertEquals(ExecutionState.CANCELED, task.getExecutionState());
-			verify(task).notifyExecutionStateChange(ExecutionState.CANCELED, null);
+			verify(task).unregisterTask();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -262,12 +263,12 @@ public class TaskTest {
 			
 			TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask",
2, 7,
 					new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(),
-					Collections.<PartitionDeploymentDescriptor>emptyList(),
-					Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+					Collections.<InputGateDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 			
 			Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender()));
-			doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), any(Throwable.class));
+			doNothing().when(task).unregisterTask();
 			
 			RuntimeEnvironment env = new RuntimeEnvironment(mock(ActorRef.class), task, tdd, getClass().getClassLoader(),
 					mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
@@ -281,8 +282,8 @@ public class TaskTest {
 			task.getEnvironment().getExecutingThread().join();
 			
 			assertEquals(ExecutionState.FINISHED, task.getExecutionState());
-			
-			verify(task).notifyExecutionStateChange(ExecutionState.FINISHED, null);
+
+			verify(task).unregisterTask();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -299,12 +300,12 @@ public class TaskTest {
 			
 			TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask",
2, 7,
 					new Configuration(), new Configuration(), TestInvokableWithException.class.getName(),
-					Collections.<PartitionDeploymentDescriptor>emptyList(),
-					Collections.<PartitionConsumerDeploymentDescriptor>emptyList(),
+					Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+					Collections.<InputGateDeploymentDescriptor>emptyList(),
 					new ArrayList<BlobKey>(), 0);
 			
 			Task task = spy(new Task(jid, vid, 2, 7, eid, "TestTask", ActorRef.noSender()));
-			doNothing().when(task).notifyExecutionStateChange(any(ExecutionState.class), any(Throwable.class));
+			doNothing().when(task).unregisterTask();
 			
 			RuntimeEnvironment env = new RuntimeEnvironment(mock(ActorRef.class), task, tdd, getClass().getClassLoader(),
 					mock(MemoryManager.class), mock(IOManager.class), mock(InputSplitProvider.class),
@@ -318,11 +319,8 @@ public class TaskTest {
 			task.getEnvironment().getExecutingThread().join();
 			
 			assertEquals(ExecutionState.FAILED, task.getExecutionState());
-			
-			verify(task).notifyExecutionStateChange(Matchers.eq(ExecutionState.FAILED), any(Throwable.class));
-			verify(task, times(0)).notifyExecutionStateChange(ExecutionState.CANCELING, null);
-			verify(task, times(0)).notifyExecutionStateChange(ExecutionState.CANCELED, null);
-			verify(task, times(0)).notifyExecutionStateChange(ExecutionState.FINISHED, null);
+
+			verify(task).unregisterTask();
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
index d9bd232..466d1d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DiscardingRecycler.java
@@ -24,5 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 public class DiscardingRecycler implements BufferRecycler {
 
 	@Override
-	public void recycle(MemorySegment memSeg) {}
+	public void recycle(MemorySegment memSeg) {
+		memSeg.free();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java
index a2587a0..e016a50 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/AtomicDisposableReferenceCounterTest.java
@@ -37,13 +37,26 @@ public class AtomicDisposableReferenceCounterTest {
 	public void testSerialIncrementAndDecrement() {
 		AtomicDisposableReferenceCounter counter = new AtomicDisposableReferenceCounter();
 
-		assertTrue(counter.incrementReferenceCounter());
+		assertTrue(counter.increment());
 
-		assertTrue(counter.decrementReferenceCounter());
+		assertTrue(counter.decrement());
 
-		assertFalse(counter.incrementReferenceCounter());
+		assertFalse(counter.increment());
 
-		assertFalse(counter.decrementReferenceCounter());
+		assertFalse(counter.decrement());
+	}
+
+	@Test
+	public void testSerialIncrementAndDecrementWithCustomDisposeCount() {
+		AtomicDisposableReferenceCounter counter = new AtomicDisposableReferenceCounter(-2);
+
+		assertTrue(counter.increment());
+
+		assertFalse(counter.decrement());
+
+		assertFalse(counter.decrement());
+
+		assertTrue(counter.decrement());
 	}
 
 	@Test
@@ -63,7 +76,7 @@ public class AtomicDisposableReferenceCounterTest {
 				incrementer.setCounter(counter);
 				decrementer.setCounter(counter);
 
-				counter.incrementReferenceCounter();
+				counter.increment();
 
 				// Randomly decide which one should be first as the first task usually will win the race
 				boolean incrementFirst = random.nextBoolean();
@@ -90,7 +103,7 @@ public class AtomicDisposableReferenceCounterTest {
 
 		@Override
 		public Boolean call() throws Exception {
-			return counter.incrementReferenceCounter();
+			return counter.increment();
 		}
 	}
 
@@ -104,7 +117,7 @@ public class AtomicDisposableReferenceCounterTest {
 
 		@Override
 		public Boolean call() throws Exception {
-			return counter.decrementReferenceCounter();
+			return counter.decrement();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
index b7af589..be9adf4 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.io;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
@@ -31,15 +31,15 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends
RecordWrit
 
 	private OutputFlusher outputFlusher;
 
-	public StreamRecordWriter(BufferWriter writer) {
+	public StreamRecordWriter(ResultPartitionWriter writer) {
 		this(writer, new RoundRobinChannelSelector<T>(), 1000);
 	}
 
-	public StreamRecordWriter(BufferWriter writer, ChannelSelector<T> channelSelector)
{
+	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector)
{
 		this(writer, channelSelector, 1000);
 	}
 
-	public StreamRecordWriter(BufferWriter writer, ChannelSelector<T> channelSelector,
long timeout) {
+	public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
long timeout) {
 		super(writer, channelSelector);
 
 		this.timeout = timeout;
@@ -70,7 +70,7 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends
RecordWrit
 		}
 		@Override
 		public void run() {
-			while (running && !writer.isFinished()) {
+			while (running) {
 				try {
 					flush();
 					Thread.sleep(timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
index d30c241..8296010 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
@@ -26,10 +26,10 @@ import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 71b774f..6e86896 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -34,6 +34,14 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	private JobExecutionResult latestExecutionResult;
 	
 	private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM;
+
+	/**
+	 * The number of times a test should be repeated.
+	 *
+	 * <p> This is useful for runtime changes, which affect resource management. Running
certain
+	 * tests repeatedly might help to discover resource leaks, race conditions etc.
+	 */
+	private int numberOfTestRepetitions = 1;
 	
 	private boolean isCollectionExecution;
 
@@ -50,6 +58,10 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 		this.degreeOfParallelism = degreeOfParallelism;
 		setTaskManagerNumSlots(degreeOfParallelism);
 	}
+
+	public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
+		this.numberOfTestRepetitions = numberOfTestRepetitions;
+	}
 	
 	public int getDegreeOfParallelism() {
 		return isCollectionExecution ? 1 : degreeOfParallelism;
@@ -101,20 +113,24 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 			TestEnvironment env = new TestEnvironment(this.executor, this.degreeOfParallelism);
 			env.getConfig().enableObjectReuse();
 			env.setAsContext();
-			
-			// call the test program
-			try {
-				testProgram();
-				this.latestExecutionResult = env.latestResult;
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				Assert.fail("Error while calling the test program: " + e.getMessage());
+
+			// Possibly run the test multiple times
+			for (int i = 0; i < numberOfTestRepetitions; i++) {
+				// call the test program
+				try {
+					testProgram();
+					this.latestExecutionResult = env.latestResult;
+				}
+				catch (Exception e) {
+					System.err.println(e.getMessage());
+					e.printStackTrace();
+					Assert.fail("Error while calling the test program: " + e.getMessage());
+				}
+
+				Assert.assertNotNull("The test program never triggered an execution.",
+						this.latestExecutionResult);
 			}
 			
-			Assert.assertNotNull("The test program never triggered an execution.", this.latestExecutionResult);
-			
 			// post-submit
 			try {
 				postSubmit();
@@ -150,18 +166,22 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 			env.getConfig().disableObjectReuse();
 			env.setAsContext();
 
-			// call the test program
-			try {
-				testProgram();
-				this.latestExecutionResult = env.latestResult;
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				Assert.fail("Error while calling the test program: " + e.getMessage());
-			}
+			// Possibly run the test multiple times
+			for (int i = 0; i < numberOfTestRepetitions; i++) {
+				// call the test program
+				try {
+					testProgram();
+					this.latestExecutionResult = env.latestResult;
+				}
+				catch (Exception e) {
+					System.err.println(e.getMessage());
+					e.printStackTrace();
+					Assert.fail("Error while calling the test program: " + e.getMessage());
+				}
 
-			Assert.assertNotNull("The test program never triggered an execution.", this.latestExecutionResult);
+				Assert.assertNotNull("The test program never triggered an execution.",
+						this.latestExecutionResult);
+			}
 
 			// post-submit
 			try {
@@ -246,7 +266,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 						Assert.fail("Cannot compare tuple fields");
 					}
 					
-					int cmp = ((Comparable<Object>) obj1).compareTo((Comparable<Object>) obj2);
+					int cmp = ((Comparable<Object>) obj1).compareTo(obj2);
 					if (cmp != 0) {
 						return cmp;
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
index bd4e63a..b88eb4e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
@@ -28,9 +28,9 @@ public class WordCountITCase extends JavaProgramTestBase {
 	protected String resultPath;
 
 	public WordCountITCase(){
-//		setDegreeOfParallelism(4);
-//		setNumTaskManagers(2);
-//		setTaskManagerNumSlots(2);
+		setDegreeOfParallelism(4);
+		setNumTaskManagers(2);
+		setTaskManagerNumSlots(2);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansForTestITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansForTestITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansForTestITCase.java
new file mode 100644
index 0000000..732bd06
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansForTestITCase.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.localDistributed.PackagedProgramEndToEndITCase;
+import org.apache.flink.test.testdata.KMeansData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.testjar.KMeansForTest;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * This K-Means is a copy of {@link KMeansForTest} from the {@link PackagedProgramEndToEndITCase},
+ * which detected a problem with the wiring of blocking intermediate results reproducibly
with
+ * multiple runs, whereas other tests didn't.
+ *
+ * <p> The code is copied here, because the packaged program test removes the classes
from the
+ * classpath.
+ *
+ * <p> It's safe to remove this test in the future.
+ */
+public class KMeansForTestITCase extends JavaProgramTestBase {
+
+	protected String dataPath;
+	protected String clusterPath;
+	protected String resultPath;
+
+	public KMeansForTestITCase(){
+		setNumTaskManagers(2);
+		setTaskManagerNumSlots(2);
+		setNumberOfTestRepetitions(10);
+	}
+
+	@Override
+	protected void preSubmit() throws Exception {
+		dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS);
+		clusterPath = createTempFile("initial_centers.txt", KMeansData.INITIAL_CENTERS);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		int numIterations = 20;
+
+		// get input data
+		DataSet<Point> points = env.readCsvFile(dataPath)
+				.fieldDelimiter("|")
+				.includeFields(true, true)
+				.types(Double.class, Double.class)
+				.map(new TuplePointConverter());
+
+		DataSet<Centroid> centroids = env.readCsvFile(clusterPath)
+				.fieldDelimiter("|")
+				.includeFields(true, true, true)
+				.types(Integer.class, Double.class, Double.class)
+				.map(new TupleCentroidConverter());
+
+		// set number of bulk iterations for KMeans algorithm
+		IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
+
+		DataSet<Centroid> newCentroids = points
+				// compute closest centroid for each point
+				.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
+						// count and sum point coordinates for each centroid
+				.map(new CountAppender())
+						// !test if key expressions are working!
+				.groupBy("field0").reduce(new CentroidAccumulator())
+						// compute new centroids from point counts and coordinate sums
+				.map(new CentroidAverager());
+
+		// feed new centroids back into next iteration
+		DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
+
+		DataSet<Tuple2<Integer, Point>> clusteredPoints = points
+				// assign points to final clusters
+				.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
+
+		// emit result
+		clusteredPoints.writeAsCsv(resultPath, "\n", " ");
+
+		env.execute("KMeansForTest");
+	}
+
+	// *************************************************************************
+	//     DATA TYPES
+	// *************************************************************************
+
+	/**
+	 * A simple two-dimensional point.
+	 */
+	public static class Point implements Serializable {
+
+		public double x, y;
+
+		public Point() {}
+
+		public Point(double x, double y) {
+			this.x = x;
+			this.y = y;
+		}
+
+		public Point add(Point other) {
+			x += other.x;
+			y += other.y;
+			return this;
+		}
+
+		public Point div(long val) {
+			x /= val;
+			y /= val;
+			return this;
+		}
+
+		public double euclideanDistance(Point other) {
+			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
+		}
+
+		public void clear() {
+			x = y = 0.0;
+		}
+
+		@Override
+		public String toString() {
+			return x + " " + y;
+		}
+	}
+
+	/**
+	 * A simple two-dimensional centroid, basically a point with an ID.
+	 */
+	public static class Centroid extends Point {
+
+		public int id;
+
+		public Centroid() {}
+
+		public Centroid(int id, double x, double y) {
+			super(x,y);
+			this.id = id;
+		}
+
+		public Centroid(int id, Point p) {
+			super(p.x, p.y);
+			this.id = id;
+		}
+
+		@Override
+		public String toString() {
+			return id + " " + super.toString();
+		}
+	}
+
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	/** Converts a Tuple2<Double,Double> into a Point. */
+	public static final class TuplePointConverter extends RichMapFunction<Tuple2<Double,
Double>, Point> {
+
+		@Override
+		public Point map(Tuple2<Double, Double> t) throws Exception {
+			return new Point(t.f0, t.f1);
+		}
+	}
+
+	/** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
+	public static final class TupleCentroidConverter extends RichMapFunction<Tuple3<Integer,
Double, Double>, Centroid> {
+
+		@Override
+		public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception {
+			return new Centroid(t.f0, t.f1, t.f2);
+		}
+	}
+
+	/** Determines the closest cluster center for a data point. */
+	public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<Integer,
Point>> {
+		private Collection<Centroid> centroids;
+
+		/** Reads the centroid values from a broadcast variable into a collection. */
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
+		}
+
+		@Override
+		public Tuple2<Integer, Point> map(Point p) throws Exception {
+
+			double minDistance = Double.MAX_VALUE;
+			int closestCentroidId = -1;
+
+			// check all cluster centers
+			for (Centroid centroid : centroids) {
+				// compute distance
+				double distance = p.euclideanDistance(centroid);
+
+				// update nearest cluster if necessary
+				if (distance < minDistance) {
+					minDistance = distance;
+					closestCentroidId = centroid.id;
+				}
+			}
+
+			// emit a new record with the center id and the data point.
+			return new Tuple2<Integer, Point>(closestCentroidId, p);
+		}
+	}
+
+	// Use this so that we can check whether POJOs and the POJO comparator also work
+	public static final class DummyTuple3IntPointLong {
+		public Integer field0;
+		public Point field1;
+		public Long field2;
+
+		public DummyTuple3IntPointLong() {}
+
+		DummyTuple3IntPointLong(Integer f0, Point f1, Long f2) {
+			this.field0 = f0;
+			this.field1 = f1;
+			this.field2 = f2;
+		}
+	}
+
+	/** Appends a count variable to the tuple. */
+	public static final class CountAppender extends RichMapFunction<Tuple2<Integer, Point>,
DummyTuple3IntPointLong> {
+
+		@Override
+		public DummyTuple3IntPointLong map(Tuple2<Integer, Point> t) {
+			return new DummyTuple3IntPointLong(t.f0, t.f1, 1L);
+		}
+	}
+
+	/** Sums and counts point coordinates. */
+	public static final class CentroidAccumulator extends RichReduceFunction<DummyTuple3IntPointLong>
{
+
+		@Override
+		public DummyTuple3IntPointLong reduce(DummyTuple3IntPointLong val1, DummyTuple3IntPointLong
val2) {
+			return new DummyTuple3IntPointLong(val1.field0, val1.field1.add(val2.field1), val1.field2
+ val2.field2);
+		}
+	}
+
+	/** Computes new centroid from coordinate sum and count of points. */
+	public static final class CentroidAverager extends RichMapFunction<DummyTuple3IntPointLong,
Centroid> {
+
+		@Override
+		public Centroid map(DummyTuple3IntPointLong value) {
+			return new Centroid(value.field0, value.field1.div(value.field2));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
index 8cbd7ca..7bb094b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
@@ -409,7 +409,7 @@ public class ProcessFailureBatchRecoveryITCase {
 				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 
 				TaskManager.runTaskManager(cfg, TaskManager.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-tests/src/test/java/org/apache/flink/test/runtime/ConsumePipelinedAndBlockingResultITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/ConsumePipelinedAndBlockingResultITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ConsumePipelinedAndBlockingResultITCase.java
new file mode 100644
index 0000000..007742b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/ConsumePipelinedAndBlockingResultITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class ConsumePipelinedAndBlockingResultITCase extends JavaProgramTestBase {
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(1);
+
+		DataSet<Tuple1<Long>> pipelinedSource = env.fromElements(new Tuple1<Long>(1l));
+
+		DataSet<Tuple1<Long>> slowBlockingSource = env.generateSequence(0, 10).map(
+				new MapFunction<Long, Tuple1<Long>>() {
+					@Override
+					public Tuple1<Long> map(Long value) throws Exception {
+						Thread.sleep(200);
+
+						return new Tuple1<Long>(value);
+					}
+				}
+		);
+
+		slowBlockingSource.join(slowBlockingSource)
+				.where(0).equalTo(0).output(new DiscardingOutputFormat<Tuple2<Tuple1<Long>,
Tuple1<Long>>>());
+
+		// Join the slow blocking and the pipelined source. This test should verify that this works
+		// w/o problems and the blocking result is not requested too early.
+		pipelinedSource.join(slowBlockingSource)
+				.where(0).equalTo(0)
+				.output(new DiscardingOutputFormat<Tuple2<Tuple1<Long>, Tuple1<Long>>>());
+
+		env.execute("Consume one pipelined and one blocking result test job");
+	}
+
+	@Override
+	protected boolean skipCollectionExecution() {
+		// Skip collection execution as it is independent of the runtime environment functionality,
+		// which is under test.
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-tests/src/test/java/org/apache/flink/test/runtime/JoinDeadlockITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/JoinDeadlockITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/JoinDeadlockITCase.java
new file mode 100644
index 0000000..2c4dca3
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/JoinDeadlockITCase.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+
+/**
+ * Tests a join, which leads to a deadlock with large data sizes and PIPELINED-only execution.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/FLINK-1343">FLINK-1343</a>
+ */
+public class JoinDeadlockITCase extends JavaProgramTestBase {
+
+	protected String resultPath;
+
+	@Rule
+	public Timeout globalTimeout = new Timeout(120 * 1000); // Set timeout for deadlocks
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Long> longs = env.generateSequence(0, 100000);
+
+		DataSet<Tuple1<Long>> longT1 = longs.map(new TupleWrapper());
+		DataSet<Tuple1<Long>> longT2 = longT1.project(0);
+		DataSet<Tuple1<Long>> longT3 = longs.map(new TupleWrapper());
+
+		longT2.join(longT3).where(0).equalTo(0).projectFirst(0)
+				.join(longT1).where(0).equalTo(0).projectFirst(0)
+				.writeAsText(resultPath);
+
+		env.execute();
+	}
+
+	public static class TupleWrapper implements MapFunction<Long, Tuple1<Long>>
{
+
+		@Override
+		public Tuple1<Long> map(Long l) throws Exception {
+			return new Tuple1<Long>(l);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-tests/src/test/java/org/apache/flink/test/runtime/SelfJoinDeadlockITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/SelfJoinDeadlockITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/SelfJoinDeadlockITCase.java
new file mode 100644
index 0000000..7729138
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/SelfJoinDeadlockITCase.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * Tests a self-join, which leads to a deadlock with large data sizes and PIPELINED-only
execution.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/FLINK-1141">FLINK-1141</a>
+ */
+public class SelfJoinDeadlockITCase extends JavaProgramTestBase {
+
+	protected String resultPath;
+
+	@Rule
+	public Timeout globalTimeout = new Timeout(120 * 1000); // Set timeout for deadlocks
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Integer, String>> ds = env.createInput(new LargeJoinDataGeneratorInputFormat(1000000));
+
+		ds.join(ds).where(0).equalTo(1).with(new Joiner()).writeAsText(resultPath);
+
+		env.execute("Local Selfjoin Test Job");
+	}
+
+	@SuppressWarnings("serial")
+	public static class Joiner implements FlatJoinFunction<Tuple3<Integer, Integer, String>,
Tuple3<Integer, Integer, String>, Tuple5<Integer, Integer, Integer, String, String>>
{
+
+		@Override
+		public void join(Tuple3<Integer, Integer, String> in1, Tuple3<Integer, Integer,
String> in2, Collector<Tuple5<Integer, Integer, Integer, String, String>> out)
throws Exception {
+			out.collect(new Tuple5<Integer, Integer, Integer, String, String>(in1.f0, in1.f1,
in2.f1, in1.f2, in2.f2));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	// Use custom input format to generate the data. Other available input formats (like collection
+	// input format) create data upfront and serialize it completely on the heap, which might
+	// break the test JVM heap sizes.
+	private static class LargeJoinDataGeneratorInputFormat extends GenericInputFormat<Tuple3<Integer,
Integer, String>> implements NonParallelInput {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Random rand = new Random(42);
+
+		private final int toProduce;
+
+		private int produced;
+
+		public LargeJoinDataGeneratorInputFormat(int toProduce) {
+			this.toProduce = toProduce;
+		}
+
+		@Override
+		public boolean reachedEnd() throws IOException {
+			return produced >= toProduce;
+		}
+
+		@Override
+		public Tuple3<Integer, Integer, String> nextRecord(Tuple3<Integer, Integer, String>
reuse) throws IOException {
+			produced++;
+
+			return new Tuple3<Integer, Integer, String>(rand.nextInt(toProduce), rand.nextInt(toProduce),
"aaa");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
index 44447d8..784e824 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
@@ -16,27 +16,27 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.util.testjar;
 
-import java.io.Serializable;
-import java.util.Collection;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.localDistributed.PackagedProgramEndToEndITCase;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
+
+import java.io.Serializable;
+import java.util.Collection;
 
 /**
- * This class belongs to the @see {@link PackagedProgramEndToEndITCase} test
+ * This class belongs to the {@link PackagedProgramEndToEndITCase} test.
  *
+ * <p> It's removed by Maven from classpath, so other tests must not depend on it.
  */
 @SuppressWarnings("serial")
 public class KMeansForTest implements Program {
@@ -45,8 +45,6 @@ public class KMeansForTest implements Program {
 	//     PROGRAM
 	// *************************************************************************
 
-
-
 	@Override
 	public Plan getPlan(String... args) {
 		if (args.length < 4) {
@@ -79,14 +77,14 @@ public class KMeansForTest implements Program {
 		IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
 
 		DataSet<Centroid> newCentroids = points
-			// compute closest centroid for each point
-			.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
-			// count and sum point coordinates for each centroid
-			.map(new CountAppender())
-			// !test if key expressions are working!
-			.groupBy("field0").reduce(new CentroidAccumulator())
-			// compute new centroids from point counts and coordinate sums
-			.map(new CentroidAverager());
+				// compute closest centroid for each point
+				.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
+						// count and sum point coordinates for each centroid
+				.map(new CountAppender())
+						// !test if key expressions are working!
+				.groupBy("field0").reduce(new CentroidAccumulator())
+						// compute new centroids from point counts and coordinate sums
+				.map(new CentroidAverager());
 
 		// feed new centroids back into next iteration
 		DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);


Mime
View raw message