kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Evan Pollan (JIRA)" <j...@apache.org>
Subject [jira] [Created] (KAFKA-5875) Consumer group repeatedly fails to join, even across JVM restarts: BufferUnderFlowException reading the {{version}} field in the consumer protocol header
Date Tue, 12 Sep 2017 15:05:00 GMT
Evan Pollan created KAFKA-5875:
----------------------------------

             Summary: Consumer group repeatedly fails to join, even across JVM restarts: BufferUnderFlowException
reading the {{version}} field in the consumer protocol header
                 Key: KAFKA-5875
                 URL: https://issues.apache.org/jira/browse/KAFKA-5875
             Project: Kafka
          Issue Type: Bug
            Reporter: Evan Pollan


I've seen this maybe once a month in our large cluster Kubernetes-based Kafka consumers &
producers.  Every once in a while a consumer in a Kubernetes "pod" get this error trying to
join a consumer group:

{code}
{"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+0000","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka
version : 0.11.0.0","exception":""}
{"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+0000","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka
commitId : cb8625948210849f","exception":""}
{"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+0000","logger":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","message":"Revoking
previously assigned partitions [] for group conv-fetch-jobs-runner-for-internal","exception":""}
{"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+0000","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"(Re-)joining
group conv-fetch-jobs-runner-for-internal","exception":""}
{"level":"INFO","@timestamp":"2017-09-12T13:45:43.588+0000","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"Successfully
joined group conv-fetch-jobs-runner-for-internal with generation 17297","exception":""}
{"errorType":"Error reading field 'version': java.nio.BufferUnderflowException","level":"ERROR","message":"Died!","operation":"Died!","stacktrace":"org.apache.kafka.common.protocol.types.SchemaException:
Error reading field 'version': java.nio.BufferUnderflowException\n\tat org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)\n\tat
org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)\n\tat
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)\n\tat
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)\n\tat
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)\n\tat
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)\n\tat
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)\n\tat
com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141)\n\tat
com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28)\n\tat
com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125)\n\tat
com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78)\n\tat java.lang.Thread.run(Thread.java:745)\n","trackingId":"dead-consumer","logger":"com.spredfast.common.kafka.consumer.RunnableConsumer","loggingVersion":"UNKNOWN","component":"MetricsFetch","pid":25,"host":"fetch-2420457278-sh4f5","@timestamp":"2017-09-12T13:45:43.613Z"}
{code}

Pardon the log format -- these get sucked into logstash, thus the JSON.

Here's the raw stacktrace: 
{code}
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'version': java.nio.BufferUnderflowException
	at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)
	at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
	at com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141)
	at com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28)
	at com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125)
	at com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78)
	at java.lang.Thread.run(Thread.java:745)
{code}

What's fascinating about this is:
* We have a liveness probe (Kubernetes term for a healthcheck whose failure will cause the
container backing the "pod" to be killed and restarted) attached to the existence of dead
consumers.  When this situation happens, it _never_ resolves itself. Today, I found a pod
that had been restarted 1023 times due to this error.
* The only way to make it go away is to _delete_ the Kubernetes pod.  This causes it to be
replaced by a pod on another Kubernetes host ("minion") using the same docker image and configuration.
 Invariably, this pod comes up and all consumers join just fine
* We have _not_ tried restarting the brokers when this happens.  
* There must be something about the pod, container, or Kubernetes host that is consistent
across pod crash loops that factors into the consumer group join process -- MAC?  hostname?
 Can't be anything that is recomputed on JVM restart, though...

Seems like there's either:
# a bug in the client (i.e. in its assumption that it can deserialize a protocol header on
successful return of that join future).  maybe there's a flavor of broker response that doesn't
include this header?
# a bug in the broker in that it's sending an empty or undersized response to a group join
command in some situtations.

It's worth noting that the severity of this issue is magnified by the fact that it requires
manual intervention.  It wouldn't be so bad if our healthcheck failure tripped a pod restart,
and the new JVM's consumers would join OK.  But, the fact that even a JVM restart doesn't
do it means most resiliency plays won't work.

BTW, I see a similar schema read failure in https://issues.apache.org/jira/browse/KAFKA-4349,
although the client code is completely different (admin {{ConsumerGroupCommand}})



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message