kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject [16/37] git commit: KAFKA-1512 Add per-ip connection limits.
Date Tue, 05 Aug 2014 23:00:13 GMT
KAFKA-1512 Add per-ip connection limits.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e444a35
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e444a35
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e444a35

Branch: refs/heads/transactional_messaging
Commit: 8e444a3562d6723b9f33cbdaa6a461409c84c98b
Parents: b428d8c
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Mon Jun 30 21:28:04 2014 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Wed Jul 16 09:53:15 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/network/SocketServer.scala | 146 +++++++++++++------
 .../main/scala/kafka/server/KafkaConfig.scala   |   6 +
 .../main/scala/kafka/server/KafkaServer.scala   |   3 +-
 .../unit/kafka/network/SocketServerTest.scala   |  20 ++-
 4 files changed, 129 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8e444a35/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 4976d9c..8e99de0 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -23,6 +23,8 @@ import java.net._
 import java.io._
 import java.nio.channels._
 
+import scala.collection._
+
 import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
@@ -41,7 +43,9 @@ class SocketServer(val brokerId: Int,
                    val maxQueuedRequests: Int,
                    val sendBufferSize: Int,
                    val recvBufferSize: Int,
-                   val maxRequestSize: Int = Int.MaxValue) extends Logging with KafkaMetricsGroup
{
+                   val maxRequestSize: Int = Int.MaxValue,
+                   val maxConnectionsPerIp: Int = Int.MaxValue,
+                   val maxConnectionsPerIpOverrides: Map[String, Int] = Map[String, Int]())
extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Socket Server on Broker " + brokerId + "], "
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)
@@ -55,17 +59,23 @@ class SocketServer(val brokerId: Int,
    * Start the socket server
    */
   def startup() {
+    val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
     for(i <- 0 until numProcessorThreads) {
-      processors(i) = new Processor(i, time, maxRequestSize, aggregateIdleMeter,
-        newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS),
-        numProcessorThreads, requestChannel)
+      processors(i) = new Processor(i, 
+                                    time, 
+                                    maxRequestSize, 
+                                    aggregateIdleMeter,
+                                    newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent",
TimeUnit.NANOSECONDS),
+                                    numProcessorThreads, 
+                                    requestChannel,
+                                    quotas)
       Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
     }
     // register the processor threads for notification of responses
     requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
    
     // start accepting connections
-    this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize)
+    this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize,
quotas)
     Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
     acceptor.awaitStartup
     info("Started")
@@ -87,7 +97,7 @@ class SocketServer(val brokerId: Int,
 /**
  * A base class with some helper variables and methods
  */
-private[kafka] abstract class AbstractServerThread extends Runnable with Logging {
+private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends
Runnable with Logging {
 
   protected val selector = Selector.open();
   private val startupLatch = new CountDownLatch(1)
@@ -131,13 +141,48 @@ private[kafka] abstract class AbstractServerThread extends Runnable
with Logging
    */
   def wakeup() = selector.wakeup()
   
+  /**
+   * Close the given key and associated socket
+   */
+  def close(key: SelectionKey) {
+    if(key != null) {
+      key.attach(null)
+      close(key.channel.asInstanceOf[SocketChannel])
+      swallowError(key.cancel())
+    }
+  }
+  
+  def close(channel: SocketChannel) {
+    if(channel != null) {
+      debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
+      connectionQuotas.dec(channel.socket.getInetAddress)
+      swallowError(channel.socket().close())
+      swallowError(channel.close())
+    }
+  }
+  
+  /**
+   * Close all open connections
+   */
+  def closeAll() {
+    val iter = this.selector.keys().iterator()
+    while (iter.hasNext) {
+      val key = iter.next()
+      close(key)
+    }
+  }
+  
 }
 
 /**
  * Thread that accepts and configures new connections. There is only need for one of these
  */
-private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor],
-                              val sendBufferSize: Int, val recvBufferSize: Int) extends AbstractServerThread
{
+private[kafka] class Acceptor(val host: String, 
+                              val port: Int, 
+                              private val processors: Array[Processor],
+                              val sendBufferSize: Int, 
+                              val recvBufferSize: Int,
+                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas)
{
   val serverChannel = openServerSocket(host, port)
 
   /**
@@ -158,14 +203,14 @@ private[kafka] class Acceptor(val host: String, val port: Int, private
val proce
             key = iter.next
             iter.remove()
             if(key.isAcceptable)
-                accept(key, processors(currentProcessor))
-              else
-                throw new IllegalStateException("Unrecognized key state for acceptor thread.")
+               accept(key, processors(currentProcessor))
+            else
+               throw new IllegalStateException("Unrecognized key state for acceptor thread.")
 
-              // round robin to the next processor thread
-              currentProcessor = (currentProcessor + 1) % processors.length
+            // round robin to the next processor thread
+            currentProcessor = (currentProcessor + 1) % processors.length
           } catch {
-            case e: Throwable => error("Error in acceptor", e)
+            case e: Throwable => error("Error while accepting connection", e)
           }
         }
       }
@@ -187,6 +232,7 @@ private[kafka] class Acceptor(val host: String, val port: Int, private
val proce
         new InetSocketAddress(host, port)
     val serverChannel = ServerSocketChannel.open()
     serverChannel.configureBlocking(false)
+    serverChannel.socket().setReceiveBufferSize(recvBufferSize)
     try {
       serverChannel.socket.bind(socketAddress)
       info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port))
@@ -202,19 +248,24 @@ private[kafka] class Acceptor(val host: String, val port: Int, private
val proce
    */
   def accept(key: SelectionKey, processor: Processor) {
     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
-    serverSocketChannel.socket().setReceiveBufferSize(recvBufferSize)
-
     val socketChannel = serverSocketChannel.accept()
-    socketChannel.configureBlocking(false)
-    socketChannel.socket().setTcpNoDelay(true)
-    socketChannel.socket().setSendBufferSize(sendBufferSize)
+    try {
+      connectionQuotas.inc(socketChannel.socket().getInetAddress)
+      socketChannel.configureBlocking(false)
+      socketChannel.socket().setTcpNoDelay(true)
+      socketChannel.socket().setSendBufferSize(sendBufferSize)
 
-    debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d]
recvBufferSize [actual|requested]: [%d|%d]"
-          .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress,
+      debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d]
recvBufferSize [actual|requested]: [%d|%d]"
+            .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress,
                   socketChannel.socket.getSendBufferSize, sendBufferSize,
                   socketChannel.socket.getReceiveBufferSize, recvBufferSize))
 
-    processor.accept(socketChannel)
+      processor.accept(socketChannel)
+    } catch {
+      case e: TooManyConnectionsException =>
+        info("Rejected connection from %s, address already has the configured maximum of
%d connections.".format(e.ip, e.count))
+        close(socketChannel)
+    }
   }
 
 }
@@ -229,7 +280,8 @@ private[kafka] class Processor(val id: Int,
                                val aggregateIdleMeter: Meter,
                                val idleMeter: Meter,
                                val totalProcessorThreads: Int,
-                               val requestChannel: RequestChannel) extends AbstractServerThread
{
+                               val requestChannel: RequestChannel,
+                               connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas)
{
   
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
 
@@ -324,26 +376,6 @@ private[kafka] class Processor(val id: Int,
       }
     }
   }
-  
-  private def close(key: SelectionKey) {
-    val channel = key.channel.asInstanceOf[SocketChannel]
-    debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
-    swallowError(channel.socket().close())
-    swallowError(channel.close())
-    key.attach(null)
-    swallowError(key.cancel())
-  }
-  
-  /*
-   * Close all open connections
-   */
-  private def closeAll() {
-    val iter = this.selector.keys().iterator()
-    while (iter.hasNext) {
-      val key = iter.next()
-      close(key)
-    }
-  }
 
   /**
    * Queue up a new connection for reading
@@ -419,3 +451,31 @@ private[kafka] class Processor(val id: Int,
   private def channelFor(key: SelectionKey) = key.channel().asInstanceOf[SocketChannel]
 
 }
+
+class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
+  private val overrides = overrideQuotas.map(entry => (InetAddress.getByName(entry._1),
entry._2))
+  private val counts = mutable.Map[InetAddress, Int]()
+  
+  def inc(addr: InetAddress) {
+    counts synchronized {
+      val count = counts.getOrElse(addr, 0)
+      counts.put(addr, count + 1)
+      val max = overrides.getOrElse(addr, defaultMax)
+      if(count >= max)
+        throw new TooManyConnectionsException(addr, max)
+    }
+  }
+  
+  def dec(addr: InetAddress) {
+    counts synchronized {
+      val count = counts.get(addr).get
+      if(count == 1)
+        counts.remove(addr)
+      else
+        counts.put(addr, count - 1)
+    }
+  }
+  
+}
+
+class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException("Too
many connections from %s (maximum = %d)".format(ip, count))

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e444a35/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index bb2e654..50b09ed 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -106,6 +106,12 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
 
   /* the maximum number of bytes in a socket request */
   val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024,
(1, Int.MaxValue))
+  
+  /* the maximum number of connections we allow from each ip address */
+  val maxConnectionsPerIp: Int = props.getIntInRange("max.connections.per.ip", Int.MaxValue,
(1, Int.MaxValue))
+  
+  /* per-ip or hostname overrides to the default maximum number of connections */
+  val maxConnectionsPerIpOverrides = props.getMap("max.connections.per.ip.overrides").map(entry
=> (entry._1, entry._2.toInt))
 
   /*********** Log Configuration ***********/
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e444a35/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5a56f57..def1dc2 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -91,7 +91,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends
Logg
                                     config.queuedMaxRequests,
                                     config.socketSendBufferBytes,
                                     config.socketReceiveBufferBytes,
-                                    config.socketRequestMaxBytes)
+                                    config.socketRequestMaxBytes,
+                                    config.maxConnectionsPerIp)
     socketServer.startup()
 
     replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager,
isShuttingDown)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e444a35/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 1c492de..3b83a86 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -40,7 +40,8 @@ class SocketServerTest extends JUnitSuite {
                                               maxQueuedRequests = 50,
                                               sendBufferSize = 300000,
                                               recvBufferSize = 300000,
-                                              maxRequestSize = 50)
+                                              maxRequestSize = 50,
+                                              maxConnectionsPerIp = 5)
   server.startup()
 
   def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
@@ -75,7 +76,7 @@ class SocketServerTest extends JUnitSuite {
   def cleanup() {
     server.shutdown()
   }
-
+  
   @Test
   def simpleRequest() {
     val socket = connect()
@@ -139,4 +140,19 @@ class SocketServerTest extends JUnitSuite {
     // doing a subsequent send should throw an exception as the connection should be closed.
     sendRequest(socket, 0, bytes)
   }
+  
+  @Test
+  def testMaxConnectionsPerIp() {
+    // make the maximum allowable number of connections and then leak them
+    val conns = (0 until server.maxConnectionsPerIp).map(i => connect())
+    
+    // now try one more (should fail)
+    try {
+      val conn = connect()
+      sendRequest(conn, 100, "hello".getBytes)
+      assertEquals(-1, conn.getInputStream().read())
+    } catch {
+      case e: IOException => // this is good
+    }
+  }
 }


Mime
View raw message