Return-Path: X-Original-To: apmail-incubator-kafka-dev-archive@minotaur.apache.org Delivered-To: apmail-incubator-kafka-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8F00CD5A0 for ; Tue, 7 Aug 2012 22:46:21 +0000 (UTC) Received: (qmail 18416 invoked by uid 500); 7 Aug 2012 22:46:21 -0000 Delivered-To: apmail-incubator-kafka-dev-archive@incubator.apache.org Received: (qmail 18333 invoked by uid 500); 7 Aug 2012 22:46:21 -0000 Mailing-List: contact kafka-dev-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: kafka-dev@incubator.apache.org Delivered-To: mailing list kafka-dev@incubator.apache.org Received: (qmail 18323 invoked by uid 99); 7 Aug 2012 22:46:21 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Aug 2012 22:46:21 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of l.alberton@gmail.com designates 209.85.213.175 as permitted sender) Received: from [209.85.213.175] (HELO mail-yx0-f175.google.com) (209.85.213.175) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Aug 2012 22:46:13 +0000 Received: by yenm1 with SMTP id m1so162905yen.6 for ; Tue, 07 Aug 2012 15:45:52 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:date:message-id:subject:from:to:content-type; bh=IKE4hO/Cd9aCkKYiXi07vRTDWqEwE26P+D3VGhQ6K3Y=; b=WbuoWgFTwak9buYRe+AVymNgMmM+RfHH76erX89qLL3K5nUn/lqz/6D7HYYtdtwyoZ jk2JLPCCWIc486016avTqN06ptckD8u/u8DGWCnpBM57eP2zOVzaBzX4+gtCajQ1cr2B AIOSO7fv9Ln4aZ0h0Sncsk2/ZS1LO1BZWiyzXteEz7902e3Yzoltn0CAb1eMKodOv34V rYDHnFIFUeCvS7I9LTlvaxQouebBuz0ag9p/yV4VQ28uhmyhPTHVyatAVa21dhYtHsN6 664g1twdMlQASfsxIvFJXq1U951ks9cKpqistn8GwQLNlFGa4LgrLYnlVKLMux1p8lQ1 XwYQ== MIME-Version: 1.0 Received: by 10.101.180.36 with SMTP id h36mr4582637anp.86.1344379552614; Tue, 07 Aug 2012 15:45:52 -0700 (PDT) Received: by 10.236.50.111 with HTTP; Tue, 7 Aug 2012 15:45:52 -0700 (PDT) Date: Tue, 7 Aug 2012 23:45:52 +0100 Message-ID: Subject: Non-blocking socket when queue has no new data From: Lorenzo Alberton To: kafka-dev@incubator.apache.org Content-Type: multipart/alternative; boundary=001636c927f1ad496704c6b4c1dd --001636c927f1ad496704c6b4c1dd Content-Type: text/plain; charset=ISO-8859-1 Hi, I have a question about non-blocking connections. >From the code I see the request is handled by an acceptor thread that passes it on to a Processor thread. The connection by default is blocking, i.e. the consumer sends the fetch request and then blocks until there's data available. I understand that this is exactly the intended behaviour, although I'm wondering if it's possible to establish a non-blocking connection that immediately returns if there's no data available at the given offset (assuming the offset is valid and points at the end of the queue). We've been load-testing kafka with a few thousand topics and many short-lived consumers that fetch a limited number of messages before closing the connection. We have a socket timeout on the client side to close the socket if there's no data available, but the kafka server doesn't close the socket at its end until new data becomes available and a write() call is attempted. When this happens, we can see the following stack trace in the logs: ============================================== kafka: INFO [kafka.network.Processor] (kafka-processor-7) Closing socket connection to . kafka: ERROR [kafka.network.Processor] (kafka-processor-5) Closing socket for because of error java.io.IOException: Connection reset by peer at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506) at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:102) at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:53) at kafka.network.Processor.write(SocketServer.scala:339) at kafka.network.Processor.run(SocketServer.scala:216) at java.lang.Thread.run(Thread.java:662) ============================================== Apart from the exception in the logs, the file descriptor for the socket is not released until a write() call is attempted, limiting the amount of connections that can be established. I do realise that Kafka was designed to work with long-running processes that maintain a persistent connection, but I see on the mailing-list a lot of interest around having a REST interface in front of it or however consuming data in chunks with short-lived processes, and we're very interested in this scenario as well for one of our use-cases. So my question really is, is there a plan to have a non-blocking connection? In non-blocking mode, the Response could be immediate and could consist in the header alone (int32 for the response length, set to 0, followed by int16 for the error code, which could be set to 0 too or to a new value indicating that the operation would normally block). We can probably contribute this feature if not yet available and others find it useful. Another option (which I don't like as much) could be setting a request timeout (server-side) or a way of closing the connection gracefully from the client side. Thoughts? Best regards, -- Lorenzo Alberton Chief Tech Architect DataSift, Inc. --001636c927f1ad496704c6b4c1dd--