kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [44/50] [abbrv] kafka git commit: KAFKA-3521: validate null keys in Streams DSL implementations
Date Mon, 11 Apr 2016 23:10:00 GMT
KAFKA-3521: validate null keys in Streams DSL implementations

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1197 from guozhangwang/K3521


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c595657
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c595657
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c595657

Branch: refs/heads/0.10.0
Commit: 8c59565761a42984335294683c3501df8427ce62
Parents: 2a8fa28
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri Apr 8 13:30:46 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Apr 8 13:30:46 2016 -0700

----------------------------------------------------------------------
 .../kstream/internals/ChangedSerializer.java    | 16 +++++++++++++++-
 .../kstream/internals/KStreamJoinWindow.java    |  8 ++++++--
 .../kstream/internals/KStreamKStreamJoin.java   |  5 +++++
 .../internals/KStreamKTableLeftJoin.java        |  6 +++++-
 .../kstream/internals/KStreamTransform.java     | 10 +++++-----
 .../internals/KStreamWindowAggregate.java       |  5 +++++
 .../kstream/internals/KStreamWindowReduce.java  |  5 +++++
 .../kstream/internals/KTableKTableJoin.java     |  5 +++++
 .../kstream/internals/KTableKTableLeftJoin.java |  5 +++++
 .../internals/KTableKTableOuterJoin.java        |  5 +++++
 .../internals/KTableKTableRightJoin.java        |  5 +++++
 .../kstream/internals/KTableRepartitionMap.java |  5 +++++
 .../processor/internals/StandbyContextImpl.java | 20 ++++++++++----------
 13 files changed, 81 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index e9b7cad..5ea0791 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.StreamsException;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
@@ -39,8 +40,21 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>
{
 
     @Override
     public byte[] serialize(String topic, Change<T> data) {
+        byte[] serializedKey;
+
         // only one of the old / new values would be not null
-        byte[] serializedKey = inner.serialize(topic, data.newValue != null ? data.newValue
: data.oldValue);
+        if (data.newValue != null) {
+            if (data.oldValue != null)
+                throw new StreamsException("Both old and new values are not null (" + data.oldValue
+                        + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
+
+            serializedKey = inner.serialize(topic, data.newValue);
+        } else {
+            if (data.oldValue == null)
+                throw new StreamsException("Both old and new values are null in ChangeSerializer,
which is not allowed.");
+
+            serializedKey = inner.serialize(topic, data.oldValue);
+        }
 
         ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
         buf.put(serializedKey);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 5b83b28..94e0b88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -55,8 +55,12 @@ class KStreamJoinWindow<K, V> implements ProcessorSupplier<K,
V> {
 
         @Override
         public void process(K key, V value) {
-            context().forward(key, value);
-            window.put(key, value);
+            // if the key is null, we do not need to put the record into window store
+            // since it will never be considered for join operations
+            if (key != null) {
+                context().forward(key, value);
+                window.put(key, value);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index a4ac9b3..d8caf3c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -63,6 +64,10 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K,
V1> {
 
         @Override
         public void process(K key, V1 value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KStream-KStream join operator
with other window state store " + otherWindowName + " should not be null.");
+
             boolean needOuterJoin = KStreamKStreamJoin.this.outer;
 
             long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
index dfca019..92b9b59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
@@ -55,7 +55,11 @@ class KStreamKTableLeftJoin<K, R, V1, V2> implements ProcessorSupplier<K,
V1> {
 
         @Override
         public void process(K key, V1 value) {
-            context().forward(key, joiner.apply(value, valueGetter.get(key)));
+            // if the key is null, we do not need proceed joining
+            // the record with the table
+            if (key != null) {
+                context().forward(key, joiner.apply(value, valueGetter.get(key)));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 4299c66..09dddfe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -34,13 +35,12 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K,
V> {
 
     @Override
     public Processor<K, V> get() {
-        return new KStreamTransformProcessor(transformerSupplier.get());
+        return new KStreamTransformProcessor<>(transformerSupplier.get());
     }
 
-    public static class KStreamTransformProcessor<K1, V1, K2, V2> implements Processor<K1,
V1> {
+    public static class KStreamTransformProcessor<K1, V1, K2, V2> extends AbstractProcessor<K1,
V1> {
 
         private final Transformer<K1, V1, KeyValue<K2, V2>> transformer;
-        private ProcessorContext context;
 
         public KStreamTransformProcessor(Transformer<K1, V1, KeyValue<K2, V2>>
transformer) {
             this.transformer = transformer;
@@ -48,14 +48,14 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K,
V> {
 
         @Override
         public void init(ProcessorContext context) {
+            super.init(context);
             transformer.init(context);
-            this.context = context;
         }
 
         @Override
         public void process(K1 key, V1 value) {
             KeyValue<K2, V2> pair = transformer.transform(key, value);
-            context.forward(pair.key, pair.value);
+            context().forward(pair.key, pair.value);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 76964f9..f36cc8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -72,6 +72,11 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements
KStrea
 
         @Override
         public void process(K key, V value) {
+            // if the key is null, we do not need proceed aggregating the record
+            // the record with the table
+            if (key == null)
+                return;
+
             // first get the matching windows
             long timestamp = context().timestamp();
             Map<Long, W> matchedWindows = windows.windowsFor(timestamp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index d532e79..6c05ce3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -69,6 +69,11 @@ public class KStreamWindowReduce<K, V, W extends Window> implements
KStreamAggPr
 
         @Override
         public void process(K key, V value) {
+            // if the key is null, we do not need proceed aggregating the record
+            // the record with the table
+            if (key == null)
+                return;
+
             // first get the matching windows
             long timestamp = context().timestamp();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 6eb27b6..24c8da6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R, V1,
 
         @Override
         public void process(K key, Change<V1> change) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable join operator should not
be null.");
+
             R newValue = null;
             R oldValue = null;
             V2 value2 = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 00e872e..4bf45ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
 
         @Override
         public void process(K key, Change<V1> change) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable left-join operator should
not be null.");
+
             R newValue = null;
             R oldValue = null;
             V2 value2 = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 6ab0ae9..49eed53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
 
         @Override
         public void process(K key, Change<V1> change) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable outer-join operator should
not be null.");
+
             R newValue = null;
             R oldValue = null;
             V2 value2 = valueGetter.get(key);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index a6a13fc..7443d4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -62,6 +63,10 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R,
 
         @Override
         public void process(K key, Change<V1> change) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable right-join operator should
not be null.");
+
             R newValue = null;
             R oldValue = null;
             V2 value2 = valueGetter.get(key);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index ff69c37..142a279 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -77,6 +78,10 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
         public void process(K key, Change<V> change) {
             KeyValue<K1, V1> newPair = computeValue(key, change.newValue);
 
+            // the selected repartition key should never be null
+            if (newPair.key == null)
+                throw new StreamsException("Record key for KTable repartition operator should
not be null.");
+
             context().forward(newPair.key, new Change<>(newPair.value, null));
 
             if (change.oldValue != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8c595657/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index ea008b8..468fe74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -108,51 +108,51 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
 
     @Override
     public StateStore getStateStore(String name) {
-        throw new UnsupportedOperationException("getStateStore() not supported.");
+        throw new UnsupportedOperationException("this should not happen: getStateStore()
not supported in standby tasks.");
     }
 
     @Override
     public String topic() {
-        throw new UnsupportedOperationException("topic() not supported.");
+        throw new UnsupportedOperationException("this should not happen: topic() not supported
in standby tasks.");
     }
 
     @Override
     public int partition() {
-        throw new UnsupportedOperationException("partition() not supported.");
+        throw new UnsupportedOperationException("this should not happen: partition() not
supported in standby tasks.");
     }
 
     @Override
     public long offset() {
-        throw new UnsupportedOperationException("offset() not supported.");
+        throw new UnsupportedOperationException("this should not happen: offset() not supported
in standby tasks.");
     }
 
     @Override
     public long timestamp() {
-        throw new UnsupportedOperationException("timestamp() not supported.");
+        throw new UnsupportedOperationException("this should not happen: timestamp() not
supported in standby tasks.");
     }
 
     @Override
     public <K, V> void forward(K key, V value) {
-        throw new UnsupportedOperationException("forward() not supported.");
+        throw new UnsupportedOperationException("this should not happen: forward() not supported
in standby tasks.");
     }
 
     @Override
     public <K, V> void forward(K key, V value, int childIndex) {
-        throw new UnsupportedOperationException("forward() not supported.");
+        throw new UnsupportedOperationException("this should not happen: forward() not supported
in standby tasks.");
     }
 
     @Override
     public <K, V> void forward(K key, V value, String childName) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: forward() not supported
in standby tasks.");
     }
 
     @Override
     public void commit() {
-        throw new UnsupportedOperationException("commit() not supported.");
+        throw new UnsupportedOperationException("this should not happen: commit() not supported
in standby tasks.");
     }
 
     @Override
     public void schedule(long interval) {
-        throw new UnsupportedOperationException("schedule() not supported.");
+        throw new UnsupportedOperationException("this should not happen: schedule() not supported
in standby tasks.");
     }
 }


Mime
View raw message