kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (KAFKA-6401) InvalidStateStoreException immediately after starting streams
Date Tue, 26 Dec 2017 18:33:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-6401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Matthias J. Sax resolved KAFKA-6401.
------------------------------------
    Resolution: Not A Problem

> InvalidStateStoreException immediately after starting streams
> -------------------------------------------------------------
>
>                 Key: KAFKA-6401
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6401
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>         Environment: ubuntu 14.04
>            Reporter: Mostafa Asgari
>            Priority: Minor
>         Attachments: Test.java
>
>
> Hi
> I wrote a simple kafka streams application. After I start the stream, if I call KafkaStreams.store
immediately, I will get InvalidStateStoreException:
> {code:java}
> org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-table,
may have migrated to another instance.
> {code}
> Here is the complete code :
> {code:java}
>  final StreamsBuilder builder = new StreamsBuilder();
>         final KTable<String, Integer> table = builder.table(TOPIC_NAME , Consumed.with(Serdes.String(),
Serdes.Integer(),
>                 new FailOnInvalidTimestamp(), Topology.AutoOffsetReset.EARLIEST), Materialized.as("my-table"));
>         Topology topology = builder.build();
>         Properties props = new Properties();
>         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"my-streams-app");
>         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
>         props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,"10000");
>         final KafkaStreams streams = new KafkaStreams( topology , props );
>         Runtime.getRuntime().addShutdownHook(new Thread(){
>             @Override
>             public void run() {
>                 streams.close();
>             }
>         });
>         streams.start();
>         ReadOnlyKeyValueStore<String, Integer> store streams.store(table.queryableStoreName(),
QueryableStoreTypes.keyValueStore());
> {code}
> However if after start() method, I write Thread.sleep( SOME_AMOUNT ) I will not get the
exception any more.
> I wonder if it is a bug or I did something wrong.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message