Return-Path: Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: (qmail 97711 invoked from network); 6 Jan 2011 23:34:06 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 6 Jan 2011 23:34:06 -0000 Received: (qmail 60970 invoked by uid 500); 6 Jan 2011 23:34:06 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 60939 invoked by uid 500); 6 Jan 2011 23:34:06 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 60931 invoked by uid 99); 6 Jan 2011 23:34:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Jan 2011 23:34:06 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Jan 2011 23:34:05 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 696B523889B3; Thu, 6 Jan 2011 23:33:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1056121 - /cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Date: Thu, 06 Jan 2011 23:33:45 -0000 To: commits@cassandra.apache.org From: brandonwilliams@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110106233345.696B523889B3@eris.apache.org> Author: brandonwilliams Date: Thu Jan 6 23:33:45 2011 New Revision: 1056121 URL: http://svn.apache.org/viewvc?rev=1056121&view=rev Log: Don't begin buffering a connection until we've determined the type. Patch by Stu Hood, reviewed by gdusbabek for CASSANDRA-1943 Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1056121&r1=1056120&r2=1056121&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Thu Jan 6 23:33:45 2011 @@ -32,40 +32,48 @@ public class IncomingTcpConnection exten { private static Logger logger = Logger.getLogger(IncomingTcpConnection.class); - private final DataInputStream input; private Socket socket; public IncomingTcpConnection(Socket socket) { assert socket != null; this.socket = socket; + } + + /** + * A new connection will either stream or message for its entire lifetime: because streaming + * bypasses the InputStream implementations to use sendFile, we cannot begin buffering until + * we've determined the type of the connection. + */ + @Override + public void run() + { + DataInputStream input; + boolean isStream; try { - input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096)); + // determine the connection type to decide whether to buffer + input = new DataInputStream(socket.getInputStream()); + MessagingService.validateMagic(input.readInt()); + int header = input.readInt(); + isStream = MessagingService.getBits(header, 3, 1) == 1; + if (!isStream) + // we should buffer + input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096)); } catch (IOException e) { close(); throw new IOError(e); } - } - - @Override - public void run() - { while (true) { try { - MessagingService.validateMagic(input.readInt()); - int header = input.readInt(); - int type = MessagingService.getBits(header, 1, 2); - boolean isStream = MessagingService.getBits(header, 3, 1) == 1; - int version = MessagingService.getBits(header, 15, 8); - if (isStream) { new IncomingStreamReader(socket.getChannel()).read(); + break; } else { @@ -76,6 +84,10 @@ public class IncomingTcpConnection exten Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes))); MessagingService.receive(message); } + // prepare to read the next message + MessagingService.validateMagic(input.readInt()); + int header = input.readInt(); + assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream; } catch (EOFException e) {