kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xinyang Gao (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (KAFKA-6189) Loosing messages on OFFSET_OUT_OF_RANGE error in consumer
Date Mon, 13 Nov 2017 11:58:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16249419#comment-16249419 ] 

Xinyang Gao edited comment on KAFKA-6189 at 11/13/17 11:57 AM:
---------------------------------------------------------------

I had seen similar behaviors which also led to message loss, setup is as follows:

topic has 3 partitions and replication factor 3, named gao31
min.insync.replicas=2
consumer has default "auto.offset.reset=latest"
consumer manually commitSync offsets after handling messages with consumer.commitSync()
unclean leader election = false
kafka cluster has 3 brokers, kafka-foo-0, kafka-foo-1 and kafka-foo-2
consumer group ID is uklonvd826214
session.timeout.ms and max.poll.interval.ms are using default values

Initially kafka-foo-1 is selected as group coordinator

To reproduce: 
1. kill kafka-foo-1 
2. See following logs which implies that consumer disconnect to kafka-foo-1 and try to discover a new group coordinator 


{code:java}
2017-11-10 11:27:45,097 DEBUG org.apache.kafka.clients.NetworkClient                        - Node 2147483646 disconnected. [kafka-consumer]
2017-11-10 11:27:45,097 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483646 rack: null) dead for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient                        - Removing node kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 [kafka-consumer]
2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient                        - Removing node kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 1 [kafka-consumer]
2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient                        - Found least loaded node kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:27:45,098 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending GroupCoordinator request for group uklonvd826214 to broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Removing node kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Removing node kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 1 [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Found least loaded node kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:27:45,098 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending GroupCoordinator request for group uklonvd826214 to broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Removing node kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Removing node kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 1 [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Found least loaded node kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:27:45,099 DEBUG org.apache.kafka.clients.NetworkClient                        - Sending metadata request (type=MetadataRequest, topics=gao31) to node 0 [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Sending METADATA {topics=[gao31],allow_auto_topic_creation=true} to node 0. [kafka-consumer]
2017-11-10 11:27:45,100 TRACE org.apache.kafka.clients.NetworkClient                        - Sending FIND_COORDINATOR {coordinator_key=uklonvd826214,coordinator_type=0} to node 0. [kafka-consumer]
2017-11-10 11:27:45,276 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2, for key 1, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=2,error_code=0,high_watermark=1631,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1630, timestamp=1510313264997, key=1 bytes, value=102 bytes))]},{partition_header={partition=0,error_code=0,high_watermark=1223,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[]}]}]} [kafka-consumer]
2017-11-10 11:27:45,276 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch READ_UNCOMMITTED at offset 1630 for partition gao31-2 returned fetch data (error=NONE, highWaterMark=1631, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer]
2017-11-10 11:27:45,276 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch READ_UNCOMMITTED at offset 1223 for partition gao31-0 returned fetch data (error=NONE, highWaterMark=1223, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=0) [kafka-consumer]
2017-11-10 11:27:45,341 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 0, for key 1, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=1224,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1223, timestamp=1510313265277, key=1 bytes, value=102 bytes))]}]}]} [kafka-consumer]
2017-11-10 11:27:45,341 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch READ_UNCOMMITTED at offset 1223 for partition gao31-1 returned fetch data (error=NONE, highWaterMark=1224, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer]
2017-11-10 11:27:45,342 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 0, for key 3, received {throttle_time_ms=0,brokers=[{node_id=2,host=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com,port=9092,rack=null},{node_id=1,host=kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com,port=9092,rack=null},{node_id=0,host=kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com,port=9092,rack=null}],cluster_id=Y8zodxM7TNi-19dFcgErpw,controller_id=0,topic_metadata=[{topic_error_code=0,topic=gao31,is_internal=false,partition_metadata=[{partition_error_code=0,partition_id=2,leader=2,replicas=[1,2,0],isr=[0,2]},{partition_error_code=0,partition_id=1,leader=0,replicas=[0,1,2],isr=[0,2]},{partition_error_code=0,partition_id=0,leader=2,replicas=[2,0,1],isr=[2,0]}]}]} [kafka-consumer]
2017-11-10 11:27:45,342 DEBUG org.apache.kafka.clients.Metadata                             - Updated cluster metadata version 4 to Cluster(id = Y8zodxM7TNi-19dFcgErpw, nodes = [kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null), kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null), kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null)], partitions = [Partition(topic = gao31, partition = 0, leader = 2, replicas = [2,0,1], isr = [2,0]), Partition(topic = gao31, partition = 1, leader = 0, replicas = [0,1,2], isr = [0,2]), Partition(topic = gao31, partition = 2, leader = 2, replicas = [1,2,0], isr = [0,2])]) [kafka-consumer]
2017-11-10 11:27:45,343 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 0, for key 10, received {throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=2,host=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com,port=9092}} [kafka-consumer]
2017-11-10 11:27:45,343 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received GroupCoordinator response ClientResponse(receivedTimeMs=1510313265343, latencyMs=245, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=7610,client_id=consumer-2}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null))) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,346 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483645 rack: null) for group uklonvd826214. [kafka-consumer]
2017-11-10 11:27:45,346 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating connection to node 2147483645 at kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092. [kafka-consumer]
2017-11-10 11:27:45,350 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Preparing to read 188 bytes of data for partition gao31-2 with offset 1630 [kafka-consumer]
2017-11-10 11:27:45,350 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Updating high watermark for partition gao31-2 to 1631 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning fetched records at offset 1630 for assigned partition gao31-2 and update position to 1631 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Preparing to read 0 bytes of data for partition gao31-0 with offset 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Updating high watermark for partition gao31-0 to 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning fetched records at offset 1223 for assigned partition gao31-0 and update position to 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Preparing to read 188 bytes of data for partition gao31-1 with offset 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Updating high watermark for partition gao31-1 to 1224 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning fetched records at offset 1223 for assigned partition gao31-1 and update position to 1224 [kafka-consumer]
{code}



3. Then it tries to connect to kafka-foo-0 or kafka-foo-2 in a round-robin way:



{code:java}
2017-11-10 11:27:45,470 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2147483645, for key 8, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=16},{partition=1,error_code=16},{partition=2,error_code=16}]}]} [kafka-consumer]
2017-11-10 11:27:45,470 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Offset commit for group uklonvd826214 failed: This is not the correct coordinator. [kafka-consumer]
2017-11-10 11:27:45,470 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483645 rack: null) dead for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,573 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 0, for key 1, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=1225,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1224, timestamp=1510313265499, key=1 bytes, value=102 bytes))]}]}]} [kafka-consumer]
2017-11-10 11:27:45,573 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch READ_UNCOMMITTED at offset 1224 for partition gao31-1 returned fetch data (error=NONE, highWaterMark=1225, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer]
2017-11-10 11:27:45,575 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2, for key 10, received {throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=2,host=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com,port=9092}} [kafka-consumer]
2017-11-10 11:27:45,575 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received GroupCoordinator response ClientResponse(receivedTimeMs=1510313265575, latencyMs=4, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=7616,client_id=consumer-2}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null))) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,575 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483645 rack: null) for group uklonvd826214. [kafka-consumer]
2017-11-10 11:27:45,576 TRACE org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending OffsetCommit request with {gao31-1=OffsetAndMetadata{offset=1224, metadata='no metadata'}, gao31-2=OffsetAndMetadata{offset=1631, metadata='no metadata'}, gao31-0=OffsetAndMetadata{offset=1223, metadata='no metadata'}} to coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483645 rack: null) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,576 TRACE org.apache.kafka.clients.NetworkClient                        - Sending OFFSET_COMMIT {group_id=uklonvd826214,group_generation_id=7,member_id=consumer-2-82264aac-077f-469a-9783-2d8949bd61c3,retention_time=-1,topics=[{topic=gao31,partitions=[{partition=0,offset=1223,metadata=no metadata},{partition=1,offset=1224,metadata=no metadata},{partition=2,offset=1631,metadata=no metadata}]}]} to node 2147483645. [kafka-consumer]
2017-11-10 11:27:45,581 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2147483645, for key 8, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=25},{partition=1,error_code=25},{partition=2,error_code=25}]}]} [kafka-consumer]
2017-11-10 11:27:45,581 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Offset commit for group uklonvd826214 failed: The coordinator is not aware of this member. [kafka-consumer]
2017-11-10 11:27:45,581 DEBUG com.db.gm.cto.core.tests.ContinuousConsumer                   - unable to commit offsets [kafka-consumer]
2017-11-10 11:27:45,582 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Revoking previously assigned partitions [gao31-1, gao31-2, gao31-0] for group uklonvd826214 [kafka-consumer]
......
{code}


this lasts for a about 3 minutes
4. Finally it discovered kafka-foo-0 and use it as group coordinator, however, the logs in red shows that the new group coordinator is not able to know what is the last committed offset for this consumer group, thus resetting the offset to "latest"




{code:java}
2017-11-10 11:30:05,817 TRACE org.apache.kafka.clients.NetworkClient                        - Found least loaded node kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) connected with no in-flight requests [kafka-consumer]
2017-11-10 11:30:05,817 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending GroupCoordinator request for group uklonvd826214 to broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:30:05,817 TRACE org.apache.kafka.clients.NetworkClient                        - Sending FIND_COORDINATOR {coordinator_key=uklonvd826214,coordinator_type=0} to node 0. [kafka-consumer]
2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 0, for key 10, received {throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=0,host=kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com,port=9092}} [kafka-consumer]
2017-11-10 11:30:05,820 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received GroupCoordinator response ClientResponse(receivedTimeMs=1510313405820, latencyMs=3, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=10942,client_id=consumer-2}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null))) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:05,820 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483647 rack: null) for group uklonvd826214. [kafka-consumer]
2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Disabling heartbeat thread for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:05,820 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - (Re-)joining group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:05,820 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending JoinGroup ((type: JoinGroupRequest, groupId=uklonvd826214, sessionTimeout=10000, rebalanceTimeout=300000, memberId=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@161cb9a1)) to coordinator kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483647 rack: null) [kafka-consumer]
2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.NetworkClient                        - Sending JOIN_GROUP {group_id=uklonvd826214,session_timeout=10000,rebalance_timeout=300000,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=17 cap=17]}]} to node 2147483647. [kafka-consumer]
2017-11-10 11:30:05,823 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2147483647, for key 11, received {throttle_time_ms=0,error_code=0,generation_id=1113,group_protocol=range,leader_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,members=[{member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=17 cap=17]}]} [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received successful JoinGroup response for group uklonvd826214: org.apache.kafka.common.requests.JoinGroupResponse@6f13c4b9 [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Performing assignment for group uklonvd826214 using strategy range with subscriptions {consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5=Subscription(topics=[gao31])} [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Finished assignment for group uklonvd826214: {consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5=Assignment(partitions=[gao31-0, gao31-1, gao31-2])} [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending leader SyncGroup for group uklonvd826214 to coordinator kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483647 rack: null): (type=SyncGroupRequest, groupId=uklonvd826214, generationId=1113, memberId=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5, groupAssignment=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5) [kafka-consumer]
2017-11-10 11:30:05,824 TRACE org.apache.kafka.clients.NetworkClient                        - Sending SYNC_GROUP {group_id=uklonvd826214,generation_id=1113,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,group_assignment=[{member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]}]} to node 2147483647. [kafka-consumer]
2017-11-10 11:30:06,051 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2147483647, for key 14, received {throttle_time_ms=0,error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]} [kafka-consumer]
2017-11-10 11:30:06,051 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Successfully joined group uklonvd826214 with generation 1113 [kafka-consumer]
2017-11-10 11:30:06,051 TRACE org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Enabling heartbeat thread for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:06,051 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Setting newly assigned partitions [gao31-1, gao31-2, gao31-0] for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:06,052 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group uklonvd826214 fetching committed offsets for partitions: [gao31-1, gao31-2, gao31-0] [kafka-consumer]
2017-11-10 11:30:06,052 TRACE org.apache.kafka.clients.NetworkClient                        - Sending OFFSET_FETCH {group_id=uklonvd826214,topics=[{topic=gao31,partitions=[{partition=1},{partition=2},{partition=0}]}]} to node 2147483647. [kafka-consumer]
*{color:#d04437}2017-11-10 11:30:06,059 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2147483647, for key 9, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,offset=-1,metadata=,error_code=0},{partition=1,offset=-1,metadata=,error_code=0},{partition=2,offset=-1,metadata=,error_code=0}]}],error_code=0} [kafka-consumer]
2017-11-10 11:30:06,059 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group uklonvd826214 has no committed offset for partition gao31-1 [kafka-consumer]
2017-11-10 11:30:06,059 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group uklonvd826214 has no committed offset for partition gao31-2 [kafka-consumer]
2017-11-10 11:30:06,059 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group uklonvd826214 has no committed offset for partition gao31-0 [kafka-consumer]{color}*
2017-11-10 11:30:06,059 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={gao31-1=-1, gao31-2=-1, gao31-0=-1}, minVersion=0) to broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:30:06,060 TRACE org.apache.kafka.clients.NetworkClient                        - Sending LIST_OFFSETS {replica_id=-1,isolation_level=0,topics=[{topic=gao31,partitions=[{partition=0,timestamp=-1},{partition=1,timestamp=-1},{partition=2,timestamp=-1}]}]} to node 0. [kafka-consumer]
2017-11-10 11:30:06,062 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 0, for key 2, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=0,timestamp=-1,offset=1288},{partition=1,error_code=0,timestamp=-1,offset=1290},{partition=2,error_code=0,timestamp=-1,offset=1718}]}]} [kafka-consumer]
2017-11-10 11:30:06,062 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse org.apache.kafka.common.requests.ListOffsetResponse@492e9c44 from broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for gao31-1. Fetched offset 1290, timestamp -1 [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for gao31-2. Fetched offset 1718, timestamp -1 [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for gao31-0. Fetched offset 1288, timestamp -1 [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting offset for partition gao31-1 to offset 1290. [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting offset for partition gao31-2 to offset 1718. [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting offset for partition gao31-0 to offset 1288. [kafka-consumer]
{code}



I am wondering why the new group coordinator is not able to know the committed offset for the consumer group ? Since the group ID was never being changed, just consumer leaves and re-joins, I am expecting that the new consumer is able to pick up the last committed offset and continue from there ? Otherwise we will lose messages on consumer side (the messages generated in the group coordinator downtime).

Any theories about this ?

Thanks







was (Author: gaoxinyang):
I had seen similar behaviors which also led to message loss, setup is as follows:

topic has 3 partitions and replication factor 3, named gao31
min.insync.replicas=2
consumer has default "auto.offset.reset=latest"
consumer manually commitSync offsets after handling messages with consumer.commitSync()
unclean leader election = false
kafka cluster has 3 brokers, kafka-foo-0, kafka-foo-1 and kafka-foo-2
consumer group ID is uklonvd826214
session.timeout.ms and max.poll.interval.ms are using default values

Initially kafka-foo-1 is selected as group coordinator

To reproduce: 
1. kill kafka-foo-1 
2. See following logs which implies that consumer disconnect to kafka-foo-1 and try to discover a new group coordinator 


{code:java}
2017-11-10 11:27:45,097 DEBUG org.apache.kafka.clients.NetworkClient                        - Node 2147483646 disconnected. [kafka-consumer]
2017-11-10 11:27:45,097 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483646 rack: null) dead for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient                        - Removing node kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 [kafka-consumer]
2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient                        - Removing node kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 1 [kafka-consumer]
2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient                        - Found least loaded node kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:27:45,098 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending GroupCoordinator request for group uklonvd826214 to broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Removing node kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Removing node kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 1 [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Found least loaded node kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:27:45,098 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending GroupCoordinator request for group uklonvd826214 to broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Removing node kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0 [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Removing node kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 1 [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Found least loaded node kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:27:45,099 DEBUG org.apache.kafka.clients.NetworkClient                        - Sending metadata request (type=MetadataRequest, topics=gao31) to node 0 [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient                        - Sending METADATA {topics=[gao31],allow_auto_topic_creation=true} to node 0. [kafka-consumer]
2017-11-10 11:27:45,100 TRACE org.apache.kafka.clients.NetworkClient                        - Sending FIND_COORDINATOR {coordinator_key=uklonvd826214,coordinator_type=0} to node 0. [kafka-consumer]
2017-11-10 11:27:45,276 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2, for key 1, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=2,error_code=0,high_watermark=1631,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1630, timestamp=1510313264997, key=1 bytes, value=102 bytes))]},{partition_header={partition=0,error_code=0,high_watermark=1223,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[]}]}]} [kafka-consumer]
2017-11-10 11:27:45,276 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch READ_UNCOMMITTED at offset 1630 for partition gao31-2 returned fetch data (error=NONE, highWaterMark=1631, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer]
2017-11-10 11:27:45,276 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch READ_UNCOMMITTED at offset 1223 for partition gao31-0 returned fetch data (error=NONE, highWaterMark=1223, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=0) [kafka-consumer]
2017-11-10 11:27:45,341 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 0, for key 1, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=1224,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1223, timestamp=1510313265277, key=1 bytes, value=102 bytes))]}]}]} [kafka-consumer]
2017-11-10 11:27:45,341 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch READ_UNCOMMITTED at offset 1223 for partition gao31-1 returned fetch data (error=NONE, highWaterMark=1224, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer]
2017-11-10 11:27:45,342 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 0, for key 3, received {throttle_time_ms=0,brokers=[{node_id=2,host=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com,port=9092,rack=null},{node_id=1,host=kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com,port=9092,rack=null},{node_id=0,host=kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com,port=9092,rack=null}],cluster_id=Y8zodxM7TNi-19dFcgErpw,controller_id=0,topic_metadata=[{topic_error_code=0,topic=gao31,is_internal=false,partition_metadata=[{partition_error_code=0,partition_id=2,leader=2,replicas=[1,2,0],isr=[0,2]},{partition_error_code=0,partition_id=1,leader=0,replicas=[0,1,2],isr=[0,2]},{partition_error_code=0,partition_id=0,leader=2,replicas=[2,0,1],isr=[2,0]}]}]} [kafka-consumer]
2017-11-10 11:27:45,342 DEBUG org.apache.kafka.clients.Metadata                             - Updated cluster metadata version 4 to Cluster(id = Y8zodxM7TNi-19dFcgErpw, nodes = [kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null), kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null), kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null)], partitions = [Partition(topic = gao31, partition = 0, leader = 2, replicas = [2,0,1], isr = [2,0]), Partition(topic = gao31, partition = 1, leader = 0, replicas = [0,1,2], isr = [0,2]), Partition(topic = gao31, partition = 2, leader = 2, replicas = [1,2,0], isr = [0,2])]) [kafka-consumer]
2017-11-10 11:27:45,343 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 0, for key 10, received {throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=2,host=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com,port=9092}} [kafka-consumer]
2017-11-10 11:27:45,343 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received GroupCoordinator response ClientResponse(receivedTimeMs=1510313265343, latencyMs=245, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=7610,client_id=consumer-2}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null))) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,346 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483645 rack: null) for group uklonvd826214. [kafka-consumer]
2017-11-10 11:27:45,346 DEBUG org.apache.kafka.clients.NetworkClient                        - Initiating connection to node 2147483645 at kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092. [kafka-consumer]
2017-11-10 11:27:45,350 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Preparing to read 188 bytes of data for partition gao31-2 with offset 1630 [kafka-consumer]
2017-11-10 11:27:45,350 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Updating high watermark for partition gao31-2 to 1631 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning fetched records at offset 1630 for assigned partition gao31-2 and update position to 1631 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Preparing to read 0 bytes of data for partition gao31-0 with offset 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Updating high watermark for partition gao31-0 to 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning fetched records at offset 1223 for assigned partition gao31-0 and update position to 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Preparing to read 188 bytes of data for partition gao31-1 with offset 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Updating high watermark for partition gao31-1 to 1224 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Returning fetched records at offset 1223 for assigned partition gao31-1 and update position to 1224 [kafka-consumer]
{code}



3. Then it tries to connect to kafka-foo-0 or kafka-foo-2 in a round-robin way:



{code:java}
2017-11-10 11:27:45,470 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2147483645, for key 8, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=16},{partition=1,error_code=16},{partition=2,error_code=16}]}]} [kafka-consumer]
2017-11-10 11:27:45,470 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Offset commit for group uklonvd826214 failed: This is not the correct coordinator. [kafka-consumer]
2017-11-10 11:27:45,470 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483645 rack: null) dead for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,573 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 0, for key 1, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=1225,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1224, timestamp=1510313265499, key=1 bytes, value=102 bytes))]}]}]} [kafka-consumer]
2017-11-10 11:27:45,573 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch READ_UNCOMMITTED at offset 1224 for partition gao31-1 returned fetch data (error=NONE, highWaterMark=1225, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer]
2017-11-10 11:27:45,575 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2, for key 10, received {throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=2,host=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com,port=9092}} [kafka-consumer]
2017-11-10 11:27:45,575 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received GroupCoordinator response ClientResponse(receivedTimeMs=1510313265575, latencyMs=4, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=7616,client_id=consumer-2}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null))) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,575 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483645 rack: null) for group uklonvd826214. [kafka-consumer]
2017-11-10 11:27:45,576 TRACE org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending OffsetCommit request with {gao31-1=OffsetAndMetadata{offset=1224, metadata='no metadata'}, gao31-2=OffsetAndMetadata{offset=1631, metadata='no metadata'}, gao31-0=OffsetAndMetadata{offset=1223, metadata='no metadata'}} to coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483645 rack: null) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,576 TRACE org.apache.kafka.clients.NetworkClient                        - Sending OFFSET_COMMIT {group_id=uklonvd826214,group_generation_id=7,member_id=consumer-2-82264aac-077f-469a-9783-2d8949bd61c3,retention_time=-1,topics=[{topic=gao31,partitions=[{partition=0,offset=1223,metadata=no metadata},{partition=1,offset=1224,metadata=no metadata},{partition=2,offset=1631,metadata=no metadata}]}]} to node 2147483645. [kafka-consumer]
2017-11-10 11:27:45,581 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2147483645, for key 8, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=25},{partition=1,error_code=25},{partition=2,error_code=25}]}]} [kafka-consumer]
2017-11-10 11:27:45,581 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Offset commit for group uklonvd826214 failed: The coordinator is not aware of this member. [kafka-consumer]
2017-11-10 11:27:45,581 DEBUG com.db.gm.cto.core.tests.ContinuousConsumer                   - unable to commit offsets [kafka-consumer]
2017-11-10 11:27:45,582 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Revoking previously assigned partitions [gao31-1, gao31-2, gao31-0] for group uklonvd826214 [kafka-consumer]
......
{code}


this lasts for a about 3 minutes
4. Finally it discovered kafka-foo-0 and use it as group coordinator, however, the logs in red shows that the new group coordinator is not able to know what is the last committed offset for this consumer group, thus resetting the offset to "latest"




{code:java}
2017-11-10 11:30:05,817 TRACE org.apache.kafka.clients.NetworkClient                        - Found least loaded node kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) connected with no in-flight requests [kafka-consumer]
2017-11-10 11:30:05,817 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending GroupCoordinator request for group uklonvd826214 to broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:30:05,817 TRACE org.apache.kafka.clients.NetworkClient                        - Sending FIND_COORDINATOR {coordinator_key=uklonvd826214,coordinator_type=0} to node 0. [kafka-consumer]
2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 0, for key 10, received {throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=0,host=kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com,port=9092}} [kafka-consumer]
2017-11-10 11:30:05,820 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received GroupCoordinator response ClientResponse(receivedTimeMs=1510313405820, latencyMs=3, disconnected=false, requestHeader={api_key=10,api_version=1,correlation_id=10942,client_id=consumer-2}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null))) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:05,820 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered coordinator kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483647 rack: null) for group uklonvd826214. [kafka-consumer]
2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Disabling heartbeat thread for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:05,820 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - (Re-)joining group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:05,820 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending JoinGroup ((type: JoinGroupRequest, groupId=uklonvd826214, sessionTimeout=10000, rebalanceTimeout=300000, memberId=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@161cb9a1)) to coordinator kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483647 rack: null) [kafka-consumer]
2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.NetworkClient                        - Sending JOIN_GROUP {group_id=uklonvd826214,session_timeout=10000,rebalance_timeout=300000,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=17 cap=17]}]} to node 2147483647. [kafka-consumer]
2017-11-10 11:30:05,823 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2147483647, for key 11, received {throttle_time_ms=0,error_code=0,generation_id=1113,group_protocol=range,leader_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,members=[{member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=17 cap=17]}]} [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received successful JoinGroup response for group uklonvd826214: org.apache.kafka.common.requests.JoinGroupResponse@6f13c4b9 [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Performing assignment for group uklonvd826214 using strategy range with subscriptions {consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5=Subscription(topics=[gao31])} [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Finished assignment for group uklonvd826214: {consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5=Assignment(partitions=[gao31-0, gao31-1, gao31-2])} [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending leader SyncGroup for group uklonvd826214 to coordinator kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483647 rack: null): (type=SyncGroupRequest, groupId=uklonvd826214, generationId=1113, memberId=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5, groupAssignment=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5) [kafka-consumer]
2017-11-10 11:30:05,824 TRACE org.apache.kafka.clients.NetworkClient                        - Sending SYNC_GROUP {group_id=uklonvd826214,generation_id=1113,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,group_assignment=[{member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]}]} to node 2147483647. [kafka-consumer]
2017-11-10 11:30:06,051 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2147483647, for key 14, received {throttle_time_ms=0,error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]} [kafka-consumer]
2017-11-10 11:30:06,051 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Successfully joined group uklonvd826214 with generation 1113 [kafka-consumer]
2017-11-10 11:30:06,051 TRACE org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Enabling heartbeat thread for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:06,051 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Setting newly assigned partitions [gao31-1, gao31-2, gao31-0] for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:06,052 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group uklonvd826214 fetching committed offsets for partitions: [gao31-1, gao31-2, gao31-0] [kafka-consumer]
2017-11-10 11:30:06,052 TRACE org.apache.kafka.clients.NetworkClient                        - Sending OFFSET_FETCH {group_id=uklonvd826214,topics=[{topic=gao31,partitions=[{partition=1},{partition=2},{partition=0}]}]} to node 2147483647. [kafka-consumer]
{color:#d04437}2017-11-10 11:30:06,059 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 2147483647, for key 9, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,offset=-1,metadata=,error_code=0},{partition=1,offset=-1,metadata=,error_code=0},{partition=2,offset=-1,metadata=,error_code=0}]}],error_code=0} [kafka-consumer]
2017-11-10 11:30:06,059 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group uklonvd826214 has no committed offset for partition gao31-1 [kafka-consumer]
2017-11-10 11:30:06,059 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group uklonvd826214 has no committed offset for partition gao31-2 [kafka-consumer]
2017-11-10 11:30:06,059 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group uklonvd826214 has no committed offset for partition gao31-0 [kafka-consumer]{color}
2017-11-10 11:30:06,059 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={gao31-1=-1, gao31-2=-1, gao31-0=-1}, minVersion=0) to broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:30:06,060 TRACE org.apache.kafka.clients.NetworkClient                        - Sending LIST_OFFSETS {replica_id=-1,isolation_level=0,topics=[{topic=gao31,partitions=[{partition=0,timestamp=-1},{partition=1,timestamp=-1},{partition=2,timestamp=-1}]}]} to node 0. [kafka-consumer]
2017-11-10 11:30:06,062 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 0, for key 2, received {throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=0,timestamp=-1,offset=1288},{partition=1,error_code=0,timestamp=-1,offset=1290},{partition=2,error_code=0,timestamp=-1,offset=1718}]}]} [kafka-consumer]
2017-11-10 11:30:06,062 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse org.apache.kafka.common.requests.ListOffsetResponse@492e9c44 from broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for gao31-1. Fetched offset 1290, timestamp -1 [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for gao31-2. Fetched offset 1718, timestamp -1 [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for gao31-0. Fetched offset 1288, timestamp -1 [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting offset for partition gao31-1 to offset 1290. [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting offset for partition gao31-2 to offset 1718. [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting offset for partition gao31-0 to offset 1288. [kafka-consumer]
{code}



I am wondering why the new group coordinator is not able to know the committed offset for the consumer group ? Since the group ID was never being changed, just consumer leaves and re-joins, I am expecting that the new consumer is able to pick up the last committed offset and continue from there ? Otherwise we will lose messages on consumer side (the messages generated in the group coordinator downtime).

Any theories about this ?

Thanks






> Loosing messages on OFFSET_OUT_OF_RANGE error in consumer
> ---------------------------------------------------------
>
>                 Key: KAFKA-6189
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6189
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.11.0.0
>            Reporter: Andrey
>         Attachments: kafkaLossingMessages.png
>
>
> Steps to reproduce:
> * Setup test:
> ** producer sends messages constantly. If cluster not available, then it will retry
> ** consumer polling
> ** topic has 3 partitions and replication factor 3. 
> ** min.insync.replicas=2
> ** producer has "acks=all"
> ** consumer has default "auto.offset.reset=latest"
> ** consumer manually commitSync offsets after handling messages.
> **  unclean leader election = false
> ** kafka cluster has 3 brokers
> * Kill broker 0
> * In consumer's logs:
> {code}
> 2017-11-08 11:36:33,967 INFO  org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch offset 10706682 is out of range for partition mytopic-2, resetting offset [kafka-consumer]
> 2017-11-08 11:36:33,968 INFO  org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch offset 8024431 is out of range for partition mytopic-1, resetting offset [kafka-consumer]
> 2017-11-08 11:36:34,045 INFO  org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch offset 8029505 is out of range for partition mytopic-0, resetting offset [kafka-consumer]
> {code}
> After that, consumer lost several messages on each partition.
> Expected:
> * return upper bound of range
> * consumer should resume from that offset instead of "auto.offset.reset".
> Workaround:
> * put "auto.offset.reset=earliest"
> * get a lot of duplicate messages, instead of lost
> Looks like this is what happening during the recovery from broker failure (see attachment)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message