kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: clean up window store interface to avoid confusion (#5359)
Date Sat, 04 Aug 2018 20:57:25 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b9f1179  MINOR: clean up window store interface to avoid confusion (#5359)
b9f1179 is described below

commit b9f11796944056a5b3c7440033d587d19290b3c3
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Sat Aug 4 15:57:18 2018 -0500

    MINOR: clean up window store interface to avoid confusion (#5359)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
---
 .../apache/kafka/streams/kstream/internals/KStreamImpl.java | 13 ++-----------
 .../kafka/streams/kstream/internals/KStreamJoinWindow.java  | 13 +++----------
 .../main/java/org/apache/kafka/streams/state/Stores.java    | 10 +++++++++-
 .../java/org/apache/kafka/streams/state/WindowStore.java    | 11 ++++++++---
 .../kafka/streams/state/internals/CachingWindowStore.java   |  4 ++--
 .../state/internals/ChangeLoggingWindowBytesStore.java      |  6 +++---
 .../kafka/streams/state/internals/MeteredWindowStore.java   |  4 ++--
 .../kafka/streams/state/internals/RocksDBWindowStore.java   |  4 ++--
 8 files changed, 31 insertions(+), 34 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 2b37f24..becb03d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -871,22 +871,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
             final StoreBuilder<WindowStore<K1, V2>> otherWindowStore =
                 createWindowedStateStore(windows, joined.keySerde(), joined.otherValueSerde(),
joinOtherName + "-store");
 
-
-            final KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(
-                thisWindowStore.name(),
-                windows.beforeMs + windows.afterMs + 1,
-                windows.maintainMs()
-            );
+            final KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindowStore.name());
 
             final ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters(thisWindowedStream,
thisWindowStreamName);
             final ProcessorGraphNode<K1, V1> thisWindowedStreamsNode = new ProcessorGraphNode<>(thisWindowStreamName,
thisWindowStreamProcessorParams);
             builder.addGraphNode(thisStreamsGraphNode, thisWindowedStreamsNode);
 
-            final KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(
-                otherWindowStore.name(),
-                windows.beforeMs + windows.afterMs + 1,
-                windows.maintainMs()
-            );
+            final KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindowStore.name());
 
             final ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters(otherWindowedStream,
otherWindowStreamName);
             final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamName,
otherWindowStreamProcessorParams);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 895dab4..34756d4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -27,15 +26,8 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K,
V> {
 
     private final String windowName;
 
-    /**
-     * @throws TopologyException if retention period of the join window is less than expected
-     */
-    KStreamJoinWindow(final String windowName, final long windowSizeMs, final long retentionPeriodMs)
{
+    KStreamJoinWindow(final String windowName) {
         this.windowName = windowName;
-
-        if (windowSizeMs > retentionPeriodMs)
-            throw new TopologyException("The retention period of the join window "
-                    + windowName + " must be no smaller than its window size.");
     }
 
     @Override
@@ -61,7 +53,8 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K, V>
{
             // since it will never be considered for join operations
             if (key != null) {
                 context().forward(key, value);
-                window.put(key, value);
+                // Every record basically starts a new window. We're using a window store
mostly for the retention.
+                window.put(key, value, context().timestamp());
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 03eaa07..3bda28d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -144,7 +144,10 @@ public class Stores {
      * @param name                  name of the store (cannot be {@code null})
      * @param retentionPeriod       length of time to retain data in the store (cannot be
negative)
      * @param numSegments           number of db segments (cannot be zero or negative)
-     * @param windowSize            size of the windows (cannot be negative)
+     * @param windowSize            size of the windows that are stored (cannot be negative).
Note: the window size
+     *                              is not stored with the records, so this value is used
to compute the keys that
+     *                              the store returns. No effort is made to validate this
parameter, so you must be
+     *                              careful to set it the same as the windowed keys you're
actually storing.
      * @param retainDuplicates      whether or not to retain duplicates.
      * @return an instance of {@link WindowBytesStoreSupplier}
      * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long,
boolean, long)} instead
@@ -211,6 +214,11 @@ public class Stores {
         if (segmentInterval < 1L) {
             throw new IllegalArgumentException("segmentInterval cannot be zero or negative");
         }
+        if (windowSize > retentionPeriod) {
+            throw new IllegalArgumentException("The retention period of the window store
"
+                                                   + name + " must be no smaller than its
window size. Got size=["
+                                                   + windowSize + "], retention=[" + retentionPeriod
+ "]");
+        }
 
         return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentInterval,
windowSize, retainDuplicates);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index ee4c19a..1685123 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -27,8 +27,12 @@ import org.apache.kafka.streams.processor.StateStore;
 public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
{
 
     /**
-     * Put a key-value pair with the current record time as the timestamp
-     * into the corresponding window
+     * Use the current record timestamp as the {@code windowStartTimestamp} and
+     * delegate to {@link WindowStore#put(Object, Object, long)}.
+     *
+     * It's highly recommended to use {@link WindowStore#put(Object, Object, long)} instead,
as the record timestamp
+     * is unlikely to be the correct windowStartTimestamp in general.
+     *
      * @param key The key to associate the value to
      * @param value The value to update, it can be null;
      *              if the serialized bytes are also null it is interpreted as deletes
@@ -40,7 +44,8 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K,
V>
      * Put a key-value pair with the given timestamp into the corresponding window
      * @param key The key to associate the value to
      * @param value The value; can be null
+     * @param windowStartTimestamp The timestamp of the beginning of the window to put the
key/value into
      * @throws NullPointerException If null is used for key.
      */
-    void put(K key, V value, long timestamp);
+    void put(K key, V value, long windowStartTimestamp);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 4347811..688e889 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -144,12 +144,12 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore
impl
     }
 
     @Override
-    public synchronized void put(final Bytes key, final byte[] value, final long timestamp)
{
+    public synchronized void put(final Bytes key, final byte[] value, final long windowStartTimestamp)
{
         // since this function may not access the underlying inner store, we need to validate
         // if store is open outside as well.
         validateStoreOpen();
         
-        final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0);
+        final Bytes keyBytes = WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp,
0);
         final LRUCacheEntry entry =
             new LRUCacheEntry(
                 value,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index aa9cbe6..785aacd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -77,9 +77,9 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore
     }
 
     @Override
-    public void put(final Bytes key, final byte[] value, final long timestamp) {
-        bytesStore.put(key, value, timestamp);
-        changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp, maybeUpdateSeqnumForDups()),
value);
+    public void put(final Bytes key, final byte[] value, final long windowStartTimestamp)
{
+        bytesStore.put(key, value, windowStartTimestamp);
+        changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp,
maybeUpdateSeqnumForDups()), value);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 5a27ed4..5162eac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -108,10 +108,10 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public void put(final K key, final V value, final long timestamp) {
+    public void put(final K key, final V value, final long windowStartTimestamp) {
         final long startNs = time.nanoseconds();
         try {
-            inner.put(keyBytes(key), serdes.rawValue(value), timestamp);
+            inner.put(keyBytes(key), serdes.rawValue(value), windowStartTimestamp);
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key, value);
             throw new ProcessorStateException(message, e);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 4c0e01f..d7bb523 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -70,10 +70,10 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public void put(final K key, final V value, final long timestamp) {
+    public void put(final K key, final V value, final long windowStartTimestamp) {
         maybeUpdateSeqnumForDups();
 
-        bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes),
serdes.rawValue(value));
+        bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, seqnum,
serdes), serdes.rawValue(value));
     }
 
     @Override


Mime
View raw message