Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2DB64200B85 for ; Thu, 1 Sep 2016 04:56:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1988A160ABA; Thu, 1 Sep 2016 02:56:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DB828160AB4 for ; Thu, 1 Sep 2016 04:56:31 +0200 (CEST) Received: (qmail 50802 invoked by uid 500); 1 Sep 2016 02:56:30 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 50790 invoked by uid 99); 1 Sep 2016 02:56:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Sep 2016 02:56:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id DC5A5C031E for ; Thu, 1 Sep 2016 02:56:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.28 X-Spam-Level: * X-Spam-Status: No, score=1.28 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, WEIRD_PORT=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent-io.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ynZ0vPVZcDOp for ; Thu, 1 Sep 2016 02:56:24 +0000 (UTC) Received: from mail-qk0-f169.google.com (mail-qk0-f169.google.com [209.85.220.169]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 254335F613 for ; Thu, 1 Sep 2016 02:56:24 +0000 (UTC) Received: by mail-qk0-f169.google.com with SMTP id l2so71904852qkf.3 for ; Wed, 31 Aug 2016 19:56:24 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=3kWQC5L9Y4Cd4yzUgJhmeAs9q8Tfk0ZRCAzr0BtgvWc=; b=yzDfzOhQsD3xuXRbM22jkap2oAM5LDUJlc48oXZnLlpuq0GjJHMi8AuO0/oXG/EXx0 P4yLCe10P0SNAV5fGUIV+KgxhGzV0NPAJln6vrOe6vxtRsFh5EIUaIkZregjQjXpMRWa YRcXTwO8KQIFyvaqGX7tvtGBAvrRI+Dtodg45KQmcZhXsT4nz6pGb4Bh1zShAE2w2rQQ henLlWGDDci3iZ2Z0S7O32hJzGFW6Q233SvoWmYvF9p/rvQrDo+ZwAafON1xzk97SXkj L9JjIpTMwMhnrJ/TXAglsa+WubwRCSQzOGNscxwhGhHhZ4Ug0ypda2gbQAsaQym8If5P uwNg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=3kWQC5L9Y4Cd4yzUgJhmeAs9q8Tfk0ZRCAzr0BtgvWc=; b=IsVrpTheeM+OMw2l27uQGsANOqQQJvOUiA4WgxJco5YQ0lkbnhaiuuAOL/50Rr0wVG y8OGnfPxJKCkrgHRrQZ4WdRG60V2K0Cr5rvAxdiHZfDFRYZSm/vyaIof16A1XQw/jFO8 yoiB2c/qA26emThM2GqlPmbBWspRPMes6XEhzOR+bedHcrUzkbZnFVwwl26b9f9j9Ymb y9mvNmPrJqPgftGFawjPfcFfgDiBuye6eFukKyXzkzP5bADZn8p406x+AAJl4N7U8DtD Ar6kr0YP3keZrJvzWbwLr86x/HEjGyukhRo19A4946XhrGS9CRigM/5GSpmurR+bM0pD afYQ== X-Gm-Message-State: AE9vXwPUH4UJoWtxqNYcJy/t9vr9K1GqBEtw7n0HIKkZ8PbYE56q8TlA+7fUXG5vAM3S/CP3MFnrarxPyPo+dLpJ X-Received: by 10.55.103.145 with SMTP id b139mr15428026qkc.108.1472698578534; Wed, 31 Aug 2016 19:56:18 -0700 (PDT) MIME-Version: 1.0 Received: by 10.55.181.193 with HTTP; Wed, 31 Aug 2016 19:56:17 -0700 (PDT) In-Reply-To: <10260682ec9e4218b020a35353a6b500@VAADCEX32.cable.comcast.com> References: <5f1ba5d8d3e6443883fa05f347a5aefb@VAADCEX32.cable.comcast.com> <10260682ec9e4218b020a35353a6b500@VAADCEX32.cable.comcast.com> From: Jason Gustafson Date: Wed, 31 Aug 2016 19:56:17 -0700 Message-ID: Subject: Re: Kafka consumers unable to process message To: users@kafka.apache.org Cc: "dev@kafka.apache.org" Content-Type: multipart/alternative; boundary=94eb2c05782ea296bf053b695b85 archived-at: Thu, 01 Sep 2016 02:56:43 -0000 --94eb2c05782ea296bf053b695b85 Content-Type: text/plain; charset=UTF-8 The exceptions show one of the replica fetcher threads on the broker failing which makes perfect sense since some of the partitions were bound to have leaders in the failed datacenter. I'd actually like to see the consumer logs at DEBUG level if possible. Thanks, Jason On Wed, Aug 31, 2016 at 7:48 PM, Ghosh, Achintya (Contractor) < Achintya_Ghosh@comcast.com> wrote: > Hi Jason, > > No, I didn't bring down any zookeeper server. Even I tried with 3 > zookeeper server one as an 'Observer' but the same issue. > > Here is the server log from one of the node of my other datacenter: > > [2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0. > (kafka.log.Log) > [2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting > (kafka.server.ReplicaFetcherThread) > [2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4] Added > fetcher for partitions List([[TEST3,0], initOffset 0 to broker > BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server. > ReplicaFetcherManager) > [2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in fetch > kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server. > ReplicaFetcherThread) > java.io.IOException: Connection to 3 was disconnected before the response > was read > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > NetworkClientBlockingOps.scala:87) > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > NetworkClientBlockingOps.scala:84) > at scala.Option.foreach(Option.scala:257) > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > scala:84) > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > scala:80) > at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2( > NetworkClientBlockingOps.scala:137) > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > NetworkClientBlockingOps$$pollContinuously$extension( > NetworkClientBlockingOps.scala:143) > at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$ > extension(NetworkClientBlockingOps.scala:80) > at kafka.server.ReplicaFetcherThread.sendRequest( > ReplicaFetcherThread.scala:244) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:229) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:42) > at kafka.server.AbstractFetcherThread.processFetchRequest( > AbstractFetcherThread.scala:107) > at kafka.server.AbstractFetcherThread.doWork( > AbstractFetcherThread.scala:98) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > [2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in fetch > kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server. > ReplicaFetcherThread) > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 > rack: null) failed > at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$ > extension$2.apply(NetworkClientBlockingOps.scala:63) > at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$ > extension$2.apply(NetworkClientBlockingOps.scala:59) > at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1( > NetworkClientBlockingOps.scala:112) > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps. > scala:120) > at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension( > NetworkClientBlockingOps.scala:59) > at kafka.server.ReplicaFetcherThread.sendRequest( > ReplicaFetcherThread.scala:239) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:229) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:42) > at kafka.server.AbstractFetcherThread.processFetchRequest( > AbstractFetcherThread.scala:107) > at kafka.server.AbstractFetcherThread.doWork( > AbstractFetcherThread.scala:98) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > [2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in fetch > kafka.server.ReplicaFetcherThread$FetchRequest@5adea8fb (kafka.server. > ReplicaFetcherThread) > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 > rack: null) failed > at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$ > extension$2.apply(NetworkClientBlockingOps.scala:63) > at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$ > extension$2.apply(NetworkClientBlockingOps.scala:59) > at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1( > NetworkClientBlockingOps.scala:112) > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps. > scala:120) > at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension( > NetworkClientBlockingOps.scala:59) > at kafka.server.ReplicaFetcherThread.sendRequest( > ReplicaFetcherThread.scala:239) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:229) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:42) > at kafka.server.AbstractFetcherThread.processFetchRequest( > AbstractFetcherThread.scala:107) > at kafka.server.AbstractFetcherThread.doWork( > AbstractFetcherThread.scala:98) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > [2016-09-01 01:26:20,223] WARN [ReplicaFetcherThread-0-3], Error in fetch > kafka.server.ReplicaFetcherThread$FetchRequest@4c159cc3 (kafka.server. > ReplicaFetcherThread) > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 > rack: null) failed > at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$ > extension$2.apply(NetworkClientBlockingOps.scala:63) > at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$ > extension$2.apply(NetworkClientBlockingOps.scala:59) > at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1( > NetworkClientBlockingOps.scala:112) > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps. > scala:120) > at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension( > NetworkClientBlockingOps.scala:59) > at kafka.server.ReplicaFetcherThread.sendRequest( > ReplicaFetcherThread.scala:239) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:229) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:42) > at kafka.server.AbstractFetcherThread.processFetchRequest( > AbstractFetcherThread.scala:107) > at kafka.server.AbstractFetcherThread.doWork( > AbstractFetcherThread.scala:98) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > [2016-09-01 01:26:22,246] WARN [ReplicaFetcherThread-0-3], Error in fetch > kafka.server.ReplicaFetcherThread$FetchRequest@611ed1e9 (kafka.server. > ReplicaFetcherThread) > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 > rack: null) failed > at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$ > extension$2.apply(NetworkClientBlockingOps.scala:63) > at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$ > extension$2.apply(NetworkClientBlockingOps.scala:59) > at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1( > NetworkClientBlockingOps.scala:112) > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps. > scala:120) > at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension( > NetworkClientBlockingOps.scala:59) > at kafka.server.ReplicaFetcherThread.sendRequest( > ReplicaFetcherThread.scala:239) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:229) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:42) > at kafka.server.AbstractFetcherThread.processFetchRequest( > AbstractFetcherThread.scala:107) > at kafka.server.AbstractFetcherThread.doWork( > AbstractFetcherThread.scala:98) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > [2016-09-01 01:26:22,736] INFO [ReplicaFetcherManager on broker 4] Removed > fetcher for partitions [TEST3,0] (kafka.server.ReplicaFetcherManager) > [2016-09-01 01:26:22,752] INFO [ReplicaFetcherThread-0-3], Shutting down > (kafka.server.ReplicaFetcherThread) > [2016-09-01 01:26:22,755] INFO [ReplicaFetcherThread-0-3], Stopped > (kafka.server.ReplicaFetcherThread) > [2016-09-01 01:26:22,756] INFO [ReplicaFetcherThread-0-3], Shutdown > completed (kafka.server.ReplicaFetcherThread) > [2016-09-01 01:26:48,025] INFO Creating /controller (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > [2016-09-01 01:26:48,034] INFO Result of znode creation is: OK > (kafka.utils.ZKCheckedEphemeral) > [2016-09-01 01:26:48,035] INFO 4 successfully elected as leader > (kafka.server.ZookeeperLeaderElector) > [2016-09-01 01:26:48,726] INFO New leader is 4 (kafka.server. > ZookeeperLeaderElector$LeaderChangeListener) > [2016-09-01 01:26:54,837] INFO Partition [TEST3,0] on broker 4: Shrinking > ISR for partition [TEST3,0] from 4,5,6,1 to 4,5,6 (kafka.cluster.Partition) > [2016-09-01 01:33:04,926] INFO [Group Metadata Manager on Broker 4]: > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator. > GroupMetadataManager) > [2016-09-01 01:43:04,926] INFO [Group Metadata Manager on Broker 4]: > Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator. > GroupMetadataManager) > [2016-09-01 01:53:04,926] INFO [Group Metadata Manager on Broker 4]: > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator. > GroupMetadataManager) > [2016-09-01 02:03:04,926] INFO [Group Metadata Manager on Broker 4]: > Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator. > GroupMetadataManager) > [2016-09-01 02:13:04,928] INFO [Group Metadata Manager on Broker 4]: > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator. > GroupMetadataManager) > [2016-09-01 02:23:04,926] INFO [Group Metadata Manager on Broker 4]: > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator. > GroupMetadataManager) > [2016-09-01 02:33:04,926] INFO [Group Metadata Manager on Broker 4]: > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator. > GroupMetadataManager) > > > Why it's trying to connect the node3 of my local datacenter and it's > throwing IOException. > > Thanks > Achintya > > -----Original Message----- > From: Jason Gustafson [mailto:jason@confluent.io] > Sent: Wednesday, August 31, 2016 10:26 PM > To: users@kafka.apache.org > Cc: dev@kafka.apache.org > Subject: Re: Kafka consumers unable to process message > > Hi Achintya, > > Just to clarify, you did not take down either of the zookeepers in this > test, right? Having only two zookeepers in the ensemble would mean that if > either one of them failed, zookeeper wouldn't be able to reach quorum. > > I'm not entirely sure why this would happen. One possibility is that the > consumer is failing to find the new coordinator, which might happen if all > the replicas for one of the __consumer_offsets partitions were located in > the "failed" datacenter. Perhaps you can enable DEBUG logging and post some > logs so we can see what it's actually doing during poll(). > > By the way, I noticed that your consumer configuration settings seem a > little mixed up. The new consumer doesn't actually communicate with > Zookeeper, so there's no need for those settings. And you don't need to > include the "offsets.storage" option since Kafka is the only choice. Also, > I don't think "consumer.timeout.ms" is an option. > > -Jason > > > On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) < > Achintya_Ghosh@comcast.com> wrote: > > > Hi Jason, > > > > Thanks for your response. > > > > I know that is a known issue and I resolved it calling wakeup method > > by another thread. But here my problem is different, let me explain , > > it's very basic > > > > I created one cluster with 6 nodes( 3 from one datacenter and 3 from > > another(remote) datacenter and kept replication factor 6 with 2 > > zookeeper servers one from each datacenter ). Now I brought down all 3 > > nodes of my local datacenter and produced few messages and I see > > producer is working fine even my local data center nodes are down. It > > successfully writes the messages to other data center nodes. But when > > I'm trying to consume the messages the consumer.poll method gets stuck > > as my local datacenter is down though other datacenter's nodes are up. > > > > My question is as the data has been written successfully to other > > datacenter why consumer part is not working? > > > > Here is my Producer settings: > > > > props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616, > > psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab. > > sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3 > > -ab.sys.comcast.net:61617"); > > props.put("acks", "1"); > > props.put("max.block.ms", 1000); > > props.put("key.serializer", "org.apache.kafka.common.serialization. > > StringSerializer"); > > props.put("value.serializer", "com.comcast.ps.kafka.object. > > CustomMessageSer"); > > > > and here is Consumer settings: > > > > props.put("group.id", "app-consumer"); > > props.put("enable.auto.commit", "false"); > > props.put("auto.offset.reset", "earliest"); > > props.put("auto.commit.interval.ms", "500"); > > props.put("session.timeout.ms", "120000"); > > props.put("consumer.timeout.ms", "10000"); > > props.put("zookeeper.session.timeout.ms", "120000"); > > props.put("zookeeper.connection.timeout.ms", "60000"); > > props.put("offsets.storage","kafka"); > > props.put("request.timeout.ms", "150000"); > > props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net > : > > 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616, > > psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3 > > -ab.sys.comcast.net:61617"); > > props.put("key.deserializer", "org.apache.kafka.common. > > serialization.StringDeserializer"); > > props.put("value.deserializer", > > "com.comcast.ps.kafka.object.CustomMessageDeSer"); > > > > Is it because of consumer is not able to get the broker metadata if it > > is trying to connect other datacenter's zookeeper server? I tried with > > to increate the zookeeper session timeout and connection time out but no > luck. > > > > Please help on this. > > Thanks > > Achintya > > > > > > -----Original Message----- > > From: Jason Gustafson [mailto:jason@confluent.io] > > Sent: Wednesday, August 31, 2016 4:05 PM > > To: users@kafka.apache.org > > Cc: dev@kafka.apache.org > > Subject: Re: Kafka consumers unable to process message > > > > Hi Achintya, > > > > We have a JIRA for this problem: https://issues. > > apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise > > an exception in this case or do you just want to keep it from blocking > > indefinitely? If the latter, you could escape the poll from another > > thread using wakeup(). > > > > Thanks, > > Jason > > > > On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) < > > Achintya_Ghosh@comcast.com> wrote: > > > > > Hi there, > > > > > > Kafka consumer gets stuck at consumer.poll() method if my current > > > datacenter is down and replicated messages are in remote datacenter. > > > > > > How to solve that issue? > > > > > > Thanks > > > Achintya > > > > > > --94eb2c05782ea296bf053b695b85--