flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/6] flink git commit: [hotfix] [streaming api] Minor cleanup in WindowedStream and AllWindowedStream
Date Tue, 24 Jan 2017 18:19:57 GMT
[hotfix] [streaming api] Minor cleanup in WindowedStream and AllWindowedStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6342d6db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6342d6db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6342d6db

Branch: refs/heads/master
Commit: 6342d6db1de5f38a921732e35abd83e6c5b9305a
Parents: 87af841
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Jan 23 14:55:48 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/AllWindowedStream.java     | 12 ++++++------
 .../flink/streaming/api/datastream/WindowedStream.java  | 11 +++++------
 2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6342d6db/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index bd11de3..5de1774 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -53,6 +53,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A {@code AllWindowedStream} represents a data stream where the stream of
  * elements is split into windows based on a
@@ -122,12 +124,10 @@ public class AllWindowedStream<T, W extends Window> {
 	 */
 	@PublicEvolving
 	public AllWindowedStream<T, W> allowedLateness(Time lateness) {
-		long millis = lateness.toMilliseconds();
-		if (millis < 0) {
-			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
-		} else if (windowAssigner.isEventTime()) {
-			this.allowedLateness = millis;
-		}
+		final long millis = lateness.toMilliseconds();
+		checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
+
+		this.allowedLateness = millis;
 		return this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6342d6db/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index c360ea1..c74bad7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -66,6 +66,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -145,12 +146,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 */
 	@PublicEvolving
 	public WindowedStream<T, K, W> allowedLateness(Time lateness) {
-		long millis = lateness.toMilliseconds();
-		if (millis < 0) {
-			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
-		} else if (windowAssigner.isEventTime()) {
-			this.allowedLateness = millis;
-		}
+		final long millis = lateness.toMilliseconds();
+		checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
+
+		this.allowedLateness = millis;
 		return this;
 	}
 


Mime
View raw message