flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [3/3] flink git commit: [FLINK-2138] [streaming] Added docs and tests for partitioning
Date Sat, 11 Jul 2015 12:15:29 GMT
[FLINK-2138] [streaming] Added docs and tests for partitioning

Closes #872


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f3aeb7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f3aeb7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f3aeb7e

Branch: refs/heads/master
Commit: 3f3aeb7e0a99f2f2af521fa880dc9d11743610f6
Parents: bc8d7c4
Author: Gábor Hermann <reckoner42@gmail.com>
Authored: Wed Jul 1 14:26:33 2015 +0200
Committer: Gyula Fora <gyfora@apache.org>
Committed: Sat Jul 11 14:01:16 2015 +0200

----------------------------------------------------------------------
 docs/apis/programming_guide.md                  |  13 ++
 docs/apis/streaming_guide.md                    |  19 ++
 .../flink/streaming/api/PartitionerTest.java    | 229 +++++++++++++++++++
 3 files changed, 261 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f3aeb7e/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 17903a9..edf2003 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -958,6 +958,19 @@ val result = in.partitionByHash(0).mapPartition { ... }
     </tr>
     </tr>
     <tr>
+      <td><strong>Custom Partitioning</strong></td>
+      <td>
+        <p>Manually specify a partitioning over the data.
+          <br/>
+          <i>Note</i>: This method works only on single field keys.</p>
+{% highlight scala %}
+val in: DataSet[(Int, String)] = // [...]
+val result = in
+  .partitionCustom(partitioner: Partitioner[K], key)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
       <td><strong>Sort Partition</strong></td>
       <td>
         <p>Locally sorts all partitions of a data set on a specified field in a specified
order. 

http://git-wip-us.apache.org/repos/asf/flink/blob/3f3aeb7e/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 7d8ab6d..e337ea8 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -311,6 +311,25 @@ Usage: `dataStream.broadcast()`
  * *Global*: All data points are directed to the first instance of the operator. 
 Usage: `dataStream.global()`
 
+Custom partitioning can also be used by giving a Partitioner function and a single field
key to partition on, similarly to the batch API.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String,Integer>> in = // [...]
+DataStream<Tuple2<String,Integer>> result =in
+    .partitionCustom(Partitioner<K> partitioner, key)
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+val in: DataSet[(Int, String)] = // [...]
+val result = in
+    .partitionCustom(partitioner: Partitioner[K], key)
+{% endhighlight %}
+</div>
+</div>
+
 By default *Forward* partitioning is used. 
 
 Partitioning does not remain in effect after a transformation, so it needs to be set again
for subsequent operations.

http://git-wip-us.apache.org/repos/asf/flink/blob/3f3aeb7e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
new file mode 100644
index 0000000..c858834
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestListResultSink;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.junit.Test;
+
+/**
+ * IT case that tests the different stream partitioning schemes.
+ */
+public class PartitionerTest {
+
+	public static final int PARALLELISM = 3;
+	public static final int MEMORY_SIZE = 32;
+
+	@Test
+	public void partitionerTest() {
+
+		TestListResultSink<Tuple2<Integer, String>> hashPartitionResultSink =
+				new TestListResultSink<Tuple2<Integer, String>>();
+		TestListResultSink<Tuple2<Integer, String>> customPartitionResultSink =
+				new TestListResultSink<Tuple2<Integer, String>>();
+		TestListResultSink<Tuple2<Integer, String>> broadcastPartitionResultSink =
+				new TestListResultSink<Tuple2<Integer, String>>();
+		TestListResultSink<Tuple2<Integer, String>> forwardPartitionResultSink =
+				new TestListResultSink<Tuple2<Integer, String>>();
+		TestListResultSink<Tuple2<Integer, String>> rebalancePartitionResultSink =
+				new TestListResultSink<Tuple2<Integer, String>>();
+		TestListResultSink<Tuple2<Integer, String>> globalPartitionResultSink =
+				new TestListResultSink<Tuple2<Integer, String>>();
+
+
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORY_SIZE);
+		DataStream<Tuple1<String>> src = env.fromElements(
+				new Tuple1<String>("a"),
+				new Tuple1<String>("b"),
+				new Tuple1<String>("b"),
+				new Tuple1<String>("a"),
+				new Tuple1<String>("a"),
+				new Tuple1<String>("c"),
+				new Tuple1<String>("a")
+		);
+
+		// partition by hash
+		src
+				.partitionByHash(0)
+				.map(new SubtaskIndexAssigner())
+				.addSink(hashPartitionResultSink);
+
+		// partition custom
+		DataStream<Tuple2<Integer, String>> partitionCustom = src
+				.partitionCustom(new Partitioner<String>() {
+					@Override
+					public int partition(String key, int numPartitions) {
+						if (key.equals("c")) {
+							return 2;
+						} else {
+							return 0;
+						}
+					}
+				}, 0)
+				.map(new SubtaskIndexAssigner());
+
+		partitionCustom.addSink(customPartitionResultSink);
+
+		// partition broadcast
+		src.broadcast().map(new SubtaskIndexAssigner()).addSink(broadcastPartitionResultSink);
+
+		// partition forward
+		src.map(new SubtaskIndexAssigner()).addSink(forwardPartitionResultSink);
+
+		// partition rebalance
+		src.rebalance().map(new SubtaskIndexAssigner()).addSink(rebalancePartitionResultSink);
+
+		// partition global
+		src.global().map(new SubtaskIndexAssigner()).addSink(globalPartitionResultSink);
+
+		try {
+			env.execute();
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+
+		List<Tuple2<Integer, String>> hashPartitionResult = hashPartitionResultSink.getResult();
+		List<Tuple2<Integer, String>> customPartitionResult = customPartitionResultSink.getResult();
+		List<Tuple2<Integer, String>> broadcastPartitionResult = broadcastPartitionResultSink.getResult();
+		List<Tuple2<Integer, String>> forwardPartitionResult = forwardPartitionResultSink.getResult();
+		List<Tuple2<Integer, String>> rebalancePartitionResult = rebalancePartitionResultSink.getResult();
+		List<Tuple2<Integer, String>> globalPartitionResult = globalPartitionResultSink.getResult();
+
+		verifyHashPartitioning(hashPartitionResult);
+		verifyCustomPartitioning(customPartitionResult);
+		verifyBroadcastPartitioning(broadcastPartitionResult);
+		verifyRebalancePartitioning(forwardPartitionResult);
+		verifyRebalancePartitioning(rebalancePartitionResult);
+		verifyGlobalPartitioning(globalPartitionResult);
+	}
+
+	private static void verifyHashPartitioning(List<Tuple2<Integer, String>> hashPartitionResult)
{
+		HashMap<String, Integer> verifier = new HashMap<String, Integer>();
+		for (Tuple2<Integer, String> elem : hashPartitionResult) {
+			Integer subtaskIndex = verifier.get(elem.f1);
+			if (subtaskIndex == null) {
+				verifier.put(elem.f1, elem.f0);
+			} else if (subtaskIndex != elem.f0) {
+				fail();
+			}
+		}
+	}
+
+	private static void verifyCustomPartitioning(List<Tuple2<Integer, String>> customPartitionResult)
{
+		for (Tuple2<Integer, String> stringWithSubtask : customPartitionResult) {
+			if (stringWithSubtask.f1.equals("c")) {
+				assertEquals(new Integer(2), stringWithSubtask.f0);
+			} else {
+				assertEquals(new Integer(0), stringWithSubtask.f0);
+			}
+		}
+	}
+
+	private static void verifyBroadcastPartitioning(List<Tuple2<Integer, String>>
broadcastPartitionResult) {
+		List<Tuple2<Integer, String>> expected = Arrays.asList(
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(0, "b"),
+				new Tuple2<Integer, String>(0, "b"),
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(0, "c"),
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(1, "a"),
+				new Tuple2<Integer, String>(1, "b"),
+				new Tuple2<Integer, String>(1, "b"),
+				new Tuple2<Integer, String>(1, "a"),
+				new Tuple2<Integer, String>(1, "a"),
+				new Tuple2<Integer, String>(1, "c"),
+				new Tuple2<Integer, String>(1, "a"),
+				new Tuple2<Integer, String>(2, "a"),
+				new Tuple2<Integer, String>(2, "b"),
+				new Tuple2<Integer, String>(2, "b"),
+				new Tuple2<Integer, String>(2, "a"),
+				new Tuple2<Integer, String>(2, "a"),
+				new Tuple2<Integer, String>(2, "c"),
+				new Tuple2<Integer, String>(2, "a"));
+
+		assertEquals(
+				new HashSet<Tuple2<Integer, String>>(expected),
+				new HashSet<Tuple2<Integer, String>>(broadcastPartitionResult));
+	}
+
+	private static void verifyRebalancePartitioning(List<Tuple2<Integer, String>>
rebalancePartitionResult) {
+		List<Tuple2<Integer, String>> expected = Arrays.asList(
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(1, "b"),
+				new Tuple2<Integer, String>(2, "b"),
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(1, "a"),
+				new Tuple2<Integer, String>(2, "c"),
+				new Tuple2<Integer, String>(0, "a"));
+
+		assertEquals(
+				new HashSet<Tuple2<Integer, String>>(expected),
+				new HashSet<Tuple2<Integer, String>>(rebalancePartitionResult));
+	}
+
+	private static void verifyGlobalPartitioning(List<Tuple2<Integer, String>> globalPartitionResult)
{
+		List<Tuple2<Integer, String>> expected = Arrays.asList(
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(0, "b"),
+				new Tuple2<Integer, String>(0, "b"),
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(0, "c"),
+				new Tuple2<Integer, String>(0, "a"));
+
+		assertEquals(
+				new HashSet<Tuple2<Integer, String>>(expected),
+				new HashSet<Tuple2<Integer, String>>(globalPartitionResult));
+	}
+
+	private static class SubtaskIndexAssigner
+			extends RichMapFunction<Tuple1<String>, Tuple2<Integer, String>> {
+
+		private int indexOfSubtask;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			RuntimeContext runtimeContext = getRuntimeContext();
+			indexOfSubtask = runtimeContext.getIndexOfThisSubtask();
+		}
+
+		@Override
+		public Tuple2<Integer, String> map(Tuple1<String> value) throws Exception {
+			return new Tuple2<Integer, String>(indexOfSubtask, value.f0);
+		}
+	}
+}


Mime
View raw message