kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guozhang Wang (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-6063) StreamsException is thrown after the changing `partitions`
Date Mon, 16 Oct 2017 23:37:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-6063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Guozhang Wang updated KAFKA-6063:
---------------------------------
    Labels: user-experience  (was: )

> StreamsException is thrown after the changing `partitions`
> ----------------------------------------------------------
>
>                 Key: KAFKA-6063
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6063
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.0
>         Environment: macOS 10.12
> kafka 0.11.0.1
>            Reporter: Akihito Nakano
>              Labels: user-experience
>
> Hi.
> "org.apache.kafka.streams.errors.StreamsException" is thrown in following case.
> h3. Create topic
> {code:java}
> $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions
6 --topic word-count-input
> {code}
> h3. Create Kafka Streams Application
> {code:java}
> public class WordCountApp {
>     public static void main(String[] args) {
>         Properties config = new Properties();
>         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
> ...
> ...
> {code}
> h3.  Ensure that it works fine
> {code:java}
> $ java -jar wordcount.jar
> KafkaStreams processID: b4a559cb-7075-4ece-a718-5043a432900b
>         StreamsThread appId: wordcount-application
> ...
> ...
> {code}
> h3.  Change "partitions"
> {code:java}
> $ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 8 --topic word-count-input
> Adding partitions succeeded!
> {code}
> h3.  When I start Application, StreamsException is thrown
> {code:java}
> $ java -jar wordcount.jar
> KafkaStreams processID: 8a9cbf03-b841-4cb2-9d44-6456b4520522
>         StreamsThread appId: wordcount-applicationn
>                 StreamsThread clientId: wordcount-applicationn-8a9cbf03-b841-4cb2-9d44-6456b4520522
>                 StreamsThread threadId: wordcount-applicationn-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1
>                 Active tasks:
>                         Running:
>                         Suspended:
>                         Restoring:
>                         New:
>                 Standby tasks:
>                         Running:
>                         Suspended:
>                         Restoring:
>                         New:
> Exception in thread "wordcount-application-8a9cbf03-b841-4cb2-9d44-6456b4520522-StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: Could not create internal topics.
> 	at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:82)
> 	at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:660)
> 	at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:398)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:365)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:522)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> {code}
> If I change the application id, Application works again.
> Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message