Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 006B3200D11 for ; Mon, 2 Oct 2017 21:13:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F2CD31609EF; Mon, 2 Oct 2017 19:13:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EB2911609C0 for ; Mon, 2 Oct 2017 21:13:09 +0200 (CEST) Received: (qmail 71326 invoked by uid 500); 2 Oct 2017 19:13:09 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 71315 invoked by uid 99); 2 Oct 2017 19:13:09 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Oct 2017 19:13:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 507ABCF812 for ; Mon, 2 Oct 2017 19:13:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id bTHUeUNbk-BJ for ; Mon, 2 Oct 2017 19:13:07 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 756EC5FB40 for ; Mon, 2 Oct 2017 19:13:06 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id C3308E0012 for ; Mon, 2 Oct 2017 19:13:03 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 423B6242AF for ; Mon, 2 Oct 2017 19:13:01 +0000 (UTC) Date: Mon, 2 Oct 2017 19:13:00 +0000 (UTC) From: "Bart Vercammen (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-6000) streams 0.10.2.1 - kafka 0.11.0.1 state restore not working MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 02 Oct 2017 19:13:11 -0000 [ https://issues.apache.org/jira/browse/KAFKA-6000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188651#comment-16188651 ] Bart Vercammen commented on KAFKA-6000: --------------------------------------- Ok, I'll try to collect all client- and server-logs from the platform and post them here. Some partitions are indeed successfully restored. I have not seen a direct pattern here yet, but at first glance it looks like the partitions that contain less than 100,000 records or so successfully recover, and the partitions with (a lot) more records fail. But as said, this is what I notice at first glance, but still need to investigate/test further to be sure this is the pattern. I'm also trying to reproduce this in a fully controlled unit-test, but currently this is still work in progress ... Once I catch this issue in a unit-test, I'll also post it here as a reference. > streams 0.10.2.1 - kafka 0.11.0.1 state restore not working > ----------------------------------------------------------- > > Key: KAFKA-6000 > URL: https://issues.apache.org/jira/browse/KAFKA-6000 > Project: Kafka > Issue Type: Bug > Components: core, streams > Affects Versions: 0.10.2.1, 0.11.0.0 > Reporter: Bart Vercammen > Priority: Blocker > > Potential interop issue between Kafka Streams (0.10.2.1) and Kafka (0.11.0.1) > {noformat} > 11:24:16.416 [StreamThread-3] DEBUG rocessorStateManager - task [0_2] Registering state store lateststate to its state manager > 11:24:16.472 [StreamThread-3] TRACE rocessorStateManager - task [0_2] Restoring state store lateststate from changelog topic scratch.lateststate.dsh > 11:24:16.472 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Resetting offset for partition scratch.lateststate.dsh-2 to latest offset. > 11:24:16.472 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Partition scratch.lateststate.dsh-2 is unknown for fetching offset, wait for metadata refresh > 11:24:16.474 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={scratch.lateststate.dsh-2=-1}, minVersion=0) to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.476 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received ListOffsetResponse {responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=1773763}]}]} from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.476 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Handling ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 1773763, timestamp -1 > 11:24:16.477 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Resetting offset for partition scratch.lateststate.dsh-2 to earliest offset. > 11:24:16.478 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={scratch.lateststate.dsh-2=-2}, minVersion=0) to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.480 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received ListOffsetResponse {responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=0}]}]} from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.481 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Handling ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 0, timestamp -1 > 11:24:16.483 [StreamThread-3] DEBUG rocessorStateManager - restoring partition scratch.lateststate.dsh-2 from offset 0 to endOffset 1773763 > 11:24:16.484 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Added fetch request for partition scratch.lateststate.dsh-2 at offset 0 to node broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.485 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Sending fetch for partitions [scratch.lateststate.dsh-2] to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.486 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Skipping fetch for partition scratch.lateststate.dsh-2 because there is an in-flight request to broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.490 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Adding fetched record for partition scratch.lateststate.dsh-2 with offset 0 to buffered record list > 11:24:16.492 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received 3 records in fetch response for partition scratch.lateststate.dsh-2 with offset 0 > 11:24:16.493 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Returning fetched records at offset 0 for assigned partition scratch.lateststate.dsh-2 and update position to 1586527 > 11:24:16.494 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Ignoring fetched records for scratch.lateststate.dsh-2 at offset 0 since the current position is 1586527 > 11:24:16.496 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Added fetch request for partition scratch.lateststate.dsh-2 at offset 1586527 to node broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.496 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Sending fetch for partitions [scratch.lateststate.dsh-2] to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.498 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Skipping fetch for partition scratch.lateststate.dsh-2 because there is an in-flight request to broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.499 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Adding fetched record for partition scratch.lateststate.dsh-2 with offset 1586527 to buffered record list > 11:24:16.500 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received 0 records in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 > 11:24:16.501 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Added fetch request for partition scratch.lateststate.dsh-2 at offset 1586527 to node broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.502 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Sending fetch for partitions [scratch.lateststate.dsh-2] to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.511 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Skipping fetch for partition scratch.lateststate.dsh-2 because there is an in-flight request to broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.512 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Adding fetched record for partition scratch.lateststate.dsh-2 with offset 1586527 to buffered record list > 11:24:16.512 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received 0 records in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 > 11:24:16.513 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Added fetch request for partition scratch.lateststate.dsh-2 at offset 1586527 to node broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.515 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Sending fetch for partitions [scratch.lateststate.dsh-2] to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.517 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Skipping fetch for partition scratch.lateststate.dsh-2 because there is an in-flight request to broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.518 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Adding fetched record for partition scratch.lateststate.dsh-2 with offset 1586527 to buffered record list > 11:24:16.519 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received 0 records in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 > 11:24:16.520 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Added fetch request for partition scratch.lateststate.dsh-2 at offset 1586527 to node broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.520 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Sending fetch for partitions [scratch.lateststate.dsh-2] to broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.522 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Skipping fetch for partition scratch.lateststate.dsh-2 because there is an in-flight request to broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.523 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Adding fetched record for partition scratch.lateststate.dsh-2 with offset 1586527 to buffered record list > 11:24:16.523 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received 0 records in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 > {noformat} > In this setup, I have 5 Kafka brokers, running 0.11.0.1 (with SSL) and a KafkaStreams application running version 0.10.2.1. The streams application uses an underlying statestore (`scratch.lateststate.dsh`). The problem I've seen is that when the kafka streams application (re)starts when quite some data is already present in the state-stores, it does not restore the state. KafkaStreams remains in {{REBALANCING}} state, and never exits the {{restoreActiveState}} function in {{ProcessorStateManager}}. > Now, what I also noticed is that sometimes the state-restore seems to work when the number of records in the changelog-topic is below 100K (or something like that). I've seen a successful restore when the restore-consumer-lag was below 100K records. > When running the exact same application on a 0.10.2.1 Kafka cluster the issue never occures. It only happens when I run the 0.10.2.1 KafkaStreams application against a 0.11 Kafka cluster. > The logs above are a snippet when restoring the changelog that 'hangs'. > It also shows FetchResponses returning 0 records all the time which look awkward to me. > For what I can tell, in KafkaStreams, the code is stuck in this loop in {{restoreActiveState}} because the offset does not increment anymore : > {code} > while (true) { > long offset = 0L; > for (ConsumerRecord record : restoreConsumer.poll(100).records(storePartition)) { > offset = record.offset(); > if (offset >= limit) break; > stateRestoreCallback.restore(record.key(), record.value()); > } > if (offset >= limit) { > break; > } else if (restoreConsumer.position(storePartition) == endOffset) { > break; > } else if (restoreConsumer.position(storePartition) > endOffset) { > // For a logging enabled changelog (no offset limit), > // the log end offset should not change while restoring since it is only written by this thread. > throw new IllegalStateException(String.format("%s Log end offset of %s should not change while restoring: old end offset %d, current offset %d", > logPrefix, storePartition, endOffset, restoreConsumer.position(storePartition))); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)