samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aleksandar Bircakovic <>
Subject RE: Messages lost after broker failure
Date Wed, 05 Apr 2017 15:26:01 GMT
Thank you Jagadish.

Regarding leader selection, we exposed some additional metrics that are supposed to tell us
which broker is leader for each partition and we confirmed your theory, Samza consumer if
fully aware who is the current leader.

Entire logs are pretty big so here are some parts instead. File container_logs.txt contains
logs from containers. Here we can see kafka.common.UnknownException. Kafka_logs.txt is log
from one of the brokers where we have 'Error processing fetch operation on partition' during
the whole day. It seems like that issue is fixed in scope of this Kafka ticket (
For those offsets that appear in Kafka log we have Samza system logs with INFO severity saying
that Samza is constantly validating those offsets. Samza system logs are in file samza_system_logs.txt.
If you need anything else please say so.

Container log:
Kafka log:
Samza log as JSON:

While analyzing issues reported on Kafka board we found out that older versions of Kafka had
some edge cases where broker failure recovery didn't work as expected, should we switch to
new Kafka consumer since we are using Kafka 0.10.1 but our Samza jobs are using Kafka consumer

Also, since some messages are appearing after even few days, is it possible that producer
gets stuck and sends some messages with really big latency after many unsuccessful retries?

Many thanks,
Aleksandar Bircakovic

-----Original Message-----
From: Jagadish Venkatraman [] 
Sent: Tuesday, April 4, 2017 6:31 PM
Subject: Re: Messages lost after broker failure

>> All this is leading us to conclusion that Samza's consumers are 
>> somehow
not aware of all of the partitions

We have had a number of broker failures at LinkedIn, and have not run into data loss issues
due to consumers being unaware of partitions. You can use the metrics emitted at a per-partition
level (like messages read, offset lags etc.) to validate this theory.

>> BrokerProxy [WARN] It appears that we received an invalid or empty
offset Some(366399914) for [topic_name,60]. Attempting to use Kafka's auto.offset.reset setting.

Usually, attempting to fetch from an invalid offset will reset the consumer to the upcoming
offset. This will cause data-loss since you will only process new messages. It will be interesting
to find out what caused the consumer to receive an invalid offset / why the received offset
was invalid. Also, the entire log will be helpful (assuming there's no sensitive information
that must be redacted).

On Tue, Apr 4, 2017 at 1:12 AM, Aleksandar Bircakovic <> wrote:

> Hi everyone,
> my team is building real-time system using Samza (version 0.11.0) and 
> we are facing some issues with data loss so we would like to hear your 
> thoughts.
> Due to using some additional tools for monitoring and alerting we 
> exceeded number of allowed open files so TooManyOpenFiles exception 
> caused our brokers to fail.
> After fixing this issue failed brokers and all Samza jobs were restarted.
> Issue was gone but it seems like we are constantly losing almost half 
> or the messages from some of our topics after this incident.
> To keep things as simple as possible I will focus just on a small part 
> of the pipeline. On the picture below  we can see two topics, both 
> with 80 partitions, that are input and output for one of our Samza 
> jobs. Number of messages in those topics should be the same but we see 
> that output topic has almost two times less messages than the input 
> one. There is no some kind of bottleneck so messages are not kept in 
> Kafka for too long and they are not deleted by log retention before processing.
> Another strange thing is that some old messages are appearing after 
> day or two. All this is leading us to conclusion that Samza's 
> consumers are somehow not aware of all of the partitions. Is it 
> possible that consumers are not aware of new partition leaders, since 
> new leader selection occurred after broker failures, and somehow they 
> are trying to get data from the old ones that are not the leaders 
> anymore and have a lower offsets meaning that a new messages are 
> skipped. Is there some kind of topic metadata caching that could lead 
> us to this situation? While debugging we discovered 
> KafkaSystemConsumer exception that says no leader for partition. Looking at the Kafka
Manager all partitions have their leaders.
> Here are some additional details that might be useful.
> Our Samza jobs are built on top of Samza v 0.11.0.
> Kafka consumers/producers are used in jobs.
> Kafka cluster:
> - 8 brokers
> - Kafka version 0.10.1
> - unclean.leader.election.enable false
> - 10000
> - 7200000
> Input topic:
> - segment.bytes 2147483647
> - 172800000
> Some warnings from logs:
> Error processing fetch operation on partition [topic_name,35], offset
> 232841013 (kafka.server.ReplicaManager)
> java.lang.IllegalStateException: Failed to read complete buffer for 
> targetOffset 241769924 startPosition 2147479938 in
> BrokerProxy [WARN] Got non-recoverable error codes during multifetch.
> Throwing an exception to trigger reconnect. Errors:
> Error([topic_name,47],-1,kafka.common.UnknownException
> BrokerProxy [WARN] It appears that we received an invalid or empty 
> offset
> Some(366399914) for [topic_name,60]. Attempting to use Kafka's 
> auto.offset.reset setting. This can result in data loss if processing 
> continues.
> Any help and suggestion will be appreciated.
> Thanks,
> Aleksandar Bircakovic

Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University
View raw message