flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/4] flink git commit: [hotfix] [streaming api] Minor cleanup in WindowedStream and AllWindowedStream
Date Mon, 23 Jan 2017 18:45:14 GMT
[hotfix] [streaming api] Minor cleanup in WindowedStream and AllWindowedStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/159f51b2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/159f51b2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/159f51b2

Branch: refs/heads/release-1.2
Commit: 159f51b23dfdf2b6ccba728dbbe4c517ca532dbc
Parents: 4697b97
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Jan 23 14:55:48 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Jan 23 19:16:03 2017 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/AllWindowedStream.java     | 12 ++++++------
 .../flink/streaming/api/datastream/WindowedStream.java  | 12 ++++++------
 2 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/159f51b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 0f0e947..6c57391 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -53,6 +53,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A {@code AllWindowedStream} represents a data stream where the stream of
  * elements is split into windows based on a
@@ -122,12 +124,10 @@ public class AllWindowedStream<T, W extends Window> {
 	 */
 	@PublicEvolving
 	public AllWindowedStream<T, W> allowedLateness(Time lateness) {
-		long millis = lateness.toMilliseconds();
-		if (millis < 0) {
-			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
-		} else if (windowAssigner.isEventTime()) {
-			this.allowedLateness = millis;
-		}
+		final long millis = lateness.toMilliseconds();
+		checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
+
+		this.allowedLateness = millis;
 		return this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/159f51b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 2df3621..b20e67a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -63,6 +63,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A {@code WindowedStream} represents a data stream where elements are grouped by
  * key, and for each key, the stream of elements is split into windows based on a
@@ -140,12 +142,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 */
 	@PublicEvolving
 	public WindowedStream<T, K, W> allowedLateness(Time lateness) {
-		long millis = lateness.toMilliseconds();
-		if (millis < 0) {
-			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
-		} else if (windowAssigner.isEventTime()) {
-			this.allowedLateness = millis;
-		}
+		final long millis = lateness.toMilliseconds();
+		checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
+
+		this.allowedLateness = millis;
 		return this;
 	}
 


Mime
View raw message