flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/3] flink git commit: [distributed runtime] Throw interrupted exception during partition request client creation
Date Tue, 03 Mar 2015 09:37:19 GMT
Repository: flink
Updated Branches:
  refs/heads/master 2a528712d -> 940704156


[distributed runtime] Throw interrupted exception during partition request client creation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94070415
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94070415
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94070415

Branch: refs/heads/master
Commit: 9407041565b5402cd6c1923fbfa764e01a5db178
Parents: 6da093a
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Mar 2 20:08:33 2015 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Tue Mar 3 10:36:56 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/io/network/ConnectionManager.java      |  2 +-
 .../runtime/io/network/netty/NettyConnectionManager.java |  2 +-
 .../io/network/netty/PartitionRequestClientFactory.java  | 11 +++--------
 .../io/network/partition/consumer/InputChannel.java      |  2 +-
 .../runtime/io/network/partition/consumer/InputGate.java |  2 +-
 .../network/partition/consumer/RemoteInputChannel.java   |  2 +-
 .../io/network/partition/consumer/SingleInputGate.java   |  4 ++--
 .../io/network/partition/consumer/UnionInputGate.java    |  2 +-
 .../org/apache/flink/streaming/io/CoRecordReader.java    |  2 +-
 9 files changed, 12 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index d478e0f..76f8bbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -34,7 +34,7 @@ public interface ConnectionManager {
 	/**
 	 * Creates a {@link PartitionRequestClient} instance for the given {@link RemoteAddress}.
 	 */
-	PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws
IOException;
+	PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws
IOException, InterruptedException;
 
 	/**
 	 * Closes opened ChannelConnections in case of a resource release

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 5d03c15..260ea7e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -49,7 +49,7 @@ public class NettyConnectionManager implements ConnectionManager {
 	}
 
 	@Override
-	public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress)
throws IOException {
+	public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress)
throws IOException, InterruptedException {
 		return partitionRequestClientFactory.createPartitionRequestClient(remoteAddress);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index d7e6efd..d4c022b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -48,7 +48,7 @@ class PartitionRequestClientFactory {
 	 * Atomically establishes a TCP connection to the given remote address and
 	 * creates a {@link PartitionRequestClient} instance for this connection.
 	 */
-	PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws
IOException {
+	PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws
IOException, InterruptedException {
 		Object entry;
 		PartitionRequestClient client = null;
 
@@ -182,15 +182,10 @@ class PartitionRequestClientFactory {
 
 		private volatile Throwable error;
 
-		private PartitionRequestClient waitForChannel() throws IOException {
+		private PartitionRequestClient waitForChannel() throws IOException, InterruptedException
{
 			synchronized (connectLock) {
 				while (error == null && partitionRequestClient == null) {
-					try {
-						connectLock.wait(2000);
-					}
-					catch (InterruptedException e) {
-						throw new RuntimeException("Wait for channel connection interrupted.");
-					}
+					connectLock.wait(2000);
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 31b67ca..7173566 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -92,7 +92,7 @@ public abstract class InputChannel {
 	 * The queue index to request depends on which sub task the channel belongs
 	 * to and is specified by the consumer of this channel.
 	 */
-	public abstract void requestIntermediateResultPartition(int queueIndex) throws IOException;
+	public abstract void requestIntermediateResultPartition(int queueIndex) throws IOException,
InterruptedException;
 
 	/**
 	 * Returns the next buffer from the consumed subpartition.

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 8d28084..43cdd29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -29,7 +29,7 @@ public interface InputGate {
 
 	public boolean isFinished();
 
-	public void requestPartitions() throws IOException;
+	public void requestPartitions() throws IOException, InterruptedException;
 
 	public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index daf94e6..d50ddc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -85,7 +85,7 @@ public class RemoteInputChannel extends InputChannel {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void requestIntermediateResultPartition(int queueIndex) throws IOException {
+	public void requestIntermediateResultPartition(int queueIndex) throws IOException, InterruptedException
{
 		if (partitionRequestClient == null) {
 			LOG.debug("Requesting REMOTE queue {} from partition {} produced by {}.", queueIndex,
partitionId, producerExecutionId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 0383cca..19898c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -192,7 +192,7 @@ public class SingleInputGate implements InputGate {
 		}
 	}
 
-	public void updateInputChannel(PartitionInfo partitionInfo) throws IOException {
+	public void updateInputChannel(PartitionInfo partitionInfo) throws IOException, InterruptedException
{
 		synchronized (requestLock) {
 			if (releasedResourcesFlag) {
 				// There was a race with a task failure/cancel
@@ -273,7 +273,7 @@ public class SingleInputGate implements InputGate {
 	}
 
 	@Override
-	public void requestPartitions() throws IOException {
+	public void requestPartitions() throws IOException, InterruptedException {
 		if (!requestedPartitionsFlag) {
 			// Sanity check
 			if (numberOfInputChannels != inputChannels.size()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 4994f13..5a7a5b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -120,7 +120,7 @@ public class UnionInputGate implements InputGate {
 	}
 
 	@Override
-	public void requestPartitions() throws IOException {
+	public void requestPartitions() throws IOException, InterruptedException {
 		if (!requestedPartitionsFlag) {
 			for (InputGate inputGate : inputGates) {
 				inputGate.requestPartitions();

http://git-wip-us.apache.org/repos/asf/flink/blob/94070415/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index bb3a659..84b08f7 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -78,7 +78,7 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends
IOReadable
 		bufferReader2.registerListener(this);
 	}
 
-	public void requestPartitionsOnce() throws IOException {
+	public void requestPartitionsOnce() throws IOException, InterruptedException {
 		if (!hasRequestedPartitions) {
 			bufferReader1.requestPartitions();
 			bufferReader2.requestPartitions();


Mime
View raw message