flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [28/34] incubator-flink git commit: [FLINK-1279] [streaming] Forward partitioning changed to use round-robin method
Date Fri, 05 Dec 2014 17:26:33 GMT
[FLINK-1279] [streaming] Forward partitioning changed to use round-robin method


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f0cc5d64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f0cc5d64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f0cc5d64

Branch: refs/heads/master
Commit: f0cc5d64b4361845aebef43124b77da51e0b7824
Parents: 64baa00
Author: Gyula Fora <gyfora@apache.org>
Authored: Thu Nov 27 13:17:41 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Fri Dec 5 16:45:10 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    |  7 ++--
 .../streaming/api/datastream/DataStream.java    | 14 ++-----
 .../api/datastream/IterativeDataStream.java     |  6 +--
 .../partitioner/BroadcastPartitioner.java       | 10 ++++-
 .../partitioner/DistributePartitioner.java      | 20 ++++-----
 .../partitioner/FieldsPartitioner.java          |  6 +--
 .../partitioner/ForwardPartitioner.java         | 43 --------------------
 .../partitioner/GlobalPartitioner.java          |  8 ++--
 .../partitioner/ShufflePartitioner.java         |  7 ++--
 .../partitioner/StreamPartitioner.java          | 27 ++++++++----
 .../apache/flink/streaming/api/PrintTest.java   |  1 +
 .../partitioner/DistributePartitionerTest.java  |  2 +-
 .../partitioner/ForwardPartitionerTest.java     | 16 ++++----
 13 files changed, 65 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 03198ce..8a8595a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -37,8 +37,8 @@ import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
 import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
 import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
 import org.apache.flink.streaming.api.streamvertex.StreamVertex;
-import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.partitioner.StreamPartitioner.PartitioningStrategy;
 import org.apache.flink.streaming.state.OperatorState;
 import org.apache.flink.streaming.util.serialization.TypeWrapper;
 import org.slf4j.Logger;
@@ -80,7 +80,6 @@ public class JobGraphBuilder {
 	private Map<String, Long> iterationWaitTime;
 	private Map<String, Map<String, OperatorState<?>>> operatorStates;
 
-
 	/**
 	 * Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
 	 * and consists of sources, tasks (intermediate vertices) and sinks.
@@ -205,7 +204,7 @@ public class JobGraphBuilder {
 	 */
 	public void addIterationTail(String vertexName, String iterationTail, String iterationID,
 			int parallelism, long waitTime) {
-		
+
 		if (bufferTimeout.get(iterationTail) == 0) {
 			throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
 		}
@@ -355,7 +354,7 @@ public class JobGraphBuilder {
 
 		StreamConfig config = new StreamConfig(upStreamVertex.getConfiguration());
 
-		if (partitionerObject.getClass().equals(ForwardPartitioner.class)) {
+		if (partitionerObject.getStrategy() == PartitioningStrategy.FORWARD) {
 			downStreamVertex
 					.connectNewDataSetAsInput(upStreamVertex, DistributionPattern.POINTWISE);
 		} else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index fd19f73..55f5e71 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -66,7 +66,6 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.partitioner.DistributePartitioner;
 import org.apache.flink.streaming.partitioner.FieldsPartitioner;
-import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.FieldsKeySelector;
@@ -127,7 +126,7 @@ public class DataStream<OUT> {
 		this.jobGraphBuilder = environment.getJobGraphBuilder();
 		this.userDefinedNames = new ArrayList<String>();
 		this.selectAll = false;
-		this.partitioner = new ForwardPartitioner<OUT>();
+		this.partitioner = new DistributePartitioner<OUT>(true);
 		this.outTypeWrapper = outTypeWrapper;
 		this.mergedStreams = new ArrayList<DataStream<OUT>>();
 		this.mergedStreams.add(this);
@@ -159,13 +158,6 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Partitioning strategy on the stream.
-	 */
-	public static enum ConnectionType {
-		SHUFFLE, BROADCAST, FIELD, FORWARD, DISTRIBUTE
-	}
-
-	/**
 	 * Returns the ID of the {@link DataStream}.
 	 * 
 	 * @return ID of the DataStream
@@ -341,7 +333,7 @@ public class DataStream<OUT> {
 	 * @return The DataStream with shuffle partitioning set.
 	 */
 	public DataStream<OUT> forward() {
-		return setConnectionType(new ForwardPartitioner<OUT>());
+		return setConnectionType(new DistributePartitioner<OUT>(true));
 	}
 
 	/**
@@ -351,7 +343,7 @@ public class DataStream<OUT> {
 	 * @return The DataStream with shuffle partitioning set.
 	 */
 	public DataStream<OUT> distribute() {
-		return setConnectionType(new DistributePartitioner<OUT>());
+		return setConnectionType(new DistributePartitioner<OUT>(false));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 41616c9..29f5eec 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.streaming.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.partitioner.DistributePartitioner;
 
 /**
  * The iterative data stream represents the start of an iteration in a
@@ -91,8 +91,8 @@ public class IterativeDataStream<IN> extends
 
 		for (DataStream<IN> stream : iterationTail.mergedStreams) {
 			String inputID = stream.getId();
-			jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<IN>(),
0,
-					name, false);
+			jobGraphBuilder.setEdge(inputID, returnStream.getId(), new DistributePartitioner<IN>(
+					true), 0, name, false);
 		}
 
 		return iterationTail;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
index d8abea6..3813c8a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
@@ -26,16 +26,21 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
  * @param <T>
  *            Type of the Tuple
  */
-public class BroadcastPartitioner<T> implements StreamPartitioner<T> {
+public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
 	int[] returnArray;
 	boolean set;
+	int setNumber;
+
+	public BroadcastPartitioner() {
+		super(PartitioningStrategy.BROADCAST);
+	}
 
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
 			int numberOfOutputChannels) {
-		if (set) {
+		if (set && setNumber == numberOfOutputChannels) {
 			return returnArray;
 		} else {
 			this.returnArray = new int[numberOfOutputChannels];
@@ -43,6 +48,7 @@ public class BroadcastPartitioner<T> implements StreamPartitioner<T>
{
 				returnArray[i] = i;
 			}
 			set = true;
+			setNumber = numberOfOutputChannels;
 			return returnArray;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
index 6e95c5f..7299397 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
@@ -27,22 +27,20 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
  * @param <T>
  *            Type of the Tuple
  */
-public class DistributePartitioner<T> implements StreamPartitioner<T> {
+public class DistributePartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private int currentChannelIndex;
-	private int[] returnArray;
-	
-	public DistributePartitioner() {
-		this.currentChannelIndex = 0;
-		this.returnArray = new int[1];
+	private int[] returnArray = new int[] {-1};
+
+	public DistributePartitioner(boolean forward) {
+		super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE);
 	}
-	
+
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
 			int numberOfOutputChannels) {
-		returnArray[0] = currentChannelIndex;
-		currentChannelIndex = (currentChannelIndex + 1) % numberOfOutputChannels;
-		return returnArray;
+		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
+
+		return this.returnArray;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
index 0b6a8cc..e61555d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
@@ -28,15 +28,15 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
  * @param <T>
  *            Type of the Tuple
  */
-public class FieldsPartitioner<T> implements StreamPartitioner<T> {
+public class FieldsPartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
-	private int[] returnArray;
+	private int[] returnArray = new int[1];;
 	KeySelector<T, ?> keySelector;
 
 	public FieldsPartitioner(KeySelector<T, ?> keySelector) {
+		super(PartitioningStrategy.FIELDS);
 		this.keySelector = keySelector;
-		this.returnArray = new int[1];
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ForwardPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ForwardPartitioner.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ForwardPartitioner.java
deleted file mode 100755
index d9ca822..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ForwardPartitioner.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-/**
- * Partitioner that forwards the tuples to the local subtask of the output vertex
- *
- * @param <T>
- *            Type of the Tuple
- */
-public class ForwardPartitioner<T> implements StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray;
-
-	public ForwardPartitioner() {
-		this.returnArray = new int[]{0};
-	}
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		return returnArray;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
index 73b171e..c73ca71 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
@@ -21,13 +21,13 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 //Group to the partitioner with the lowest id
-public class GlobalPartitioner<T> implements StreamPartitioner<T> {
+public class GlobalPartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
-	
-	private int[] returnArray;
+
+	private int[] returnArray = new int[] { 0 };
 
 	public GlobalPartitioner() {
-		this.returnArray = new int[] { 0 };
+		super(PartitioningStrategy.GLOBAL);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
index e5bfeb7..318de3f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
@@ -29,16 +29,15 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
  * @param <T>
  *            Type of the Tuple
  */
-public class ShufflePartitioner<T> implements StreamPartitioner<T> {
+public class ShufflePartitioner<T> extends StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
 	private Random random = new Random();
 
-	private int[] returnArray;
+	private int[] returnArray = new int[1];
 
 	public ShufflePartitioner() {
-		this.random = new Random();
-		this.returnArray = new int[1];
+		super(PartitioningStrategy.SHUFFLE);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
index fb920f0..15b4f43 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
@@ -22,12 +22,23 @@ import org.apache.flink.runtime.io.network.api.ChannelSelector;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
-/**
- * Empty interface to encapsulate partitioners.
- *
- * @param <T>
- *            Type of the Tuple
- */
-public interface StreamPartitioner<T> extends ChannelSelector<SerializationDelegate<StreamRecord<T>>>,
-		Serializable {
+public abstract class StreamPartitioner<T> implements
+		ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable
{
+
+	public enum PartitioningStrategy {
+
+		FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, FIELDS;
+
+	}
+
+	private static final long serialVersionUID = 1L;
+	private PartitioningStrategy strategy;
+
+	public StreamPartitioner(PartitioningStrategy strategy) {
+		this.strategy = strategy;
+	}
+
+	public PartitioningStrategy getStrategy() {
+		return strategy;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index dd8652e..d1d5b1e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 
 public class PrintTest implements Serializable {
 
+	private static final long serialVersionUID = 1L;
 	private static final long MEMORYSIZE = 32;
 
 	private static final class IdentityMap implements MapFunction<Long, Long> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
index 6c0a3f6..0675242 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
@@ -34,7 +34,7 @@ public class DistributePartitionerTest {
 	
 	@Before
 	public void setPartitioner() {
-		distributePartitioner = new DistributePartitioner<Tuple>();
+		distributePartitioner = new DistributePartitioner<Tuple>(false);
 	}
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0cc5d64/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
index 440cfcd..b381d85 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
@@ -26,17 +26,17 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class ForwardPartitionerTest {
-	
-	private ForwardPartitioner<Tuple> forwardPartitioner;
+
+	private DistributePartitioner<Tuple> forwardPartitioner;
 	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
 			null);
-	
+
 	@Before
 	public void setPartitioner() {
-		forwardPartitioner = new ForwardPartitioner<Tuple>();
+		forwardPartitioner = new DistributePartitioner<Tuple>(true);
 	}
-	
+
 	@Test
 	public void testSelectChannelsLength() {
 		sd.setInstance(streamRecord);
@@ -44,12 +44,12 @@ public class ForwardPartitionerTest {
 		assertEquals(1, forwardPartitioner.selectChannels(sd, 2).length);
 		assertEquals(1, forwardPartitioner.selectChannels(sd, 1024).length);
 	}
-	
+
 	@Test
 	public void testSelectChannelsInterval() {
 		sd.setInstance(streamRecord);
 		assertEquals(0, forwardPartitioner.selectChannels(sd, 1)[0]);
-		assertEquals(0, forwardPartitioner.selectChannels(sd, 2)[0]);
-		assertEquals(0, forwardPartitioner.selectChannels(sd, 1024)[0]);
+		assertEquals(1, forwardPartitioner.selectChannels(sd, 2)[0]);
+		assertEquals(2, forwardPartitioner.selectChannels(sd, 1024)[0]);
 	}
 }


Mime
View raw message