kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7222: Add Windows grace period (#5369)
Date Tue, 14 Aug 2018 16:53:30 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b3771ba  KAFKA-7222: Add Windows grace period (#5369)
b3771ba is described below

commit b3771ba22acad7870e38ff7f58820c5b50946787
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Tue Aug 14 11:53:25 2018 -0500

    KAFKA-7222: Add Windows grace period (#5369)
    
    Part I of KIP-238:
    
    * add grace period to Windows
    * deprecate retention/maintainMs and segmentInterval from Windows
    * record expired records in the store with a new metric
    * record late record drops as a new metric instead of as a "skipped record"
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../apache/kafka/streams/kstream/JoinWindows.java  | 31 +++-----
 .../apache/kafka/streams/kstream/Materialized.java | 29 +++++++
 .../streams/kstream/SessionWindowedKStream.java    |  4 +-
 .../kafka/streams/kstream/SessionWindows.java      | 65 ++++++++++++----
 .../kafka/streams/kstream/TimeWindowedKStream.java |  6 +-
 .../apache/kafka/streams/kstream/TimeWindows.java  | 33 ++++----
 .../kafka/streams/kstream/UnlimitedWindows.java    | 30 ++++----
 .../org/apache/kafka/streams/kstream/Windows.java  | 55 +++++++++++--
 .../streams/kstream/internals/KStreamImpl.java     | 42 +++++-----
 .../internals/KStreamSessionWindowAggregate.java   | 35 +++++++--
 .../kstream/internals/KStreamWindowAggregate.java  | 22 +++---
 .../kstream/internals/MaterializedInternal.java    |  5 ++
 .../internals/SessionWindowedKStreamImpl.java      | 16 +++-
 .../kstream/internals/TimeWindowedKStreamImpl.java | 52 ++++++++++---
 .../streams/kstream/internals/metrics/Sensors.java | 42 ++++++++++
 .../org/apache/kafka/streams/state/Stores.java     | 14 +++-
 .../internals/RocksDBSegmentedBytesStore.java      | 41 ++++++++--
 .../RocksDbSessionBytesStoreSupplier.java          |  1 +
 .../internals/RocksDbWindowBytesStoreSupplier.java |  1 +
 .../kafka/streams/state/internals/Segments.java    |  3 +-
 .../org/apache/kafka/streams/TopologyTest.java     |  4 +-
 .../integration/InternalTopicIntegrationTest.java  | 10 ++-
 .../KStreamAggregationIntegrationTest.java         |  6 +-
 .../kafka/streams/kstream/JoinWindowsTest.java     | 88 ++++++++-------------
 .../kafka/streams/kstream/SessionWindowsTest.java  | 34 ++++----
 .../kafka/streams/kstream/TimeWindowsTest.java     | 49 ++++--------
 .../streams/kstream/UnlimitedWindowsTest.java      | 10 +++
 .../apache/kafka/streams/kstream/WindowsTest.java  | 18 +++++
 .../streams/kstream/internals/KStreamImplTest.java | 58 +++++++++++---
 ...KStreamSessionWindowAggregateProcessorTest.java | 61 +++++++++++++--
 .../internals/KStreamWindowAggregateTest.java      | 90 ++++++++++++++++++++--
 .../kstream/internals/KStreamWindowReduceTest.java | 29 +++++--
 .../processor/internals/StandbyTaskTest.java       |  8 +-
 .../internals/testutil/LogCaptureAppender.java     |  5 ++
 .../state/internals/CachingSessionStoreTest.java   |  2 +-
 .../state/internals/CachingWindowStoreTest.java    |  2 +-
 .../internals/RocksDBSegmentedBytesStoreTest.java  | 63 +++++++++++++--
 .../state/internals/RocksDBSessionStoreTest.java   |  4 +-
 .../state/internals/RocksDBWindowStoreTest.java    |  2 +-
 .../kafka/test/InternalMockProcessorContext.java   |  1 +
 40 files changed, 769 insertions(+), 302 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 863ae95..bd31175 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -133,12 +133,20 @@ public final class JoinWindows extends Windows<Window> {
         return beforeMs + afterMs;
     }
 
+    @Override
+    public JoinWindows grace(final long millisAfterWindowEnd) {
+        super.grace(millisAfterWindowEnd);
+        return this;
+    }
+
     /**
      * @param durationMs the window retention time in milliseconds
      * @return itself
      * @throws IllegalArgumentException if {@code durationMs} is smaller than the window size
+     * @deprecated since 2.1. Use {@link JoinWindows#grace(long)} instead.
      */
     @Override
+    @Deprecated
     public JoinWindows until(final long durationMs) throws IllegalArgumentException {
         if (durationMs < size()) {
             throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
@@ -153,30 +161,11 @@ public final class JoinWindows extends Windows<Window> {
      * For {@link TimeWindows} the maintain duration is at least as small as the window size.
      *
      * @return the window maintain duration
+     * @deprecated since 2.1. Use {@link JoinWindows#gracePeriodMs()} instead.
      */
     @Override
+    @Deprecated
     public long maintainMs() {
         return Math.max(super.maintainMs(), size());
     }
-
-    @Override
-    public final boolean equals(final Object o) {
-        if (o == this) {
-            return true;
-        }
-        if (!(o instanceof JoinWindows)) {
-            return false;
-        }
-
-        final JoinWindows other = (JoinWindows) o;
-        return beforeMs == other.beforeMs && afterMs == other.afterMs;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = (int) (beforeMs ^ (beforeMs >>> 32));
-        result = 31 * result + (int) (afterMs ^ (afterMs >>> 32));
-        return result;
-    }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index 89603fa..15ec6ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.state.StoreSupplier;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -60,6 +61,7 @@ public class Materialized<K, V, S extends StateStore> {
     protected boolean loggingEnabled = true;
     protected boolean cachingEnabled = true;
     protected Map<String, String> topicConfig = new HashMap<>();
+    protected Duration retention;
 
     private Materialized(final StoreSupplier<S> storeSupplier) {
         this.storeSupplier = storeSupplier;
@@ -81,6 +83,7 @@ public class Materialized<K, V, S extends StateStore> {
         this.loggingEnabled = materialized.loggingEnabled;
         this.cachingEnabled = materialized.cachingEnabled;
         this.topicConfig = materialized.topicConfig;
+        this.retention = materialized.retention;
     }
 
     /**
@@ -101,6 +104,10 @@ public class Materialized<K, V, S extends StateStore> {
     /**
      * Materialize a {@link WindowStore} using the provided {@link WindowBytesStoreSupplier}.
      *
+     * Important: Custom subclasses are allowed here, but they should respect the retention contract:
+     * Window stores are required to retain windows at least as long as (window size + window grace period).
+     * Stores constructed via {@link org.apache.kafka.streams.state.Stores} already satisfy this contract.
+     *
      * @param supplier the {@link WindowBytesStoreSupplier} used to materialize the store
      * @param <K>      key type of the store
      * @param <V>      value type of the store
@@ -114,6 +121,10 @@ public class Materialized<K, V, S extends StateStore> {
     /**
      * Materialize a {@link SessionStore} using the provided {@link SessionBytesStoreSupplier}.
      *
+     * Important: Custom subclasses are allowed here, but they should respect the retention contract:
+     * Session stores are required to retain windows at least as long as (session inactivity gap + session grace period).
+     * Stores constructed via {@link org.apache.kafka.streams.state.Stores} already satisfy this contract.
+     *
      * @param supplier the {@link SessionBytesStoreSupplier} used to materialize the store
      * @param <K>      key type of the store
      * @param <V>      value type of the store
@@ -222,4 +233,22 @@ public class Materialized<K, V, S extends StateStore> {
         return this;
     }
 
+    /**
+     * Configure retention period for window and session stores. Ignored for key/value stores.
+     *
+     * Overridden by pre-configured store suppliers
+     * ({@link Materialized#as(SessionBytesStoreSupplier)} or {@link Materialized#as(WindowBytesStoreSupplier)}).
+     *
+     * Note that the retention period must be at least long enough to contain the windowed data's entire life cycle,
+     * from window-start through window-end, and for the entire grace period.
+     *
+     * @return itself
+     */
+    public Materialized<K, V, S> withRetention(final long retentionMs) {
+        if (retentionMs < 0) {
+            throw new IllegalArgumentException("Retention must not be negative.");
+        }
+        retention = Duration.ofMillis(retentionMs);
+        return this;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index 36e7823..c5c44c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.SessionStore;
 
+import java.time.Duration;
+
 /**
  * {@code SessionWindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
  * It is an intermediate representation after a grouping and windowing of a {@link KStream} before an aggregation is applied to the
@@ -36,7 +38,7 @@ import org.apache.kafka.streams.state.SessionStore;
  * They have no fixed time boundaries, rather the size of the window is determined by the records.
  * Please see {@link SessionWindows} for more details.
  * <p>
- * {@link SessionWindows} are retained until their retention time expires (c.f. {@link SessionWindows#until(long)}).
+ * New events are added to {@link SessionWindows} until their grace period ends (see {@link SessionWindows#grace(Duration)}).
  *
  * Furthermore, updates are sent downstream into a windowed {@link KTable} changelog stream, where
  * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index fc1fb9f..96aea0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -17,8 +17,10 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+
+import java.time.Duration;
 
-import java.util.Objects;
 
 /**
  * A session based window specification used for aggregating events into sessions.
@@ -67,10 +69,13 @@ public final class SessionWindows {
 
     private final long gapMs;
     private final long maintainDurationMs;
+    private final Duration grace;
+
 
-    private SessionWindows(final long gapMs, final long maintainDurationMs) {
+    private SessionWindows(final long gapMs, final long maintainDurationMs, final Duration grace) {
         this.gapMs = gapMs;
         this.maintainDurationMs = maintainDurationMs;
+        this.grace = grace;
     }
 
     /**
@@ -86,7 +91,7 @@ public final class SessionWindows {
             throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
         }
         final long oneDayMs = 24 * 60 * 60_000L;
-        return new SessionWindows(inactivityGapMs, oneDayMs);
+        return new SessionWindows(inactivityGapMs, oneDayMs, null);
     }
 
     /**
@@ -95,13 +100,50 @@ public final class SessionWindows {
      *
      * @return itself
      * @throws IllegalArgumentException if {@code durationMs} is smaller than window gap
+     *
+     * @deprecated since 2.1. Use {@link Materialized#retention}
+     *             or directly configure the retention in a store supplier and use
+     *             {@link Materialized#as(SessionBytesStoreSupplier)}.
      */
+    @Deprecated
     public SessionWindows until(final long durationMs) throws IllegalArgumentException {
         if (durationMs < gapMs) {
             throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap.");
         }
 
-        return new SessionWindows(gapMs, durationMs);
+        return new SessionWindows(gapMs, durationMs, null);
+    }
+
+    /**
+     * Reject late events that arrive more than {@code afterWindowEnd}
+     * after the end of its window.
+     *
+     * Note that new events may change the boundaries of session windows, so aggressive
+     * close times can lead to surprising results in which a too-late event is rejected and then
+     * a subsequent event moves the window boundary forward.
+     *
+     * @param millisAfterWindowEnd The grace period to admit late-arriving events to a window.
+     * @return this updated builder
+     */
+    public SessionWindows grace(final long millisAfterWindowEnd) {
+        if (millisAfterWindowEnd < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
+        return new SessionWindows(
+            gapMs,
+            maintainDurationMs,
+            Duration.ofMillis(millisAfterWindowEnd)
+        );
+    }
+
+    @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
+    public long gracePeriodMs() {
+
+        // NOTE: in the future, when we remove maintainMs,
+        // we should default the grace period to 24h to maintain the default behavior,
+        // or we can default to (24h - gapMs) if you want to be super accurate.
+        return grace != null ? grace.toMillis() : maintainMs() - inactivityGap();
     }
 
     /**
@@ -119,22 +161,11 @@ public final class SessionWindows {
      * For {@code SessionWindows} the maintain duration is at least as small as the window gap.
      *
      * @return the window maintain duration
+     * @deprecated since 2.1. Use {@link Materialized#retention} instead.
      */
+    @Deprecated
     public long maintainMs() {
         return Math.max(maintainDurationMs, gapMs);
     }
 
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        final SessionWindows that = (SessionWindows) o;
-        return gapMs == that.gapMs &&
-                maintainDurationMs == that.maintainDurationMs;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(gapMs, maintainDurationMs);
-    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 03bc986..d6f4082 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -25,6 +25,8 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.WindowStore;
 
+import java.time.Duration;
+
 /**
  * {@code TimeWindowedKStream} is an abstraction of a <i>windowed</i> record stream of {@link KeyValue} pairs.
  * It is an intermediate representation of a {@link KStream} in order to apply a windowed aggregation operation on the original
@@ -38,7 +40,9 @@ import org.apache.kafka.streams.state.WindowStore;
  * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
  * The result is written into a local windowed {@link KeyValueStore} (which is basically an ever-updating
  * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
- * Windows are retained until their retention time expires (c.f. {@link Windows#until(long)}).
+ *
+ * New events are added to windows until their grace period ends (see {@link Windows#grace(Duration)}).
+ *
  * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
  * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index c2b910d..808b006 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -119,12 +120,22 @@ public final class TimeWindows extends Windows<TimeWindow> {
         return sizeMs;
     }
 
+    @Override
+    public TimeWindows grace(final long millisAfterWindowEnd) {
+        super.grace(millisAfterWindowEnd);
+        return this;
+    }
+
     /**
      * @param durationMs the window retention time
      * @return itself
      * @throws IllegalArgumentException if {@code duration} is smaller than the window size
+     *
+     * @deprecated since 2.1. Use {@link Materialized#retention} or directly configure the retention in a store supplier
+     *             and use {@link Materialized#as(WindowBytesStoreSupplier)}.
      */
     @Override
+    @Deprecated
     public TimeWindows until(final long durationMs) throws IllegalArgumentException {
         if (durationMs < sizeMs) {
             throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
@@ -139,29 +150,11 @@ public final class TimeWindows extends Windows<TimeWindow> {
      * For {@code TimeWindows} the maintain duration is at least as small as the window size.
      *
      * @return the window maintain duration
+     * @deprecated since 2.1. Use {@link Materialized#retention} instead.
      */
     @Override
+    @Deprecated
     public long maintainMs() {
         return Math.max(super.maintainMs(), sizeMs);
     }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (o == this) {
-            return true;
-        }
-        if (!(o instanceof TimeWindows)) {
-            return false;
-        }
-        final TimeWindows other = (TimeWindows) o;
-        return sizeMs == other.sizeMs && advanceMs == other.advanceMs;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = (int) (sizeMs ^ (sizeMs >>> 32));
-        result = 31 * result + (int) (advanceMs ^ (advanceMs >>> 32));
-        return result;
-    }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index a3b9338..e795a2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -96,9 +96,11 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
      * Throws an {@link IllegalArgumentException} because the retention time for unlimited windows is always infinite
      * and cannot be changed.
      *
-     * @throws IllegalArgumentException on every invocation
+     * @throws IllegalArgumentException on every invocation.
+     * @deprecated since 2.1.
      */
     @Override
+    @Deprecated
     public UnlimitedWindows until(final long durationMs) {
         throw new IllegalArgumentException("Window retention time (durationMs) cannot be set for UnlimitedWindows.");
     }
@@ -108,29 +110,23 @@ public final class UnlimitedWindows extends Windows<UnlimitedWindow> {
      * The retention time for unlimited windows in infinite and thus represented as {@link Long#MAX_VALUE}.
      *
      * @return the window retention time that is {@link Long#MAX_VALUE}
+     * @deprecated since 2.1. Use {@link Materialized#retention} instead.
      */
     @Override
+    @Deprecated
     public long maintainMs() {
         return Long.MAX_VALUE;
     }
 
+    /**
+     * Throws an {@link IllegalArgumentException} because the window never ends and the
+     * grace period is therefore meaningless.
+     *
+     * @throws IllegalArgumentException on every invocation
+     */
     @Override
-    public boolean equals(final Object o) {
-        if (o == this) {
-            return true;
-        }
-
-        if (!(o instanceof UnlimitedWindows)) {
-            return false;
-        }
-
-        final UnlimitedWindows other = (UnlimitedWindows) o;
-        return startMs == other.startMs;
-    }
-
-    @Override
-    public int hashCode() {
-        return (int) (startMs ^ (startMs >>> 32));
+    public UnlimitedWindows grace(final long millisAfterWindowEnd) {
+        throw new IllegalArgumentException("Grace period cannot be set for UnlimitedWindows.");
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 53ead1e..adfc88a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -17,15 +17,15 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 
+import java.time.Duration;
 import java.util.Map;
 
 /**
- * The window specification interface for fixed size windows that is used to define window boundaries and window
- * maintain duration.
- * <p>
- * If not explicitly specified, the default maintain duration is 1 day.
- * For time semantics, see {@link TimestampExtractor}.
+ * The window specification interface for fixed size windows that is used to define window boundaries and grace period.
+ *
+ * Grace period defines how long to wait on late events, where lateness is defined as (stream_time - record_timestamp).
  *
  * @param <W> type of the window instance
  * @see TimeWindows
@@ -39,17 +39,54 @@ public abstract class Windows<W extends Window> {
     private long maintainDurationMs = 24 * 60 * 60 * 1000L; // default: one day
     @Deprecated public int segments = 3;
 
+    private Duration grace;
+
     protected Windows() {}
 
     /**
+     * Reject late events that arrive more than {@code millisAfterWindowEnd}
+     * after the end of its window.
+     *
+     * Lateness is defined as (stream_time - record_timestamp).
+     *
+     * @param millisAfterWindowEnd The grace period to admit late-arriving events to a window.
+     * @return this updated builder
+     */
+    public Windows<W> grace(final long millisAfterWindowEnd) {
+        if (millisAfterWindowEnd < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
+        grace = Duration.ofMillis(millisAfterWindowEnd);
+
+        return this;
+    }
+
+    /**
+     * Return the window grace period (the time to admit
+     * late-arriving events after the end of the window.)
+     *
+     * Lateness is defined as (stream_time - record_timestamp).
+     */
+    @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
+    public long gracePeriodMs() {
+        // NOTE: in the future, when we remove maintainMs,
+        // we should default the grace period to 24h to maintain the default behavior,
+        // or we can default to (24h - size) if you want to be super accurate.
+        return grace != null ? grace.toMillis() : maintainMs() - size();
+    }
+
+    /**
      * Set the window maintain duration (retention time) in milliseconds.
      * This retention time is a guaranteed <i>lower bound</i> for how long a window will be maintained.
      *
      * @param durationMs the window retention time in milliseconds
      * @return itself
      * @throws IllegalArgumentException if {@code durationMs} is negative
+     * @deprecated since 2.1. Use {@link Materialized#withRetention(long)}
+     *             or directly configure the retention in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
      */
-    // This should always get overridden to provide the correct return type and thus to avoid a cast
+    @Deprecated
     public Windows<W> until(final long durationMs) throws IllegalArgumentException {
         if (durationMs < 0) {
             throw new IllegalArgumentException("Window retention time (durationMs) cannot be negative.");
@@ -63,7 +100,9 @@ public abstract class Windows<W extends Window> {
      * Return the window maintain duration (retention time) in milliseconds.
      *
      * @return the window maintain duration
+     * @deprecated since 2.1. Use {@link Materialized#retention} instead.
      */
+    @Deprecated
     public long maintainMs() {
         return maintainDurationMs;
     }
@@ -72,8 +111,9 @@ public abstract class Windows<W extends Window> {
      * Return the segment interval in milliseconds.
      *
      * @return the segment interval
+     * @deprecated since 2.1. Instead, directly configure the segment interval in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
      */
-    @SuppressWarnings("deprecation") // The deprecation is on the public visibility of segments. We intend to make the field private later.
+    @Deprecated
     public long segmentInterval() {
         // Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient.
         final long minimumSegmentInterval = 60_000L;
@@ -81,6 +121,7 @@ public abstract class Windows<W extends Window> {
         return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval);
     }
 
+
     /**
      * Set the number of segments to be used for rolling the window store.
      * This function is not exposed to users but can be called by developers that extend this class.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index becb03d..803faf6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -194,7 +194,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name,
                                                                                                    processorParameters,
-                                                                                     true);
+                                                                                                   true);
 
         mapProcessorNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, mapProcessorNode);
@@ -216,7 +216,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMapValues<>(mapper), name);
 
 
-        final ProcessorGraphNode<? super  K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name,
+        final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name,
                                                                                                          processorParameters,
                                                                                                          repartitionRequired);
         builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
@@ -235,7 +235,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name,
                                                                                             processorParameters,
-                                                                            false);
+                                                                                            false);
         builder.addGraphNode(this.streamsGraphNode, printNode);
     }
 
@@ -250,7 +250,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name,
                                                                                               processorParameters,
-                                                                                true);
+                                                                                              true);
         flatMapNode.keyChangingOperation(true);
 
         builder.addGraphNode(this.streamsGraphNode, flatMapNode);
@@ -375,8 +375,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
 
         final ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<>(name,
-                                                                              processorParameters,
-                                                                              repartitionRequired);
+                                                                                              processorParameters,
+                                                                                              repartitionRequired);
         builder.addGraphNode(this.streamsGraphNode, foreachNode);
     }
 
@@ -595,7 +595,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final String repartitionedSourceName = createRepartitionedSource(builder,
                                                                          keySerde,
                                                                          valSerde,
-                                                                          null,
+                                                                         null,
                                                                          name,
                                                                          optimizableRepartitionNodeBuilder);
 
@@ -716,9 +716,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(processorSupplier, name);
 
         final StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>(name,
-                                                                                  processorParameters,
-                                                                                  new String[]{},
-                                                                                  null);
+                                                                                        processorParameters,
+                                                                                        new String[] {},
+                                                                                        null);
         builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, false, streamTableJoinNode);
@@ -819,17 +819,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     }
 
-    private static <K, V> StoreBuilder<WindowStore<K, V>> createWindowedStateStore(final JoinWindows windows,
-                                                                                   final Serde<K> keySerde,
-                                                                                   final Serde<V> valueSerde,
-                                                                                   final String storeName) {
+    @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
+    private static <K, V> StoreBuilder<WindowStore<K, V>> joinWindowStoreBuilder(final String joinName,
+                                                                                 final JoinWindows windows,
+                                                                                 final Serde<K> keySerde,
+                                                                                 final Serde<V> valueSerde) {
         return Stores.windowStoreBuilder(
             Stores.persistentWindowStore(
-                storeName,
-                windows.maintainMs(),
+                joinName + "-store",
+                windows.size() + windows.gracePeriodMs(),
                 windows.size(),
-                true,
-                windows.segmentInterval()
+                true
             ),
             keySerde,
             valueSerde
@@ -865,11 +865,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             final StreamsGraphNode thisStreamsGraphNode = ((AbstractStream) lhs).streamsGraphNode;
             final StreamsGraphNode otherStreamsGraphNode = ((AbstractStream) other).streamsGraphNode;
 
-            final StoreBuilder<WindowStore<K1, V1>> thisWindowStore =
-                createWindowedStateStore(windows, joined.keySerde(), joined.valueSerde(), joinThisName + "-store");
 
+            final StoreBuilder<WindowStore<K1, V1>> thisWindowStore =
+                joinWindowStoreBuilder(joinThisName, windows, joined.keySerde(), joined.valueSerde());
             final StoreBuilder<WindowStore<K1, V2>> otherWindowStore =
-                createWindowedStateStore(windows, joined.keySerde(), joined.otherValueSerde(), joinOtherName + "-store");
+                joinWindowStoreBuilder(joinOtherName, windows, joined.keySerde(), joined.otherValueSerde());
 
             final KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindowStore.name());
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 35dd7a6..5a3c897 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -23,9 +24,11 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.metrics.Sensors;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
@@ -73,12 +76,17 @@ class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSup
         private SessionStore<K, Agg> store;
         private TupleForwarder<Windowed<K>, Agg> tupleForwarder;
         private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
             metrics = (StreamsMetricsImpl) context.metrics();
+            lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
+
             store = (SessionStore<K, Agg>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
         }
@@ -96,6 +104,8 @@ class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSup
                 return;
             }
 
+            final long closeTime = internalProcessorContext.streamTime() - windows.gracePeriodMs();
+
             final long timestamp = context().timestamp();
             final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
             final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
@@ -117,16 +127,25 @@ class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSup
                 }
             }
 
-            agg = aggregator.apply(key, value, agg);
-            final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
-            if (!mergedWindow.equals(newSessionWindow)) {
-                for (final KeyValue<Windowed<K>, Agg> session : merged) {
-                    store.remove(session.key);
-                    tupleForwarder.maybeForward(session.key, null, session.value);
+            if (mergedWindow.end() > closeTime) {
+                if (!mergedWindow.equals(newSessionWindow)) {
+                    for (final KeyValue<Windowed<K>, Agg> session : merged) {
+                        store.remove(session.key);
+                        tupleForwarder.maybeForward(session.key, null, session.value);
+                    }
                 }
+
+                agg = aggregator.apply(key, value, agg);
+                final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
+                store.put(sessionKey, agg);
+                tupleForwarder.maybeForward(sessionKey, agg, null);
+            } else {
+                LOG.debug(
+                    "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
+                    key, context().topic(), context().partition(), context().offset(), context().timestamp(), mergedWindow.start(), mergedWindow.end(), closeTime
+                );
+                lateRecordDropSensor.record();
             }
-            store.put(sessionKey, agg);
-            tupleForwarder.maybeForward(sessionKey, agg, null);
         }
 
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 40702f0..5754284 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -16,11 +16,13 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.kstream.internals.metrics.Sensors;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -68,23 +70,24 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
         private TupleForwarder<Windowed<K>, Agg> tupleForwarder;
         private StreamsMetricsImpl metrics;
         private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            this.internalProcessorContext = (InternalProcessorContext) context;
+            internalProcessorContext = (InternalProcessorContext) context;
 
             metrics = (StreamsMetricsImpl) context.metrics();
 
+            lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext);
+
             windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues);
         }
 
         @Override
         public void process(final K key, final V value) {
-            // if the key is null, we do not need proceed aggregating the record
-            // the record with the table
             if (key == null) {
                 log.warn(
                     "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
@@ -96,14 +99,15 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
 
             // first get the matching windows
             final long timestamp = context().timestamp();
-            final long expiryTime = internalProcessorContext.streamTime() - windows.maintainMs();
+            final long closeTime = internalProcessorContext.streamTime() - windows.gracePeriodMs();
 
             final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
 
             // try update the window, and create the new window for the rest of unmatched window that do not exist yet
             for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
                 final Long windowStart = entry.getKey();
-                if (windowStart > expiryTime) {
+                final long windowEnd = entry.getValue().end();
+                if (windowEnd > closeTime) {
                     Agg oldAgg = windowStore.fetch(key, windowStart);
 
                     if (oldAgg == null) {
@@ -116,11 +120,11 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
                     windowStore.put(key, newAgg, windowStart);
                     tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
                 } else {
-                    log.warn(
-                        "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{}] expiration=[{}]",
-                        key, context().topic(), context().partition(), context().offset(), context().timestamp(), windowStart, expiryTime
+                    log.debug(
+                        "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
+                        key, context().topic(), context().partition(), context().offset(), context().timestamp(), windowStart, windowEnd, closeTime
                     );
-                    metrics.skippedRecordsSensor().record();
+                    lateRecordDropSensor.record();
                 }
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 2bc7326..1955dea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.StoreSupplier;
 
+import java.time.Duration;
 import java.util.Map;
 
 public class MaterializedInternal<K, V, S extends StateStore> extends Materialized<K, V, S> {
@@ -73,4 +74,8 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
     public boolean isQueryable() {
         return queriable;
     }
+
+    Duration retention() {
+        return retention;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 54611f7..98076e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -174,12 +174,26 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implemen
         );
     }
 
+    @SuppressWarnings("deprecation") // continuing to support SessionWindows#maintainMs in fallback mode
     private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> materialized) {
         SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) materialized.storeSupplier();
         if (supplier == null) {
+            // NOTE: in the future, when we remove Windows#maintainMs(), we should set the default retention
+            // to be (windows.inactivityGap() + windows.grace()). This will yield the same default behavior.
+            final long retentionPeriod = materialized.retention() != null ? materialized.retention().toMillis() : windows.maintainMs();
+
+            if ((windows.inactivityGap() + windows.gracePeriodMs()) > retentionPeriod) {
+                throw new IllegalArgumentException("The retention period of the session store "
+                                                       + materialized.storeName()
+                                                       + " must be no smaller than the session inactivity gap plus the"
+                                                       + " grace period."
+                                                       + " Got gap=[" + windows.inactivityGap() + "],"
+                                                       + " grace=[" + windows.gracePeriodMs() + "],"
+                                                       + " retention=[" + retentionPeriod + "]");
+            }
             supplier = Stores.persistentSessionStore(
                 materialized.storeName(),
-                windows.maintainMs()
+                retentionPeriod
             );
         }
         final StoreBuilder<SessionStore<K, VR>> builder = Stores.sessionStoreBuilder(
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 0daaf0d..5c5cfb2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -56,10 +56,9 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
                             final boolean repartitionRequired,
                             final StreamsGraphNode streamsGraphNode) {
         super(builder, name, sourceNodes, streamsGraphNode);
-        Objects.requireNonNull(windows, "windows can't be null");
         this.valSerde = valSerde;
         this.keySerde = keySerde;
-        this.windows = windows;
+        this.windows = Objects.requireNonNull(windows, "windows can't be null");
         this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name, streamsGraphNode);
     }
 
@@ -164,16 +163,51 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
         );
     }
 
+    @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode
     private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
         WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
         if (supplier == null) {
-            supplier = Stores.persistentWindowStore(
-                materialized.storeName(),
-                windows.maintainMs(),
-                windows.size(),
-                false,
-                windows.segmentInterval()
-            );
+            if (materialized.retention() != null) {
+                // new style retention: use Materialized retention and default segmentInterval
+                final long retentionPeriod = materialized.retention().toMillis();
+
+                if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
+                    throw new IllegalArgumentException("The retention period of the window store "
+                                                           + name + " must be no smaller than its window size plus the grace period."
+                                                           + " Got size=[" + windows.size() + "],"
+                                                           + " grace=[" + windows.gracePeriodMs() + "],"
+                                                           + " retention=[" + retentionPeriod + "]");
+                }
+
+                supplier = Stores.persistentWindowStore(
+                    materialized.storeName(),
+                    retentionPeriod,
+                    windows.size(),
+                    false
+                );
+
+            } else {
+                // old style retention: use deprecated Windows retention/segmentInterval.
+
+                // NOTE: in the future, when we remove Windows#maintainMs(), we should set the default retention
+                // to be (windows.size() + windows.grace()). This will yield the same default behavior.
+
+                if ((windows.size() + windows.gracePeriodMs()) > windows.maintainMs()) {
+                    throw new IllegalArgumentException("The retention period of the window store "
+                                                           + name + " must be no smaller than its window size plus the grace period."
+                                                           + " Got size=[" + windows.size() + "],"
+                                                           + " grace=[" + windows.gracePeriodMs() + "],"
+                                                           + " retention=[" + windows.maintainMs() + "]");
+                }
+
+                supplier = Stores.persistentWindowStore(
+                    materialized.storeName(),
+                    windows.maintainMs(),
+                    windows.size(),
+                    false,
+                    windows.segmentInterval()
+                );
+            }
         }
         final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(
             supplier,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
new file mode 100644
index 0000000..04c7150
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.metrics;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+
+public class Sensors {
+    private Sensors() {}
+
+    public static Sensor lateRecordDropSensor(final InternalProcessorContext context) {
+        final StreamsMetricsImpl metrics = context.metrics();
+        final Sensor sensor = metrics.nodeLevelSensor(
+            context.taskId().toString(),
+            context.currentNode().name(),
+            "late-record-drop",
+            Sensor.RecordingLevel.INFO
+        );
+        StreamsMetricsImpl.addInvocationRateAndCount(
+            sensor,
+            "stream-processor-node-metrics",
+            metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", context.currentNode().name()),
+            "late-record-drop"
+        );
+        return sensor;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 3bda28d..6e965fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -142,7 +142,10 @@ public class Stores {
     /**
      * Create a persistent {@link WindowBytesStoreSupplier}.
      * @param name                  name of the store (cannot be {@code null})
-     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
+     * @param retentionPeriod       length of time to retain data in the store (cannot be negative).
+     *                              Note that the retention period must be at least long enough to contain the
+     *                              windowed data's entire life cycle, from window-start through window-end,
+     *                              and for the entire grace period.
      * @param numSegments           number of db segments (cannot be zero or negative)
      * @param windowSize            size of the windows that are stored (cannot be negative). Note: the window size
      *                              is not stored with the records, so this value is used to compute the keys that
@@ -177,6 +180,9 @@ public class Stores {
      * Create a persistent {@link WindowBytesStoreSupplier}.
      * @param name                  name of the store (cannot be {@code null})
      * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
+     *                              Note that the retention period must be at least long enough to contain the
+     *                              windowed data's entire life cycle, from window-start through window-end,
+     *                              and for the entire grace period.
      * @param windowSize            size of the windows (cannot be negative)
      * @param retainDuplicates      whether or not to retain duplicates.
      * @return an instance of {@link WindowBytesStoreSupplier}
@@ -194,6 +200,9 @@ public class Stores {
      * Create a persistent {@link WindowBytesStoreSupplier}.
      * @param name                  name of the store (cannot be {@code null})
      * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
+     *                              Note that the retention period must be at least long enough to contain the
+     *                              windowed data's entire life cycle, from window-start through window-end,
+     *                              and for the entire grace period.
      * @param segmentInterval       size of segments in ms (cannot be negative)
      * @param windowSize            size of the windows (cannot be negative)
      * @param retainDuplicates      whether or not to retain duplicates.
@@ -227,6 +236,9 @@ public class Stores {
      * Create a persistent {@link SessionBytesStoreSupplier}.
      * @param name              name of the store (cannot be {@code null})
      * @param retentionPeriod   length ot time to retain data in the store (cannot be negative)
+     *                          Note that the retention period must be at least long enough to contain the
+     *                          windowed data's entire life cycle, from window-start through window-end,
+     *                          and for the entire grace period.
      * @return an instance of a {@link  SessionBytesStoreSupplier}
      */
     public static SessionBytesStoreSupplier persistentSessionStore(final String name,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 6a9284c..fccb6c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -25,6 +26,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
@@ -38,20 +40,26 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+
 class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     private static final Logger LOG = LoggerFactory.getLogger(RocksDBSegmentedBytesStore.class);
     private final String name;
     private final Segments segments;
+    private final String metricScope;
     private final KeySchema keySchema;
     private InternalProcessorContext context;
     private volatile boolean open;
     private Set<Segment> bulkLoadSegments;
+    private Sensor expiredRecordSensor;
 
     RocksDBSegmentedBytesStore(final String name,
+                               final String metricScope,
                                final long retention,
                                final long segmentInterval,
                                final KeySchema keySchema) {
         this.name = name;
+        this.metricScope = metricScope;
         this.keySchema = keySchema;
         this.segments = new Segments(name, retention, segmentInterval);
     }
@@ -79,26 +87,26 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
                                    keySchema.hasNextCondition(keyFrom, keyTo, from, to),
                                    binaryFrom, binaryTo);
     }
-    
+
     @Override
     public KeyValueIterator<Bytes, byte[]> all() {
-        
+
         final List<Segment> searchSpace = segments.allSegments();
-        
+
         return new SegmentIterator(searchSpace.iterator(),
                                    keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE),
                                    null, null);
     }
-    
+
     @Override
     public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom, final long timeTo) {
         final List<Segment> searchSpace = segments.segments(timeFrom, timeTo);
-        
+
         return new SegmentIterator(searchSpace.iterator(),
                                    keySchema.hasNextCondition(null, null, timeFrom, timeTo),
                                    null, null);
     }
-    
+
     @Override
     public void remove(final Bytes key) {
         final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
@@ -110,9 +118,11 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
 
     @Override
     public void put(final Bytes key, final byte[] value) {
-        final long segmentId = segments.segmentId(keySchema.segmentTimestamp(key));
+        final long timestamp = keySchema.segmentTimestamp(key);
+        final long segmentId = segments.segmentId(timestamp);
         final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context);
         if (segment == null) {
+            expiredRecordSensor.record();
             LOG.debug("Skipping record for expired segment.");
         } else {
             segment.put(key, value);
@@ -137,6 +147,23 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = (InternalProcessorContext) context;
 
+        final StreamsMetricsImpl metrics = this.context.metrics();
+
+        final String taskName = context.taskId().toString();
+
+        expiredRecordSensor = metrics.storeLevelSensor(
+            taskName,
+            name(),
+            "expired-window-record-drop",
+            Sensor.RecordingLevel.INFO
+        );
+        addInvocationRateAndCount(
+            expiredRecordSensor,
+            "stream-" + metricScope + "-metrics",
+            metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
+            "expired-window-record-drop"
+        );
+
         keySchema.init(ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name()));
 
         segments.openExisting(this.context);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index 5610fb2..e88755b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -40,6 +40,7 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
     public SessionStore<Bytes, byte[]> get() {
         final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(
             name,
+            metricsScope(),
             retentionPeriod,
             segmentIntervalMs(),
             new SessionKeySchema());
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index 5c7b099..b9b7279 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -49,6 +49,7 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
     public WindowStore<Bytes, byte[]> get() {
         final RocksDBSegmentedBytesStore segmentedBytesStore = new RocksDBSegmentedBytesStore(
                 name,
+                metricsScope(),
                 retentionPeriod,
                 segmentInterval,
                 new WindowKeySchema()
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 8e6c524..cf858b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -72,7 +72,8 @@ class Segments {
     }
 
     Segment getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context) {
-        final long minLiveSegment = segmentId(context.streamTime() - retentionPeriod);
+        final long minLiveTimestamp = context.streamTime() - retentionPeriod;
+        final long minLiveSegment = segmentId(minLiveTimestamp);
 
         final Segment toReturn;
         if (segmentId >= minLiveSegment) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index eeb08ac..caf2b10 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -188,7 +188,7 @@ public class TopologyTest {
     public void shouldNotAllowToAddProcessorWithNullParents() {
         topology.addSource("source", "topic-1");
         try {
-            topology.addProcessor("processor", new MockProcessorSupplier(), null);
+            topology.addProcessor("processor", new MockProcessorSupplier(), (String) null);
             fail("Should throw NullPointerException for processor when null parent names are provided");
         } catch (final NullPointerException expected) { }
     }
@@ -228,7 +228,7 @@ public class TopologyTest {
         topology.addSource("source", "topic-1");
         topology.addProcessor("processor", new MockProcessorSupplier(), "source");
         try {
-            topology.addSink("sink", "topic-2", null);
+            topology.addSink("sink", "topic-2", (String) null);
             fail("Should throw NullPointerException for sink when null parent names are provided");
         } catch (final NullPointerException expected) { }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index a099cea..fe7ee26 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -197,9 +197,13 @@ public class InternalTopicIntegrationTest {
                 return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
             }
         })
-                .groupBy(MockMapper.<String, String>selectValueMapper())
-                .windowedBy(TimeWindows.of(1000).until(2000))
-                .count(Materialized.<String, Long, WindowStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("CountWindows"));
+            .groupBy(MockMapper.<String, String>selectValueMapper())
+            .windowedBy(TimeWindows.of(1000).grace(0L))
+            .count(
+                Materialized
+                    .<String, Long, WindowStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("CountWindows")
+                    .withRetention(2_000L)
+            );
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp);
         streams.start();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index fecb8ea..c16950e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -455,7 +455,6 @@ public class KStreamAggregationIntegrationTest {
     @Test
     public void shouldCountSessionWindows() throws Exception {
         final long sessionGap = 5 * 60 * 1000L;
-        final long maintainMillis = sessionGap * 3;
 
         final long t1 = mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
         final List<KeyValue<String, String>> t1Messages = Arrays.asList(new KeyValue<>("bob", "start"),
@@ -518,7 +517,7 @@ public class KStreamAggregationIntegrationTest {
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
                 .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-                .windowedBy(SessionWindows.with(sessionGap).until(maintainMillis))
+                .windowedBy(SessionWindows.with(sessionGap))
                 .count()
                 .toStream()
                 .transform(() -> new Transformer<Windowed<String>, Long, KeyValue<Object, Object>>() {
@@ -554,7 +553,6 @@ public class KStreamAggregationIntegrationTest {
     @Test
     public void shouldReduceSessionWindows() throws Exception {
         final long sessionGap = 1000L; // something to do with time
-        final long maintainMillis = sessionGap * 3;
 
         final long t1 = mockTime.milliseconds();
         final List<KeyValue<String, String>> t1Messages = Arrays.asList(new KeyValue<>("bob", "start"),
@@ -617,7 +615,7 @@ public class KStreamAggregationIntegrationTest {
         final String userSessionsStore = "UserSessionsStore";
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
                 .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-                .windowedBy(SessionWindows.with(sessionGap).until(maintainMillis))
+                .windowedBy(SessionWindows.with(sessionGap))
                 .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore))
                 .toStream()
                 .foreach((key, value) -> {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index ab973e8..5576b93 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -19,63 +19,27 @@ package org.apache.kafka.streams.kstream;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 
 public class JoinWindowsTest {
 
-    private static long anySize = 123L;
-    private static long anyOtherSize = 456L; // should be larger than anySize
-
-    @Test
-    public void shouldHaveSaneEqualsAndHashCode() {
-        final JoinWindows w1 = JoinWindows.of(anySize);
-        final JoinWindows w2 = JoinWindows.of(anySize);
-
-        // Reflexive
-        assertEquals(w1, w1);
-        assertEquals(w1.hashCode(), w1.hashCode());
-
-        // Symmetric
-        assertEquals(w1, w2);
-        assertEquals(w2, w1);
-        assertEquals(w1.hashCode(), w2.hashCode());
-
-        final JoinWindows w3 = JoinWindows.of(w2.afterMs).before(anyOtherSize);
-        final JoinWindows w4 = JoinWindows.of(anyOtherSize).after(w2.afterMs);
-        assertEquals(w3, w4);
-        assertEquals(w4, w3);
-        assertEquals(w3.hashCode(), w4.hashCode());
-
-        // Inequality scenarios
-        assertNotEquals("must be false for null", null, w1);
-        assertNotEquals("must be false for different window types", UnlimitedWindows.of(), w1);
-        assertNotEquals("must be false for different types", new Object(), w1);
-
-        final JoinWindows differentWindowSize = JoinWindows.of(w1.afterMs + 1);
-        assertNotEquals("must be false when window sizes are different", differentWindowSize, w1);
-
-        final JoinWindows differentWindowSize2 = JoinWindows.of(w1.afterMs).after(w1.afterMs + 1);
-        assertNotEquals("must be false when window sizes are different", differentWindowSize2, w1);
-
-        final JoinWindows differentWindowSize3 = JoinWindows.of(w1.afterMs).before(w1.beforeMs + 1);
-        assertNotEquals("must be false when window sizes are different", differentWindowSize3, w1);
-    }
+    private static final long ANY_SIZE = 123L;
+    private static final long ANY_OTHER_SIZE = 456L; // should be larger than anySize
 
     @Test
     public void validWindows() {
-        JoinWindows.of(anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
-            .before(anySize)                    // [ -anySize ; anyOtherSize ]
-            .before(0)                          // [ 0 ; anyOtherSize ]
-            .before(-anySize)                   // [ anySize ; anyOtherSize ]
-            .before(-anyOtherSize);             // [ anyOtherSize ; anyOtherSize ]
-
-        JoinWindows.of(anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
-            .after(anySize)                     // [ -anyOtherSize ; anySize ]
-            .after(0)                           // [ -anyOtherSize ; 0 ]
-            .after(-anySize)                    // [ -anyOtherSize ; -anySize ]
-            .after(-anyOtherSize);              // [ -anyOtherSize ; -anyOtherSize ]
+        JoinWindows.of(ANY_OTHER_SIZE)   // [ -anyOtherSize ; anyOtherSize ]
+                   .before(ANY_SIZE)                    // [ -anySize ; anyOtherSize ]
+                   .before(0)                          // [ 0 ; anyOtherSize ]
+                   .before(-ANY_SIZE)                   // [ anySize ; anyOtherSize ]
+                   .before(-ANY_OTHER_SIZE);             // [ anyOtherSize ; anyOtherSize ]
+
+        JoinWindows.of(ANY_OTHER_SIZE)   // [ -anyOtherSize ; anyOtherSize ]
+                   .after(ANY_SIZE)                     // [ -anyOtherSize ; anySize ]
+                   .after(0)                           // [ -anyOtherSize ; 0 ]
+                   .after(-ANY_SIZE)                    // [ -anyOtherSize ; -anySize ]
+                   .after(-ANY_OTHER_SIZE);              // [ -anyOtherSize ; -anyOtherSize ]
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -85,9 +49,9 @@ public class JoinWindowsTest {
 
     @Test
     public void endTimeShouldNotBeBeforeStart() {
-        final JoinWindows windowSpec = JoinWindows.of(anySize);
+        final JoinWindows windowSpec = JoinWindows.of(ANY_SIZE);
         try {
-            windowSpec.after(-anySize - 1);
+            windowSpec.after(-ANY_SIZE - 1);
             fail("window end time should not be before window start time");
         } catch (final IllegalArgumentException e) {
             // expected
@@ -96,25 +60,27 @@ public class JoinWindowsTest {
 
     @Test
     public void startTimeShouldNotBeAfterEnd() {
-        final JoinWindows windowSpec = JoinWindows.of(anySize);
+        final JoinWindows windowSpec = JoinWindows.of(ANY_SIZE);
         try {
-            windowSpec.before(-anySize - 1);
+            windowSpec.before(-ANY_SIZE - 1);
             fail("window start time should not be after window end time");
         } catch (final IllegalArgumentException e) {
             // expected
         }
     }
 
+    @Deprecated
     @Test
     public void untilShouldSetMaintainDuration() {
-        final JoinWindows windowSpec = JoinWindows.of(anySize);
+        final JoinWindows windowSpec = JoinWindows.of(ANY_SIZE);
         final long windowSize = windowSpec.size();
         assertEquals(windowSize, windowSpec.until(windowSize).maintainMs());
     }
 
+    @Deprecated
     @Test
     public void retentionTimeMustNoBeSmallerThanWindowSize() {
-        final JoinWindows windowSpec = JoinWindows.of(anySize);
+        final JoinWindows windowSpec = JoinWindows.of(ANY_SIZE);
         final long windowSize = windowSpec.size();
         try {
             windowSpec.until(windowSize - 1);
@@ -124,4 +90,16 @@ public class JoinWindowsTest {
         }
     }
 
+    @Test
+    public void gracePeriodShouldEnforceBoundaries() {
+        JoinWindows.of(3L).grace(0L);
+
+        try {
+            JoinWindows.of(3L).grace(-1L);
+            fail("should not accept negatives");
+        } catch (final IllegalArgumentException e) {
+            //expected
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index d0e5996..c464c75 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -18,9 +18,6 @@ package org.apache.kafka.streams.kstream;
 
 import org.junit.Test;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -32,12 +29,26 @@ public class SessionWindowsTest {
         assertEquals(anyGap, SessionWindows.with(anyGap).inactivityGap());
     }
 
+    @Deprecated
     @Test
     public void shouldSetWindowRetentionTime() {
         final long anyRetentionTime = 42L;
         assertEquals(anyRetentionTime, SessionWindows.with(1).until(anyRetentionTime).maintainMs());
     }
 
+
+    @Test
+    public void gracePeriodShouldEnforceBoundaries() {
+        SessionWindows.with(3L).grace(0L);
+
+        try {
+            SessionWindows.with(3L).grace(-1L);
+            fail("should not accept negatives");
+        } catch (final IllegalArgumentException e) {
+            //expected
+        }
+    }
+
     @Test(expected = IllegalArgumentException.class)
     public void windowSizeMustNotBeNegative() {
         SessionWindows.with(-1);
@@ -48,12 +59,14 @@ public class SessionWindowsTest {
         SessionWindows.with(0);
     }
 
+    @SuppressWarnings("deprecation") // specifically testing deprecated apis
     @Test
     public void retentionTimeShouldBeGapIfGapIsLargerThanDefaultRetentionTime() {
         final long windowGap = 2 * SessionWindows.with(1).maintainMs();
         assertEquals(windowGap, SessionWindows.with(windowGap).maintainMs());
     }
 
+    @Deprecated
     @Test
     public void retentionTimeMustNotBeNegative() {
         final SessionWindows windowSpec = SessionWindows.with(42);
@@ -64,19 +77,4 @@ public class SessionWindowsTest {
             // expected
         }
     }
-
-    @Test
-    public void shouldBeEqualWhenGapAndMaintainMsAreTheSame() {
-        assertThat(SessionWindows.with(5), equalTo(SessionWindows.with(5)));
-    }
-
-    @Test
-    public void shouldNotBeEqualWhenMaintainMsDifferent() {
-        assertThat(SessionWindows.with(5), not(equalTo(SessionWindows.with(5).until(10))));
-    }
-
-    @Test
-    public void shouldNotBeEqualWhenGapIsDifferent() {
-        assertThat(SessionWindows.with(5), not(equalTo(SessionWindows.with(10))));
-    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index a90c86f..b8d3bfd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -22,7 +22,6 @@ import org.junit.Test;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 public class TimeWindowsTest {
@@ -40,49 +39,19 @@ public class TimeWindowsTest {
         assertEquals(anyAdvance, TimeWindows.of(ANY_SIZE).advanceBy(anyAdvance).advanceMs);
     }
 
+    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test
     public void shouldSetWindowRetentionTime() {
         assertEquals(ANY_SIZE, TimeWindows.of(ANY_SIZE).until(ANY_SIZE).maintainMs());
     }
 
+    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test
     public void shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() {
         final long windowSize = 2 * TimeWindows.of(1).maintainMs();
         assertEquals(windowSize, TimeWindows.of(windowSize).maintainMs());
     }
 
-    @Test
-    public void shouldHaveSaneEqualsAndHashCode() {
-        final TimeWindows w1 = TimeWindows.of(ANY_SIZE);
-        final TimeWindows w2 = TimeWindows.of(w1.sizeMs);
-
-        // Reflexive
-        assertEquals(w1, w1);
-        assertEquals(w1.hashCode(), w1.hashCode());
-
-        // Symmetric
-        assertEquals(w1, w2);
-        assertEquals(w2, w1);
-        assertEquals(w1.hashCode(), w2.hashCode());
-
-        // Transitive
-        final TimeWindows w3 = TimeWindows.of(w2.sizeMs);
-        assertEquals(w2, w3);
-        assertEquals(w1, w3);
-        assertEquals(w1.hashCode(), w3.hashCode());
-
-        // Inequality scenarios
-        assertNotEquals("must be false for null", null, w1);
-        assertNotEquals("must be false for different window types", UnlimitedWindows.of(), w1);
-        assertNotEquals("must be false for different types", new Object(), w1);
-
-        final TimeWindows differentWindowSize = TimeWindows.of(w1.sizeMs + 1);
-        assertNotEquals("must be false when window sizes are different", differentWindowSize, w1);
-
-        final TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advanceMs - 1);
-        assertNotEquals("must be false when advance intervals are different", differentAdvanceInterval, w1);
-    }
-
     @Test(expected = IllegalArgumentException.class)
     public void windowSizeMustNotBeZero() {
         TimeWindows.of(0);
@@ -115,6 +84,7 @@ public class TimeWindowsTest {
         }
     }
 
+    @Deprecated
     @Test
     public void advanceIntervalMustNotBeLargerThanWindowSize() {
         final TimeWindows windowSpec = TimeWindows.of(ANY_SIZE);
@@ -126,6 +96,7 @@ public class TimeWindowsTest {
         }
     }
 
+    @Deprecated
     @Test
     public void retentionTimeMustNoBeSmallerThanWindowSize() {
         final TimeWindows windowSpec = TimeWindows.of(ANY_SIZE);
@@ -138,6 +109,18 @@ public class TimeWindowsTest {
     }
 
     @Test
+    public void gracePeriodShouldEnforceBoundaries() {
+        TimeWindows.of(3L).grace(0L);
+
+        try {
+            TimeWindows.of(3L).grace(-1L);
+            fail("should not accept negatives");
+        } catch (final IllegalArgumentException e) {
+            //expected
+        }
+    }
+
+    @Test
     public void shouldComputeWindowsForHoppingWindows() {
         final TimeWindows windows = TimeWindows.of(12L).advanceBy(5L);
         final Map<Long, TimeWindow> matched = windows.windowsFor(21L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
index c5e1ce5..9798a81 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
@@ -51,6 +51,16 @@ public class UnlimitedWindowsTest {
     }
 
     @Test
+    public void gracePeriodShouldNotBeSettable() {
+        try {
+            UnlimitedWindows.of().grace(0L);
+            fail("should not be able to set grace period");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        }
+    }
+
+    @Test
     public void shouldIncludeRecordsThatHappenedOnWindowStart() {
         final UnlimitedWindows w = UnlimitedWindows.of().startOn(anyStartTime);
         final Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(w.startMs);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
index fc097ca..12ff166 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
@@ -21,6 +21,7 @@ import org.junit.Test;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class WindowsTest {
 
@@ -37,6 +38,7 @@ public class WindowsTest {
         }
     }
 
+    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test
     public void shouldSetNumberOfSegments() {
         final int anySegmentSizeLargerThanOne = 5;
@@ -49,17 +51,33 @@ public class WindowsTest {
         );
     }
 
+    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test
     public void shouldSetWindowRetentionTime() {
         final int anyNotNegativeRetentionTime = 42;
         assertEquals(anyNotNegativeRetentionTime, new TestWindows().until(anyNotNegativeRetentionTime).maintainMs());
     }
 
+
+    @Test
+    public void gracePeriodShouldEnforceBoundaries() {
+        new TestWindows().grace(0L);
+
+        try {
+            new TestWindows().grace(-1L);
+            fail("should not accept negatives");
+        } catch (final IllegalArgumentException e) {
+            //expected
+        }
+    }
+
+    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test(expected = IllegalArgumentException.class)
     public void numberOfSegmentsMustBeAtLeastTwo() {
         new TestWindows().segments(1);
     }
 
+    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test(expected = IllegalArgumentException.class)
     public void retentionTimeMustNotBeNegative() {
         new TestWindows().until(-1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index c41b19b..354fa0a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -19,11 +19,11 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
@@ -60,6 +60,7 @@ import java.util.regex.Pattern;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -243,34 +244,69 @@ public class KStreamImplTest {
         assertThat(mockProcessors.get(1).processed, equalTo(Collections.singletonList("b:v1")));
     }
 
+    @SuppressWarnings("deprecation") // specifically testing the deprecated variant
     @Test
-    public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
+    public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreatedWithRetention() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> kStream = builder.stream("topic-1", stringConsumed);
         final ValueJoiner<String, String, String> valueJoiner = MockValueJoiner.instance(":");
         final long windowSize = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
         final KStream<String, String> stream = kStream
-                        .map(new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>() {
-                            @Override
-                            public KeyValue<? extends String, ? extends String> apply(final String key, final String value) {
-                                return KeyValue.pair(value, value);
-                            }
-                        });
+            .map(new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>() {
+                @Override
+                public KeyValue<? extends String, ? extends String> apply(final String key, final String value) {
+                    return KeyValue.pair(value, value);
+                }
+            });
         stream.join(kStream,
                     valueJoiner,
                     JoinWindows.of(windowSize).until(3 * windowSize),
                     Joined.with(Serdes.String(),
                                 Serdes.String(),
                                 Serdes.String()))
-                .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
+              .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
+
+        final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build();
+
+        final SourceNode originalSourceNode = topology.source("topic-1");
+
+        for (final SourceNode sourceNode : topology.sources()) {
+            if (sourceNode.name().equals(originalSourceNode.name())) {
+                assertNull(sourceNode.getTimestampExtractor());
+            } else {
+                assertThat(sourceNode.getTimestampExtractor(), instanceOf(FailOnInvalidTimestamp.class));
+            }
+        }
+    }
+
+    @Test
+    public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> kStream = builder.stream("topic-1", stringConsumed);
+        final ValueJoiner<String, String, String> valueJoiner = MockValueJoiner.instance(":");
+        final long windowSize = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
+        final KStream<String, String> stream = kStream
+            .map(new KeyValueMapper<String, String, KeyValue<? extends String, ? extends String>>() {
+                @Override
+                public KeyValue<? extends String, ? extends String> apply(final String key, final String value) {
+                    return KeyValue.pair(value, value);
+                }
+            });
+        stream.join(
+            kStream,
+            valueJoiner,
+            JoinWindows.of(windowSize).grace(3L * windowSize),
+            Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
+        )
+              .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
 
         final ProcessorTopology topology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build();
 
         final SourceNode originalSourceNode = topology.source("topic-1");
 
-        for (final SourceNode sourceNode: topology.sources()) {
+        for (final SourceNode sourceNode : topology.sources()) {
             if (sourceNode.name().equals(originalSourceNode.name())) {
-                assertEquals(sourceNode.getTimestampExtractor(), null);
+                assertNull(sourceNode.getTimestampExtractor());
             } else {
                 assertThat(sourceNode.getTimestampExtractor(), instanceOf(FailOnInvalidTimestamp.class));
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 6b5e577..74cd7bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Merger;
@@ -36,6 +39,7 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -46,6 +50,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -78,7 +84,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     };
     private final KStreamSessionWindowAggregate<String, String, Long> sessionAggregator =
         new KStreamSessionWindowAggregate<>(
-            SessionWindows.with(GAP_MS).until(3 * GAP_MS),
+            SessionWindows.with(GAP_MS),
             STORE_NAME,
             initializer,
             aggregator,
@@ -88,13 +94,23 @@ public class KStreamSessionWindowAggregateProcessorTest {
     private final Processor<String, String> processor = sessionAggregator.get();
     private SessionStore<String, Long> sessionStore;
     private InternalMockProcessorContext context;
+    private Metrics metrics;
 
 
     @Before
     public void initializeStore() {
         final File stateDir = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext(stateDir,
-            Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 100000, new MockStreamsMetrics(new Metrics()))) {
+        metrics = new Metrics();
+        final MockStreamsMetrics metrics = new MockStreamsMetrics(KStreamSessionWindowAggregateProcessorTest.this.metrics);
+        context = new InternalMockProcessorContext(
+            stateDir,
+            Serdes.String(),
+            Serdes.String(),
+            metrics,
+            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            NoOpRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 100000, metrics)
+        ) {
             @Override
             public <K, V> void forward(final K key, final V value) {
                 results.add(KeyValue.pair(key, value));
@@ -107,9 +123,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     private void initStore(final boolean enableCaching) {
         final StoreBuilder<SessionStore<String, Long>> storeBuilder = Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, GAP_MS * 3),
-                Serdes.String(),
-                Serdes.Long())
-                .withLoggingDisabled();
+                                                                                                 Serdes.String(),
+                                                                                                 Serdes.Long())
+            .withLoggingDisabled();
 
         if (enableCaching) {
             storeBuilder.withCachingEnabled();
@@ -316,4 +332,37 @@ public class KStreamSessionWindowAggregateProcessorTest {
         assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
         assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[-3] offset=[-2]"));
     }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingLateRecord() {
+        LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        final Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
+            SessionWindows.with(10L).grace(10L),
+            STORE_NAME,
+            initializer,
+            aggregator,
+            sessionMerger
+        ).get();
+
+        initStore(false);
+        processor.init(context);
+        context.setStreamTime(20);
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        processor.process("A", "1");
+        LogCaptureAppender.unregister(appender);
+
+        final Metric dropMetric = metrics.metrics().get(new MetricName(
+            "late-record-drop-total",
+            "stream-processor-node-metrics",
+            "The total number of occurrence of late-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "test"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("processor-node-id", "TESTING_NODE")
+            )
+        ));
+        assertEquals(1.0, dropMetric.metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]"));
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index dd2cf05..af7cff6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -46,6 +47,8 @@ import org.junit.Test;
 import java.util.List;
 import java.util.Properties;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
@@ -246,6 +249,7 @@ public class KStreamWindowAggregateTest {
         }
     }
 
+    @Deprecated // testing deprecated functionality (behavior of until)
     @Test
     public void shouldLogAndMeterWhenSkippingExpiredWindow() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -263,6 +267,7 @@ public class KStreamWindowAggregateTest {
             .map((key, value) -> new KeyValue<>(key.toString(), value))
             .to("output");
 
+        LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
             driver.pipeInput(recordFactory.create(topic, "k", "100", 100L));
@@ -275,15 +280,84 @@ public class KStreamWindowAggregateTest {
             driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
             LogCaptureAppender.unregister(appender);
 
-            assertThat(getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue(), equalTo(7.0));
+            final MetricName metricName = new MetricName(
+                "late-record-drop-total",
+                "stream-processor-node-metrics",
+                "The total number of occurrence of late-record-drop operations.",
+                mkMap(
+                    mkEntry("client-id", "topology-test-driver-virtual-thread"),
+                    mkEntry("task-id", "0_0"),
+                    mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
+                )
+            );
+            assertThat(driver.metrics().get(metricName).metricValue(), equalTo(7.0));
+            assertThat(appender.getMessages(), hasItems(
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
+            ));
+
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100", 100);
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/110]", "+100", 100);
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5", 5);
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5+6", 6);
+            assertThat(driver.readOutput("output"), nullValue());
+        }
+    }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
+        stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .windowedBy(TimeWindows.of(10).advanceBy(5).grace(90L))
+            .aggregate(
+                () -> "",
+                MockAggregator.toStringInstance("+"),
+                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+            )
+            .toStream()
+            .map((key, value) -> new KeyValue<>(key.toString(), value))
+            .to("output");
+
+        LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+            driver.pipeInput(recordFactory.create(topic, "k", "100", 100L));
+            driver.pipeInput(recordFactory.create(topic, "k", "0", 0L));
+            driver.pipeInput(recordFactory.create(topic, "k", "1", 1L));
+            driver.pipeInput(recordFactory.create(topic, "k", "2", 2L));
+            driver.pipeInput(recordFactory.create(topic, "k", "3", 3L));
+            driver.pipeInput(recordFactory.create(topic, "k", "4", 4L));
+            driver.pipeInput(recordFactory.create(topic, "k", "5", 5L));
+            driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
+            LogCaptureAppender.unregister(appender);
+
+            final MetricName metricName = new MetricName(
+                "late-record-drop-total",
+                "stream-processor-node-metrics",
+                "The total number of occurrence of late-record-drop operations.",
+                mkMap(
+                    mkEntry("client-id", "topology-test-driver-virtual-thread"),
+                    mkEntry("task-id", "0_0"),
+                    mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
+                )
+            );
+            assertThat(driver.metrics().get(metricName).metricValue(), equalTo(7.0));
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0] expiration=[0]"
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
             ));
 
             OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100", 100);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index 4cf9324..bc8ca95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -34,6 +36,8 @@ import org.junit.Test;
 
 import java.util.Properties;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
@@ -68,6 +72,7 @@ public class KStreamWindowReduceTest {
         }
     }
 
+    @Deprecated // testing deprecated functionality (behavior of until)
     @Test
     public void shouldLogAndMeterOnExpiredEvent() {
 
@@ -83,6 +88,7 @@ public class KStreamWindowReduceTest {
 
 
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            LogCaptureAppender.setClassLoggerToDebug(KStreamWindowReduce.class);
             final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
             driver.pipeInput(recordFactory.create("TOPIC", "k", "100", 100L));
             driver.pipeInput(recordFactory.create("TOPIC", "k", "0", 0L));
@@ -93,13 +99,24 @@ public class KStreamWindowReduceTest {
             driver.pipeInput(recordFactory.create("TOPIC", "k", "5", 5L));
             LogCaptureAppender.unregister(appender);
 
-            assertThat(getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue(), equalTo(5.0));
+            final Metric dropMetric = driver.metrics().get(new MetricName(
+                "late-record-drop-total",
+                "stream-processor-node-metrics",
+                "The total number of occurrence of late-record-drop operations.",
+                mkMap(
+                    mkEntry("client-id", "topology-test-driver-virtual-thread"),
+                    mkEntry("task-id", "0_0"),
+                    mkEntry("processor-node-id", "KSTREAM-REDUCE-0000000002")
+                )
+            ));
+
+            assertThat(dropMetric.metricValue(), equalTo(5.0));
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[1] timestamp=[0] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[2] timestamp=[1] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[3] timestamp=[2] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[4] timestamp=[3] window=[0] expiration=[0]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[5] timestamp=[4] window=[0] expiration=[0]"
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[1] timestamp=[0] window=[0,5) expiration=[5]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[2] timestamp=[1] window=[0,5) expiration=[5]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[3] timestamp=[2] window=[0,5) expiration=[5]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[4] timestamp=[3] window=[0,5) expiration=[5]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[5] timestamp=[4] window=[0,5) expiration=[5]"
             ));
 
             OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/105]", "100", 100);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b27e16f..c822fc3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -229,10 +229,12 @@ public class StandbyTaskTest {
         final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder().setApplicationId(applicationId);
 
         final InternalStreamsBuilder builder = new InternalStreamsBuilder(internalTopologyBuilder);
-        builder.stream(Collections.singleton("topic"), new ConsumedInternal<>())
+
+        builder
+            .stream(Collections.singleton("topic"), new ConsumedInternal<>())
             .groupByKey()
-            .windowedBy(TimeWindows.of(60_000).until(120_000))
-            .count(Materialized.as(storeName));
+            .windowedBy(TimeWindows.of(60_000).grace(0L))
+            .count(Materialized.<Object, Long, WindowStore<Bytes, byte[]>>as(storeName).withRetention(120_000L));
 
         builder.buildAndOptimizeTopology();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
index b6f5769..ffb8799 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals.testutil;
 
 
 import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
 
@@ -33,6 +34,10 @@ public class LogCaptureAppender extends AppenderSkeleton {
         return logCaptureAppender;
     }
 
+    public static void setClassLoggerToDebug(final Class<?> clazz) {
+        Logger.getLogger(clazz).setLevel(Level.DEBUG);
+    }
+
     public static void unregister(final LogCaptureAppender logCaptureAppender) {
         Logger.getRootLogger().removeAppender(logCaptureAppender);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 47e79c9..f06e129 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -68,7 +68,7 @@ public class CachingSessionStoreTest {
     public void setUp() {
         final SessionKeySchema schema = new SessionKeySchema();
         schema.init("topic");
-        underlying = new RocksDBSegmentedBytesStore("test", 0L, SEGMENT_INTERVAL, schema);
+        underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, schema);
         final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
         cachingStore = new CachingSessionStore<>(sessionStore, Serdes.String(), Serdes.String(), SEGMENT_INTERVAL);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 0e7f88a..1c8dd7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -79,7 +79,7 @@ public class CachingWindowStoreTest {
     @Before
     public void setUp() {
         keySchema = new WindowKeySchema();
-        underlying = new RocksDBSegmentedBytesStore("test", 0, SEGMENT_INTERVAL, keySchema);
+        underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0, SEGMENT_INTERVAL, keySchema);
         final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false, WINDOW_SIZE);
         cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
         cachingStore = new CachingWindowStore<>(windowStore, Serdes.String(), Serdes.String(), WINDOW_SIZE, SEGMENT_INTERVAL);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index cffd73f..93e3452 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -27,6 +29,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -55,11 +58,15 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.SimpleTimeZone;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 
@@ -88,22 +95,22 @@ public class RocksDBSegmentedBytesStoreTest {
         schema.init("topic");
 
         if (schema instanceof SessionKeySchema) {
-            windows[0] = new SessionWindow(10, 10);
-            windows[1] = new SessionWindow(500, 1000);
-            windows[2] = new SessionWindow(1000, 1500);
+            windows[0] = new SessionWindow(10L, 10L);
+            windows[1] = new SessionWindow(500L, 1000L);
+            windows[2] = new SessionWindow(1_000L, 1_500L);
             windows[3] = new SessionWindow(30_000L, 60_000L);
         }
         if (schema instanceof WindowKeySchema) {
-
-            windows[0] = timeWindowForSize(10, windowSizeForTimeWindow);
-            windows[1] = timeWindowForSize(500, windowSizeForTimeWindow);
-            windows[2] = timeWindowForSize(1000, windowSizeForTimeWindow);
+            windows[0] = timeWindowForSize(10L, windowSizeForTimeWindow);
+            windows[1] = timeWindowForSize(500L, windowSizeForTimeWindow);
+            windows[2] = timeWindowForSize(1_000L, windowSizeForTimeWindow);
             windows[3] = timeWindowForSize(60_000L, windowSizeForTimeWindow);
         }
 
 
         bytesStore = new RocksDBSegmentedBytesStore(
             storeName,
+            "metrics-scope",
             retention,
             segmentInterval,
             schema
@@ -276,6 +283,7 @@ public class RocksDBSegmentedBytesStoreTest {
 
         bytesStore = new RocksDBSegmentedBytesStore(
             storeName,
+            "metrics-scope",
             retention,
             segmentInterval,
             schema
@@ -312,6 +320,7 @@ public class RocksDBSegmentedBytesStoreTest {
 
         bytesStore = new RocksDBSegmentedBytesStore(
             storeName,
+            "metrics-scope",
             retention,
             segmentInterval,
             schema
@@ -402,6 +411,46 @@ public class RocksDBSegmentedBytesStoreTest {
         }
     }
 
+    @Test
+    public void shouldLogAndMeasureExpiredRecords() {
+        LogCaptureAppender.setClassLoggerToDebug(RocksDBSegmentedBytesStore.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        context.setStreamTime(Math.max(retention, segmentInterval) * 2);
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(5));
+
+        LogCaptureAppender.unregister(appender);
+
+        final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+
+        final Metric dropTotal = metrics.get(new MetricName(
+            "expired-window-record-drop-total",
+            "stream-metrics-scope-metrics",
+            "The total number of occurrence of expired-window-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "mock"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("metrics-scope-id", "bytes-store")
+            )
+        ));
+
+        final Metric dropRate = metrics.get(new MetricName(
+            "expired-window-record-drop-rate",
+            "stream-metrics-scope-metrics",
+            "The average number of occurrence of expired-window-record-drop operation per second.",
+            mkMap(
+                mkEntry("client-id", "mock"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("metrics-scope-id", "bytes-store")
+            )
+        ));
+
+        assertEquals(1.0, dropTotal.metricValue());
+        assertNotEquals(0.0, dropRate.metricValue());
+        final List<String> messages = appender.getMessages();
+        assertThat(messages, hasItem("Skipping record for expired segment."));
+    }
+
     private Set<String> segmentDirs() {
         final File windowDir = new File(stateDir, storeName);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index cf4b90d..a80e28b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -53,7 +53,7 @@ public class RocksDBSessionStoreTest {
         schema.init("topic");
 
         final RocksDBSegmentedBytesStore bytesStore =
-                new RocksDBSegmentedBytesStore("session-store", 10_000L, 60_000L, schema);
+                new RocksDBSegmentedBytesStore("session-store", "metrics-scope", 10_000L, 60_000L, schema);
 
         sessionStore = new RocksDBSessionStore<>(bytesStore,
                                                  Serdes.String(),
@@ -154,7 +154,7 @@ public class RocksDBSessionStoreTest {
     @Test
     public void shouldFetchExactKeys() {
         final RocksDBSegmentedBytesStore bytesStore =
-                new RocksDBSegmentedBytesStore("session-store", 0x7a00000000000000L, 0x7a00000000000000L, new SessionKeySchema());
+                new RocksDBSegmentedBytesStore("session-store", "metrics-scope", 0x7a00000000000000L, 0x7a00000000000000L, new SessionKeySchema());
 
         sessionStore = new RocksDBSessionStore<>(bytesStore,
                                                  Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index b0057e5..b49a5c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -828,7 +828,7 @@ public class RocksDBWindowStoreTest {
     @Test
     public void shouldNoNullPointerWhenSerdeDoesNotHandleNull() {
         windowStore = new RocksDBWindowStore<>(
-            new RocksDBSegmentedBytesStore(windowName, retentionPeriod, segmentInterval, new WindowKeySchema()),
+            new RocksDBSegmentedBytesStore(windowName, "metrics-scope", retentionPeriod, segmentInterval, new WindowKeySchema()),
             Serdes.Integer(),
             new SerdeThatDoesntHandleNull(),
             false,
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index e3a9ce7..ec8d328 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -136,6 +136,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
             metrics,
             null,
             cache);
+        super.setCurrentNode(new ProcessorNode("TESTING_NODE"));
         this.stateDir = stateDir;
         this.keySerde = keySerde;
         this.valSerde = valSerde;


Mime
View raw message