flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-8117] [runtime] Eliminate modulo operation from round-robin partitioners
Date Wed, 22 Nov 2017 09:02:28 GMT
Repository: flink
Updated Branches:
  refs/heads/master c4107d4c3 -> 98241d513


[FLINK-8117] [runtime] Eliminate modulo operation from round-robin partitioners

This closes #5041


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98241d51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98241d51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98241d51

Branch: refs/heads/master
Commit: 98241d513befcee460a3f7af805a96c794c33ada
Parents: c4107d4
Author: Gabor Gevay <ggab90@gmail.com>
Authored: Mon Nov 20 15:12:17 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Nov 22 10:01:57 2017 +0100

----------------------------------------------------------------------
 .../io/network/api/writer/RoundRobinChannelSelector.java      | 5 ++++-
 .../runtime/partitioner/CustomPartitionerWrapper.java         | 2 +-
 .../streaming/runtime/partitioner/ForwardPartitioner.java     | 2 +-
 .../streaming/runtime/partitioner/GlobalPartitioner.java      | 2 +-
 .../streaming/runtime/partitioner/RebalancePartitioner.java   | 7 +++++--
 .../streaming/runtime/partitioner/RescalePartitioner.java     | 7 +++++--
 .../streaming/runtime/partitioner/ShufflePartitioner.java     | 2 +-
 7 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
index 46af5a7..c7d25e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
@@ -46,7 +46,10 @@ public class RoundRobinChannelSelector<T extends IOReadableWritable>
implements
 	@Override
 	public int[] selectChannels(final T record, final int numberOfOutputChannels) {
 
-		this.nextChannelToSendTo[0] = (this.nextChannelToSendTo[0] + 1) % numberOfOutputChannels;
+		int newChannel = ++this.nextChannelToSendTo[0];
+		if (newChannel >= numberOfOutputChannels) {
+			this.nextChannelToSendTo[0] = 0;
+		}
 
 		return this.nextChannelToSendTo;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
index a51cede..f19c87d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private int[] returnArray = new int[1];
+	private final int[] returnArray = new int[1];
 	Partitioner<K> partitioner;
 	KeySelector<T, K> keySelector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
index 0ae737c..c952282 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class ForwardPartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private int[] returnArray = new int[] {0};
+	private final int[] returnArray = new int[] {0};
 
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int
numberOfOutputChannels) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
index 67eaa73..69c8d00 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class GlobalPartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private int[] returnArray = new int[] { 0 };
+	private final int[] returnArray = new int[] { 0 };
 
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,

http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
index a81f973..bb88d17 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
@@ -31,12 +31,15 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class RebalancePartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private int[] returnArray = new int[] {-1};
+	private final int[] returnArray = new int[] {-1};
 
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
 			int numberOfOutputChannels) {
-		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
+		int newChannel = ++this.returnArray[0];
+		if (newChannel >= numberOfOutputChannels) {
+			this.returnArray[0] = 0;
+		}
 		return this.returnArray;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
index 9061523..b9af629 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
@@ -48,11 +48,14 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class RescalePartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private int[] returnArray = new int[] {-1};
+	private final int[] returnArray = new int[] {-1};
 
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int
numberOfOutputChannels) {
-		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
+		int newChannel = ++this.returnArray[0];
+		if (newChannel >= numberOfOutputChannels) {
+			this.returnArray[0] = 0;
+		}
 		return this.returnArray;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
index 60c3fbc..ddcbec7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
@@ -36,7 +36,7 @@ public class ShufflePartitioner<T> extends StreamPartitioner<T>
{
 
 	private Random random = new Random();
 
-	private int[] returnArray = new int[1];
+	private final int[] returnArray = new int[1];
 
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,


Mime
View raw message