flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject incubator-flink git commit: [FLINK-1239] [streaming] IterateExample fix and parallelism setting in StreamExecutionEnvironment made public
Date Sun, 16 Nov 2014 19:05:46 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master c339c266f -> d7d723837


[FLINK-1239] [streaming] IterateExample fix and parallelism setting in StreamExecutionEnvironment
made public

Conflicts:
	flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d7d72383
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d7d72383
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d7d72383

Branch: refs/heads/master
Commit: d7d72383734b932810b86e084eaa55efe48a0d4b
Parents: c339c26
Author: ghermann <reckoner42@gmail.com>
Authored: Thu Nov 13 16:39:57 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Sun Nov 16 18:20:40 2014 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/streaming/api/JobGraphBuilder.java    | 4 ++++
 .../streaming/api/environment/StreamExecutionEnvironment.java   | 2 +-
 .../flink/streaming/examples/iteration/IterateExample.java      | 5 ++---
 3 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7d72383/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index df59be1..03198ce 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -205,6 +205,10 @@ public class JobGraphBuilder {
 	 */
 	public void addIterationTail(String vertexName, String iterationTail, String iterationID,
 			int parallelism, long waitTime) {
+		
+		if (bufferTimeout.get(iterationTail) == 0) {
+			throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
+		}
 
 		addVertex(vertexName, StreamIterationTail.class, null, null, null, parallelism);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7d72383/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 600a87a..7baabf9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -91,7 +91,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @param degreeOfParallelism
 	 *            The degree of parallelism
 	 */
-	protected StreamExecutionEnvironment setDegreeOfParallelism(int degreeOfParallelism) {
+	public StreamExecutionEnvironment setDegreeOfParallelism(int degreeOfParallelism) {
 		if (degreeOfParallelism < 1) {
 			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7d72383/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index f574718..8454f52 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -79,7 +79,7 @@ public class IterateExample {
 
 		// apply the step function to add new random value to the tuple and to
 		// increment the counter and split the output with the output selector
-		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
+		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle().setBufferTimeout(1)
 				.split(new MySelector());
 
 		// close the iteration by selecting the tuples that were directed to the
@@ -121,7 +121,6 @@ public class IterateExample {
 
 		@Override
 		public Tuple2<Double, Integer> map(Tuple2<Double, Integer> value) throws Exception
{
-
 			return new Tuple2<Double, Integer>(value.f0 + rnd.nextDouble(), value.f1 + 1);
 		}
 
@@ -135,7 +134,7 @@ public class IterateExample {
 
 		@Override
 		public void select(Tuple2<Double, Integer> value, Collection<String> outputs)
{
-			if (value.f0 > 200) {
+			if (value.f0 > 100) {
 				outputs.add("output");
 			} else {
 				outputs.add("iterate");


Mime
View raw message