Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E205311A01 for ; Tue, 5 Aug 2014 22:59:59 +0000 (UTC) Received: (qmail 19105 invoked by uid 500); 5 Aug 2014 22:59:59 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 19036 invoked by uid 500); 5 Aug 2014 22:59:59 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 18654 invoked by uid 99); 5 Aug 2014 22:59:59 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Aug 2014 22:59:59 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2E2CE9C095D; Tue, 5 Aug 2014 22:59:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jjkoshy@apache.org To: commits@kafka.apache.org Date: Tue, 05 Aug 2014 23:00:13 -0000 Message-Id: <1acf49afecca494ba1d17afd2867e39c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/37] git commit: KAFKA-1512 Add per-ip connection limits. KAFKA-1512 Add per-ip connection limits. Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e444a35 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e444a35 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e444a35 Branch: refs/heads/transactional_messaging Commit: 8e444a3562d6723b9f33cbdaa6a461409c84c98b Parents: b428d8c Author: Jay Kreps Authored: Mon Jun 30 21:28:04 2014 -0700 Committer: Jay Kreps Committed: Wed Jul 16 09:53:15 2014 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/network/SocketServer.scala | 146 +++++++++++++------ .../main/scala/kafka/server/KafkaConfig.scala | 6 + .../main/scala/kafka/server/KafkaServer.scala | 3 +- .../unit/kafka/network/SocketServerTest.scala | 20 ++- 4 files changed, 129 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8e444a35/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 4976d9c..8e99de0 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -23,6 +23,8 @@ import java.net._ import java.io._ import java.nio.channels._ +import scala.collection._ + import kafka.common.KafkaException import kafka.metrics.KafkaMetricsGroup import kafka.utils._ @@ -41,7 +43,9 @@ class SocketServer(val brokerId: Int, val maxQueuedRequests: Int, val sendBufferSize: Int, val recvBufferSize: Int, - val maxRequestSize: Int = Int.MaxValue) extends Logging with KafkaMetricsGroup { + val maxRequestSize: Int = Int.MaxValue, + val maxConnectionsPerIp: Int = Int.MaxValue, + val maxConnectionsPerIpOverrides: Map[String, Int] = Map[String, Int]()) extends Logging with KafkaMetricsGroup { this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) @@ -55,17 +59,23 @@ class SocketServer(val brokerId: Int, * Start the socket server */ def startup() { + val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) for(i <- 0 until numProcessorThreads) { - processors(i) = new Processor(i, time, maxRequestSize, aggregateIdleMeter, - newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS), - numProcessorThreads, requestChannel) + processors(i) = new Processor(i, + time, + maxRequestSize, + aggregateIdleMeter, + newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS), + numProcessorThreads, + requestChannel, + quotas) Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() } // register the processor threads for notification of responses requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) // start accepting connections - this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize) + this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) Utils.newThread("kafka-socket-acceptor", acceptor, false).start() acceptor.awaitStartup info("Started") @@ -87,7 +97,7 @@ class SocketServer(val brokerId: Int, /** * A base class with some helper variables and methods */ -private[kafka] abstract class AbstractServerThread extends Runnable with Logging { +private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging { protected val selector = Selector.open(); private val startupLatch = new CountDownLatch(1) @@ -131,13 +141,48 @@ private[kafka] abstract class AbstractServerThread extends Runnable with Logging */ def wakeup() = selector.wakeup() + /** + * Close the given key and associated socket + */ + def close(key: SelectionKey) { + if(key != null) { + key.attach(null) + close(key.channel.asInstanceOf[SocketChannel]) + swallowError(key.cancel()) + } + } + + def close(channel: SocketChannel) { + if(channel != null) { + debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) + connectionQuotas.dec(channel.socket.getInetAddress) + swallowError(channel.socket().close()) + swallowError(channel.close()) + } + } + + /** + * Close all open connections + */ + def closeAll() { + val iter = this.selector.keys().iterator() + while (iter.hasNext) { + val key = iter.next() + close(key) + } + } + } /** * Thread that accepts and configures new connections. There is only need for one of these */ -private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor], - val sendBufferSize: Int, val recvBufferSize: Int) extends AbstractServerThread { +private[kafka] class Acceptor(val host: String, + val port: Int, + private val processors: Array[Processor], + val sendBufferSize: Int, + val recvBufferSize: Int, + connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { val serverChannel = openServerSocket(host, port) /** @@ -158,14 +203,14 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce key = iter.next iter.remove() if(key.isAcceptable) - accept(key, processors(currentProcessor)) - else - throw new IllegalStateException("Unrecognized key state for acceptor thread.") + accept(key, processors(currentProcessor)) + else + throw new IllegalStateException("Unrecognized key state for acceptor thread.") - // round robin to the next processor thread - currentProcessor = (currentProcessor + 1) % processors.length + // round robin to the next processor thread + currentProcessor = (currentProcessor + 1) % processors.length } catch { - case e: Throwable => error("Error in acceptor", e) + case e: Throwable => error("Error while accepting connection", e) } } } @@ -187,6 +232,7 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce new InetSocketAddress(host, port) val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) + serverChannel.socket().setReceiveBufferSize(recvBufferSize) try { serverChannel.socket.bind(socketAddress) info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port)) @@ -202,19 +248,24 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce */ def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] - serverSocketChannel.socket().setReceiveBufferSize(recvBufferSize) - val socketChannel = serverSocketChannel.accept() - socketChannel.configureBlocking(false) - socketChannel.socket().setTcpNoDelay(true) - socketChannel.socket().setSendBufferSize(sendBufferSize) + try { + connectionQuotas.inc(socketChannel.socket().getInetAddress) + socketChannel.configureBlocking(false) + socketChannel.socket().setTcpNoDelay(true) + socketChannel.socket().setSendBufferSize(sendBufferSize) - debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" - .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress, + debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" + .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress, socketChannel.socket.getSendBufferSize, sendBufferSize, socketChannel.socket.getReceiveBufferSize, recvBufferSize)) - processor.accept(socketChannel) + processor.accept(socketChannel) + } catch { + case e: TooManyConnectionsException => + info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count)) + close(socketChannel) + } } } @@ -229,7 +280,8 @@ private[kafka] class Processor(val id: Int, val aggregateIdleMeter: Meter, val idleMeter: Meter, val totalProcessorThreads: Int, - val requestChannel: RequestChannel) extends AbstractServerThread { + val requestChannel: RequestChannel, + connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) { private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() @@ -324,26 +376,6 @@ private[kafka] class Processor(val id: Int, } } } - - private def close(key: SelectionKey) { - val channel = key.channel.asInstanceOf[SocketChannel] - debug("Closing connection from " + channel.socket.getRemoteSocketAddress()) - swallowError(channel.socket().close()) - swallowError(channel.close()) - key.attach(null) - swallowError(key.cancel()) - } - - /* - * Close all open connections - */ - private def closeAll() { - val iter = this.selector.keys().iterator() - while (iter.hasNext) { - val key = iter.next() - close(key) - } - } /** * Queue up a new connection for reading @@ -419,3 +451,31 @@ private[kafka] class Processor(val id: Int, private def channelFor(key: SelectionKey) = key.channel().asInstanceOf[SocketChannel] } + +class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { + private val overrides = overrideQuotas.map(entry => (InetAddress.getByName(entry._1), entry._2)) + private val counts = mutable.Map[InetAddress, Int]() + + def inc(addr: InetAddress) { + counts synchronized { + val count = counts.getOrElse(addr, 0) + counts.put(addr, count + 1) + val max = overrides.getOrElse(addr, defaultMax) + if(count >= max) + throw new TooManyConnectionsException(addr, max) + } + } + + def dec(addr: InetAddress) { + counts synchronized { + val count = counts.get(addr).get + if(count == 1) + counts.remove(addr) + else + counts.put(addr, count - 1) + } + } + +} + +class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException("Too many connections from %s (maximum = %d)".format(ip, count)) http://git-wip-us.apache.org/repos/asf/kafka/blob/8e444a35/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index bb2e654..50b09ed 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -106,6 +106,12 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the maximum number of bytes in a socket request */ val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue)) + + /* the maximum number of connections we allow from each ip address */ + val maxConnectionsPerIp: Int = props.getIntInRange("max.connections.per.ip", Int.MaxValue, (1, Int.MaxValue)) + + /* per-ip or hostname overrides to the default maximum number of connections */ + val maxConnectionsPerIpOverrides = props.getMap("max.connections.per.ip.overrides").map(entry => (entry._1, entry._2.toInt)) /*********** Log Configuration ***********/ http://git-wip-us.apache.org/repos/asf/kafka/blob/8e444a35/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5a56f57..def1dc2 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -91,7 +91,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.queuedMaxRequests, config.socketSendBufferBytes, config.socketReceiveBufferBytes, - config.socketRequestMaxBytes) + config.socketRequestMaxBytes, + config.maxConnectionsPerIp) socketServer.startup() replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) http://git-wip-us.apache.org/repos/asf/kafka/blob/8e444a35/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 1c492de..3b83a86 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -40,7 +40,8 @@ class SocketServerTest extends JUnitSuite { maxQueuedRequests = 50, sendBufferSize = 300000, recvBufferSize = 300000, - maxRequestSize = 50) + maxRequestSize = 50, + maxConnectionsPerIp = 5) server.startup() def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { @@ -75,7 +76,7 @@ class SocketServerTest extends JUnitSuite { def cleanup() { server.shutdown() } - + @Test def simpleRequest() { val socket = connect() @@ -139,4 +140,19 @@ class SocketServerTest extends JUnitSuite { // doing a subsequent send should throw an exception as the connection should be closed. sendRequest(socket, 0, bytes) } + + @Test + def testMaxConnectionsPerIp() { + // make the maximum allowable number of connections and then leak them + val conns = (0 until server.maxConnectionsPerIp).map(i => connect()) + + // now try one more (should fail) + try { + val conn = connect() + sendRequest(conn, 100, "hello".getBytes) + assertEquals(-1, conn.getInputStream().read()) + } catch { + case e: IOException => // this is good + } + } }