kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source
Date Thu, 23 Feb 2017 22:39:44 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881456#comment-15881456
] 

Matthias J. Sax commented on KAFKA-4791:
----------------------------------------

Ack. My bad. I thought [~clouTrix] calls {{connectStateStoreNameToSourceTopics}}, but he doesn't...
Calling {{addStateStore}} is of course fine. [~bbejeck] feel free to start working on this.
:)

> Kafka Streams - unable to add state stores when using wildcard topics on the source
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-4791
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4791
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.1
>         Environment: Java 8
>            Reporter: Bart Vercammen
>            Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it seems not possible
to attach state stores when using wildcard topics on the sources.
> Inside {{addStateStore}}, the processor gets connected to the state store with {{connectProcessorAndStateStore}},
and there it will try to connect the state store with the source topics from the processor:
{{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
>     private Set<String> findSourceTopicsForProcessorParents(String [] parents)
{
>         final Set<String> sourceTopics = new HashSet<>();
>         for (String parent : parents) {
>             NodeFactory nodeFactory = nodeFactories.get(parent);
>             if (nodeFactory instanceof SourceNodeFactory) {
>                 sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) nodeFactory).getTopics()));
>             } else if (nodeFactory instanceof ProcessorNodeFactory) {
>                 sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
nodeFactory).parents));
>             }
>         }
>         return sourceTopics;
>     }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) nodeFactory).getTopics()))}}
will fail as there are no topics inside the {{SourceNodeFactory}} object, only a pattern ({{.getTopics}}
returns null)
> I also tried to search for some unit tests inside the Kafka Streams project that cover
this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on wildcard topics,
but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message