kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Arnaud Villevieille (Jira)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-9005) Kafka stream: “TopicAuthorizationException: Not authorized to access topics” for an internal state store
Date Wed, 09 Oct 2019 08:44:00 GMT
Arnaud Villevieille created KAFKA-9005:
------------------------------------------

             Summary: Kafka stream: “TopicAuthorizationException: Not authorized to access
topics” for an internal state store
                 Key: KAFKA-9005
                 URL: https://issues.apache.org/jira/browse/KAFKA-9005
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.3.0
            Reporter: Arnaud Villevieille


Java: OpenJdk 11
 Java: OpenJdk 11
 Kafka server: 2.2.0
 Kafka streams lib: 2.3.0

i have created a stackoverflow  query [here|[https://stackoverflow.com/questions/58299827/kafka-stream-topicauthorizationexception-not-authorized-to-access-topics-for]]

 

I am trying to deploy my Kafka streams application in a *docker* container and it fails while
trying to create an internal state store with a TopicAuthorizationException.It works well
locally. The main difference between locally and on the server is that there it connects to
a server deployed Kafka and authenticates using the usual *Kerberos* auth.I fail to understand
the link between authentication and the *local stores*.

My stream looks like that:
{code:java}
StreamsBuilder builder = new StreamsBuilder();

        //We stream from the source topic
        KStream<String, EnrichedMessagePayload> sourceMessagesStream = builder.stream(sourceTopic,
Consumed
                .with(Serdes.serdeFrom(String.class), INPUT_SERDE));

        //We group per room and window
        TimeWindowedKStream<String, EnrichedMessagePayload> windowed = sourceMessagesStream
                .groupByKey().windowedBy(TimeWindows.of(Duration.ofMillis(windowSize)).grace(Duration.ZERO));

        //We make them a list
        KStream<Windowed<String>, WindowedMessages> grouped = windowed
                .aggregate(WindowedMessages::new,
                        (key, value, aggregate) -> aggregate.add(value),
                        Materialized.with(Serdes.String(), Serdes.serdeFrom(windowSerializer,
windowSerializer)))
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream();

        //Filter
        KStream<Windowed<String>, FilterResult> filtered = grouped
                .mapValues((readOnlyKey, value) -> filterWindow(value.getMessages()));

        //Re map to its original form
        KStream<String, OutputPayload> reduced = filtered
                .flatMap((KeyValueMapper<Windowed<String>, WindowedMessages, Iterable<KeyValue<String,
OutputPayload>>>) (key, value) -> value
                        .getMessages()
                        .stream().map(payload -> new KeyValue<>(key.key(), payload))
                        .collect(toList()));


        //Target topic
        reduced.to(sinkTopic, Produced
                .with(Serdes.serdeFrom(String.class), SERDE));

        return builder.build();
{code}
It receives a stream of messages, windows it, aggregates all the messages per window, keeps
only the last version of the list with a 'Suppressed' and then flatMaps the whole to forward
it to another topic.

Every time i get that kind of exception:

 
{quote}org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access
topics: [Topic authorization failed.]> Error message was: org.apache.kafka.common.errors.TopicAuthorizationException:
Not authorized to access topics: [Topic authorization failed.]2019-10-09 06:44:03.255 +0000
ERROR [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] [StreamThread.java:777]
- stream-thread [filterer-d83f2f60-b2bd-40b2-a314-4b20f32918f7-StreamThread-1] Encountered
the following unexpected Kafka exception during processing, this usually indicate Streams
internal errors: - [rapid_r-live-message-filterer-0-0-1-snapshot-10.1e842f1a-ea60-11e9-9c7d-024298932744]
- [] - []org.apache.kafka.streams.errors.StreamsException: Could not create topic filterer-KTABLE-SUPPRESS-STATE-STORE-0000000005-changelog.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:212)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:226)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:104)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:971)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:618)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:424)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:622)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:107)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)Caused
by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics:
[Topic authorization failed.]
{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message