kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Manikumar (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (KAFKA-1138) Remote producer uses the hostname defined in broker
Date Thu, 31 Aug 2017 18:30:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Manikumar resolved KAFKA-1138.
------------------------------
    Resolution: Fixed

> Remote producer uses the hostname defined in broker
> ---------------------------------------------------
>
>                 Key: KAFKA-1138
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1138
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.0
>            Reporter: Hyun-Gul Roh
>            Assignee: Jun Rao
>
> When the producer API in the node which is not the broker sends message to a broker,
only TopicMetadataRequest is sent, but ProducerRequest is not by observing the log of "kafka-request.log"
> According to my analysis, when the producer api sends ProducerRequest, it seems to use
the hostname defined in the broker. So, if the hostname is not the one registered in DNS,
the producer cannot send the ProducerRequest. 
> I am attaching the log:
> [2013-11-21 15:28:49,464] ERROR Failed to collate messages by topic, partition due to:
fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)]
failed (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:49,465] INFO Back off for 100 ms before retrying send. Remaining retries
= 1 (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:49,566] INFO Fetching metadata from broker id:0,host:111.111.111.111,port:9092
with correlation id 6 for 1 topic(s) Set(test) (kafka.client.ClientUtils$)
> [2013-11-21 15:28:49,819] ERROR Producer connection to 111.111.111.111:9092 unsuccessful
(kafka.producer.SyncProducer)
> java.net.ConnectException: 연결이 거부됨
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> 	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> 	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> 	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:49,821] WARN Fetching topic metadata with correlation id 6 for topics
[Set(test)] from broker [id:0,host:111.111.111.111,port:9092] failed (kafka.client.ClientUtils$)
> java.net.ConnectException: 연결이 거부됨
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> 	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> 	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> 	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:49,822] ERROR fetching topic metadata for topics [Set(test)] from broker
[ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed (kafka.utils.Utils$)
> kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker
[ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> 	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> 	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> 	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> Caused by: java.net.ConnectException: 연결이 거부됨
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	... 12 more
> [2013-11-21 15:28:49,825] INFO Fetching metadata from broker id:0,host:111.111.111.111,port:9092
with correlation id 7 for 1 topic(s) Set(test) (kafka.client.ClientUtils$)
> [2013-11-21 15:28:50,021] ERROR Producer connection to 111.111.111.111:9092 unsuccessful
(kafka.producer.SyncProducer)
> java.net.ConnectException: 연결이 거부됨
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> 	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> 	at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> 	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> 	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> 	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> 	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> 	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:50,024] WARN Fetching topic metadata with correlation id 7 for topics
[Set(test)] from broker [id:0,host:111.111.111.111,port:9092] failed (kafka.client.ClientUtils$)
> java.net.ConnectException: 연결이 거부됨
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> 	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> 	at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> 	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> 	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> 	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> 	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> 	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:50,025] ERROR Failed to collate messages by topic, partition due to:
fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)]
failed (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:50,026] INFO Back off for 100 ms before retrying send. Remaining retries
= 0 (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:50,127] INFO Fetching metadata from broker id:0,host:111.111.111.111,port:9092
with correlation id 8 for 1 topic(s) Set(test) (kafka.client.ClientUtils$)
> [2013-11-21 15:28:50,324] ERROR Producer connection to 111.111.111.111:9092 unsuccessful
(kafka.producer.SyncProducer)
> java.net.ConnectException: 연결이 거부됨
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> 	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> 	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> 	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:50,326] WARN Fetching topic metadata with correlation id 8 for topics
[Set(test)] from broker [id:0,host:111.111.111.111,port:9092] failed (kafka.client.ClientUtils$)
> java.net.ConnectException: 연결이 거부됨
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> 	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> 	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> 	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:50,328] ERROR fetching topic metadata for topics [Set(test)] from broker
[ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed (kafka.utils.Utils$)
> kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker
[ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> 	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> 	at kafka.utils.Utils$.swallow(Utils.scala:186)
> 	at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> 	at kafka.utils.Utils$.swallowError(Utils.scala:45)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> 	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> 	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> 	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> 	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> Caused by: java.net.ConnectException: 연결이 거부됨
> 	at sun.nio.ch.Net.connect0(Native Method)
> 	at sun.nio.ch.Net.connect(Net.java:465)
> 	at sun.nio.ch.Net.connect(Net.java:457)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> 	at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> 	at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> 	at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> 	... 12 more
> [2013-11-21 15:28:50,332] ERROR Failed to send requests for topics test with correlation
ids in [0,8] (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:50,333] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> 	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> 	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> 	at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> 	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> 	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>        



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message