flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/7] flink git commit: [FLINK-2009] Force chaining for System Operators
Date Tue, 19 May 2015 14:37:59 GMT
[FLINK-2009] Force chaining for System Operators

The system operators are, for example, the StreamDiscretizer and
WindowBuffer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a04f0912
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a04f0912
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a04f0912

Branch: refs/heads/master
Commit: a04f091217cedccb00bb21a4686350e7709adea9
Parents: 58865ff
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon May 18 15:12:50 2015 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue May 19 16:35:49 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/SingleOutputStreamOperator.java       |  2 +-
 .../api/environment/StreamExecutionEnvironment.java      |  2 +-
 .../streaming/api/graph/StreamingJobGraphGenerator.java  | 11 +++++++----
 .../apache/flink/streaming/api/operators/StreamFold.java |  2 +-
 .../flink/streaming/api/operators/StreamOperator.java    |  7 +++++--
 .../flink/streaming/api/operators/StreamSource.java      |  2 ++
 .../api/operators/windowing/StreamDiscretizer.java       |  2 ++
 .../api/operators/windowing/StreamWindowBuffer.java      |  2 +-
 .../api/operators/windowing/WindowFlattener.java         |  2 +-
 .../streaming/api/operators/windowing/WindowMerger.java  |  2 +-
 .../api/operators/windowing/WindowPartitioner.java       |  2 +-
 .../apache/flink/streaming/api/scala/DataStream.scala    |  2 +-
 .../streaming/api/scala/StreamExecutionEnvironment.scala |  2 +-
 13 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 0b2462b..bebfff0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -157,7 +157,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	/**
 	 * Turns off chaining for this operator so thread co-location will not be
 	 * used as an optimization. </p> Chaining can be turned off for the whole
-	 * job by {@link StreamExecutionEnvironment#disableOperatorChaning()}
+	 * job by {@link StreamExecutionEnvironment#disableOperatorChaining()}
 	 * however it is not advised for performance considerations.
 	 * 
 	 * @return The operator with chaining disabled

http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 908b392..820cfed 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -195,7 +195,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 
 	 * @return StreamExecutionEnvironment with chaining disabled.
 	 */
-	public StreamExecutionEnvironment disableOperatorChaning() {
+	public StreamExecutionEnvironment disableOperatorChaining() {
 		streamGraph.setChaining(false);
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 0d314f1..799862a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -315,13 +315,16 @@ public class StreamingJobGraphGenerator {
 				&& outOperator != null
 				&& upStreamVertex.getSlotSharingID() == downStreamVertex.getSlotSharingID()
 				&& upStreamVertex.getSlotSharingID() != -1
-				&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
-				&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator
-						.getChainingStrategy() == ChainingStrategy.ALWAYS)
+				&& (outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS ||
+					outOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS)
+				&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
+					headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS ||
+					headOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS)
 				&& (edge.getPartitioner().getStrategy() == PartitioningStrategy.FORWARD || downStreamVertex
 						.getParallelism() == 1)
 				&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
-				&& streamGraph.isChainingEnabled();
+				&& (streamGraph.isChainingEnabled() ||
+					outOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS);
 	}
 
 	private void setSlotSharing() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
index 1b12e89..6f956ca 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
@@ -34,7 +34,7 @@ public class StreamFold<IN, OUT>
 		super(folder);
 		this.accumulator = initialValue;
 		this.outTypeInformation = outTypeInformation;
-		this.chainingStrategy = ChainingStrategy.ALWAYS;
+		this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 6fd3a3c..43ab2ac 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -61,9 +61,12 @@ public interface StreamOperator<OUT> extends Serializable {
 	 * to <b>NEVER</b>, the operator will not be chained to the preceding or succeeding
 	 * operators.</p> <b>HEAD</b> strategy marks a start of a new chain, so
that the
 	 * operator will not be chained to preceding operators, only succeding ones.
-	 * 
+	 *
+	 * <b>FORCE_ALWAYS</b> will enable chaining even if chaining is disabled on
the execution
+	 * environment. This should only be used by system-level operators, not operators implemented
+	 * by users.
 	 */
 	public static enum ChainingStrategy {
-		ALWAYS, NEVER, HEAD
+		FORCE_ALWAYS, ALWAYS, NEVER, HEAD
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 187f4f0..2e4d313 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -23,6 +23,8 @@ public class StreamSource<OUT> extends AbstractUdfStreamOperator<OUT,
SourceFunc
 
 	public StreamSource(SourceFunction<OUT> sourceFunction) {
 		super(sourceFunction);
+
+		this.chainingStrategy = ChainingStrategy.HEAD;
 	}
 
 	public void run() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
index ff9a96d..fbe6e44 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
@@ -56,6 +56,8 @@ public class StreamDiscretizer<IN>
 
 		this.isActiveTrigger = triggerPolicy instanceof ActiveTriggerPolicy;
 		this.isActiveEviction = evictionPolicy instanceof ActiveEvictionPolicy;
+
+		this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
 	}
 
 	public TriggerPolicy<IN> getTrigger() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
index 7d153f4..b9de698 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
@@ -34,7 +34,7 @@ public class StreamWindowBuffer<T>
 
 	public StreamWindowBuffer(WindowBuffer<T> buffer) {
 		this.buffer = buffer;
-		setChainingStrategy(ChainingStrategy.ALWAYS);
+		setChainingStrategy(ChainingStrategy.FORCE_ALWAYS);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
index fb9ee22..f1b0ee2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
@@ -29,7 +29,7 @@ public class WindowFlattener<T> extends AbstractStreamOperator<T>
 		implements OneInputStreamOperator<StreamWindow<T>, T> {
 
 	public WindowFlattener() {
-		chainingStrategy = ChainingStrategy.ALWAYS;
+		chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
index 46f5d4e..12dd239 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
@@ -37,7 +37,7 @@ public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>
 	public WindowMerger() {
 		this.windows = new HashMap<Integer, StreamWindow<T>>();
 
-		chainingStrategy = ChainingStrategy.ALWAYS;
+		chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
index c1e701d..14a055a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
@@ -35,7 +35,7 @@ public class WindowPartitioner<T> extends AbstractStreamOperator<StreamWindow<T>
 	public WindowPartitioner(KeySelector<T, ?> keySelector) {
 		this.keySelector = keySelector;
 
-		chainingStrategy = ChainingStrategy.ALWAYS;
+		chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
 	}
 
 	public WindowPartitioner(int numberOfSplits) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 08b2535..89fd999 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -103,7 +103,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   /**
    * Turns off chaining for this operator so thread co-location will not be
    * used as an optimization. </p> Chaining can be turned off for the whole
-   * job by {@link StreamExecutionEnvironment#disableOperatorChaning()}
+   * job by {@link StreamExecutionEnvironment#disableOperatorChaining()}
    * however it is not advised for performance considerations.
    * 
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/a04f0912/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 21b2e71..686fc23 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -133,7 +133,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * 
    */
   def disableOperatorChaning(): StreamExecutionEnvironment = {
-    javaEnv.disableOperatorChaning()
+    javaEnv.disableOperatorChaining()
     this
   }
 


Mime
View raw message