kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hiroshi Nakahara (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode
Date Wed, 24 Jul 2019 02:30:00 GMT
Hiroshi Nakahara created KAFKA-8705:
---------------------------------------

             Summary: NullPointerException was thrown by topology optimization when two MergeNodes
have common KeyChaingingNode
                 Key: KAFKA-8705
                 URL: https://issues.apache.org/jira/browse/KAFKA-8705
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.3.0
            Reporter: Hiroshi Nakahara


NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode.

Kafka Stream version: 2.3.0
h3. Code
{code:java}
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class Main {
    public static void main(String[] args) {
        final StreamsBuilder streamsBuilder = new StreamsBuilder();
        final KStream<Integer, Integer> parentStream = streamsBuilder.stream("parentTopic",
Consumed.with(Serdes.Integer(), Serdes.Integer()))
                .selectKey(Integer::sum);  // To make parentStream KeyChaingingPoint
        final KStream<Integer, Integer> childStream1 = parentStream.mapValues(v ->
v + 1);
        final KStream<Integer, Integer> childStream2 = parentStream.mapValues(v ->
v + 2);
        final KStream<Integer, Integer> childStream3 = parentStream.mapValues(v ->
v + 3);
        childStream1
                .merge(childStream2)
                .merge(childStream3)
                .to("outputTopic");

        final Properties properties = new Properties();
        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
        streamsBuilder.build(properties);
    }
}
{code}
h3. Expected result

streamsBuilder.build should create Topology without throwing Exception.  The expected topology
is:
{code:java}
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [parentTopic])
      --> KSTREAM-KEY-SELECT-0000000001
    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
      --> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003, KSTREAM-MAPVALUES-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
      --> KSTREAM-MERGE-0000000005
      <-- KSTREAM-KEY-SELECT-0000000001
    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
      --> KSTREAM-MERGE-0000000005
      <-- KSTREAM-KEY-SELECT-0000000001
    Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])
      --> KSTREAM-MERGE-0000000006
      <-- KSTREAM-KEY-SELECT-0000000001
    Processor: KSTREAM-MERGE-0000000005 (stores: [])
      --> KSTREAM-MERGE-0000000006
      <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003
    Processor: KSTREAM-MERGE-0000000006 (stores: [])
      --> KSTREAM-SINK-0000000007
      <-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004
    Sink: KSTREAM-SINK-0000000007 (topic: outputTopic)
      <-- KSTREAM-MERGE-0000000006
{code}
h3. Actual result

NullPointerException was thrown with the following stacktrace.
{code:java}
Exception in thread "main" java.lang.NullPointerException
	at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
	at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
	at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
	at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
	at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
	at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
	at Main.main(Main.java:24){code}
h3. Cause

This exception occurs in InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
{code:java}
    private void maybeUpdateKeyChangingRepartitionNodeMap() {
        final Map<StreamsGraphNode, Set<StreamsGraphNode>> mergeNodesToKeyChangers
= new HashMap<>();
        for (final StreamsGraphNode mergeNode : mergeNodes) {
            mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
            final Collection<StreamsGraphNode> keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet();
            for (final StreamsGraphNode key : keys) {
                final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode,
node -> node.parentNodes().contains(key));
                if (maybeParentKey != null) {
                    mergeNodesToKeyChangers.get(mergeNode).add(key);
                }
            }
        }

        for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry :
mergeNodesToKeyChangers.entrySet()) {
            final StreamsGraphNode mergeKey = entry.getKey();
            final Collection<StreamsGraphNode> keyChangingParents = entry.getValue();
            final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes = new LinkedHashSet<>();
            for (final StreamsGraphNode keyChangingParent : keyChangingParents) {
                repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
                keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent);
            }

            keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey, repartitionNodes);
        }
    }{code}
In the example, there will be two elements in mergeNodesToKeyChangers.  (KSTREAM-MERGE-0000000005, KSTREAM-MERGE-0000000006) 
And each element has one common keyChagingPrarent.  (KSTREAM-KEY-SELECT-0000000001)
Also keyChangingOperationsToOptimizableRepartitionNodes has one element.  (KSTREAM-KEY-SELECT-0000000001)

When the first element is processed in the second for loop, KSTREAM-MERGE-0000000005 is added
to keyChangingOperationsToOptimizableRepartitionNodes, and KSTREAM-KEY-SELECT-0000000001 is
removed from keyChangingOperationsToOptimizableRepartitionNodes.
As a result, when the second element is processed in the seconf for loop, KSTREAM-KEY-SELECT-0000000001
is not found in keyChangingOperationsToOptimizableRepartitionNodes and it caused NullPointerException.

 

It is the first time for me to report the issue in Kafka.  Please let me know if further
information is required.  Thank you



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Mime
View raw message