flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [10/11] flink git commit: [hotfix] [kafka connector] Replace funky loop with simple division in FixedPartitioner
Date Thu, 28 Jan 2016 13:57:01 GMT
[hotfix] [kafka connector] Replace funky loop with simple division in FixedPartitioner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9637ee78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9637ee78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9637ee78

Branch: refs/heads/master
Commit: 9637ee78846e4df5ef328c620cc991d394056f61
Parents: 1ea5e13
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Jan 27 12:20:59 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jan 28 13:41:38 2016 +0100

----------------------------------------------------------------------
 .../kafka/partitioner/FixedPartitioner.java     | 20 ++++++++------------
 1 file changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9637ee78/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
index d9dcfc1..9b848e0 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -54,27 +54,23 @@ import java.io.Serializable;
 public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable
{
 	private static final long serialVersionUID = 1627268846962918126L;
 
-	int targetPartition = -1;
+	private int targetPartition = -1;
 
 	@Override
 	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
-		int p = 0;
-		for (int i = 0; i < parallelInstances; i++) {
-			if (i == parallelInstanceId) {
-				targetPartition = partitions[p];
-				return;
-			}
-			if (++p == partitions.length) {
-				p = 0;
-			}
+		if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {
+			throw new IllegalArgumentException();
 		}
+		
+		this.targetPartition = partitions[parallelInstanceId % partitions.length];
 	}
 
 	@Override
 	public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions)
{
-		if (targetPartition == -1) {
+		if (targetPartition >= 0) {
+			return targetPartition;
+		} else {
 			throw new RuntimeException("The partitioner has not been initialized properly");
 		}
-		return targetPartition;
 	}
 }


Mime
View raw message