kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Simon Dean (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-8689) Cannot Name Join State Store Topics
Date Fri, 19 Jul 2019 13:54:00 GMT
Simon Dean created KAFKA-8689:

             Summary: Cannot Name Join State Store Topics
                 Key: KAFKA-8689
                 URL: https://issues.apache.org/jira/browse/KAFKA-8689
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.3.0
            Reporter: Simon Dean

Performing a join on two KStreams, produces two state store topics.  Currently the names
state store topics are auto generated and cannot be overridden. 

Example code:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class JoinTopicNamesExample {

    public static void main(final String[] args) throws InterruptedException {
        new Thread(() -> {

        new Thread(() -> {
            try {
            } catch (InterruptedException e) {

    private static void produce(String[] args) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);

        KafkaProducer<String, Long> producer = new KafkaProducer<>(props);

        for (long i = 0; i < 10; i++) {
            producer.send(new ProducerRecord("left", Long.toString(i), i));

        for (long i = 0; i < 10; i++) {
            producer.send(new ProducerRecord("right", Long.toString(i), i));

    private static void streams(String[] args) throws InterruptedException {
        final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
        final Properties streamsConfiguration = new Properties();
        // Give the Streams application a unique name.  The name must be unique in the Kafka
        // against which the application is run.
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-topic-names-example");
        streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "user-region-lambda-example-client");
        // Where to find Kafka broker(s).
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // Specify default (de)serializers for record keys and for record values.
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
        // Records should be flushed every 10 seconds. This is less than the default
        // in order to keep this example interactive.
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);

        final Serde<String> stringSerde = Serdes.String();
        final Serde<Long> longSerde = Serdes.Long();

        final StreamsBuilder builder = new StreamsBuilder();

        final KStream<String, Long> left = builder.stream("left", Consumed.with(stringSerde,
        final KStream<String, Long> right = builder.stream("right", Consumed.with(stringSerde,

                (value1, value2) -> value1 + value2,

        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);



        // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));



Here are the topics produce by the example code:
 * join-topic-names-example-KSTREAM-JOINOTHER-0000000005-store-changelog
 * join-topic-names-example-KSTREAM-JOINTHIS-0000000004-store-changelog
 * left
 * right

In the example code above, a material name is passed into the Join with Joined.as("sum"). 
The "sum" name is ignored when the Kafka Stream decides on the state store topic names. 

This message was sent by Atlassian JIRA

View raw message