kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1733; Producer.send will block indeterminately when broker is unavailable; patched by Marc Chung; reviewed by Jun Rao
Date Thu, 13 Nov 2014 20:07:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 383ac3e19 -> 91f8ce7ef


kafka-1733; Producer.send will block indeterminately when broker is unavailable; patched by
Marc Chung; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/91f8ce7e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/91f8ce7e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/91f8ce7e

Branch: refs/heads/0.8.2
Commit: 91f8ce7eff5c62c619454ad9d67415878805f600
Parents: 383ac3e
Author: Marc Chung <mchungWgmail.com>
Authored: Thu Nov 13 12:06:54 2014 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Nov 13 12:06:54 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/network/BlockingChannel.scala | 11 +++++++----
 core/src/main/scala/kafka/producer/SyncProducer.scala   |  2 +-
 2 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/91f8ce7e/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index eb7bb14..6e2a38e 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -42,7 +42,8 @@ class BlockingChannel( val host: String,
   private var readChannel: ReadableByteChannel = null
   private var writeChannel: GatheringByteChannel = null
   private val lock = new Object()
-  
+  private val connectTimeoutMs = readTimeoutMs
+
   def connect() = lock synchronized  {
     if(!connected) {
       try {
@@ -55,19 +56,21 @@ class BlockingChannel( val host: String,
         channel.socket.setSoTimeout(readTimeoutMs)
         channel.socket.setKeepAlive(true)
         channel.socket.setTcpNoDelay(true)
-        channel.connect(new InetSocketAddress(host, port))
+        channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
 
         writeChannel = channel
         readChannel = Channels.newChannel(channel.socket().getInputStream)
         connected = true
         // settings may not match what we requested above
-        val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested
%d), SO_SNDBUF = %d (requested %d)."
+        val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested
%d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d."
         debug(msg.format(channel.socket.getSoTimeout,
                          readTimeoutMs,
                          channel.socket.getReceiveBufferSize, 
                          readBufferSize,
                          channel.socket.getSendBufferSize,
-                         writeBufferSize))
+                         writeBufferSize,
+                         connectTimeoutMs))
+
       } catch {
         case e: Throwable => disconnect()
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/91f8ce7e/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 42c9503..35e9e8c 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -42,7 +42,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
   val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
 
-  trace("Instantiating Scala Sync Producer")
+  trace("Instantiating Scala Sync Producer with properties: %s".format(config.props))
 
   private def verifyRequest(request: RequestOrResponse) = {
     /**


Mime
View raw message