Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ED5A8200D4F for ; Wed, 6 Dec 2017 17:05:08 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id EC0B6160C08; Wed, 6 Dec 2017 16:05:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 155CC160BFD for ; Wed, 6 Dec 2017 17:05:07 +0100 (CET) Received: (qmail 52124 invoked by uid 500); 6 Dec 2017 16:05:02 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 52112 invoked by uid 99); 6 Dec 2017 16:05:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Dec 2017 16:05:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 2A5081A12DF for ; Wed, 6 Dec 2017 16:05:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.88 X-Spam-Level: * X-Spam-Status: No, score=1.88 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, WEIRD_PORT=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id PR4JCdD58-Kf for ; Wed, 6 Dec 2017 16:04:58 +0000 (UTC) Received: from mail-wm0-f48.google.com (mail-wm0-f48.google.com [74.125.82.48]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id AB8DB5F256 for ; Wed, 6 Dec 2017 16:04:57 +0000 (UTC) Received: by mail-wm0-f48.google.com with SMTP id t8so8094806wmc.3 for ; Wed, 06 Dec 2017 08:04:57 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=sS8qymDUF6lBnkShCw9ZJikBKovw13jefm19ryUlF9k=; b=Mg4j1n+fwUQ/8R4a2Go+ozEXIl0YYjpRhuF8PCWwP8UTzHPJjvhD8EDWtD65kOxtyf Hg+ocuE+e3bTZOayYQQi2yPottui5ZF7FSTlR/4ADw8/Ko2ZlfJY1548B+gznINl+6OX DOF6VTttiPHQJfe6m4OWdKVKDLScKtBs6cC8Czb25LEyMboctSWWrDXGnPBaj1k/ga6F kostzDARmep4t3lYl9A2q2boV4WkfJ6tacqbn+59mVbuqrRcQiEGnhgk+h6mVgTNYhWo wOEnQW6idID+q9JYxdjElR709LfSsqtqSqmfFyJ85c9yZGiNYU1V5gjRrvfiHWAI6bNw 9E/g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=sS8qymDUF6lBnkShCw9ZJikBKovw13jefm19ryUlF9k=; b=O4TiQZke5Dm+zr8iyn3H8tNc+C7cH6bVDm8/m0TCtNvCQ23jTtz6ka4rTlipPFqCzT 2+iIQzETLda0e4j8WCzJwUuiea5/7JktAy7lujZB5s9bDEpQdsbah4sw+cXjwtcByCFZ qQan7lYMPrytOpgdhMFGtz5yy7Gw3hyW3/VdWw59/FATdK6VCZYHQZcj6+ntGp/0LsG5 x8OXuFVWtFGWFt7WagE7u6ZfADan2o3jwjxmVzJhf1EQ+rPeuhnAS945fiOih0O0+3me 2QsQlcKJwS6dgfcnIF2aSgIH6yL/k5JvhQZpanJVpGj1pT+2Rn7gg64pYKXPSuqnNHv5 CNEA== X-Gm-Message-State: AJaThX7Ji8LMofjf9spMUtf7NmWtqbb2dQMsMc/gMPJC0J8c8tXTgDzm Gix2lotzmW961AWS7Synrj9aMzsTH+EH9RPklB6NiQ== X-Google-Smtp-Source: AGs4zMamlIPPaYkS2M9danwBnTB7AgfvPKzVpcou2X9XaevTANN3OXvWG+S+j9qwkgN50pyeUwwW6E+h+/ekaOyAKIA= X-Received: by 10.80.165.226 with SMTP id b31mr41056035edc.172.1512576291060; Wed, 06 Dec 2017 08:04:51 -0800 (PST) MIME-Version: 1.0 Received: by 10.80.163.233 with HTTP; Wed, 6 Dec 2017 08:04:50 -0800 (PST) In-Reply-To: References: From: Patrice Chalcol Date: Wed, 6 Dec 2017 17:04:50 +0100 Message-ID: Subject: Re: Flaky healthcheck when trying to check Kafka Stream processing app status To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="f403045c1bd0863fb8055fae1c3e" archived-at: Wed, 06 Dec 2017 16:05:09 -0000 --f403045c1bd0863fb8055fae1c3e Content-Type: text/plain; charset="UTF-8" Hi Bill, Thanks, I understand. Let me know if you need further information. Regards, Patrice 2017-12-06 16:03 GMT+01:00 Bill Bejeck : > Hi Patrice, > > I haven't forgotten, just sidetracked with other things. I'll get back to > you by the end of the week. > > Thanks, > Bill > > On Wed, Nov 29, 2017 at 10:36 AM, Bill Bejeck wrote: > > > Patrice, > > > > Thanks for reporting this. I'll have a look at what you've posted on > > Github. > > > > Thanks, > > Bill > > > > On Wed, Nov 29, 2017 at 7:04 AM, Patrice Chalcol > > wrote: > > > >> Hello, > >> > >> I have implemented a basic application which uses kafka streams stores > and > >> interactive queries, available there : > >> https://github.com/pchalcol/kstreams-healthcheck > >> > >> The healthcheck implementation is based on kafka streams metadata and > the > >> stream state, as illustrated below : > >> ``` > >> String healthcheck() { > >> Collection stores = streams.allMetadata(); > >> long storescount = stores.stream() > >> .filter(meta -> meta.host().contains("localhost") && meta.port() == > 4567) > >> .count(); > >> > >> State state = streams.state(); > >> > >> System.out.println(String.format("Application State: (%d, %s)", > >> storescount, state.toString())); > >> > >> // KO if current node is down or if is in 'not running' state > >> if (storescount == 0 || !state.isRunning()) return "KO"; > >> return "OK"; > >> } > >> ``` > >> > >> I have created the topics with 4 partitions : > >> `kafka-topics --create --topic events --zookeeper localhost:2181 > >> --partitions 4 --replication-factor 1` > >> `kafka-topics --create --topic library --zookeeper localhost:2181 > >> --partitions 4 --replication-factor 1` > >> > >> What I had expected was the healthcheck returning an error whenever the > >> broker is shut down, which is not the case. > >> > >> When I check the application status using the following > >> curl -XGET http://localhost:4567/healthcheck > >> The server always returns a SUCCESS response, even if the kafka cluster > is > >> down. > >> > >> You will find below the different tests cases I've done. > >> > >> 1/ The Stream state is not changed after shutting down the kafka cluster > >> - start kafka > >> `cd docker && docker-compose up -d` > >> > >> - start producer > >> `sbt runMain com.example.streams.Producer` > >> > >> - start streams and http server > >> `sbt runMain com.example.streams.Producer` > >> > >> - healthcheck > >> `curl -XGET http://localhost:4567/healthcheck` > >> > >> => response = {"status": "SUCCESS"} > >> - shutdown kafka : docker-compose stop > >> > >> - healthcheck > >> `curl -XGET http://localhost:4567/healthcheck` > >> > >> => response = {"status": "SUCCESS"} while the expected one should be > >> {"status": "ERROR"} > >> > >> > >> 2/ Sometimes, I also encounter this behaviour, no data seems to be > >> available when querying the stores > >> - Start kafka > >> - Start Producer > >> - Start Streams and http Server > >> > >> - Request data : curl -XGET http://localhost:4567/titles > >> This http request calls a service which in turn queries the keyvalue > >> store > >> => received response > >> ``` > >> { > >> "data": [ > >> { > >> "key": 1, > >> "value": "Fresh Fruit For Rotting Vegetables" > >> }, > >> > >> ... > >> > >> { > >> "key": 10, > >> "value": "Fear Of A Black Planet" > >> } > >> ], > >> "status": "SUCCESS" > >> } > >> ``` > >> > >> - Request data : curl -XGET http://localhost:4567/titles/counts > >> => received response > >> ``` > >> { > >> "data": [ > >> { > >> "key": "fear of a black planet", > >> "value": 414 > >> }, > >> ... > >> { > >> "key": "curtain call - the hits", > >> "value": 431 > >> } > >> ], > >> "status": "SUCCESS" > >> } > >> ``` > >> > >> - shutdown kafka > >> > >> - Request data : curl -XGET http://localhost:4567/titles > >> => received response, same as before, which seems to be ok as we are > >> querying the local store > >> ``` > >> { > >> "data": [ > >> { > >> "key": 1, > >> "value": "Fresh Fruit For Rotting Vegetables" > >> }, > >> > >> ... > >> > >> { > >> "key": 10, > >> "value": "Fear Of A Black Planet" > >> } > >> ], > >> "status": "SUCCESS" > >> } > >> ``` > >> - Request data : curl -XGET http://localhost:4567/titles/counts > >> => received response, still understandable > >> ``` > >> { > >> "data": [ > >> { > >> "key": "fear of a black planet", > >> "value": 414 > >> }, > >> ... > >> { > >> "key": "curtain call - the hits", > >> "value": 431 > >> } > >> ], > >> "status": "SUCCESS" > >> } > >> ``` > >> > >> - restart kafka > >> > >> - Request data : curl -XGET http://localhost:4567/titles > >> => received response > >> ``` > >> { > >> "data": [], > >> "status": "SUCCESS" > >> } > >> ``` > >> > >> - Request data : curl -XGET http://localhost:4567/titles/counts > >> => same here, received response > >> ``` > >> { > >> "data": [], > >> "status": "SUCCESS" > >> } > >> ``` > >> > >> I also see this entry in the Streams application logs > >> ```[error] > >> (kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758- > StreamThread-1) > >> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception > >> caught when producing > >> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception > >> caught when producing > >> at > >> org.apache.kafka.streams.processor.internals.RecordCollector > >> Impl.checkForException(RecordCollectorImpl.java:136) > >> at > >> org.apache.kafka.streams.processor.internals.RecordCollector > >> Impl.flush(RecordCollectorImpl.java:144) > >> at > >> org.apache.kafka.streams.processor.internals.StreamTask. > >> flushState(StreamTask.java:283) > >> at > >> org.apache.kafka.streams.processor.internals.StreamTask$1. > >> run(StreamTask.java:264) > >> at > >> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >> at > >> org.apache.kafka.streams.processor.internals.StreamTask. > >> commitImpl(StreamTask.java:259) > >> at > >> org.apache.kafka.streams.processor.internals.StreamTask. > >> commit(StreamTask.java:253) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> commitOne(StreamThread.java:815) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> access$2800(StreamThread.java:73) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread$2. > >> apply(StreamThread.java:797) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread.pe > >> rformOnStreamTasks(StreamThread.java:1448) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> commitAll(StreamThread.java:789) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> maybeCommit(StreamThread.java:778) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> runLoop(StreamThread.java:567) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> run(StreamThread.java:527) > >> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > >> record(s) for kafka-streams-test-events-counts-repartition-1: 30046 ms > >> has > >> passed since batch creation plus linger time > >> [trace] Stack trace suppressed: run last compile:runMain for the full > >> output. > >> the state store, events-counts, may have migrated to another > instance.``` > >> > >> Even if a rebalance has occurred after having restarted my cluster, as I > >> have only one consumer, I thought it should still see all partitions, so > >> the store should remain available. > >> What am I missing here ? > >> > >> Thank you for your answers. > >> > >> -- > >> Regards, > >> Patrice > >> > > > > > -- Cordialement Patrice Chalcol --f403045c1bd0863fb8055fae1c3e--