kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Boquan Tang (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-8228) Exactly once semantics break during server restart for kafka-streams application
Date Sat, 13 Apr 2019 09:12:00 GMT
Boquan Tang created KAFKA-8228:
----------------------------------

             Summary: Exactly once semantics break during server restart for kafka-streams
application
                 Key: KAFKA-8228
                 URL: https://issues.apache.org/jira/browse/KAFKA-8228
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.2.0
            Reporter: Boquan Tang


We are using 2.2.0 for kafka-streams client and 2.0.1 for server.

We have a simple kafka-streams application that has the following topology:
{code:java}
Source: KSTREAM-SOURCE-0000000004 (topics: [deduped-adclick]) 
--> KSTREAM-TRANSFORM-0000000005 
Processor: KSTREAM-TRANSFORM-0000000005 (stores: [uid-offset-store]) 
--> KSTREAM-TRANSFORM-0000000006 
<-- KSTREAM-SOURCE-0000000004 
Source: KSTREAM-SOURCE-0000000000 (topics: [advertiser-budget]) 
--> KTABLE-SOURCE-0000000001 
Source: KSTREAM-SOURCE-0000000002 (topics: [advertisement-budget]) 
--> KTABLE-SOURCE-0000000003 
Processor: KSTREAM-TRANSFORM-0000000006 (stores: [advertiser-budget-store, advertisement-budget-store])

--> KSTREAM-SINK-0000000007 
<-- KSTREAM-TRANSFORM-0000000005 
Sink: KSTREAM-SINK-0000000007 (topic: budget-adclick) 
<-- KSTREAM-TRANSFORM-0000000006 
Processor: KTABLE-SOURCE-0000000001 (stores: [advertiser-budget-store]) 
--> none 
<-- KSTREAM-SOURCE-0000000000 
Processor: KTABLE-SOURCE-0000000003 (stores: [advertisement-budget-store]) 
--> none 
<-- KSTREAM-SOURCE-0000000002{code}
The *Processor: KSTREAM-TRANSFORM-0000000005 (stores: [uid-offset-store])* is added additionally
to investigate this EOS broken issue, and its transform() is like this (specific K V class
name is removed):
{code:java}
public void init(final ProcessorContext context) {
uidStore = (WindowStore<String, Long>) context.getStateStore(uidStoreName);
this.context = context;
}

public KeyValue<K, V> transform(final K key, final V value) {
final long offset = context.offset();
final String uid = value.getUid();
final long beginningOfHour = Instant.ofEpochMilli(clickTimestamp).atZone(ZoneId.systemDefault()).withMinute(0).withSecond(0).toEpochSecond()
* 1000;
final Long maybeLastOffset = uidStore.fetch(uid, beginningOfHour);
final boolean dupe = null != maybeLastOffset && offset == maybeLastOffset;
uidStore.put(uid, offset, beginningOfHour);
if (dupe) {
LOGGER.warn("Find duplication in partition {}, uid is {}, current offset is {}, last offset
is {}",
context.partition(),
uid,
value.getAdInfo().getAdId(),
offset,
maybeLastOffset);
statsEmitter.count("duplication");
}
return dupe ? null : new KeyValue<>(key, value);
}
{code}
Although not 100% reproduce-able, we found that after we restart one or more server on the
cluster side, the duplication would be found:
{code:java}
2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] [kafka-producer-network-thread
| adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer]
[Producer clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer,
transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 (*:9092) could
not be established. Broker may not be available.
2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] [kafka-producer-network-thread
| adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer]
[Producer clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer,
transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 (*:9092) could
not be established. Broker may not be available.
2019-04-12T07:14:02Z WARN [org.apache.kafka.clients.NetworkClient] [kafka-producer-network-thread
| adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer]
[Producer clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer,
transactionalId=adclick-budget-decorator-streams-0_12] Connection to node 2 (*:9092) could
not be established. Broker may not be available.
2019-04-12T07:27:39Z WARN [org.apache.kafka.streams.processor.internals.StreamThread] [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
stream-thread [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
Detected task 0_9 that got migrated to another thread. This implies that this thread missed
a rebalance and dropped out of the consumer group. Will try to rejoin the consumer group.
Below is the detailed description of the task: >TaskId: 0_9 >> ProcessorTopology:
> KSTREAM-SOURCE-0000000000: > topics: [advertiser-budget] > children: [KTABLE-SOURCE-0000000001]
> KTABLE-SOURCE-0000000001: > states: [advertiser-budget-store] > KSTREAM-SOURCE-0000000004:
> topics: [deduped-adclick] > children: [KSTREAM-TRANSFORM-0000000005] > KSTREAM-TRANSFORM-0000000005:
> states: [uid-offset-store] > children: [KSTREAM-TRANSFORM-0000000006] > KSTREAM-TRANSFORM-0000000006:
> states: [advertiser-budget-store, advertisement-budget-store] > children: [KSTREAM-SINK-0000000007]
> KSTREAM-SINK-0000000007: > topic: StaticTopicNameExtractor(budget-adclick) > KSTREAM-SOURCE-0000000002:
> topics: [advertisement-budget] > children: [KTABLE-SOURCE-0000000003] > KTABLE-SOURCE-0000000003:
> states: [advertisement-budget-store] >Partitions [advertiser-budget-9, deduped-adclick-9,
advertisement-budget-9]
2019-04-12T07:27:40Z WARN [org.apache.kafka.common.utils.AppInfoParser] [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException javax.management.InstanceAlreadyExistsException:
kafka.producer:type=app-info,id=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_18-producer
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:424) at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287) at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createProducer(StreamThread.java:457)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.lambda$createTask$0(StreamThread.java:447)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:192)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:172)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:448)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:399)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:384)
at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:148)
at org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:107)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:281)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:292)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:344)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:342)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
2019-04-12T07:30:28Z WARN [com.indeed.adclick.budget.decorator.streams.transformers.UidDedupeTransformer]
[adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] Find
duplication in partition 18, uid is 1d8868ce40umu002, current offset is 212770034, last offset
is 212770034
2019-04-12T07:30:28Z WARN [com.indeed.adclick.budget.decorator.streams.transformers.UidDedupeTransformer]
[adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] Find
duplication in partition 18, uid is 1d8868du40u1u001, current offset is 212770036, last offset
is 212770036{code}
And our kafka-streams are configured simply like this:
{code:java}
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaStreamsApplicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), 1000);
{code}

In my understanding, the uid-offset-store in topology should be committed in the same transaction
with that of the consumer offsets of budget-adclick topic, so in theory the duplication check
should not hit, am I understanding it correctly?

I noticed one of the unusual issue on the server side is that the group coordinator needs
long time to initiate when server restarts, that caused the long halt between node loss and
eventual task migration.

Please let me know if I need to look into certain server side logs, I can share any finding
with you guys, or even perform more of destroy test in order to re-produce the issue so we
can investigate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message