flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/2] flink git commit: [FLINK-3536] Make clearer distinction between event time and processing time
Date Mon, 29 Feb 2016 19:52:38 GMT
[FLINK-3536] Make clearer distinction between event time and processing time

This brings it more in line with *ProcessingTimeWindows and makes it
clear what type of window assigner it is.

The old name, i.e. SlidingTimeWindows and TumblingTimeWindows is still
available but deprecated.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9922d10a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9922d10a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9922d10a

Branch: refs/heads/master
Commit: 9922d10a0f3e291bb7e6f75ccb70baecb5c2bcd9
Parents: 0ac2b1a
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Feb 29 14:56:29 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Feb 29 20:29:11 2016 +0100

----------------------------------------------------------------------
 docs/apis/streaming/index.md                    |  16 +--
 docs/apis/streaming/time.md                     |   2 +-
 docs/apis/streaming/windows.md                  |  31 +++--
 .../streaming/examples/join/WindowJoin.java     |   4 +-
 .../scala/examples/join/WindowJoin.scala        |   4 +-
 .../api/datastream/CoGroupedStreams.java        |   2 +-
 .../streaming/api/datastream/DataStream.java    |  12 +-
 .../streaming/api/datastream/JoinedStreams.java |   2 +-
 .../streaming/api/datastream/KeyedStream.java   |  12 +-
 .../assigners/SlidingEventTimeWindows.java      | 112 +++++++++++++++++++
 .../assigners/SlidingProcessingTimeWindows.java |   4 +-
 .../windowing/assigners/SlidingTimeWindows.java |  73 ++----------
 .../assigners/TumblingEventTimeWindows.java     |  98 ++++++++++++++++
 .../TumblingProcessingTimeWindows.java          |   4 +-
 .../assigners/TumblingTimeWindows.java          |  60 ++--------
 .../windowing/AllWindowTranslationTest.java     |  36 +++---
 .../operators/windowing/CoGroupJoinITCase.java  |   8 +-
 .../windowing/NonKeyedWindowOperatorTest.java   |  10 +-
 .../windowing/TimeWindowTranslationTest.java    |   8 +-
 .../operators/windowing/WindowFoldITCase.java   |   6 +-
 .../operators/windowing/WindowOperatorTest.java |  12 +-
 .../windowing/WindowTranslationTest.java        |  30 ++---
 .../streaming/timestamp/TimestampITCase.java    |   6 +-
 .../streaming/api/scala/CoGroupedStreams.scala  |   2 +-
 .../flink/streaming/api/scala/DataStream.scala  |   4 +-
 .../streaming/api/scala/JoinedStreams.scala     |   2 +-
 .../flink/streaming/api/scala/KeyedStream.scala |   4 +-
 .../api/scala/AllWindowTranslationTest.scala    |  26 ++---
 .../streaming/api/scala/CoGroupJoinITCase.scala |   8 +-
 .../streaming/api/scala/WindowFoldITCase.scala  |   6 +-
 .../api/scala/WindowTranslationTest.scala       |  22 ++--
 31 files changed, 364 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/docs/apis/streaming/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md
index b8a3541..3741a46 100644
--- a/docs/apis/streaming/index.md
+++ b/docs/apis/streaming/index.md
@@ -293,7 +293,7 @@ keyedStream.maxBy("key");
             key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
             See <a href="windows.html">windows</a> for a complete description of windows.
     {% highlight java %}
-dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
+dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
     {% endhighlight %}
         </p>
           </td>
@@ -307,7 +307,7 @@ dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 s
               <p><strong>WARNING:</strong> This is in many cases a <strong>non-parallel</strong> transformation. All records will be
                gathered in one task for the windowAll operator.</p>
   {% highlight java %}
-dataStream.windowAll(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
+dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
   {% endhighlight %}
           </td>
         </tr>
@@ -410,7 +410,7 @@ dataStream.union(otherStream1, otherStream2, ...);
     {% highlight java %}
 dataStream.join(otherStream)
     .where(0).equalTo(1)
-    .window(TumblingTimeWindows.of(Time.seconds(3)))
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
     .apply (new JoinFunction () {...});
     {% endhighlight %}
           </td>
@@ -422,7 +422,7 @@ dataStream.join(otherStream)
     {% highlight java %}
 dataStream.coGroup(otherStream)
     .where(0).equalTo(1)
-    .window(TumblingTimeWindows.of(Time.seconds(3)))
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
     .apply (new CoGroupFunction () {...});
     {% endhighlight %}
           </td>
@@ -669,7 +669,7 @@ keyedStream.maxBy("key")
             key according to some characteristic (e.g., the data that arrived within the last 5 seconds).
             See <a href="windows.html">windows</a> for a description of windows.
     {% highlight scala %}
-dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
+dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
     {% endhighlight %}
         </p>
           </td>
@@ -683,7 +683,7 @@ dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))) // Last 5 se
               <p><strong>WARNING:</strong> This is in many cases a <strong>non-parallel</strong> transformation. All records will be
                gathered in one task for the windowAll operator.</p>
   {% highlight scala %}
-dataStream.windowAll(TumblingTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
+dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
   {% endhighlight %}
           </td>
         </tr>
@@ -759,7 +759,7 @@ dataStream.union(otherStream1, otherStream2, ...)
     {% highlight scala %}
 dataStream.join(otherStream)
     .where(0).equalTo(1)
-    .window(TumblingTimeWindows.of(Time.seconds(3)))
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
     .apply { ... }
     {% endhighlight %}
           </td>
@@ -771,7 +771,7 @@ dataStream.join(otherStream)
     {% highlight scala %}
 dataStream.coGroup(otherStream)
     .where(0).equalTo(1)
-    .window(TumblingTimeWindows.of(Time.seconds(3)))
+    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
     .apply {}
     {% endhighlight %}
           </td>

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/docs/apis/streaming/time.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/time.md b/docs/apis/streaming/time.md
index c20319c..facd0a5 100644
--- a/docs/apis/streaming/time.md
+++ b/docs/apis/streaming/time.md
@@ -62,7 +62,7 @@ windowing you would use window assigners such as `SlidingProcessingTimeWindows`
 `TumblingProcessingTimeWindows`.
 
 In order to work with event time semantics, i.e. if you want to use window assigners such as
-`TumblingTimeWindows` or `SlidingTimeWindows`, you need to follow these steps:
+`TumblingEventTimeWindows` or `SlidingEventTimeWindows`, you need to follow these steps:
 
 - Set `enableTimestamps()`, as well the interval for watermark emission
 (`setAutoWatermarkInterval(long milliseconds)`) in `ExecutionConfig`.

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/docs/apis/streaming/windows.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/windows.md b/docs/apis/streaming/windows.md
index 01da34e..5f239fc 100644
--- a/docs/apis/streaming/windows.md
+++ b/docs/apis/streaming/windows.md
@@ -191,7 +191,7 @@ window, and every time execution is triggered, 10 elements are retained in the w
 <div data-lang="java" markdown="1">
 {% highlight java %}
 keyedStream
-    .window(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1))
+    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
     .trigger(CountTrigger.of(100))
     .evictor(CountEvictor.of(10));
 {% endhighlight %}
@@ -200,7 +200,7 @@ keyedStream
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 keyedStream
-    .window(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1))
+    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
     .trigger(CountTrigger.of(100))
     .evictor(CountEvictor.of(10))
 {% endhighlight %}
@@ -214,7 +214,7 @@ The `WindowAssigner` defines how incoming elements are assigned to windows. A wi
 that has a begin-value, and an end-value corresponding to a begin-time and end-time. Elements with timestamp (according
 to some notion of time described above within these values are part of the window).
 
-For example, the `SlidingTimeWindows`
+For example, the `SlidingEventTimeWindows`
 assigner in the code above defines a window of size 5 seconds, and a slide of 1 second. Assume that
 time starts from 0 and is measured in milliseconds. Then, we have 6 windows
 that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. Each incoming
@@ -256,7 +256,7 @@ stream.window(GlobalWindows.create());
             watermark with value higher than its end-value is received.
           </p>
       {% highlight java %}
-stream.window(TumblingTimeWindows.of(Time.seconds(1)));
+stream.window(TumblingEventTimeWindows.of(Time.seconds(1)));
       {% endhighlight %}
         </td>
       </tr>
@@ -270,7 +270,7 @@ stream.window(TumblingTimeWindows.of(Time.seconds(1)));
 	          watermark with value higher than its end-value is received.
           </p>
     {% highlight java %}
-stream.window(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1)));
+stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)));
     {% endhighlight %}
         </td>
       </tr>
@@ -338,7 +338,7 @@ stream.window(GlobalWindows.create)
             watermark with value higher than its end-value is received.
             </p>
       {% highlight scala %}
-stream.window(TumblingTimeWindows.of(Time.seconds(1)))
+stream.window(TumblingEventTimeWindows.of(Time.seconds(1)))
       {% endhighlight %}
           </td>
         </tr>
@@ -352,7 +352,7 @@ stream.window(TumblingTimeWindows.of(Time.seconds(1)))
             watermark with value higher than its end-value is received.
           </p>
     {% highlight scala %}
-stream.window(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
+stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
     {% endhighlight %}
         </td>
       </tr>
@@ -743,8 +743,7 @@ stream.countWindow(1000)
         <td>
     {% highlight java %}
 stream.window(GlobalWindows.create())
-  .trigger(CountTrigger.of(1000)
-  .evictor(CountEvictor.of(1000)))
+  .trigger(PurgingTrigger.of(CountTrigger.of(size)))
     {% endhighlight %}
         </td>
       </tr>
@@ -772,7 +771,7 @@ stream.timeWindow(Time.seconds(5))
 	</td>
         <td>
     {% highlight java %}
-stream.window(TumblingTimeWindows.of((Time.seconds(5)))
+stream.window(TumblingEventTimeWindows.of((Time.seconds(5)))
   .trigger(EventTimeTrigger.create())
     {% endhighlight %}
         </td>
@@ -786,7 +785,7 @@ stream.timeWindow(Time.seconds(5), Time.seconds(1))
 	</td>
         <td>
     {% highlight java %}
-stream.window(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
+stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
   .trigger(EventTimeTrigger.create())
     {% endhighlight %}
         </td>
@@ -800,7 +799,7 @@ stream.timeWindow(Time.seconds(5))
 	</td>
         <td>
     {% highlight java %}
-stream.window(TumblingTimeWindows.of((Time.seconds(5)))
+stream.window(TumblingProcessingTimeWindows.of((Time.seconds(5)))
   .trigger(ProcessingTimeTrigger.create())
     {% endhighlight %}
         </td>
@@ -814,7 +813,7 @@ stream.timeWindow(Time.seconds(5), Time.seconds(1))
 	</td>
         <td>
     {% highlight java %}
-stream.window(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
+stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
   .trigger(ProcessingTimeTrigger.create())
     {% endhighlight %}
         </td>
@@ -834,7 +833,7 @@ same:
 <div data-lang="java" markdown="1">
 {% highlight java %}
 nonKeyedStream
-    .windowAll(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1))
+    .windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
     .trigger(CountTrigger.of(100))
     .evictor(CountEvictor.of(10));
 {% endhighlight %}
@@ -843,7 +842,7 @@ nonKeyedStream
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 nonKeyedStream
-    .windowAll(SlidingTimeWindows.of(Time.seconds(5), Time.seconds(1))
+    .windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
     .trigger(CountTrigger.of(100))
     .evictor(CountEvictor.of(10))
 {% endhighlight %}
@@ -992,4 +991,4 @@ nonKeyedStream.countWindowAll(1000, 100)
 </table>
 
 </div>
-</div>
\ No newline at end of file
+</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 8c52747..f57d216 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 import java.util.Random;
@@ -84,7 +84,7 @@ public class WindowJoin {
 				.join(salaries)
 				.where(new NameKeySelector())
 				.equalTo(new NameKeySelector())
-				.window(TumblingTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS)))
+				.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS)))
 				.apply(new MyJoinFunction());
 
 		// emit result

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 50a2216..7ea9c70 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.scala.examples.join
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 
 import scala.Stream._
@@ -56,7 +56,7 @@ object WindowJoin {
     val joined = grades.join(salaries)
         .where(_.name)
         .equalTo(_.name)
-        .window(SlidingTimeWindows.of(Time.seconds(2), Time.seconds(1)))
+        .window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
         .apply { (g, s) => Person(g.name, g.grade, s.salary) }
 
     if (params.has("output")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 713433c..a9a64af 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -65,7 +65,7 @@ import static java.util.Objects.requireNonNull;
  * DataStream<T> result = one.coGroup(two)
  *     .where(new MyFirstKeySelector())
  *     .equalTo(new MyFirstKeySelector())
- *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  *     .apply(new MyCoGroupFunction());
  * } </pre>
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 1cd8ade..d1a2115 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -66,9 +66,9 @@ import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -594,7 +594,7 @@ public class DataStream<T> {
 	 * Windows this {@code DataStream} into tumbling time windows.
 	 *
 	 * <p>
-	 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
+	 * This is a shortcut for either {@code .window(TumblingEventTimeWindows.of(size))} or
 	 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
 	 * set using
 	 *
@@ -611,7 +611,7 @@ public class DataStream<T> {
 		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
 			return windowAll(TumblingProcessingTimeWindows.of(size));
 		} else {
-			return windowAll(TumblingTimeWindows.of(size));
+			return windowAll(TumblingEventTimeWindows.of(size));
 		}
 	}
 
@@ -619,7 +619,7 @@ public class DataStream<T> {
 	 * Windows this {@code DataStream} into sliding time windows.
 	 *
 	 * <p>
-	 * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
+	 * This is a shortcut for either {@code .window(SlidingEventTimeWindows.of(size, slide))} or
 	 * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
 	 * set using
 	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
@@ -635,7 +635,7 @@ public class DataStream<T> {
 		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
 			return windowAll(SlidingProcessingTimeWindows.of(size, slide));
 		} else {
-			return windowAll(SlidingTimeWindows.of(size, slide));
+			return windowAll(SlidingEventTimeWindows.of(size, slide));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index f131b6e..86c6226 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -57,7 +57,7 @@ import static java.util.Objects.requireNonNull;
  * DataStream<T> result = one.join(two)
  *     .where(new MyFirstKeySelector())
  *     .equalTo(new MyFirstKeySelector())
- *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  *     .apply(new MyJoinFunction());
  * } </pre>
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 39a887e..37941e1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -38,9 +38,9 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -165,7 +165,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * Windows this {@code KeyedStream} into tumbling time windows.
 	 *
 	 * <p>
-	 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
+	 * This is a shortcut for either {@code .window(TumblingEventTimeWindows.of(size))} or
 	 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
 	 * set using
 	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
@@ -176,7 +176,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
 			return window(TumblingProcessingTimeWindows.of(size));
 		} else {
-			return window(TumblingTimeWindows.of(size));
+			return window(TumblingEventTimeWindows.of(size));
 		}
 	}
 
@@ -184,7 +184,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * Windows this {@code KeyedStream} into sliding time windows.
 	 *
 	 * <p>
-	 * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
+	 * This is a shortcut for either {@code .window(SlidingEventTimeWindows.of(size, slide))} or
 	 * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
 	 * set using
 	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
@@ -195,7 +195,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
 			return window(SlidingProcessingTimeWindows.of(size, slide));
 		} else {
-			return window(SlidingTimeWindows.of(size, slide));
+			return window(SlidingEventTimeWindows.of(size, slide));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
new file mode 100644
index 0000000..89b216e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
+ * elements. Windows can possibly overlap.
+ *
+ * <p>
+ * For example, in order to window into windows of 1 minute, every 10 seconds:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
+ *   keyed.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)));
+ * } </pre>
+ */
+@PublicEvolving
+public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	private final long size;
+
+	private final long slide;
+
+	protected SlidingEventTimeWindows(long size, long slide) {
+		this.size = size;
+		this.slide = slide;
+	}
+
+	@Override
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+		if (timestamp > Long.MIN_VALUE) {
+			List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
+			long lastStart = timestamp - timestamp % slide;
+			for (long start = lastStart;
+				start > timestamp - size;
+				start -= slide) {
+				windows.add(new TimeWindow(start, start + size));
+			}
+			return windows;
+		} else {
+			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
+					"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
+					"'DataStream.assignTimestampsAndWatermarks(...)'?");
+		}
+	}
+
+	public long getSize() {
+		return size;
+	}
+
+	public long getSlide() {
+		return slide;
+	}
+
+	@Override
+	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
+		return EventTimeTrigger.create();
+	}
+
+	@Override
+	public String toString() {
+		return "SlidingEventTimeWindows(" + size + ", " + slide + ")";
+	}
+
+	/**
+	 * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to sliding time windows based on the element timestamp.
+	 *
+	 * @param size The size of the generated windows.
+	 * @param slide The slide interval of the generated windows.
+	 * @return The time policy.
+	 */
+	public static SlidingEventTimeWindows of(Time size, Time slide) {
+		return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
+	}
+
+	@Override
+	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+		return new TimeWindow.Serializer();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index 8ac709c..4b91986 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -39,7 +39,7 @@ import java.util.List;
  * DataStream<Tuple2<String, Integer>> in = ...;
  * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
  * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- *   keyed.window(SlidingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
+ *   keyed.window(SlidingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
  * } </pre>
  */
 public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
@@ -86,7 +86,7 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
 	}
 
 	/**
-	 * Creates a new {@code SlidingTimeWindows} {@link WindowAssigner} that assigns
+	 * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
 	 * elements to sliding time windows based on the element timestamp.
 	 *
 	 * @param size The size of the generated windows.

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 900e254..581bbe1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -19,94 +19,35 @@
 package org.apache.flink.streaming.api.windowing.assigners;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 
 /**
  * A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
  * elements. Windows can possibly overlap.
  *
- * <p>
- * For example, in order to window into windows of 1 minute, every 10 seconds:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
- *   keyed.window(SlidingTimeWindows.of(Time.minutes(1), Time.seconds(10)));
- * } </pre>
+ * @deprecated Please use {@link SlidingEventTimeWindows}.
  */
 @PublicEvolving
-public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
+@Deprecated
+public class SlidingTimeWindows extends SlidingEventTimeWindows {
 	private static final long serialVersionUID = 1L;
 
-	private final long size;
-
-	private final long slide;
-
 	private SlidingTimeWindows(long size, long slide) {
-		this.size = size;
-		this.slide = slide;
-	}
-
-	@Override
-	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
-		if (timestamp > Long.MIN_VALUE) {
-			List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
-			long lastStart = timestamp - timestamp % slide;
-			for (long start = lastStart;
-				start > timestamp - size;
-				start -= slide) {
-				windows.add(new TimeWindow(start, start + size));
-			}
-			return windows;
-		} else {
-			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
-					"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
-					"'DataStream.assignTimestampsAndWatermarks(...)'?");
-		}
-	}
-
-	public long getSize() {
-		return size;
-	}
-
-	public long getSlide() {
-		return slide;
-	}
-
-	@Override
-	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
-		return EventTimeTrigger.create();
-	}
-
-	@Override
-	public String toString() {
-		return "SlidingTimeWindows(" + size + ", " + slide + ")";
+		super(size, slide);
 	}
 
 	/**
 	 * Creates a new {@code SlidingTimeWindows} {@link WindowAssigner} that assigns
 	 * elements to sliding time windows based on the element timestamp.
 	 *
+	 * @deprecated Please use {@link SlidingEventTimeWindows#of(Time, Time)}.
+	 *
 	 * @param size The size of the generated windows.
 	 * @param slide The slide interval of the generated windows.
 	 * @return The time policy.
 	 */
+	@Deprecated()
 	public static SlidingTimeWindows of(Time size, Time slide) {
 		return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
 	}
-
-	@Override
-	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
-		return new TimeWindow.Serializer();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
new file mode 100644
index 0000000..1f61281
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * <p>
+ * For example, in order to window into windows of 1 minute:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
+ *   keyed.window(TumblingEventTimeWindows.of(Time.minutes(1)));
+ * } </pre>
+ */
+@PublicEvolving
+public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	private long size;
+
+	protected TumblingEventTimeWindows(long size) {
+		this.size = size;
+	}
+
+	@Override
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+		if (timestamp > Long.MIN_VALUE) {
+			// Long.MIN_VALUE is currently assigned when no timestamp is present
+			long start = timestamp - (timestamp % size);
+			return Collections.singletonList(new TimeWindow(start, start + size));
+		} else {
+			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
+					"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
+					"'DataStream.assignTimestampsAndWatermarks(...)'?");
+		}
+	}
+
+	public long getSize() {
+		return size;
+	}
+
+	@Override
+	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
+		return EventTimeTrigger.create();
+	}
+
+	@Override
+	public String toString() {
+		return "TumblingEventTimeWindows(" + size + ")";
+	}
+
+	/**
+	 * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to time windows based on the element timestamp.
+	 *
+	 * @param size The size of the generated windows.
+	 * @return The time policy.
+	 */
+	public static TumblingEventTimeWindows of(Time size) {
+		return new TumblingEventTimeWindows(size.toMilliseconds());
+	}
+
+	@Override
+	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+		return new TimeWindow.Serializer();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 83f3d0c..436a9ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -38,7 +38,7 @@ import java.util.Collections;
  * DataStream<Tuple2<String, Integer>> in = ...;
  * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
  * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- *   keyed.window(TumblingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
+ *   keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
  * } </pre>
  */
 public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
@@ -72,7 +72,7 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
 	}
 
 	/**
-	 * Creates a new {@code TumblingTimeWindows} {@link WindowAssigner} that assigns
+	 * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
 	 * elements to time windows based on the element timestamp.
 	 *
 	 * @param size The size of the generated windows.

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index 2b97691..156b1e9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -19,80 +19,34 @@
 package org.apache.flink.streaming.api.windowing.assigners;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import java.util.Collection;
-import java.util.Collections;
 
 /**
  * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
  * elements. Windows cannot overlap.
  *
- * <p>
- * For example, in order to window into windows of 1 minute:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
- *   keyed.window(TumblingTimeWindows.of(Time.minutes(1)));
- * } </pre>
+ * @deprecated Please use {@link TumblingEventTimeWindows}.
  */
 @PublicEvolving
-public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
+@Deprecated
+public class TumblingTimeWindows extends TumblingEventTimeWindows {
 	private static final long serialVersionUID = 1L;
 
-	private long size;
-
 	private TumblingTimeWindows(long size) {
-		this.size = size;
-	}
-
-	@Override
-	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
-		if (timestamp > Long.MIN_VALUE) {
-			// Long.MIN_VALUE is currently assigned when no timestamp is present
-			long start = timestamp - (timestamp % size);
-			return Collections.singletonList(new TimeWindow(start, start + size));
-		} else {
-			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
-					"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
-					"'DataStream.assignTimestampsAndWatermarks(...)'?");
-		}
-	}
-
-	public long getSize() {
-		return size;
-	}
-
-	@Override
-	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
-		return EventTimeTrigger.create();
-	}
-
-	@Override
-	public String toString() {
-		return "TumblingTimeWindows(" + size + ")";
+		super(size);
 	}
 
 	/**
 	 * Creates a new {@code TumblingTimeWindows} {@link WindowAssigner} that assigns
 	 * elements to time windows based on the element timestamp.
 	 *
+	 * @deprecated Please use {@link TumblingEventTimeWindows#of(Time)}.
+	 *
 	 * @param size The size of the generated windows.
 	 * @return The time policy.
 	 */
+	@Deprecated()
 	public static TumblingTimeWindows of(Time size) {
 		return new TumblingTimeWindows(size.toMilliseconds());
 	}
-
-	@Override
-	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
-		return new TimeWindow.Serializer();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index f6e3dcc..463395e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -65,7 +65,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DummyReducer reducer = new DummyReducer();
 
 		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
@@ -73,11 +73,11 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
 		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof ReducingWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
@@ -95,7 +95,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
 		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
 	}
 
@@ -110,7 +110,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DummyReducer reducer = new DummyReducer();
 
 		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.trigger(CountTrigger.of(100))
 				.reduce(reducer);
 
@@ -119,11 +119,11 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
 		Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof ReducingWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
 				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
@@ -142,7 +142,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
 		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
 	}
 
@@ -157,7 +157,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DummyReducer reducer = new DummyReducer();
 
 		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.evictor(CountEvictor.of(100))
 				.reduce(reducer);
 
@@ -166,12 +166,12 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
 		EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
 		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
 		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
 				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
 				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
@@ -191,7 +191,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
 		EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
 		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
 		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
 	}
@@ -210,7 +210,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DummyFolder folder = new DummyFolder();
 
 		DataStream<Integer> window1 = source
-				.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.fold(0, folder);
 
 		OneInputTransformation<Tuple2<String, Integer>, Integer> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) window1.getTransformation();
@@ -218,11 +218,11 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
 		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof FoldingWindowBuffer.Factory);
 
 		DataStream<Integer> window2 = source
-				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.evictor(CountEvictor.of(13))
 				.fold(0, folder);
 
@@ -231,7 +231,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
 		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof ListWindowBuffer.Factory);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
index 198649d..c40874c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
@@ -109,7 +109,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 		source1.coGroup(source2)
 				.where(new Tuple2KeyExtractor())
 				.equalTo(new Tuple2KeyExtractor())
-				.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
 				.apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() {
 					@Override
 					public void coGroup(Iterable<Tuple2<String, Integer>> first,
@@ -207,7 +207,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 		source1.join(source2)
 				.where(new Tuple3KeyExtractor())
 				.equalTo(new Tuple3KeyExtractor())
-				.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
 				.apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
 					@Override
 					public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
@@ -284,7 +284,7 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 		source1.join(source1)
 				.where(new Tuple3KeyExtractor())
 				.equalTo(new Tuple3KeyExtractor())
-				.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
 				.apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
 					@Override
 					public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index 406f3b0..9fc11b7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -26,12 +26,11 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ReduceIterableAllWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
@@ -45,7 +44,6 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuff
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -79,7 +77,7 @@ public class NonKeyedWindowOperatorTest {
 		final int WINDOW_SLIDE = 1;
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
-				SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+				SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceIterableAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
@@ -155,7 +153,7 @@ public class NonKeyedWindowOperatorTest {
 		final int WINDOW_SIZE = 3;
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
-				TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				windowBufferFactory,
 				new ReduceIterableAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index f214941..87da045 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -30,8 +30,8 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -111,7 +111,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof WindowOperator);
 		WindowOperator winOperator1 = (WindowOperator) operator1;
 		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
 		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
@@ -134,7 +134,7 @@ public class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator2 instanceof WindowOperator);
 		WindowOperator winOperator2 = (WindowOperator) operator2;
 		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
 		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
index b05af54..3b859f0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 
@@ -85,7 +85,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 
 		source1
 				.keyBy(0)
-				.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
 				.fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
 					@Override
 					public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator,
@@ -149,7 +149,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 		}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
 
 		source1
-				.windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
 				.fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
 					@Override
 					public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator,

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index a1f08ad..3277940 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -31,8 +31,8 @@ import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunct
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
@@ -137,7 +137,7 @@ public class WindowOperatorTest {
 			inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+				SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -173,7 +173,7 @@ public class WindowOperatorTest {
 			inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-			SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
+			SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
 			new TimeWindow.Serializer(),
 			new TupleKeySelector(),
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -268,7 +268,7 @@ public class WindowOperatorTest {
 			inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
 				new TimeWindow.Serializer(),
 				new TupleKeySelector(),
 				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
@@ -303,7 +303,7 @@ public class WindowOperatorTest {
 			inputType.createSerializer(new ExecutionConfig()));
 
 		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-			TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+			TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
 			new TimeWindow.Serializer(),
 			new TupleKeySelector(),
 			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 30bb840..cacfc26 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -30,8 +30,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -65,7 +65,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 			.keyBy(0)
-			.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+			.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 			.reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
 				@Override
 				public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
@@ -90,7 +90,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
@@ -98,12 +98,12 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator1 instanceof WindowOperator);
 		WindowOperator winOperator1 = (WindowOperator) operator1;
 		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
 		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
-				.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
@@ -120,7 +120,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator2 instanceof WindowOperator);
 		WindowOperator winOperator2 = (WindowOperator) operator2;
 		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
 		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
 	}
 
@@ -136,7 +136,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.trigger(CountTrigger.of(100))
 				.reduce(reducer);
 
@@ -145,12 +145,12 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator1 instanceof WindowOperator);
 		WindowOperator winOperator1 = (WindowOperator) operator1;
 		Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
 		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
-				.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
 				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
@@ -169,7 +169,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator2 instanceof WindowOperator);
 		WindowOperator winOperator2 = (WindowOperator) operator2;
 		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
 		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
 	}
 
@@ -185,7 +185,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.evictor(CountEvictor.of(100))
 				.reduce(reducer);
 
@@ -194,13 +194,13 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
 		EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
 		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
 		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
 		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
-				.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
 				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
 				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@@ -220,7 +220,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
 		EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
 		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
 		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
 		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index d92aee9..d918ba8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.NoOpSink;
@@ -560,7 +560,7 @@ public class TimestampITCase {
 
 		source1
 				.keyBy(0)
-				.window(TumblingTimeWindows.of(Time.seconds(5)))
+				.window(TumblingEventTimeWindows.of(Time.seconds(5)))
 				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
 					@Override
 					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  {
@@ -591,7 +591,7 @@ public class TimestampITCase {
 
 		source1
 				.keyBy(0)
-				.window(TumblingTimeWindows.of(Time.seconds(5)))
+				.window(TumblingEventTimeWindows.of(Time.seconds(5)))
 				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
 					@Override
 					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)  {

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index 4cce9e2..52c53d5 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -51,7 +51,7 @@ import scala.collection.JavaConverters._
  * val result = one.coGroup(two)
  *     .where(new MyFirstKeySelector())
  *     .equalTo(new MyFirstKeySelector())
- *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  *     .apply(new MyCoGroupFunction())
  * } }}}
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 5e9c307..5f99ad8 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -572,7 +572,7 @@ class DataStream[T](stream: JavaStream[T]) {
   /**
    * Windows this DataStream into tumbling time windows.
    *
-   * This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or
+   * This is a shortcut for either `.window(TumblingEventTimeWindows.of(size))` or
    * `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic
    * set using
    * [[StreamExecutionEnvironment.setStreamTimeCharacteristic]].
@@ -590,7 +590,7 @@ class DataStream[T](stream: JavaStream[T]) {
   /**
    * Windows this DataStream into sliding time windows.
    *
-   * This is a shortcut for either `.window(SlidingTimeWindows.of(size, slide))` or
+   * This is a shortcut for either `.window(SlidingEventTimeWindows.of(size, slide))` or
    * `.window(SlidingProcessingTimeWindows.of(size, slide))` depending on the time characteristic
    * set using
    * [[StreamExecutionEnvironment.setStreamTimeCharacteristic]].

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index d2fb013..4d09dae 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -49,7 +49,7 @@ import org.apache.flink.util.Collector
  * val result = one.join(two)
  *     .where {t => ... }
  *     .equal {t => ... }
- *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  *     .apply(new MyJoinFunction())
  * } }}}
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 00a4bbb..1647dd3 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -52,7 +52,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
   /**
    * Windows this [[KeyedStream]] into tumbling time windows.
    *
-   * This is a shortcut for either `.window(TumblingTimeWindows.of(size))` or
+   * This is a shortcut for either `.window(TumblingEventTimeWindows.of(size))` or
    * `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic
    * set using
    * [[StreamExecutionEnvironment.setStreamTimeCharacteristic()]]
@@ -85,7 +85,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
   /**
    * Windows this [[KeyedStream]] into sliding time windows.
    *
-   * This is a shortcut for either `.window(SlidingTimeWindows.of(size))` or
+   * This is a shortcut for either `.window(SlidingEventTimeWindows.of(size))` or
    * `.window(SlidingProcessingTimeWindows.of(size))` depending on the time characteristic
    * set using
    * [[StreamExecutionEnvironment.setStreamTimeCharacteristic()]]


Mime
View raw message