kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: rework JavaDoc for windowing related public API
Date Tue, 24 Jan 2017 19:02:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 90b9a8a3c -> 740abdb7b


MINOR: rework JavaDoc for windowing related public API

 - also some code refactoring and bug fixes

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Eno Thereska, Guozhang Wang

Closes #2337 from mjsax/javaDocImprovements4

(cherry picked from commit b938c03b0aa7a0cebdaad0cbb22689cc719c6ebc)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/740abdb7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/740abdb7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/740abdb7

Branch: refs/heads/0.10.2
Commit: 740abdb7bf6dff841a09baf84f1bacd88401f50e
Parents: 90b9a8a
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Jan 24 11:02:34 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jan 24 11:02:44 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/JoinWindows.java      |  90 ++++++++++++----
 .../kafka/streams/kstream/SessionWindows.java   |  60 +++++++----
 .../kafka/streams/kstream/TimeWindows.java      | 107 ++++++++++++-------
 .../kafka/streams/kstream/UnlimitedWindows.java |  83 ++++++++++----
 .../apache/kafka/streams/kstream/Window.java    |  40 ++++---
 .../apache/kafka/streams/kstream/Windowed.java  |  28 ++++-
 .../apache/kafka/streams/kstream/Windows.java   |  61 +++++++----
 .../kstream/internals/SessionWindow.java        |  18 ++--
 .../streams/kstream/internals/TimeWindow.java   |  37 ++++++-
 .../kstream/internals/UnlimitedWindow.java      |  34 +++++-
 10 files changed, 403 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/740abdb7/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 6dd1a85..f20e39f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,20 +17,23 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+
 import java.util.Map;
 
 /**
  * The window specifications used for joins.
  * <p>
- * A {@link JoinWindows} instance defines a join over two stream on the same key and a maximum time difference.
+ * A {@link JoinWindows} instance defines a maximum time difference for a {@link KStream#join(KStream, ValueJoiner,
+ * JoinWindows) join over two streams} on the same key.
  * In SQL-style you would express this join as
- * <pre>
+ * <pre>{@code
  *     SELECT * FROM stream1, stream2
  *     WHERE
  *       stream1.key = stream2.key
  *       AND
  *       stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after
- * </pre>
+ * }</pre>
  * There are three different window configuration supported:
  * <ul>
  *     <li>before = after = time-difference</li>
@@ -40,9 +43,27 @@ import java.util.Map;
  * A join is symmetric in the sense, that a join specification on the first stream returns the same result record as
  * a join specification on the second stream with flipped before and after values.
  * <p>
- * Both values (before and after) must not result in an "inverse" window,
- * i.e., lower-interval-bound must not be larger than upper-interval.bound.
+ * Both values (before and after) must not result in an "inverse" window, i.e., upper-interval bound cannot be smaller
+ * than lower-interval bound.
+ * <p>
+ * {@link JoinWindows} are sliding windows, thus, they are aligned to the actual record timestamps.
+ * This implies, that each input record defines its own window with start and end time being relative to the record's
+ * timestamp.
+ * <p>
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see UnlimitedWindows
+ * @see SessionWindows
+ * @see KStream#join(KStream, ValueJoiner, JoinWindows)
+ * @see KStream#join(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
+ * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
+ * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
+ * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
+ * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
+@InterfaceStability.Unstable
 public class JoinWindows extends Windows<Window> {
 
     /** Maximum time difference for tuples that are before the join tuple. */
@@ -52,50 +73,61 @@ public class JoinWindows extends Windows<Window> {
 
     private JoinWindows(final long beforeMs, final long afterMs) {
         if (beforeMs + afterMs < 0) {
-            throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative");
+            throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
     }
 
     /**
-     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}.
-     * ({@code timeDifference} must not be negative)
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifferenceMs},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifferenceMs} earlier or later than
+     * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference    join window interval
+     * @param timeDifferenceMs join window interval in milliseconds
+     * @throws IllegalArgumentException if {@code timeDifferenceMs} is negative
      */
     public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException {
         return new JoinWindows(timeDifferenceMs, timeDifferenceMs);
     }
 
     /**
-     * Specifies that records of the same key are joinable if their timestamps are within
-     * the join window interval, and if the timestamp of a record from the secondary stream is
-     * earlier than or equal to the timestamp of a record from the first stream.
+     * Changes the start window boundary to {@code timeDifferenceMs} but keep the end window boundary as is.
+     * Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
+     * {@code timeDifferenceMs} earlier than the timestamp of the record from the primary stream.
+     * {@code timeDifferenceMs} can be negative but it's absolute value must not be larger than current window "after"
+     * value (which would result in a negative window size).
      *
-     * @param timeDifference    join window interval
+     * @param timeDifferenceMs relative window start time in milliseconds
+     * @throws IllegalArgumentException if the resulting window size is negative
      */
     public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException {
         return new JoinWindows(timeDifferenceMs, afterMs);
     }
 
     /**
-     * Specifies that records of the same key are joinable if their timestamps are within
-     * the join window interval, and if the timestamp of a record from the secondary stream
-     * is later than or equal to the timestamp of a record from the first stream.
+     * Changes the end window boundary to {@code timeDifferenceMs} but keep the start window boundary as is.
+     * Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
+     * {@code timeDifferenceMs} later than the timestamp of the record from the primary stream.
+     * {@code timeDifferenceMs} can be negative but it's absolute value must not be larger than current window "before"
+     * value (which would result in a negative window size).
      *
-     * @param timeDifference    join window interval
+     * @param timeDifferenceMs relative window end time in milliseconds
+     * @throws IllegalArgumentException if the resulting window size is negative
      */
     public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException {
         return new JoinWindows(beforeMs, timeDifferenceMs);
     }
 
     /**
-     * Not supported by {@link JoinWindows}. Throws {@link UnsupportedOperationException}.
+     * Not supported by {@link JoinWindows}.
+     * Throws {@link UnsupportedOperationException}.
+     *
+     * @throws UnsupportedOperationException at every invocation
      */
     @Override
     public Map<Long, Window> windowsFor(final long timestamp) {
-        throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
+        throw new UnsupportedOperationException("windowsFor() is not supported by JoinWindows.");
     }
 
     @Override
@@ -103,6 +135,13 @@ public class JoinWindows extends Windows<Window> {
         return beforeMs + afterMs;
     }
 
+    /**
+     * {@inheritDoc}
+     *
+     * @param durationMs the window retention time in milliseconds
+     * @return itself
+     * @throws IllegalArgumentException if {@code durationMs} is smaller than the window size
+     */
     @Override
     public JoinWindows until(final long durationMs) throws IllegalArgumentException {
         if (durationMs < size()) {
@@ -112,6 +151,13 @@ public class JoinWindows extends Windows<Window> {
         return this;
     }
 
+    /**
+     * {@inheritDoc}
+     * <p>
+     * For {@link TimeWindows} the maintain duration is at least as small as the window size.
+     *
+     * @return the window maintain duration
+     */
     @Override
     public long maintainMs() {
         return Math.max(super.maintainMs(), size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/740abdb7/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index bed6c3d..8918f3e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -16,18 +16,16 @@
  */
 package org.apache.kafka.streams.kstream;
 
-
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 /**
  * A session based window specification used for aggregating events into sessions.
  * <p>
  * Sessions represent a period of activity separated by a defined gap of inactivity.
- * Any events processed that fall within the inactivity gap of any existing sessions
- * are merged into the existing sessions. If the event falls outside of the session gap
- * then a new session will be created.
+ * Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions.
+ * If the event falls outside of the session gap then a new session will be created.
  * <p>
- * For example, If we have a session gap of 5 and the following data arrives:
+ * For example, if we have a session gap of 5 and the following data arrives:
  * <pre>
  * +--------------------------------------+
  * |    key    |    value    |    time    |
@@ -39,13 +37,12 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * |    A      |     3       |     20     |
  * +-----------+-------------+------------+
  * </pre>
- * <p>
- * We'd have 2 sessions for key A. 1 starting from time 10 and ending at time 12 and another
- * starting and ending at time 20. The length of the session is driven by the timestamps of
- * the data within the session
+ * We'd have 2 sessions for key A.
+ * One starting from time 10 and ending at time 12 and another starting and ending at time 20.
+ * The length of the session is driven by the timestamps of the data within the session.
+ * Thus, session windows are no fixed-size windows (c.f. {@link TimeWindows} and {@link JoinWindows}).
  * <p>
  * If we then received another record:
- * <p>
  * <pre>
  * +--------------------------------------+
  * |    key    |    value    |    time    |
@@ -53,9 +50,21 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  * |    A      |     4       |     16     |
  * +-----------+-------------+------------+
  * </pre>
- * <p>
  * The previous 2 sessions would be merged into a single session with start time 10 and end time 20.
  * The aggregate value for this session would be the result of aggregating all 4 values.
+ * <p>
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see KGroupedStream#count(SessionWindows, String)
+ * @see KGroupedStream#count(SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
+ * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
 @InterfaceStability.Unstable
 public class SessionWindows {
@@ -69,27 +78,30 @@ public class SessionWindows {
     }
 
     /**
-     * Create a new SessionWindows with the specified inactivity gap
-     * @param inactivityGapMs  the gap of inactivity between sessions
-     * @return a new SessionWindows with the provided inactivity gap
-     * and default maintain duration
+     * Create a new window specification with the specified inactivity gap in milliseconds.
+     *
+     * @param inactivityGapMs the gap of inactivity between sessions in milliseconds
+     * @return a new window specification with default maintain duration of 1 day
+     *
+     * @throws IllegalArgumentException if {@code inactivityGapMs} is zero or negative
      */
     public static SessionWindows with(final long inactivityGapMs) {
-        if (inactivityGapMs < 1) {
+        if (inactivityGapMs <= 0) {
             throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
         }
         return new SessionWindows(inactivityGapMs);
     }
 
     /**
-     * Set the window maintain duration in milliseconds of streams time.
+     * Set the window maintain duration (retention time) in milliseconds.
      * This retention time is a guaranteed <i>lower bound</i> for how long a window will be maintained.
      *
-     * @return  itself
+     * @return itself
+     * @throws IllegalArgumentException if {@code durationMs} is smaller than window gap
      */
     public SessionWindows until(final long durationMs) throws IllegalArgumentException {
         if (durationMs < gapMs) {
-            throw new IllegalArgumentException("Window retentin time (durationMs) cannot be smaller than window gap.");
+            throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap.");
         }
         maintainDurationMs = durationMs;
 
@@ -97,14 +109,20 @@ public class SessionWindows {
     }
 
     /**
-     * @return the inactivityGap
+     * Return the specified gap for the session windows in milliseconds.
+     *
+     * @return the inactivity gap of the specified windows
      */
     public long inactivityGap() {
         return gapMs;
     }
 
     /**
-     * @return the minimum amount of time a window will be maintained for.
+     * Return the window maintain duration (retention time) in milliseconds.
+     * <p>
+     * For {@link SessionWindows} the maintain duration is at least as small as the window gap.
+     *
+     * @return the window maintain duration
      */
     public long maintainMs() {
         return Math.max(maintainDurationMs, gapMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/740abdb7/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 11df228..87e7f2e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,79 +17,94 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 
 import java.util.HashMap;
 import java.util.Map;
 
 /**
- * The time-based window specifications used for aggregations.
+ * The fixed-size time-based window specifications used for aggregations.
  * <p>
- * The semantics of a time-based window are: Every T1 (advance) time-units, compute the aggregate total for T2 (size) time-units.
+ * The semantics of time-based aggregation windows are: Every T1 (advance) milliseconds, compute the aggregate total for
+ * T2 (size) milliseconds.
  * <ul>
- *     <li> If {@code advance < size} a hopping windows is defined: <br />
- *          it discretize a stream into overlapping windows, which implies that a record maybe contained in one and or more "adjacent" windows.</li>
+ *     <li> If {@code advance < size} a hopping windows is defined:<br />
+ *          it discretize a stream into overlapping windows, which implies that a record maybe contained in one and or
+ *          more "adjacent" windows.</li>
  *     <li> If {@code advance == size} a tumbling window is defined:<br />
- *          it discretize a stream into non-overlapping windows, which implies that a record is only ever contained in one and only one tumbling window.</li>
+ *          it discretize a stream into non-overlapping windows, which implies that a record is only ever contained in
+ *          one and only one tumbling window.</li>
  * </ul>
+ * Thus, the specified {@link TimeWindow}s are aligned to the epoch.
+ * Aligned to the epoch means, that the first window starts at timestamp zero.
+ * For example, hopping windows with size of 5000ms and advance of 3000ms, have window boundaries
+ * [0;5000),[3000;8000),... and not [1000;6000),[4000;9000),... or even something "random" like [1452;6452),[4452;9452),...
+ * <p>
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see SessionWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see KGroupedStream#count(Windows, String)
+ * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#reduce(Reducer, Windows, String)
+ * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
+@InterfaceStability.Unstable
 public class TimeWindows extends Windows<TimeWindow> {
 
-    /**
-     * The size of the window, i.e. how long a window lasts.
-     * The window size's effective time unit is determined by the semantics of the topology's
-     * configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
-     */
+    /** The size of the windows in milliseconds. */
     public final long sizeMs;
 
     /**
-     * The size of the window's advance interval, i.e. by how much a window moves forward relative
-     * to the previous one. The interval's effective time unit is determined by the semantics of
-     * the topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
+     * The size of the window's advance interval in milliseconds, i.e., by how much a window moves forward relative to
+     * the previous one.
      */
     public final long advanceMs;
 
-    private TimeWindows(final long sizeMs, final long advanceMs) throws IllegalArgumentException {
+    private TimeWindows(final long sizeMs, final long advanceMs) {
         this.sizeMs = sizeMs;
         this.advanceMs = advanceMs;
     }
 
     /**
-     * Returns a window definition with the given window size, and with the advance interval being
-     * equal to the window size. Think: [N * size, N * size + size), with N denoting the N-th
-     * window.
+     * Return a window definition with the given window size, and with the advance interval being equal to the window
+     * size.
+     * The time interval represented by the the N-th window is: {@code [N * size, N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code advance == size}.
      *
-     * This provides the semantics of tumbling windows, which are fixed-sized, gap-less,
-     * non-overlapping windows. Tumbling windows are a specialization of hopping windows.
-     *
-     * @param size The size of the window, with the requirement that size &gt; 0.
-     *             The window size's effective time unit is determined by the semantics of the
-     *             topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
-     * @return a new window definition
+     * @param sizeMs The size of the window in milliseconds
+     * @return a new window definition with default maintain duration of 1 day
+     * @throws IllegalArgumentException if the specified window size is zero or negative
      */
     public static TimeWindows of(final long sizeMs) throws IllegalArgumentException {
         if (sizeMs <= 0) {
-            throw new IllegalArgumentException("Window sizeMs must be larger than zero.");
+            throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
         }
         return new TimeWindows(sizeMs, sizeMs);
     }
 
     /**
-     * Returns a window definition with the original size, but advance ("hop") the window by the given
-     * interval, which specifies by how much a window moves forward relative to the previous one.
-     * Think: [N * advanceInterval, N * advanceInterval + size), with N denoting the N-th window.
-     *
+     * Return a window definition with the original size, but advance ("hop") the window by the given interval, which
+     * specifies by how much a window moves forward relative to the previous one.
+     * The time interval represented by the the N-th window is: {@code [N * advance, N * advance + size)}.
+     * <p>
      * This provides the semantics of hopping windows, which are fixed-sized, overlapping windows.
      *
-     * @param interval The advance interval ("hop") of the window, with the requirement that
-     *                 0 &lt; interval &le; size. The interval's effective time unit is
-     *                 determined by the semantics of the topology's configured
-     *                 {@link org.apache.kafka.streams.processor.TimestampExtractor}.
-     * @return a new window definition
+     * @param advanceMs The advance interval ("hop") in milliseconds of the window, with the requirement that
+     *                  {@code 0 < advanceMs &le; sizeMs}.
+     * @return a new window definition with default maintain duration of 1 day
+     * @throws IllegalArgumentException if the advance interval is negative, zero, or larger-or-equal the window size
      */
     public TimeWindows advanceBy(final long advanceMs) {
         if (advanceMs <= 0 || advanceMs > sizeMs) {
-            throw new IllegalArgumentException(String.format("AdvanceMs must lie within interval (0, %d]", sizeMs));
+            throw new IllegalArgumentException(String.format("AdvanceMs must lie within interval (0, %d].", sizeMs));
         }
         return new TimeWindows(sizeMs, advanceMs);
     }
@@ -111,6 +126,13 @@ public class TimeWindows extends Windows<TimeWindow> {
         return sizeMs;
     }
 
+    /**
+     * {@inheritDoc}
+     *
+     * @param durationMs the window retention time
+     * @return itself
+     * @throws IllegalArgumentException if {@code duration} is smaller than the window size
+     */
     public TimeWindows until(final long durationMs) throws IllegalArgumentException {
         if (durationMs < sizeMs) {
             throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
@@ -119,6 +141,13 @@ public class TimeWindows extends Windows<TimeWindow> {
         return this;
     }
 
+    /**
+     * {@inheritDoc}
+     * <p>
+     * For {@link TimeWindows} the maintain duration is at least as small as the window size.
+     *
+     * @return the window maintain duration
+     */
     @Override
     public long maintainMs() {
         return Math.max(super.maintainMs(), sizeMs);
@@ -143,4 +172,4 @@ public class TimeWindows extends Windows<TimeWindow> {
         return result;
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/740abdb7/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 8605f9d..9d5669c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,14 +17,33 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
 
 import java.util.HashMap;
 import java.util.Map;
 
 /**
- * The unlimited window specifications.
+ * The unlimited window specifications used for aggregations.
+ * <p>
+ * An unlimited time window is also called landmark window.
+ * It has a fixed starting point while its window end is defined as infinite.
+ * With this regard, it is a fixed-size window with infinite window size.
+ * <p>
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see SessionWindows
+ * @see JoinWindows
+ * @see KGroupedStream#count(Windows, String)
+ * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#reduce(Reducer, Windows, String)
+ * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
+@InterfaceStability.Unstable
 public class UnlimitedWindows extends Windows<UnlimitedWindow> {
 
     private static final long DEFAULT_START_TIMESTAMP_MS = 0L;
@@ -32,10 +51,7 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
     /** The start timestamp of the window. */
     public final long startMs;
 
-    private UnlimitedWindows(final long startMs) throws IllegalArgumentException {
-        if (startMs < 0) {
-            throw new IllegalArgumentException("startMs must be > 0 (you provided " + startMs + ")");
-        }
+    private UnlimitedWindows(final long startMs) {
         this.startMs = startMs;
     }
 
@@ -49,11 +65,15 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
     /**
      * Return a new unlimited window for the specified start timestamp.
      *
-     * @param start  the window start time
-     * @return       a new unlimited window that starts at {@code start}
+     * @param startMs the window start time
+     * @return a new unlimited window that starts at {@code startMs}
+     * @throws IllegalArgumentException if the start time is negative
      */
-    public UnlimitedWindows startOn(final long start) throws IllegalArgumentException {
-        return new UnlimitedWindows(start);
+    public UnlimitedWindows startOn(final long startMs) throws IllegalArgumentException {
+        if (startMs < 0) {
+            throw new IllegalArgumentException("Window start time (startMs) cannot be negative.");
+        }
+        return new UnlimitedWindows(startMs);
     }
 
     @Override
@@ -68,11 +88,39 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
         return windows;
     }
 
+    /**
+     * {@inheritDoc}
+     * As unlimited windows have conceptually infinite size, this methods just returns {@link Long#MAX_VALUE}.
+     *
+     * @return the size of the specified windows which is {@link Long#MAX_VALUE}
+     */
     @Override
     public long size() {
         return Long.MAX_VALUE;
     }
 
+    /**
+     * Throws an {@link IllegalArgumentException} because the retention time for unlimited windows is always infinite
+     * and cannot be changed.
+     *
+     * @throws IllegalArgumentException on every invocation
+     */
+    @Override
+    public UnlimitedWindows until(final long durationMs) {
+        throw new IllegalArgumentException("Window retention time (durationMs) cannot be set for UnlimitedWindows.");
+    }
+
+    /**
+     * {@inheritDoc}
+     * The retention time for unlimited windows in infinite and thus represented as {@link Long#MAX_VALUE}.
+     *
+     * @return the window retention time that is {@link Long#MAX_VALUE}
+     */
+    @Override
+    public long maintainMs() {
+        return Long.MAX_VALUE;
+    }
+
     @Override
     public final boolean equals(final Object o) {
         if (o == this) {
@@ -92,13 +140,4 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
         return (int) (startMs ^ (startMs >>> 32));
     }
 
-    @Override
-    public UnlimitedWindows until(final long durationMs) {
-        throw new IllegalArgumentException("Window retention time (durationMs) cannot be set for UnlimitedWindows.");
-    }
-
-    @Override
-    public long maintainMs() {
-        return Long.MAX_VALUE;
-    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/740abdb7/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index 9c6edc0..2d2439d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -5,33 +5,45 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+
 /**
  * A single window instance, defined by its start and end timestamp.
+ * {@link Window} is agnostic if start/end boundaries are inclusive or exclusive; this is defined by concrete
+ * window implementations.
+ * <p>
+ * To specify how {@link Window} boundaries are defined use {@link Windows}.
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see Windows
+ * @see org.apache.kafka.streams.kstream.internals.TimeWindow
+ * @see org.apache.kafka.streams.kstream.internals.SessionWindow
+ * @see org.apache.kafka.streams.kstream.internals.UnlimitedWindow
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
+@InterfaceStability.Unstable
 public abstract class Window {
 
     protected final long startMs;
     protected final long endMs;
 
     /**
-     * Create a new window for the given start time (inclusive) and end time (exclusive).
+     * Create a new window for the given start and end time.
      *
-     * @param start  the start timestamp of the window (inclusive)
-     * @param end    the end timestamp of the window (exclusive)
-     * @throws IllegalArgumentException if {@code start} or {@code end} is negative or if {@code end} is smaller than
-     * {@code start}
+     * @param startMs the start timestamp of the window
+     * @param endMs   the end timestamp of the window
+     * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than {@code startMs}
      */
     public Window(long startMs, long endMs) throws IllegalArgumentException {
         if (startMs < 0) {
@@ -45,14 +57,14 @@ public abstract class Window {
     }
 
     /**
-     * Return the start timestamp of this window, inclusive
+     * Return the start timestamp of this window.
      */
     public long start() {
         return startMs;
     }
 
     /**
-     * Return the end timestamp of this window, exclusive
+     * Return the end timestamp of this window.
      */
     public long end() {
         return endMs;
@@ -60,9 +72,11 @@ public abstract class Window {
 
     /**
      * Check if the given window overlaps with this window.
+     * Should throw an {@link IllegalArgumentException} if the {@code other} window has a different type than {@code
+     * this} window.
      *
-     * @param other  another window
-     * @return       {@code true} if {@code other} overlaps with this window&mdash;{@code false} otherwise
+     * @param other another window of the same type
+     * @return {@code true} if {@code other} overlaps with this window&mdash;{@code false} otherwise
      */
     public abstract boolean overlap(final Window other);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/740abdb7/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 81357c1..2941136 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -17,13 +17,31 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+
 /**
- * Used to represent windowed stream aggregations (e.g. as returned by
- * {@link KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)}),
- * which have the type {@code <Windowed<K>, V>}.
+ * The result key type of a windowed stream aggregation.
+ * <p>
+ * If a {@link KStream} gets grouped and aggregated using a window-aggregation the resulting {@link KTable} is a
+ * so-called "windowed {@link KTable}" with a combined key type that encodes the corresponding aggregation window and
+ * the original record key.
+ * Thus, a windowed {@link KTable} has type {@link Windowed &lt;Windowed&lt;K&gt;,V&gt;}
  *
- * @param <K> Type of the key
+ * @param <K> type of the key
+ * @see KGroupedStream#count(Windows, String)
+ * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#count(SessionWindows, String)
+ * @see KGroupedStream#count(SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#reduce(Reducer, Windows, String)
+ * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
+ * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
  */
+@InterfaceStability.Unstable
 public class Windowed<K> {
 
     private final K key;
@@ -47,7 +65,7 @@ public class Windowed<K> {
     /**
      * Return the window containing the values associated with this key.
      *
-     * @return  the window containing the values
+     * @return the window containing the values
      */
     public Window window() {
         return window;

http://git-wip-us.apache.org/repos/asf/kafka/blob/740abdb7/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 29b61fd..ea0c728 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,13 +16,25 @@
  */
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+
 import java.util.Map;
 
 /**
- * The window specification interface that can be extended for windowing operation in joins and aggregations.
+ * The window specification interface for fixed size windows that is used to define window boundaries and window
+ * maintain duration.
+ * <p>
+ * If not explicitly specified, the default maintain duration is 1 day.
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
  *
- * @param <W>   type of the window instance
+ * @param <W> type of the window instance
+ * @see TimeWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see SessionWindows
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
+@InterfaceStability.Unstable
 public abstract class Windows<W extends Window> {
 
     private static final int DEFAULT_NUM_SEGMENTS = 3;
@@ -39,10 +51,12 @@ public abstract class Windows<W extends Window> {
     }
 
     /**
-     * Set the window maintain duration in milliseconds of streams time.
+     * Set the window maintain duration (retention time) in milliseconds.
      * This retention time is a guaranteed <i>lower bound</i> for how long a window will be maintained.
      *
-     * @return  itself
+     * @param durationMs the window retention time in milliseconds
+     * @return itself
+     * @throws IllegalArgumentException if {@code durationMs} is negative
      */
     // This should always get overridden to provide the correct return type and thus to avoid a cast
     public Windows<W> until(final long durationMs) throws IllegalArgumentException {
@@ -55,10 +69,21 @@ public abstract class Windows<W extends Window> {
     }
 
     /**
-     * Specify the number of segments to be used for rolling the window store,
-     * this function is not exposed to users but can be called by developers that extend this JoinWindows specs.
+     * Return the window maintain duration (retention time) in milliseconds.
+     *
+     * @return the window maintain duration
+     */
+    public long maintainMs() {
+        return maintainDurationMs;
+    }
+
+    /**
+     * Set the number of segments to be used for rolling the window store.
+     * This function is not exposed to users but can be called by developers that extend this class.
      *
-     * @return  itself
+     * @param segments the number of segments to be used
+     * @return itself
+     * @throws IllegalArgumentException if specified segments is small than 2
      */
     protected Windows<W> segments(final int segments) throws IllegalArgumentException {
         if (segments < 2) {
@@ -70,21 +95,17 @@ public abstract class Windows<W extends Window> {
     }
 
     /**
-     * Return the window maintain duration in milliseconds of streams time.
+     * Create all windows that contain the provided timestamp, indexed by non-negative window start timestamps.
      *
-     * @return the window maintain duration in milliseconds of streams time
+     * @param timestamp the timestamp window should get created for
+     * @return a map of {@code windowStartTimestamp -> Window} entries
      */
-    public long maintainMs() {
-        return maintainDurationMs;
-    }
+    public abstract Map<Long, W> windowsFor(final long timestamp);
 
     /**
-     * Creates all windows that contain the provided timestamp, indexed by non-negative window start timestamps.
+     * Return the size of the specified windows in milliseconds.
      *
-     * @param timestamp  the timestamp window should get created for
-     * @return  a map of {@code windowStartTimestamp -> Window} entries
+     * @return the size of the specified windows
      */
-    public abstract Map<Long, W> windowsFor(final long timestamp);
-
     public abstract long size();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/740abdb7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
index cf72752..2ea273c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java
@@ -18,38 +18,44 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.streams.kstream.Window;
 
 /**
  * A session window covers a closed time interval with its start and end timestamp both being an inclusive boundary.
+ * <p>
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
  *
  * @see TimeWindow
  * @see UnlimitedWindow
  * @see org.apache.kafka.streams.kstream.SessionWindows
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
+@InterfaceStability.Unstable
 public final class SessionWindow extends Window {
 
     /**
      * Create a new window for the given start time and end time (both inclusive).
      *
-     * @param start  the start timestamp of the window
-     * @param end    the end timestamp of the window
+     * @param startMs the start timestamp of the window
+     * @param endMs   the end timestamp of the window
+     * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than {@code startMs}
      */
-    public SessionWindow(final long startMs, final long endMs) {
+    public SessionWindow(final long startMs, final long endMs) throws IllegalArgumentException {
         super(startMs, endMs);
     }
 
     /**
      * Check if the given window overlaps with this window.
      *
-     * @param other  another window
-     * @return       {@code true} if {@code other} overlaps with this window&mdash;{@code false} otherwise
+     * @param other another window
+     * @return {@code true} if {@code other} overlaps with this window&mdash;{@code false} otherwise
      * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
      */
     public boolean overlap(final Window other) throws IllegalArgumentException {
         if (getClass() != other.getClass()) {
             throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type "
-                + other.getClass());
+                + other.getClass() + ".");
         }
         final SessionWindow otherWindow = (SessionWindow) other;
         return !(otherWindow.endMs < startMs || endMs < otherWindow.startMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/740abdb7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
index bf98f94..ab805ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
@@ -14,28 +14,57 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.streams.kstream.Window;
 
+/**
+ * A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end
+ * timestamp as exclusive boundary.
+ * It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows
+ * window specification}) will have the same size.
+ * <p>
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see SessionWindow
+ * @see UnlimitedWindow
+ * @see org.apache.kafka.streams.kstream.TimeWindows
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
+ */
+@InterfaceStability.Unstable
 public class TimeWindow extends Window {
 
-    public TimeWindow(long startMs, long endMs) {
+    /**
+     * Create a new window for the given start time (inclusive) and end time (exclusive).
+     *
+     * @param startMs the start timestamp of the window (inclusive)
+     * @param endMs   the end timestamp of the window (exclusive)
+     * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than or equal to
+     * {@code startMs}
+     */
+    public TimeWindow(final long startMs, final long endMs) throws IllegalArgumentException {
         super(startMs, endMs);
         if (startMs == endMs) {
             throw new IllegalArgumentException("Window endMs must be greater than window startMs.");
         }
     }
 
+    /**
+     * Check if the given window overlaps with this window.
+     *
+     * @param other another window
+     * @return {@code true} if {@code other} overlaps with this window&mdash;{@code false} otherwise
+     * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
+     */
     @Override
     public boolean overlap(final Window other) throws IllegalArgumentException {
         if (getClass() != other.getClass()) {
             throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type "
-                + other.getClass());
+                + other.getClass() + ".");
         }
         final TimeWindow otherWindow = (TimeWindow) other;
         return startMs < otherWindow.endMs && otherWindow.startMs < endMs;
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/740abdb7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
index 7fb7c53..311169e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
@@ -14,24 +14,52 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.streams.kstream.Window;
 
+/**
+ * {@link UnlimitedWindow} is an "infinite" large window with a fixed (inclusive) start time.
+ * All windows of the same {@link org.apache.kafka.streams.kstream.UnlimitedWindows window specification} will have the
+ * same start time.
+ * To make the window size "infinite" end time is set to {@link Long#MAX_VALUE}.
+ * <p>
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see TimeWindow
+ * @see SessionWindow
+ * @see org.apache.kafka.streams.kstream.UnlimitedWindows
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
+ */
+@InterfaceStability.Unstable
 public class UnlimitedWindow extends Window {
 
+    /**
+     * Create a new window for the given start time (inclusive).
+     *
+     * @param startMs the start timestamp of the window (inclusive)
+     * @throws IllegalArgumentException if {@code start} is negative
+     */
     public UnlimitedWindow(final long startMs) {
         super(startMs, Long.MAX_VALUE);
     }
 
+    /**
+     * Returns {@code true} if the given window is of the same type, because all unlimited windows overlap with each
+     * other due to their infinite size.
+     *
+     * @param other another window
+     * @return {@code true}
+     * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
+     */
     @Override
     public boolean overlap(final Window other) {
         if (getClass() != other.getClass()) {
             throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type "
-                + other.getClass());
+                + other.getClass() + ".");
         }
         return true;
     }
 
-}
\ No newline at end of file
+}


Mime
View raw message