kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6451: Simplifying KStreamReduce and KStreamAggregate
Date Tue, 30 Jan 2018 01:38:12 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 05e42e1  KAFKA-6451: Simplifying KStreamReduce and KStreamAggregate
05e42e1 is described below

commit 05e42e1df12d87a29c8127dec23d7a9c290901d6
Author: Tanvi Jaywant <tanvijaywant@Tanvis-Air.home>
AuthorDate: Mon Jan 29 17:28:49 2018 -0800

    KAFKA-6451: Simplifying KStreamReduce and KStreamAggregate
    
    [KAFKA-6451](https://issues.apache.org/jira/browse/KAFKA-6451)
    
    Simplified KStreamReduce and KStreamAggregate.
    Updated comments in KStreamAggregate.
    
    Author: Tanvi Jaywant <tanvijaywant@Tanvis-Air.home>
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>
    
    Closes #4477 from tanvijaywant31/KAFKA-6451
---
 .../streams/kstream/internals/KStreamAggregate.java      | 11 ++++++-----
 .../kafka/streams/kstream/internals/KStreamReduce.java   | 16 ++++++++--------
 .../kstream/internals/KGroupedStreamImplTest.java        |  1 +
 3 files changed, 15 insertions(+), 13 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index b1abdc2..95ad78e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -64,20 +64,21 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
 
         @Override
         public void process(K key, V value) {
-            if (key == null)
+            // If the key or value is null we don't need to proceed
+            if (key == null || value == null) {
                 return;
+            }
 
             T oldAgg = store.get(key);
 
-            if (oldAgg == null)
+            if (oldAgg == null) {
                 oldAgg = initializer.apply();
+            }
 
             T newAgg = oldAgg;
 
             // try to add the new value
-            if (value != null) {
-                newAgg = aggregator.apply(key, value, newAgg);
-            }
+            newAgg = aggregator.apply(key, value, newAgg);
 
             // update the store with the new value
             store.put(key, newAgg);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index d339624..0fd8f75 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -61,21 +61,21 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K,
K, V,
 
         @Override
         public void process(K key, V value) {
-            // If the key is null we don't need to proceed
-            if (key == null)
+            // If the key or value is null we don't need to proceed
+            if (key == null || value == null) {
                 return;
+            }
 
             V oldAgg = store.get(key);
             V newAgg = oldAgg;
 
             // try to add the new value
-            if (value != null) {
-                if (newAgg == null) {
-                    newAgg = value;
-                } else {
-                    newAgg = reducer.apply(newAgg, value);
-                }
+            if (newAgg == null) {
+                newAgg = value;
+            } else {
+                newAgg = reducer.apply(newAgg, value);
             }
+            
             // update the store with the new value
             store.put(key, newAgg);
             tupleForwarder.maybeForward(key, newAgg, oldAgg);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index c8b7c18..9d8b479 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -621,6 +621,7 @@ public class KGroupedStreamImplTest {
         driver.process(TOPIC, "1", "D");
         driver.process(TOPIC, "3", "E");
         driver.process(TOPIC, "3", "F");
+        driver.process(TOPIC, "3", null);
         driver.flushState();
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message