flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-3595] [runtime] Eagerly destroy buffer pools on cancelling
Date Mon, 04 Apr 2016 19:28:29 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.0 f40ffd7da -> e0dc5c137


[FLINK-3595] [runtime] Eagerly destroy buffer pools on cancelling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0dc5c13
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0dc5c13
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0dc5c13

Branch: refs/heads/release-1.0
Commit: e0dc5c137894eb4f1751028a1ac90c57336abaf1
Parents: f40ffd7
Author: Ufuk Celebi <uce@apache.org>
Authored: Thu Mar 10 12:02:25 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Apr 4 21:28:07 2016 +0200

----------------------------------------------------------------------
 .../partition/consumer/SingleInputGate.java     |  26 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  43 ++-
 .../buffer/LocalBufferPoolDestroyTest.java      | 144 +++++++++
 .../partition/consumer/SingleInputGateTest.java |  96 ++++++
 .../TaskCancelAsyncProducerConsumerITCase.java  | 309 +++++++++++++++++++
 5 files changed, 605 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0dc5c13/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index efee27c..bf8bc73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -375,15 +375,19 @@ public class SingleInputGate implements InputGate {
 
 	@Override
 	public void requestPartitions() throws IOException, InterruptedException {
-		// Sanity check
-		if (numberOfInputChannels != inputChannels.size()) {
-			throw new IllegalStateException("Bug in input gate setup logic: mismatch between" +
-					"number of total input channels and the currently set number of input " +
-					"channels.");
-		}
-
 		synchronized (requestLock) {
 			if (!requestedPartitionsFlag) {
+				if (isReleased) {
+					throw new IllegalStateException("Already released.");
+				}
+
+				// Sanity checks
+				if (numberOfInputChannels != inputChannels.size()) {
+					throw new IllegalStateException("Bug in input gate setup logic: mismatch between" +
+							"number of total input channels and the currently set number of input " +
+							"channels.");
+				}
+
 				for (InputChannel inputChannel : inputChannels.values()) {
 					inputChannel.requestSubpartition(consumedSubpartitionIndex);
 				}
@@ -404,14 +408,14 @@ public class SingleInputGate implements InputGate {
 			return null;
 		}
 
-		if (isReleased) {
-			throw new IllegalStateException("Already released.");
-		}
-
 		requestPartitions();
 
 		InputChannel currentChannel = null;
 		while (currentChannel == null) {
+			if (isReleased) {
+				throw new IllegalStateException("Released");
+			}
+
 			currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dc5c13/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f2d6025..3d8dbc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -831,7 +831,13 @@ public class Task implements Runnable {
 						// because the canceling may block on user code, we cancel from a separate thread
 						// we do not reuse the async call handler, because that one may be blocked, in which
 						// case the canceling could not continue
-						Runnable canceler = new TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask);
+						Runnable canceler = new TaskCanceler(
+								LOG,
+								invokable,
+								executingThread,
+								taskNameWithSubtask,
+								producedPartitions,
+								inputGates);
 						Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,
 								"Canceler for " + taskNameWithSubtask);
 						cancelThread.setDaemon(true);
@@ -1075,12 +1081,23 @@ public class Task implements Runnable {
 		private final AbstractInvokable invokable;
 		private final Thread executer;
 		private final String taskName;
+		private final ResultPartition[] producedPartitions;
+		private final SingleInputGate[] inputGates;
+
+		public TaskCanceler(
+				Logger logger,
+				AbstractInvokable invokable,
+				Thread executer,
+				String taskName,
+				ResultPartition[] producedPartitions,
+				SingleInputGate[] inputGates) {
 
-		public TaskCanceler(Logger logger, AbstractInvokable invokable, Thread executer, String
taskName) {
 			this.logger = logger;
 			this.invokable = invokable;
 			this.executer = executer;
 			this.taskName = taskName;
+			this.producedPartitions = producedPartitions;
+			this.inputGates = inputGates;
 		}
 
 		@Override
@@ -1095,6 +1112,28 @@ public class Task implements Runnable {
 					logger.error("Error while canceling the task", t);
 				}
 
+				// Early release of input and output buffer pools. We do this
+				// in order to unblock async Threads, which produce/consume the
+				// intermediate streams outside of the main Task Thread.
+				//
+				// Don't do this before cancelling the invokable. Otherwise we
+				// will get misleading errors in the logs.
+				for (ResultPartition partition : producedPartitions) {
+					try {
+						partition.destroyBufferPool();
+					} catch (Throwable t) {
+						LOG.error("Failed to release result partition buffer pool.", t);
+					}
+				}
+
+				for (SingleInputGate inputGate : inputGates) {
+					try {
+						inputGate.releaseAllResources();
+					} catch (Throwable t) {
+						LOG.error("Failed to release input gate.", t);
+					}
+				}
+
 				// interrupt the running thread initially
 				executer.interrupt();
 				try {

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dc5c13/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
new file mode 100644
index 0000000..18e2136
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemoryType;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class LocalBufferPoolDestroyTest {
+
+	/**
+	 * Tests that a blocking request fails properly if the buffer pool is
+	 * destroyed.
+	 *
+	 * <p>Starts a Thread, which triggers an unsatisfiable blocking buffer
+	 * request. After making sure that the Thread is actually waiting in the
+	 * blocking call, the buffer pool is destroyed and we check whether the
+	 * request Thread threw the expected Exception.
+	 */
+	@Test
+	public void testDestroyWhileBlockingRequest() throws Exception {
+		AtomicReference<Exception> asyncException = new AtomicReference<>();
+
+		NetworkBufferPool networkBufferPool = null;
+		LocalBufferPool localBufferPool = null;
+
+		try {
+			networkBufferPool = new NetworkBufferPool(1, 4096, MemoryType.HEAP);
+			localBufferPool = new LocalBufferPool(networkBufferPool, 1);
+
+			// Drain buffer pool
+			assertNotNull(localBufferPool.requestBuffer());
+			assertNull(localBufferPool.requestBuffer());
+
+			// Start request Thread
+			Thread thread = new Thread(new BufferRequestTask(localBufferPool, asyncException));
+			thread.start();
+
+			// Wait for request
+			boolean success = false;
+
+			for (int i = 0; i < 50; i++) {
+				StackTraceElement[] stackTrace = thread.getStackTrace();
+				success = isInBlockingBufferRequest(stackTrace);
+
+				if (success) {
+					break;
+				} else {
+					// Retry
+					Thread.sleep(500);
+				}
+			}
+
+			// Verify that Thread was in blocking request
+			assertTrue("Did not trigger blocking buffer request.", success);
+
+			// Destroy the buffer pool
+			localBufferPool.lazyDestroy();
+
+			// Wait for Thread to finish
+			thread.join();
+
+			// Verify expected Exception
+			assertNotNull("Did not throw expected Exception", asyncException.get());
+			assertTrue(asyncException.get() instanceof IllegalStateException);
+		} finally {
+			if (localBufferPool != null) {
+				localBufferPool.lazyDestroy();
+			}
+
+			if (networkBufferPool != null) {
+				networkBufferPool.destroyAllBufferPools();
+				networkBufferPool.destroy();
+			}
+		}
+	}
+
+	/**
+	 * Returns whether the stack trace represents a Thread in a blocking buffer
+	 * request.
+	 *
+	 * @param stackTrace Stack trace of the Thread to check
+	 *
+	 * @return Flag indicating whether the Thread is in a blocking buffer
+	 * request or not
+	 */
+	private boolean isInBlockingBufferRequest(StackTraceElement[] stackTrace) {
+		if (stackTrace.length >= 3) {
+			return stackTrace[0].getMethodName().equals("wait") &&
+					stackTrace[1].getMethodName().equals("requestBuffer") &&
+					stackTrace[2].getMethodName().equals("requestBufferBlocking");
+		} else {
+			return false;
+		}
+	}
+
+	/**
+	 * Task triggering a blocking buffer request (the test assumes that no
+	 * buffer is available).
+	 */
+	private static class BufferRequestTask implements Runnable {
+
+		private final BufferPool bufferPool;
+		private final AtomicReference<Exception> asyncException;
+
+		public BufferRequestTask(BufferPool bufferPool, AtomicReference<Exception> asyncException)
{
+			this.bufferPool = bufferPool;
+			this.asyncException = asyncException;
+		}
+
+		@Override
+		public void run() {
+			try {
+				String msg = "Test assumption violated: expected no available buffer";
+				assertNull(msg, bufferPool.requestBuffer());
+
+				bufferPool.requestBufferBlocking();
+			} catch (Exception t) {
+				asyncException.set(t);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dc5c13/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 397abed..c4bb785 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -42,8 +42,10 @@ import org.junit.Test;
 import scala.Tuple2;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -190,6 +192,100 @@ public class SingleInputGateTest {
 				any(ResultPartitionID.class), anyInt(), any(BufferProvider.class));
 	}
 
+	/**
+	 * Tests that the release of the input gate is noticed while polling the
+	 * channels for available data.
+	 */
+	@Test
+	public void testReleaseWhilePollingChannel() throws Exception {
+		final AtomicReference<Exception> asyncException = new AtomicReference<>();
+
+		// Setup the input gate with a single channel that does nothing
+		final SingleInputGate inputGate = new SingleInputGate(
+				"InputGate",
+				new JobID(),
+				new ExecutionAttemptID(),
+				new IntermediateDataSetID(),
+				0,
+				1,
+				mock(PartitionStateChecker.class));
+
+		InputChannel unknown = new UnknownInputChannel(
+				inputGate,
+				0,
+				new ResultPartitionID(),
+				new ResultPartitionManager(),
+				new TaskEventDispatcher(),
+				new LocalConnectionManager(),
+				new Tuple2<>(0, 0));
+
+		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
+
+		// Start the consumer in a separate Thread
+		Thread asyncConsumer = new Thread() {
+			@Override
+			public void run() {
+				try {
+					inputGate.getNextBufferOrEvent();
+				} catch (Exception e) {
+					asyncException.set(e);
+				}
+			}
+		};
+		asyncConsumer.start();
+
+		// Wait for blocking queue poll call and release input gate
+		boolean success = false;
+		for (int i = 0; i < 50; i++) {
+			if (asyncConsumer != null && asyncConsumer.isAlive()) {
+				StackTraceElement[] stackTrace = asyncConsumer.getStackTrace();
+				success = isInBlockingQueuePoll(stackTrace);
+			}
+
+			if (success) {
+				break;
+			} else {
+				// Retry
+				Thread.sleep(500);
+			}
+		}
+
+		// Verify that async consumer is in blocking request
+		assertTrue("Did not trigger blocking buffer request.", success);
+
+		// Release the input gate
+		inputGate.releaseAllResources();
+
+		// Wait for Thread to finish and verify expected Exceptions. If the
+		// input gate status is not properly checked during requests, this
+		// call will never return.
+		asyncConsumer.join();
+
+		assertNotNull(asyncException.get());
+		assertEquals(IllegalStateException.class, asyncException.get().getClass());
+	}
+
+	/**
+	 * Returns whether the stack trace represents a Thread in a blocking queue
+	 * poll call.
+	 *
+	 * @param stackTrace Stack trace of the Thread to check
+	 *
+	 * @return Flag indicating whether the Thread is in a blocking queue poll
+	 * call.
+	 */
+	private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) {
+		for (StackTraceElement elem : stackTrace) {
+			if (elem.getMethodName().equals("poll") &&
+					elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) {
+
+				return true;
+			}
+		}
+
+		return false;
+	}
+
 	// ---------------------------------------------------------------------------------------------
 
 	static void verifyBufferOrEvent(

http://git-wip-us.apache.org/repos/asf/flink/blob/e0dc5c13/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
new file mode 100644
index 0000000..305ecbf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
+
+	// The Exceptions thrown by the producer/consumer Threads
+	private static volatile Exception ASYNC_PRODUCER_EXCEPTION;
+	private static volatile Exception ASYNC_CONSUMER_EXCEPTION;
+
+	// The Threads producing/consuming the intermediate stream
+	private static volatile Thread ASYNC_PRODUCER_THREAD;
+	private static volatile Thread ASYNC_CONSUMER_THREAD;
+
+	/**
+	 * Tests that a task waiting on an async producer/consumer that is stuck
+	 * in a blocking buffer request can be properly cancelled.
+	 *
+	 * <p>This is currently required for the Flink Kafka sources, which spawn
+	 * a separate Thread consuming from Kafka and producing the intermediate
+	 * streams in the spawned Thread instead of the main task Thread.
+	 */
+	@Test
+	public void testCancelAsyncProducerAndConsumer() throws Exception {
+		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
+		TestingCluster flink = null;
+
+		try {
+			// Cluster
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 8);
+
+			flink = new TestingCluster(config, true);
+			flink.start();
+
+			// Job with async producer and consumer
+			JobVertex producer = new JobVertex("AsyncProducer");
+			producer.setParallelism(1);
+			producer.setInvokableClass(AsyncProducer.class);
+
+			JobVertex consumer = new JobVertex("AsyncConsumer");
+			consumer.setParallelism(1);
+			consumer.setInvokableClass(AsyncConsumer.class);
+			consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE);
+
+			SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID());
+			producer.setSlotSharingGroup(slot);
+			consumer.setSlotSharingGroup(slot);
+
+			JobGraph jobGraph = new JobGraph(producer, consumer);
+
+			// Submit job and wait until running
+			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
+			flink.submitJobDetached(jobGraph);
+
+			Object msg = new WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+			Future<?> runningFuture = jobManager.ask(msg, deadline.timeLeft());
+			Await.ready(runningFuture, deadline.timeLeft());
+
+			// Wait for blocking requests, cancel and wait for cancellation
+			msg = new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED);
+			Future<?> cancelledFuture = jobManager.ask(msg, deadline.timeLeft());
+
+			boolean producerBlocked = false;
+			for (int i = 0; i < 50; i++) {
+				Thread thread = ASYNC_PRODUCER_THREAD;
+
+				if (thread != null && thread.isAlive()) {
+					StackTraceElement[] stackTrace = thread.getStackTrace();
+					producerBlocked = isInBlockingBufferRequest(stackTrace);
+				}
+
+				if (producerBlocked) {
+					break;
+				} else {
+					// Retry
+					Thread.sleep(500);
+				}
+			}
+
+			// Verify that async producer is in blocking request
+			assertTrue("Producer thread is not blocked.", producerBlocked);
+
+			boolean consumerBlocked = false;
+			for (int i = 0; i < 50; i++) {
+				Thread thread = ASYNC_CONSUMER_THREAD;
+
+				if (thread != null && thread.isAlive()) {
+					StackTraceElement[] stackTrace = thread.getStackTrace();
+					consumerBlocked = isInBlockingQueuePoll(stackTrace);
+				}
+
+				if (consumerBlocked) {
+					break;
+				} else {
+					// Retry
+					Thread.sleep(500);
+				}
+			}
+
+			// Verify that async consumer is in blocking request
+			assertTrue("Consumer thread is not blocked.", consumerBlocked);
+
+			msg = new CancelJob(jobGraph.getJobID());
+			Future<?> cancelFuture = jobManager.ask(msg, deadline.timeLeft());
+			Await.ready(cancelFuture, deadline.timeLeft());
+
+			Await.ready(cancelledFuture, deadline.timeLeft());
+
+			// Verify the expected Exceptions
+			assertNotNull(ASYNC_PRODUCER_EXCEPTION);
+			assertEquals(IllegalStateException.class, ASYNC_PRODUCER_EXCEPTION.getClass());
+
+			assertNotNull(ASYNC_CONSUMER_EXCEPTION);
+			assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
+		} finally {
+			if (flink != null) {
+				flink.shutdown();
+			}
+		}
+	}
+
+	/**
+	 * Returns whether the stack trace represents a Thread in a blocking buffer
+	 * request.
+	 *
+	 * @param stackTrace Stack trace of the Thread to check
+	 *
+	 * @return Flag indicating whether the Thread is in a blocking buffer
+	 * request or not
+	 */
+	private boolean isInBlockingBufferRequest(StackTraceElement[] stackTrace) {
+		return stackTrace.length >= 3 && stackTrace[0].getMethodName().equals("wait")
&&
+				stackTrace[1].getMethodName().equals("requestBuffer") &&
+				stackTrace[2].getMethodName().equals("requestBufferBlocking");
+	}
+
+	/**
+	 * Returns whether the stack trace represents a Thread in a blocking queue
+	 * poll call.
+	 *
+	 * @param stackTrace Stack trace of the Thread to check
+	 *
+	 * @return Flag indicating whether the Thread is in a blocking queue poll
+	 * call.
+	 */
+	private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) {
+		for (StackTraceElement elem : stackTrace) {
+			if (elem.getMethodName().equals("poll") &&
+					elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) {
+
+				return true;
+			}
+		}
+
+		return false;
+	}
+
+	/**
+	 * Invokable emitting records in a separate Thread (not the main Task
+	 * thread).
+	 */
+	public static class AsyncProducer extends AbstractInvokable {
+
+		@Override
+		public void invoke() throws Exception {
+			Thread producer = new ProducerThread(getEnvironment().getWriter(0));
+
+			// Publish the async producer for the main test Thread
+			ASYNC_PRODUCER_THREAD = producer;
+
+			producer.start();
+
+			// Wait for the producer Thread to finish. This is executed in the
+			// main Task thread and will be interrupted on cancellation.
+			while (producer.isAlive()) {
+				try {
+					producer.join();
+				} catch (InterruptedException ignored) {
+				}
+			}
+		}
+
+		/**
+		 * The Thread emitting the records.
+		 */
+		private static class ProducerThread extends Thread {
+
+			private final RecordWriter<LongValue> recordWriter;
+
+			public ProducerThread(ResultPartitionWriter partitionWriter) {
+				this.recordWriter = new RecordWriter<>(partitionWriter);
+			}
+
+			@Override
+			public void run() {
+				LongValue current = new LongValue(0);
+
+				try {
+					while (true) {
+						current.setValue(current.getValue() + 1);
+						recordWriter.emit(current);
+						recordWriter.flush();
+					}
+				} catch (Exception e) {
+					ASYNC_PRODUCER_EXCEPTION = e;
+				}
+			}
+		}
+	}
+
+	/**
+	 * Invokable consuming buffers in a separate Thread (not the main Task
+	 * thread).
+	 */
+	public static class AsyncConsumer extends AbstractInvokable {
+
+		@Override
+		public void invoke() throws Exception {
+			Thread consumer = new ConsumerThread(getEnvironment().getInputGate(0));
+
+			// Publish the async consumer for the main test Thread
+			ASYNC_CONSUMER_THREAD = consumer;
+
+			consumer.start();
+
+			// Wait for the consumer Thread to finish. This is executed in the
+			// main Task thread and will be interrupted on cancellation.
+			while (consumer.isAlive()) {
+				try {
+					consumer.join();
+				} catch (InterruptedException ignored) {
+				}
+			}
+		}
+
+		/**
+		 * The Thread consuming buffers.
+		 */
+		private static class ConsumerThread extends Thread {
+
+			private final InputGate inputGate;
+
+			public ConsumerThread(InputGate inputGate) {
+				this.inputGate = inputGate;
+			}
+
+			@Override
+			public void run() {
+				try {
+					while (true) {
+						inputGate.getNextBufferOrEvent();
+					}
+				} catch (Exception e) {
+					ASYNC_CONSUMER_EXCEPTION = e;
+				}
+			}
+		}
+	}
+}


Mime
View raw message