flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [22/28] git commit: [streaming] Mutable and Immutable invoke in CoInvokable
Date Fri, 29 Aug 2014 19:03:55 GMT
[streaming] Mutable and Immutable invoke in CoInvokable


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/82aa0056
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/82aa0056
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/82aa0056

Branch: refs/heads/master
Commit: 82aa005641ba37c50d9c08fd80783cba24e55a88
Parents: a103950
Author: jfeher <feherj@gmail.com>
Authored: Tue Aug 26 14:08:15 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200

----------------------------------------------------------------------
 .../api/invokable/operator/co/CoInvokable.java  | 30 ++++++++++++++++++--
 1 file changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/82aa0056/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 246fa4f..7baf687 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -67,7 +67,16 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokabl
 		this.reuse2 = serializer2.createInstance();
 	}
 
+	
 	public void invoke() throws Exception {
+		if (this.isMutable) {
+			mutableInvoke();
+		} else {
+			immutableInvoke();
+		}
+	}
+	
+	protected void immutableInvoke() throws Exception {
 		while (true) {
 			int next = recordIterator.next(reuse1, reuse2);
 			if (next == 0) {
@@ -81,9 +90,26 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokabl
 			}
 		}
 	}
+	
+	protected void mutableInvoke() throws Exception {
+		while (true) {
+			int next = recordIterator.next(reuse1, reuse2);
+			if (next == 0) {
+				break;
+			} else if (next == 1) {
+				handleStream1();
+			} else {
+				handleStream2();
+			}
+		}
+	}
 
-	public abstract void handleStream1() throws Exception;
+	protected abstract void handleStream1() throws Exception;
 
-	public abstract void handleStream2() throws Exception;
+	protected abstract void handleStream2() throws Exception;
+	
+	protected abstract void coUSerFunction1() throws Exception;
+	
+	protected abstract void coUserFunction2() throws Exception;
 
 }


Mime
View raw message