kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jian Lin (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-5432) producer and consumer SocketTimeoutException
Date Tue, 13 Jun 2017 05:10:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-5432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Jian Lin updated KAFKA-5432:
----------------------------
    Description: 
Hey all, I met a strange problem, hope someone can help me.
The program ran normally for a week, and I did not do any changes, but today it reported a
mistake suddenly

Producer error log:
```
2017-06-12 10:46:01[qtp958382397-80:591423838]-[WARN] Failed to send producer request with
correlation id 234645 to broker 176 with data for partitions [sms,3]
java.net.SocketTimeoutException
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
	at kafka.utils.Utils$.read(Utils.scala:380)
	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
	at scala.collection.Iterator$class.foreach(Iterator.scala:772)
	at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
	at kafka.producer.Producer.send(Producer.scala:77)
	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
```
Consumer error log:
```
2017-06-12 10:52:52[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:603234738]-[WARN]
Fetching topic metadata with correlation id 7 for topics [Set(sms)] from broker [id:176,host:10.17.24.176,port:9092]
failed
java.net.SocketTimeoutException
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
	at kafka.utils.Utils$.read(Utils.scala:380)
	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
	at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
```

I check the kafka server.log, it just print one message *[2017-06-12 10:45:51,425] INFO Rolled
new log segment for 'sms-3' in 2 ms. (kafka.log.Log)*



----

The kafka version that i used is *2.10-0.10.2.0*, and on client side is *kafka_2.9.2-0.8.2.1.jar*.

attach file is my kafka config, my kafka cluster has three broker 175,176,177.

----update at 2017-06-13
I found the broker 176  has too many TCP CLOSE_WAIT
```
Active Internet connections (w/o servers)
Proto Recv-Q Send-Q Local Address               Foreign Address             State      
tcp        0      0 10.17.24.176:ssh            10.10.52.171:54275          ESTABLISHED 
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:54189    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:48538    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:55449    CLOSE_WAIT  
tcp        0      0 ::ffff:10.17.24.176:45856   ::ffff:10.17.24.17:eforward ESTABLISHED 
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:7817     CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:26435    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:46790    CLOSE_WAIT  
tcp       36      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:54199    CLOSE_WAIT  
tcp       36      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:42725    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:50994    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:37867    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:12582    CLOSE_WAIT  
tcp       36      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:23577    CLOSE_WAIT  
tcp        0      0 ::ffff:10.17.24.17:eforward ::ffff:10.17.24.175:32890   ESTABLISHED 
tcp       36      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:41288    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:25335    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:17124    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:41103    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:58923    CLOSE_WAIT  
```
TIME_WAIT 1
CLOSE_WAIT 389
ESTABLISHED 8

The 10.10.78.64 is my consumer's  ip.
In addition,at this server,  I run `bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
-zookeeper zk -topic topic -group group every minute, to monitor the topic offset value.

I do not know whether the reason for the timeout is related to this.


And when I restart kafka cluster, it recover nomal, the TCP CLOSE_WAIT are disappear.And I
can produce and consume normally.


  was:
Hey all, I met a strange problem, hope someone can help me.
The program ran normally for a week, and I did not do any changes, but today it reported a
mistake suddenly

Producer error log:
```
2017-06-12 10:46:01[qtp958382397-80:591423838]-[WARN] Failed to send producer request with
correlation id 234645 to broker 176 with data for partitions [sms,3]
java.net.SocketTimeoutException
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
	at kafka.utils.Utils$.read(Utils.scala:380)
	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
	at scala.collection.Iterator$class.foreach(Iterator.scala:772)
	at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
	at kafka.producer.Producer.send(Producer.scala:77)
	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
```
Consumer error log:
```
2017-06-12 10:52:52[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:603234738]-[WARN]
Fetching topic metadata with correlation id 7 for topics [Set(sms)] from broker [id:176,host:10.17.24.176,port:9092]
failed
java.net.SocketTimeoutException
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
	at kafka.utils.Utils$.read(Utils.scala:380)
	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
	at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
```

I check the kafka server.log, it just print one message *[2017-06-12 10:45:51,425] INFO Rolled
new log segment for 'sms-3' in 2 ms. (kafka.log.Log)*



----

The kafka version that i used is *2.10-0.10.2.0*, and on client side is *kafka_2.9.2-0.8.2.1.jar*.

attach file is my kafka config, my kafka cluster has three broker 175,176,177.

----update at 2017-06-13
I found the broker 176  has too many TCP CLOSE_WAIT
```
Active Internet connections (w/o servers)
Proto Recv-Q Send-Q Local Address               Foreign Address             State      
tcp        0      0 10.17.24.176:ssh            10.10.52.171:54275          ESTABLISHED 
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:54189    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:48538    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:55449    CLOSE_WAIT  
tcp        0      0 ::ffff:10.17.24.176:45856   ::ffff:10.17.24.17:eforward ESTABLISHED 
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:7817     CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:26435    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:46790    CLOSE_WAIT  
tcp       36      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:54199    CLOSE_WAIT  
tcp       36      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:42725    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:50994    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:37867    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:12582    CLOSE_WAIT  
tcp       36      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:23577    CLOSE_WAIT  
tcp        0      0 ::ffff:10.17.24.17:eforward ::ffff:10.17.24.175:32890   ESTABLISHED 
tcp       36      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:41288    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:25335    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:17124    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:41103    CLOSE_WAIT  
tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:58923    CLOSE_WAIT  
```
TIME_WAIT 1
CLOSE_WAIT 389
ESTABLISHED 8

The 10.10.78.64 is my consumer's  ip.
In addition,at this server,  I run `${kafkaBashPath}/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
-zookeeper zk -topic topic -group group every minute, to monitor the topic offset value.

I do not know whether the reason for the timeout is related to this.


And when I restart kafka cluster, it recover nomal, the TCP CLOSE_WAIT are disappear.And I
can produce and consume normally.



> producer and consumer  SocketTimeoutException
> ---------------------------------------------
>
>                 Key: KAFKA-5432
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5432
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect, network
>    Affects Versions: 0.10.2.0
>         Environment: os:Red Hat 4.4.7-17
>            Reporter: Jian Lin
>         Attachments: server.properties
>
>
> Hey all, I met a strange problem, hope someone can help me.
> The program ran normally for a week, and I did not do any changes, but today it reported
a mistake suddenly
> Producer error log:
> ```
> 2017-06-12 10:46:01[qtp958382397-80:591423838]-[WARN] Failed to send producer request
with correlation id 234645 to broker 176 with data for partitions [sms,3]
> java.net.SocketTimeoutException
> 	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
> 	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> 	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> 	at kafka.utils.Utils$.read(Utils.scala:380)
> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> 	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> 	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> 	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> 	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> 	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
> 	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> 	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> 	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
> 	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> 	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
> 	at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
> 	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> 	at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> 	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> 	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> 	at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> 	at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
> 	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> 	at kafka.producer.Producer.send(Producer.scala:77)
> 	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> ```
> Consumer error log:
> ```
> 2017-06-12 10:52:52[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:603234738]-[WARN]
Fetching topic metadata with correlation id 7 for topics [Set(sms)] from broker [id:176,host:10.17.24.176,port:9092]
failed
> java.net.SocketTimeoutException
> 	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
> 	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
> 	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
> 	at kafka.utils.Utils$.read(Utils.scala:380)
> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> 	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> 	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> 	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> 	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> 	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> 	at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> 	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> 	at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> 	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> ```
> I check the kafka server.log, it just print one message *[2017-06-12 10:45:51,425] INFO
Rolled new log segment for 'sms-3' in 2 ms. (kafka.log.Log)*
> ----
> The kafka version that i used is *2.10-0.10.2.0*, and on client side is *kafka_2.9.2-0.8.2.1.jar*.
> attach file is my kafka config, my kafka cluster has three broker 175,176,177.
> ----update at 2017-06-13
> I found the broker 176  has too many TCP CLOSE_WAIT
> ```
> Active Internet connections (w/o servers)
> Proto Recv-Q Send-Q Local Address               Foreign Address             State   
  
> tcp        0      0 10.17.24.176:ssh            10.10.52.171:54275          ESTABLISHED

> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:54189    CLOSE_WAIT
 
> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:48538    CLOSE_WAIT
 
> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:55449    CLOSE_WAIT
 
> tcp        0      0 ::ffff:10.17.24.176:45856   ::ffff:10.17.24.17:eforward ESTABLISHED

> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:7817     CLOSE_WAIT
 
> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:26435    CLOSE_WAIT
 
> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:46790    CLOSE_WAIT
 
> tcp       36      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:54199    CLOSE_WAIT
 
> tcp       36      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:42725    CLOSE_WAIT
 
> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:50994    CLOSE_WAIT
 
> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:37867    CLOSE_WAIT
 
> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:12582    CLOSE_WAIT
 
> tcp       36      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:23577    CLOSE_WAIT
 
> tcp        0      0 ::ffff:10.17.24.17:eforward ::ffff:10.17.24.175:32890   ESTABLISHED

> tcp       36      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:41288    CLOSE_WAIT
 
> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:25335    CLOSE_WAIT
 
> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:17124    CLOSE_WAIT
 
> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:41103    CLOSE_WAIT
 
> tcp       48      0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:58923    CLOSE_WAIT
 
> ```
> TIME_WAIT 1
> CLOSE_WAIT 389
> ESTABLISHED 8
> The 10.10.78.64 is my consumer's  ip.
> In addition,at this server,  I run `bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
-zookeeper zk -topic topic -group group every minute, to monitor the topic offset value.
> I do not know whether the reason for the timeout is related to this.
> And when I restart kafka cluster, it recover nomal, the TCP CLOSE_WAIT are disappear.And
I can produce and consume normally.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message