Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 321838C79 for ; Tue, 16 Aug 2011 18:27:33 +0000 (UTC) Received: (qmail 61321 invoked by uid 500); 16 Aug 2011 18:27:32 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 60970 invoked by uid 500); 16 Aug 2011 18:27:32 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 60660 invoked by uid 99); 16 Aug 2011 18:27:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Aug 2011 18:27:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Aug 2011 18:27:28 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DF1732388897 for ; Tue, 16 Aug 2011 18:27:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1158410 - in /cassandra/trunk/src/java/org/apache/cassandra: db/ db/commitlog/ gms/ io/util/ net/ service/ streaming/ thrift/ Date: Tue, 16 Aug 2011 18:27:07 -0000 To: commits@cassandra.apache.org From: brandonwilliams@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110816182708.DF1732388897@eris.apache.org> Author: brandonwilliams Date: Tue Aug 16 18:27:05 2011 New Revision: 1158410 URL: http://svn.apache.org/viewvc?rev=1158410&view=rev Log: Re-introduce FastByteArrayInputStream (and Output equivalent) Patch by Paul Loy, reviewed by brandonwilliams for CASSANDRA-2820 Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/FastByteArrayInputStream.java (with props) cassandra/trunk/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java (with props) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/trunk/src/java/org/apache/cassandra/io/util/OutputBuffer.java cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Tue Aug 16 18:27:05 2011 @@ -18,7 +18,6 @@ package org.apache.cassandra.db; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -38,6 +37,7 @@ import org.apache.cassandra.db.filter.Qu import org.apache.cassandra.db.marshal.AbstractCommutativeType; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.Message; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -200,7 +200,7 @@ public class CounterMutation implements public Message makeMutationMessage(int version) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); serializer().serialize(this, dos, version); return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java Tue Aug 16 18:27:05 2011 @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.service.StorageProxy; @@ -42,7 +43,7 @@ public class CounterMutationVerbHandler public void doVerb(Message message, String id) { byte[] bytes = message.getMessageBody(); - ByteArrayInputStream buffer = new ByteArrayInputStream(bytes); + FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes); try { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java Tue Aug 16 18:27:05 2011 @@ -25,6 +25,7 @@ import java.util.Arrays; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.io.ICompactSerializer2; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageProducer; import org.apache.cassandra.service.StorageService; @@ -75,7 +76,7 @@ public class IndexScanCommand implements public static IndexScanCommand read(Message message) throws IOException { byte[] bytes = message.getMessageBody(); - ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes); return serializer.deserialize(new DataInputStream(bis)); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Tue Aug 16 18:27:05 2011 @@ -36,7 +36,6 @@ package org.apache.cassandra.db; -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -46,6 +45,7 @@ import java.util.Arrays; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.io.ICompactSerializer; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageProducer; import org.apache.cassandra.service.IReadCommand; @@ -112,7 +112,7 @@ public class RangeSliceCommand implement public static RangeSliceCommand read(Message message) throws IOException { byte[] bytes = message.getMessageBody(); - ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes); return serializer.deserialize(new DataInputStream(bis), message.getVersion()); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java Tue Aug 16 18:27:05 2011 @@ -18,7 +18,6 @@ package org.apache.cassandra.db; -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; @@ -28,6 +27,7 @@ import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.Message; import org.apache.cassandra.utils.FBUtilities; @@ -62,7 +62,7 @@ public class RangeSliceReply public static RangeSliceReply read(byte[] body, int version) throws IOException { - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body); DataInputStream dis = new DataInputStream(bufIn); int rowCount = dis.readInt(); List rows = new ArrayList(rowCount); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Tue Aug 16 18:27:05 2011 @@ -18,7 +18,6 @@ package org.apache.cassandra.db; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -29,6 +28,7 @@ import java.util.Map; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageProducer; import org.apache.cassandra.service.IReadCommand; @@ -50,7 +50,7 @@ public abstract class ReadCommand implem public Message getMessage(Integer version) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); ReadCommand.serializer().serialize(this, dos, version); return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.READ, bos.toByteArray(), version); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java Tue Aug 16 18:27:05 2011 @@ -18,11 +18,11 @@ package org.apache.cassandra.db; -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOError; import java.io.IOException; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; @@ -31,7 +31,7 @@ public class ReadRepairVerbHandler imple public void doVerb(Message message, String id) { byte[] body = message.getMessageBody(); - ByteArrayInputStream buffer = new ByteArrayInputStream(body); + FastByteArrayInputStream buffer = new FastByteArrayInputStream(body); try { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Tue Aug 16 18:27:05 2011 @@ -18,7 +18,6 @@ package org.apache.cassandra.db; -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; @@ -26,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -56,7 +56,7 @@ public class ReadVerbHandler implements try { - ByteArrayInputStream in = new ByteArrayInputStream(message.getMessageBody()); + FastByteArrayInputStream in = new FastByteArrayInputStream(message.getMessageBody()); ReadCommand command = ReadCommand.serializer().deserialize(new DataInputStream(in), message.getVersion()); Table table = Table.open(command.table); Row row = command.getRow(table); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Tue Aug 16 18:27:05 2011 @@ -29,6 +29,8 @@ import org.apache.cassandra.config.Datab import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageProducer; import org.apache.cassandra.net.MessagingService; @@ -272,7 +274,7 @@ public class RowMutation implements IMut byte[] preserializedBuffer = preserializedBuffers.get(version); if (preserializedBuffer == null) { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); + FastByteArrayOutputStream bout = new FastByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(bout); RowMutation.serializer().serialize(this, dout, version); dout.close(); @@ -354,7 +356,7 @@ public class RowMutation implements IMut static RowMutation fromBytes(byte[] raw, int version) throws IOException { - RowMutation rm = serializer_.deserialize(new DataInputStream(new ByteArrayInputStream(raw)), version); + RowMutation rm = serializer_.deserialize(new DataInputStream(new FastByteArrayInputStream(raw)), version); boolean hasCounters = false; for (Map.Entry entry : rm.modifications_.entrySet()) { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Tue Aug 16 18:27:05 2011 @@ -18,7 +18,6 @@ package org.apache.cassandra.db; -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.net.InetAddress; @@ -28,6 +27,7 @@ import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -52,7 +52,7 @@ public class RowMutationVerbHandler impl if (hintedBytes != null) { assert hintedBytes.length > 0; - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(hintedBytes)); + DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(hintedBytes)); while (dis.available() > 0) { ByteBuffer addressBytes = ByteBufferUtil.readWithShortLength(dis); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java Tue Aug 16 18:27:05 2011 @@ -18,12 +18,12 @@ package org.apache.cassandra.db; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.Message; import org.apache.cassandra.utils.FBUtilities; @@ -49,7 +49,7 @@ public class TruncateResponse public static Message makeTruncateResponseMessage(Message original, TruncateResponse truncateResponseMessage) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); TruncateResponse.serializer().serialize(truncateResponseMessage, dos, original.getVersion()); return original.getReply(FBUtilities.getBroadcastAddress(), bos.toByteArray(), original.getVersion()); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java Tue Aug 16 18:27:05 2011 @@ -18,7 +18,6 @@ package org.apache.cassandra.db; -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOError; import java.io.IOException; @@ -26,6 +25,7 @@ import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -37,7 +37,7 @@ public class TruncateVerbHandler impleme public void doVerb(Message message, String id) { byte[] bytes = message.getMessageBody(); - ByteArrayInputStream buffer = new ByteArrayInputStream(bytes); + FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes); try { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java Tue Aug 16 18:27:05 2011 @@ -18,12 +18,12 @@ package org.apache.cassandra.db; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageProducer; import org.apache.cassandra.service.StorageService; @@ -66,7 +66,7 @@ public class Truncation implements Messa public Message getMessage(Integer version) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); serializer().serialize(this, dos, version); return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray(), version); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java Tue Aug 16 18:27:05 2011 @@ -18,13 +18,13 @@ package org.apache.cassandra.db; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.Message; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -46,7 +46,7 @@ public class WriteResponse public static Message makeWriteResponseMessage(Message original, WriteResponse writeResponseMessage) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream( bos ); WriteResponse.serializer().serialize(writeResponseMessage, dos, original.getVersion()); return original.getReply(FBUtilities.getBroadcastAddress(), bos.toByteArray(), original.getVersion()); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Tue Aug 16 18:27:05 2011 @@ -43,6 +43,7 @@ import org.apache.cassandra.config.CFMet import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.DeletionService; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; @@ -272,7 +273,7 @@ public class CommitLog implements Commit } /* deserialize the commit log entry */ - ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes, 0, serializedSize); + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(bytes, 0, serializedSize); RowMutation rm = null; try { Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java Tue Aug 16 18:27:05 2011 @@ -21,7 +21,6 @@ package org.apache.cassandra.gms; */ -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.net.InetAddress; @@ -30,6 +29,7 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; @@ -44,7 +44,7 @@ public class GossipDigestAck2VerbHandler logger_.trace("Received a GossipDigestAck2Message from {}", from); byte[] bytes = message.getMessageBody(); - DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); + DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) ); GossipDigestAck2Message gDigestAck2Message; try { Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java Tue Aug 16 18:27:05 2011 @@ -21,7 +21,6 @@ package org.apache.cassandra.gms; */ -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.net.InetAddress; @@ -32,6 +31,7 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -53,7 +53,7 @@ public class GossipDigestAckVerbHandler } byte[] bytes = message.getMessageBody(); - DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); + DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) ); try { Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java Tue Aug 16 18:27:05 2011 @@ -21,7 +21,6 @@ package org.apache.cassandra.gms; */ -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.net.InetAddress; @@ -31,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -52,7 +52,7 @@ public class GossipDigestSynVerbHandler } byte[] bytes = message.getMessageBody(); - DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); + DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) ); try { Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Aug 16 18:27:05 2011 @@ -18,7 +18,6 @@ package org.apache.cassandra.gms; -import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOError; import java.io.IOException; @@ -29,6 +28,7 @@ import java.util.concurrent.*; import org.apache.cassandra.dht.Token; import org.apache.cassandra.db.SystemTable; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.MessageProducer; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.utils.FBUtilities; @@ -411,7 +411,7 @@ public class Gossiper implements IFailur Message makeGossipDigestSynMessage(List gDigests, int version) throws IOException { GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream( bos ); GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos, version); return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray(), version); @@ -419,7 +419,7 @@ public class Gossiper implements IFailur Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage, int version) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos, version); return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray(), version); @@ -427,7 +427,7 @@ public class Gossiper implements IFailur Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message, int version) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos, version); return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version); Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/FastByteArrayInputStream.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/FastByteArrayInputStream.java?rev=1158410&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/FastByteArrayInputStream.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/FastByteArrayInputStream.java Tue Aug 16 18:27:05 2011 @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +/* + * This file has been modified from Apache Harmony's ByteArrayInputStream + * implementation. The synchronized methods of the original have been + * replaced by non-synchronized methods. This makes this certain operations + * FASTer, but also *not thread-safe*. + * + * This file remains formatted the same as the Apache Harmony original to + * make patching easier if any bug fixes are made to the Harmony version. + */ + +/** + * A specialized {@link InputStream } for reading the contents of a byte array. + * + * @see ByteArrayInputStream + */ +public class FastByteArrayInputStream extends InputStream { + /** + * The {@code byte} array containing the bytes to stream over. + */ + protected byte[] buf; + + /** + * The current position within the byte array. + */ + protected int pos; + + /** + * The current mark position. Initially set to 0 or the offset + * parameter within the constructor. + */ + protected int mark; + + /** + * The total number of bytes initially available in the byte array + * {@code buf}. + */ + protected int count; + + /** + * Constructs a new {@code ByteArrayInputStream} on the byte array + * {@code buf}. + * + * @param buf + * the byte array to stream over. + */ + public FastByteArrayInputStream(byte buf[]) { + this.mark = 0; + this.buf = buf; + this.count = buf.length; + } + + /** + * Constructs a new {@code ByteArrayInputStream} on the byte array + * {@code buf} with the initial position set to {@code offset} and the + * number of bytes available set to {@code offset} + {@code length}. + * + * @param buf + * the byte array to stream over. + * @param offset + * the initial position in {@code buf} to start streaming from. + * @param length + * the number of bytes available for streaming. + */ + public FastByteArrayInputStream(byte buf[], int offset, int length) { + this.buf = buf; + pos = offset; + mark = offset; + count = offset + length > buf.length ? buf.length : offset + length; + } + + /** + * Returns the number of bytes that are available before this stream will + * block. This method returns the number of bytes yet to be read from the + * source byte array. + * + * @return the number of bytes available before blocking. + */ + @Override + public int available() { + return count - pos; + } + + /** + * Closes this stream and frees resources associated with this stream. + * + * @throws IOException + * if an I/O error occurs while closing this stream. + */ + @Override + public void close() throws IOException { + // Do nothing on close, this matches JDK behaviour. + } + + /** + * Sets a mark position in this ByteArrayInputStream. The parameter + * {@code readlimit} is ignored. Sending {@code reset()} will reposition the + * stream back to the marked position. + * + * @param readlimit + * ignored. + * @see #markSupported() + * @see #reset() + */ + @Override + public void mark(int readlimit) { + mark = pos; + } + + /** + * Indicates whether this stream supports the {@code mark()} and + * {@code reset()} methods. Returns {@code true} since this class supports + * these methods. + * + * @return always {@code true}. + * @see #mark(int) + * @see #reset() + */ + @Override + public boolean markSupported() { + return true; + } + + /** + * Reads a single byte from the source byte array and returns it as an + * integer in the range from 0 to 255. Returns -1 if the end of the source + * array has been reached. + * + * @return the byte read or -1 if the end of this stream has been reached. + */ + @Override + public int read() { + return pos < count ? buf[pos++] & 0xFF : -1; + } + + /** + * Reads at most {@code len} bytes from this stream and stores + * them in byte array {@code b} starting at {@code offset}. This + * implementation reads bytes from the source byte array. + * + * @param b + * the byte array in which to store the bytes read. + * @param offset + * the initial position in {@code b} to store the bytes read from + * this stream. + * @param length + * the maximum number of bytes to store in {@code b}. + * @return the number of bytes actually read or -1 if no bytes were read and + * the end of the stream was encountered. + * @throws IndexOutOfBoundsException + * if {@code offset < 0} or {@code length < 0}, or if + * {@code offset + length} is greater than the size of + * {@code b}. + * @throws NullPointerException + * if {@code b} is {@code null}. + */ + @Override + public int read(byte b[], int offset, int length) { + if (b == null) { + throw new NullPointerException(); + } + // avoid int overflow + if (offset < 0 || offset > b.length || length < 0 + || length > b.length - offset) { + throw new IndexOutOfBoundsException(); + } + // Are there any bytes available? + if (this.pos >= this.count) { + return -1; + } + if (length == 0) { + return 0; + } + + int copylen = this.count - pos < length ? this.count - pos : length; + System.arraycopy(buf, pos, b, offset, copylen); + pos += copylen; + return copylen; + } + + /** + * Resets this stream to the last marked location. This implementation + * resets the position to either the marked position, the start position + * supplied in the constructor or 0 if neither has been provided. + * + * @see #mark(int) + */ + @Override + public void reset() { + pos = mark; + } + + /** + * Skips {@code count} number of bytes in this InputStream. Subsequent + * {@code read()}s will not return these bytes unless {@code reset()} is + * used. This implementation skips {@code count} number of bytes in the + * target stream. It does nothing and returns 0 if {@code n} is negative. + * + * @param n + * the number of bytes to skip. + * @return the number of bytes actually skipped. + */ + @Override + public long skip(long n) { + if (n <= 0) { + return 0; + } + int temp = pos; + pos = this.count - pos < n ? this.count : (int) (pos + n); + return pos - temp; + } +} Propchange: cassandra/trunk/src/java/org/apache/cassandra/io/util/FastByteArrayInputStream.java ------------------------------------------------------------------------------ svn:eol-style = native Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java?rev=1158410&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java Tue Aug 16 18:27:05 2011 @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; + +/* + * This file has been modified from Apache Harmony's ByteArrayOutputStream + * implementation. The synchronized methods of the original have been + * replaced by non-synchronized methods. This makes certain operations + * much FASTer, but also *not thread-safe*. + * + * This file remains formatted the same as the Apache Harmony original to + * make patching easier if any bug fixes are made to the Harmony version. + */ + +/** + * A specialized {@link OutputStream} for class for writing content to an + * (internal) byte array. As bytes are written to this stream, the byte array + * may be expanded to hold more bytes. When the writing is considered to be + * finished, a copy of the byte array can be requested from the class. + * + * @see ByteArrayOutputStream + */ +public class FastByteArrayOutputStream extends OutputStream { + /** + * The byte array containing the bytes written. + */ + protected byte[] buf; + + /** + * The number of bytes written. + */ + protected int count; + + /** + * Constructs a new ByteArrayOutputStream with a default size of 32 bytes. + * If more than 32 bytes are written to this instance, the underlying byte + * array will expand. + */ + public FastByteArrayOutputStream() { + buf = new byte[32]; + } + + /** + * Constructs a new {@code ByteArrayOutputStream} with a default size of + * {@code size} bytes. If more than {@code size} bytes are written to this + * instance, the underlying byte array will expand. + * + * @param size + * initial size for the underlying byte array, must be + * non-negative. + * @throws IllegalArgumentException + * if {@code size} < 0. + */ + public FastByteArrayOutputStream(int size) { + if (size >= 0) { + buf = new byte[size]; + } else { + throw new IllegalArgumentException(); + } + } + + /** + * Closes this stream. This releases system resources used for this stream. + * + * @throws IOException + * if an error occurs while attempting to close this stream. + */ + @Override + public void close() throws IOException { + /** + * Although the spec claims "A closed stream cannot perform output + * operations and cannot be reopened.", this implementation must do + * nothing. + */ + super.close(); + } + + private void expand(int i) { + /* Can the buffer handle @i more bytes, if not expand it */ + if (count + i <= buf.length) { + return; + } + + byte[] newbuf = new byte[(count + i) * 2]; + System.arraycopy(buf, 0, newbuf, 0, count); + buf = newbuf; + } + + /** + * Resets this stream to the beginning of the underlying byte array. All + * subsequent writes will overwrite any bytes previously stored in this + * stream. + */ + public void reset() { + count = 0; + } + + /** + * Returns the total number of bytes written to this stream so far. + * + * @return the number of bytes written to this stream. + */ + public int size() { + return count; + } + + /** + * Returns the contents of this ByteArrayOutputStream as a byte array. Any + * changes made to the receiver after returning will not be reflected in the + * byte array returned to the caller. + * + * @return this stream's current contents as a byte array. + */ + public byte[] toByteArray() { + byte[] newArray = new byte[count]; + System.arraycopy(buf, 0, newArray, 0, count); + return newArray; + } + + /** + * Returns the contents of this ByteArrayOutputStream as a string. Any + * changes made to the receiver after returning will not be reflected in the + * string returned to the caller. + * + * @return this stream's current contents as a string. + */ + + @Override + public String toString() { + return new String(buf, 0, count); + } + + /** + * Returns the contents of this ByteArrayOutputStream as a string. Each byte + * {@code b} in this stream is converted to a character {@code c} using the + * following function: + * {@code c == (char)(((hibyte & 0xff) << 8) | (b & 0xff))}. This method is + * deprecated and either {@link #toString()} or {@link #toString(String)} + * should be used. + * + * @param hibyte + * the high byte of each resulting Unicode character. + * @return this stream's current contents as a string with the high byte set + * to {@code hibyte}. + * @deprecated Use {@link #toString()}. + */ + @Deprecated + public String toString(int hibyte) { + char[] newBuf = new char[size()]; + for (int i = 0; i < newBuf.length; i++) { + newBuf[i] = (char) (((hibyte & 0xff) << 8) | (buf[i] & 0xff)); + } + return new String(newBuf); + } + + /** + * Returns the contents of this ByteArrayOutputStream as a string converted + * according to the encoding declared in {@code enc}. + * + * @param enc + * a string representing the encoding to use when translating + * this stream to a string. + * @return this stream's current contents as an encoded string. + * @throws UnsupportedEncodingException + * if the provided encoding is not supported. + */ + public String toString(String enc) throws UnsupportedEncodingException { + return new String(buf, 0, count, enc); + } + + /** + * Writes {@code count} bytes from the byte array {@code buffer} starting at + * offset {@code index} to this stream. + * + * @param buffer + * the buffer to be written. + * @param offset + * the initial position in {@code buffer} to retrieve bytes. + * @param len + * the number of bytes of {@code buffer} to write. + * @throws NullPointerException + * if {@code buffer} is {@code null}. + * @throws IndexOutOfBoundsException + * if {@code offset < 0} or {@code len < 0}, or if + * {@code offset + len} is greater than the length of + * {@code buffer}. + */ + @Override + public void write(byte[] buffer, int offset, int len) { + // avoid int overflow + if (offset < 0 || offset > buffer.length || len < 0 + || len > buffer.length - offset) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return; + } + + /* Expand if necessary */ + expand(len); + System.arraycopy(buffer, offset, buf, this.count, len); + this.count += len; + } + + /** + * Writes the specified byte {@code oneByte} to the OutputStream. Only the + * low order byte of {@code oneByte} is written. + * + * @param oneByte + * the byte to be written. + */ + @Override + public void write(int oneByte) { + if (count == buf.length) { + expand(1); + } + buf[count++] = (byte) oneByte; + } + + /** + * Takes the contents of this stream and writes it to the output stream + * {@code out}. + * + * @param out + * an OutputStream on which to write the contents of this stream. + * @throws IOException + * if an error occurs while writing to {@code out}. + */ + public void writeTo(OutputStream out) throws IOException { + out.write(buf, 0, count); + } +} Propchange: cassandra/trunk/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/OutputBuffer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/OutputBuffer.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/util/OutputBuffer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/util/OutputBuffer.java Tue Aug 16 18:27:05 2011 @@ -18,14 +18,13 @@ package org.apache.cassandra.io.util; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.IOException; /** - * Extends ByteArrayOutputStream to minimize copies. + * Extends FastByteArrayOutputStream to minimize copies. */ -public final class OutputBuffer extends ByteArrayOutputStream +public final class OutputBuffer extends FastByteArrayOutputStream { public OutputBuffer() { Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Aug 16 18:27:05 2011 @@ -25,6 +25,7 @@ import java.io.*; import java.net.Socket; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +73,7 @@ public class IncomingTcpConnection exten int size = input.readInt(); byte[] headerBytes = new byte[size]; input.readFully(headerBytes); - stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes)), version), input); + stream(StreamHeader.serializer().deserialize(new DataInputStream(new FastByteArrayInputStream(headerBytes)), version), input); } else { Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java Tue Aug 16 18:27:05 2011 @@ -21,7 +21,6 @@ package org.apache.cassandra.service; */ -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOError; import java.io.IOException; @@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.db.Row; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.Message; import org.apache.cassandra.utils.FBUtilities; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -58,7 +58,7 @@ public abstract class AbstractRowResolve public void preprocess(Message message) { byte[] body = message.getMessageBody(); - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body); try { ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn), message.getVersion()); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Tue Aug 16 18:27:05 2011 @@ -46,6 +46,8 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.ICompactSerializer; import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; @@ -395,7 +397,7 @@ public class AntiEntropyService { try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); SERIALIZER.serialize(request, dos, version); return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version); @@ -437,7 +439,7 @@ public class AntiEntropyService { byte[] bytes = message.getMessageBody(); - DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes)); + DataInputStream buffer = new DataInputStream(new FastByteArrayInputStream(bytes)); try { TreeRequest remotereq = this.deserialize(buffer, message.getVersion()); @@ -467,7 +469,7 @@ public class AntiEntropyService { try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); SERIALIZER.serialize(validator, dos, Gossiper.instance.getVersion(validator.request.endpoint)); return new Message(local, @@ -504,7 +506,7 @@ public class AntiEntropyService public void doVerb(Message message, String id) { byte[] bytes = message.getMessageBody(); - DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes)); + DataInputStream buffer = new DataInputStream(new FastByteArrayInputStream(bytes)); try { Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Tue Aug 16 18:27:05 2011 @@ -40,6 +40,8 @@ import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.db.migration.Migration; import org.apache.cassandra.gms.*; +import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.CachingMessageProducer; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageProducer; @@ -195,7 +197,7 @@ public class MigrationManager implements // other half of transformation is in DefinitionsUpdateResponseVerbHandler. private static Message makeMigrationMessage(Collection migrations, int version) throws IOException { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); + FastByteArrayOutputStream bout = new FastByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(bout); dout.writeInt(migrations.size()); // riddle me this: how do we know that these binary values (which contained serialized row mutations) are compatible @@ -219,7 +221,7 @@ public class MigrationManager implements public static Collection makeColumns(Message msg) throws IOException { Collection cols = new ArrayList(); - DataInputStream in = new DataInputStream(new ByteArrayInputStream(msg.getMessageBody())); + DataInputStream in = new DataInputStream(new FastByteArrayInputStream(msg.getMessageBody())); int count = in.readInt(); for (int i = 0; i < count; i++) { Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue Aug 16 18:27:05 2011 @@ -48,6 +48,7 @@ import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.utils.*; @@ -319,7 +320,7 @@ public class StorageProxy implements Sto { InetAddress destination = iter.next(); // group all nodes in this DC as forward headers on the primary message - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); // append to older addresses @@ -339,7 +340,7 @@ public class StorageProxy implements Sto private static void addHintHeader(Message message, InetAddress target) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); byte[] previousHints = message.getHeader(RowMutation.HINT); if (previousHints != null) Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java Tue Aug 16 18:27:05 2011 @@ -21,12 +21,12 @@ package org.apache.cassandra.streaming; */ -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageProducer; import org.apache.cassandra.service.StorageService; @@ -56,7 +56,7 @@ class StreamReply implements MessageProd public Message getMessage(Integer version) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream( bos ); serializer.serialize(this, dos, version); return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray(), version); Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java Tue Aug 16 18:27:05 2011 @@ -21,7 +21,6 @@ package org.apache.cassandra.streaming; * */ -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOError; import java.io.IOException; @@ -29,6 +28,7 @@ import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; @@ -39,7 +39,7 @@ public class StreamReplyVerbHandler impl public void doVerb(Message message, String id) { byte[] body = message.getMessageBody(); - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body); try { Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Tue Aug 16 18:27:05 2011 @@ -33,6 +33,7 @@ import org.apache.cassandra.db.Table; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageProducer; @@ -96,7 +97,7 @@ class StreamRequestMessage implements Me public Message getMessage(Integer version) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FastByteArrayOutputStream bos = new FastByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); try { Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java Tue Aug 16 18:27:05 2011 @@ -18,7 +18,6 @@ package org.apache.cassandra.streaming; - import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOError; import java.io.IOException; @@ -26,8 +25,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.IVerbHandler; - import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.Message; /** * This verb handler handles the StreamRequestMessage that is sent by @@ -43,7 +43,7 @@ public class StreamRequestVerbHandler im logger.debug("Received a StreamRequestMessage from {}", message.getFrom()); byte[] body = message.getMessageBody(); - ByteArrayInputStream bufIn = new ByteArrayInputStream(body); + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body); try { StreamRequestMessage srm = StreamRequestMessage.serializer().deserialize(new DataInputStream(bufIn), message.getVersion()); Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1158410&r1=1158409&r2=1158410&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Tue Aug 16 18:27:05 2011 @@ -49,6 +49,7 @@ import org.apache.cassandra.db.marshal.M import org.apache.cassandra.db.migration.*; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.dht.*; +import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.locator.*; import org.apache.cassandra.scheduler.IRequestScheduler; import org.apache.cassandra.service.ClientState; @@ -1158,7 +1159,7 @@ public class CassandraServer implements switch (compression) { case GZIP: - ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); + FastByteArrayOutputStream byteArray = new FastByteArrayOutputStream(); byte[] outBuffer = new byte[1024], inBuffer = new byte[1024]; Inflater decompressor = new Inflater();