kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null
Date Tue, 16 Jan 2018 00:25:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326652#comment-16326652

ASF GitHub Bot commented on KAFKA-6378:

andybryant opened a new pull request #4424: KAFKA-6378 KStream-GlobalKTable null KeyValueMapper
URL: https://github.com/apache/kafka/pull/4424
   For KStream-GlobalKTable joins let `null` `KeyValueMapper` results indicate no match
   For KStream-GlobalKTable joins, a `KeyValueMapper` is used to derive a key from the stream
records into the `GlobalKTable`. For some stream values there may be no valid reference to
the table stream. This patch allows developers to use `null` return values to indicate there
is no possible match. This is possible in this case since `null` is never a valid key value
for a `GlobalKTable`.
   Without this patch, providing a `null` value caused the stream to crash on Kafka 1.0.
   I added unit tests for KStream-GlobalKTable left and inner joins, since they were missing.
I also covered this additional scenario where `KeyValueMapper` returns `null` to insure it
is handled correctly.
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null
> --------------------------------------------------------------------------------------
>                 Key: KAFKA-6378
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6378
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Andy Bryant
>            Priority: Major
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the stream
fails with a NullPointerException (see stacktrace below). On Kafka the stream processes
this successfully, calling the ValueJoiner with the table value set to null.
> The use-case for this is joining a stream to a table containing reference data where
the stream foreign key may be null. There is no straight-forward workaround in this case with
Kafka 1.0.0 without having to resort to either generating a key that will never match or branching
the stream for records that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" java.lang.NullPointerException
> 	at java.base/java.util.Objects.requireNonNull(Objects.java:221)
> 	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
> 	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
> 	at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
> 	at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
> 	at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
> 	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> 	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> 	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> 	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
> 	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
> 	at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
> 	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

This message was sent by Atlassian JIRA

View raw message