kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Simon Dean (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-8689) Cannot Name Join State Store Topics
Date Fri, 19 Jul 2019 13:54:00 GMT
Simon Dean created KAFKA-8689:
---------------------------------

             Summary: Cannot Name Join State Store Topics
                 Key: KAFKA-8689
                 URL: https://issues.apache.org/jira/browse/KAFKA-8689
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.3.0
            Reporter: Simon Dean


Performing a join on two KStreams, produces two state store topics.  Currently the names
state store topics are auto generated and cannot be overridden. 

Example code:

 
{code:java}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class JoinTopicNamesExample {

    public static void main(final String[] args) throws InterruptedException {
        new Thread(() -> {
            produce(args);
        }).run();

        new Thread(() -> {
            try {
                streams(args);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).run();
    }

    private static void produce(String[] args) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);

        KafkaProducer<String, Long> producer = new KafkaProducer<>(props);

        for (long i = 0; i < 10; i++) {
            producer.send(new ProducerRecord("left", Long.toString(i), i));
        }

        for (long i = 0; i < 10; i++) {
            producer.send(new ProducerRecord("right", Long.toString(i), i));
        }
    }

    private static void streams(String[] args) throws InterruptedException {
        final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
        final Properties streamsConfiguration = new Properties();
        // Give the Streams application a unique name.  The name must be unique in the Kafka
cluster
        // against which the application is run.
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-topic-names-example");
        streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "user-region-lambda-example-client");
        // Where to find Kafka broker(s).
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // Specify default (de)serializers for record keys and for record values.
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
        // Records should be flushed every 10 seconds. This is less than the default
        // in order to keep this example interactive.
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);

        final Serde<String> stringSerde = Serdes.String();
        final Serde<Long> longSerde = Serdes.Long();

        final StreamsBuilder builder = new StreamsBuilder();

        final KStream<String, Long> left = builder.stream("left", Consumed.with(stringSerde,
longSerde));
        final KStream<String, Long> right = builder.stream("right", Consumed.with(stringSerde,
longSerde));

        left.join(
                right,
                (value1, value2) -> value1 + value2,
                JoinWindows.of(Duration.ofHours(1)), 
                Joined.as("sum"));

        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

        streams.start();

        Thread.sleep(TimeUnit.MINUTES.toMillis(1));

        // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

}
{code}
 

 

Here are the topics produce by the example code:
 * join-topic-names-example-KSTREAM-JOINOTHER-0000000005-store-changelog
 * join-topic-names-example-KSTREAM-JOINTHIS-0000000004-store-changelog
 * left
 * right

In the example code above, a material name is passed into the Join with Joined.as("sum"). 
The "sum" name is ignored when the Kafka Stream decides on the state store topic names. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Mime
View raw message