cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r894312 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/net/io/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/db/ test/unit/org/apach...
Date Tue, 29 Dec 2009 06:13:30 GMT
Author: jbellis
Date: Tue Dec 29 06:13:04 2009
New Revision: 894312

URL: http://svn.apache.org/viewvc?rev=894312&view=rev
Log:
replace DataInputBuffer with ByteArrayInputStream/DataInputStream.  patch by Todd Blose; reviewed
by jbellis for CASSANDRA-656

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DataInputBuffer.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/DataInputBufferTest.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/FilterTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Tue
Dec 29 06:13:04 2009
@@ -18,9 +18,11 @@
 
 package org.apache.cassandra.db;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.io.DataInputBuffer;
 
 import org.apache.log4j.Logger;
 
@@ -31,12 +33,11 @@
     public void doVerb(Message message)
     { 
         byte[] bytes = message.getMessageBody();
-        DataInputBuffer buffer = new DataInputBuffer();
-        buffer.reset(bytes, bytes.length);
+        ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
 
         try
         {
-            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
+            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(new DataInputStream(buffer));
             RowMutation rm = rmMsg.getRowMutation();
             rm.applyBinary();
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Tue Dec 29 06:13:04
2009
@@ -20,7 +20,6 @@
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.BufferedRandomAccessFile;
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FileUtils;
@@ -285,8 +284,6 @@
         Set<Table> tablesRecovered = new HashSet<Table>();
         assert StageManager.getStage(StageManager.mutationStage_).getCompletedTasks() ==
0;
         int rows = 0;
-
-        DataInputBuffer bufIn = new DataInputBuffer();
         for (File file : clogs)
         {
             int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
@@ -320,7 +317,8 @@
                     // last CL entry didn't get completely written.  that's ok.
                     break;
                 }
-                bufIn.reset(bytes, bytes.length);
+
+                ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
                 Checksum checksum = new CRC32();
                 checksum.update(bytes, 0, bytes.length);
                 if (claimedCRC32 != checksum.getValue())
@@ -331,7 +329,7 @@
                 }
 
                 /* deserialize the commit log entry */
-                final RowMutation rm = RowMutation.serializer().deserialize(bufIn);
+                final RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
                 if (logger_.isDebugEnabled())
                     logger_.debug(String.format("replaying mutation for %s.%s: %s",
                                                 rm.getTable(),

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java Tue Dec
29 06:13:04 2009
@@ -23,9 +23,7 @@
 import java.util.Arrays;
 
 import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.utils.BitSetSerializer;
-import org.apache.cassandra.config.DatabaseDescriptor;
 
 class CommitLogHeader
 {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java Tue Dec 29
06:13:04 2009
@@ -18,6 +18,7 @@
 */
 package org.apache.cassandra.db;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.DataInputStream;
@@ -25,7 +26,6 @@
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -63,9 +63,8 @@
     public static RangeCommand read(Message message) throws IOException
     {
         byte[] bytes = message.getMessageBody();
-        DataInputBuffer dib = new DataInputBuffer();
-        dib.reset(bytes, bytes.length);
-        return serializer.deserialize(new DataInputStream(dib));
+        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        return serializer.deserialize(new DataInputStream(bis));
     }
 
     public String toString()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java Tue Dec 29
06:13:04 2009
@@ -19,11 +19,12 @@
 package org.apache.cassandra.db;
 
 import java.util.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.FBUtilities;
@@ -63,17 +64,17 @@
 
     public static RangeReply read(byte[] body) throws IOException
     {
-        DataInputBuffer bufIn = new DataInputBuffer();
-        boolean rangeCompletedLocally;        
-        bufIn.reset(body, body.length);
-        rangeCompletedLocally = bufIn.readBoolean();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        boolean rangeCompletedLocally;
+        DataInputStream dis = new DataInputStream(bufIn);
+        rangeCompletedLocally = dis.readBoolean();
 
         List<String> keys = new ArrayList<String>();
-        while (bufIn.getPosition() < body.length)
+        while (dis.available() > 0)
         {
-            keys.add(bufIn.readUTF());
+            keys.add(dis.readUTF());
         }
-        
+
         return new RangeReply(keys, rangeCompletedLocally);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Tue
Dec 29 06:13:04 2009
@@ -37,7 +37,6 @@
 package org.apache.cassandra.db;
 
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
@@ -50,6 +49,7 @@
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -105,9 +105,8 @@
     public static RangeSliceCommand read(Message message) throws IOException
     {
         byte[] bytes = message.getMessageBody();
-        DataInputBuffer dib = new DataInputBuffer();
-        dib.reset(bytes, bytes.length);
-        return serializer.deserialize(new DataInputStream(dib));
+        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        return serializer.deserialize(new DataInputStream(bis));
     }
 }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java Tue Dec
29 06:13:04 2009
@@ -20,13 +20,14 @@
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.lang.StringUtils;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -67,14 +68,14 @@
 
     public static RangeSliceReply read(byte[] body) throws IOException
     {
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(body, body.length);
-        boolean completed = bufIn.readBoolean();
-        int rowCount = bufIn.readInt();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        DataInputStream dis = new DataInputStream(bufIn);
+        boolean completed = dis.readBoolean();
+        int rowCount = dis.readInt();
         List<Row> rows = new ArrayList<Row>(rowCount);
         for (int i = 0; i < rowCount; i++)
         {
-            rows.add(Row.serializer().deserialize(bufIn));
+            rows.add(Row.serializer().deserialize(dis));
         }
         return new RangeSliceReply(rows, completed);
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
Tue Dec 29 06:13:04 2009
@@ -20,7 +20,6 @@
 
 import java.io.*;
 
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 
@@ -33,12 +32,11 @@
     public void doVerb(Message message)
     {          
         byte[] body = message.getMessageBody();
-        DataInputBuffer buffer = new DataInputBuffer();
-        buffer.reset(body, body.length);        
+        ByteArrayInputStream buffer = new ByteArrayInputStream(body);
         
         try
         {
-            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
+            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(new DataInputStream(buffer));
             RowMutation rm = rmMsg.getRowMutation();
             rm.apply();                                   
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Tue Dec
29 06:13:04 2009
@@ -18,11 +18,12 @@
 
 package org.apache.cassandra.db;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import java.net.InetAddress;
 import org.apache.cassandra.net.IVerbHandler;
@@ -37,7 +38,7 @@
 {
     protected static class ReadContext
     {
-        protected DataInputBuffer bufIn_ = new DataInputBuffer();
+        protected ByteArrayInputStream bufIn_;
         protected DataOutputBuffer bufOut_ = new DataOutputBuffer();
     }
 
@@ -65,7 +66,7 @@
             readCtx = new ReadContext();
             tls_.set(readCtx);
         }
-        readCtx.bufIn_.reset(body, body.length);
+        readCtx.bufIn_ = new ByteArrayInputStream(body);
 
         try
         {
@@ -74,7 +75,7 @@
                 /* Don't service reads! */
                 throw new RuntimeException("Cannot service reads while bootstrapping!");
             }
-            ReadCommand command = ReadCommand.serializer().deserialize(readCtx.bufIn_);
+            ReadCommand command = ReadCommand.serializer().deserialize(new DataInputStream(readCtx.bufIn_));
             Table table = Table.open(command.table);
             Row row = command.getRow(table);
             ReadResponse readResponse;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Tue Dec 29 06:13:04 2009
@@ -20,7 +20,6 @@
 
 import java.io.*;
 
-import org.apache.cassandra.io.DataInputBuffer;
 import java.net.InetAddress;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
@@ -36,12 +35,11 @@
     public void doVerb(Message message)
     {
         byte[] bytes = message.getMessageBody();
-        DataInputBuffer buffer = new DataInputBuffer();
-        buffer.reset(bytes, bytes.length);
+        ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
 
         try
         {
-            RowMutation rm = RowMutation.serializer().deserialize(buffer);
+            RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(buffer));
             if (logger_.isDebugEnabled())
               logger_.debug("Applying " + rm);
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Tue Dec 29
06:13:04 2009
@@ -92,9 +92,9 @@
         int size = file.readInt();
         byte[] bytes = new byte[size];
         file.readFully(bytes);
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(bytes, bytes.length);
-        return BloomFilter.serializer().deserialize(bufIn);
+        
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
+        return BloomFilter.serializer().deserialize(new DataInputStream(bufIn));
     }
 
     /**

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/StreamRequestVerbHandler.java
Tue Dec 29 06:13:04 2009
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.io;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOError;
@@ -53,11 +55,10 @@
             logger_.debug("Received a StreamRequestMessage from " + message.getFrom());
         
         byte[] body = message.getMessageBody();
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(body, body.length);
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
         try
         {
-            StreamRequestMessage streamRequestMessage = StreamRequestMessage.serializer().deserialize(bufIn);
+            StreamRequestMessage streamRequestMessage = StreamRequestMessage.serializer().deserialize(new
DataInputStream(bufIn));
             StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_;
 
             for (StreamRequestMetadata srm : streamRequestMetadata)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Tue Dec 29 06:13:04
2009
@@ -23,6 +23,8 @@
 
 import java.net.InetAddress;
 import java.util.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.File;
 import java.io.IOError;
@@ -147,12 +149,11 @@
         public void doVerb(Message message)
         {
             byte[] body = message.getMessageBody();
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
+            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
 
             try
             {
-                StreamInitiateMessage biMsg = StreamInitiateMessage.serializer().deserialize(bufIn);
+                StreamInitiateMessage biMsg = StreamInitiateMessage.serializer().deserialize(new
DataInputStream(bufIn));
                 StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
 
                 if (streamContexts.length == 0 && StorageService.instance().isBootstrapMode())
@@ -311,12 +312,11 @@
         public void doVerb(Message message)
         {
             byte[] body = message.getMessageBody();
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
+            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
 
             try
             {
-                StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(bufIn);
+                StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(new
DataInputStream(bufIn));
                 StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
 
                 switch (streamStatus.getAction())

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java Tue
Dec 29 06:13:04 2009
@@ -18,9 +18,10 @@
 
 package org.apache.cassandra.net.io;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
 
@@ -35,8 +36,7 @@
     
     public Message deserialize(byte[] bytes) throws IOException
     {
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(bytes, bytes.length);
-        return Message.serializer().deserialize(bufIn);
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
+        return Message.serializer().deserialize(new DataInputStream(bufIn));
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Tue Dec 29 06:13:04 2009
@@ -33,7 +33,6 @@
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.CompactionIterator.CompactedRow;
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.SSTable;
 import org.apache.cassandra.io.SSTableReader;
@@ -707,12 +706,11 @@
         public void doVerb(Message message)
         { 
             byte[] bytes = message.getMessageBody();
-            DataInputBuffer buffer = new DataInputBuffer();
-            buffer.reset(bytes, bytes.length);
-
+            
+            ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
             try
             {
-                CFPair request = this.deserialize(buffer);
+                CFPair request = this.deserialize(new DataInputStream(buffer));
 
                 // trigger readonly-compaction
                 logger.debug("Queueing readonly compaction for request from " + message.getFrom()
+ " for " + request);
@@ -775,13 +773,12 @@
         public void doVerb(Message message)
         { 
             byte[] bytes = message.getMessageBody();
-            DataInputBuffer buffer = new DataInputBuffer();
-            buffer.reset(bytes, bytes.length);
+            ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
 
             try
             {
                 // deserialize the remote tree, and register it
-                Validator rvalidator = this.deserialize(buffer);
+                Validator rvalidator = this.deserialize(new DataInputStream(buffer));
                 AntiEntropyService.instance().rendezvous(rvalidator.cf, message.getFrom(),
rvalidator.tree);
             }
             catch (IOException e)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
Tue Dec 29 06:13:04 2009
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.service;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -27,7 +29,6 @@
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.io.DataInputBuffer;
 import java.net.InetAddress;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
@@ -53,17 +54,16 @@
             if (responses_.size() == ConsistencyManager.this.replicas_.size())
                 handleDigestResponses();
         }
-		
-		private void handleDigestResponses()
-		{
-            DataInputBuffer bufIn = new DataInputBuffer();
+
+        private void handleDigestResponses()
+        {
             for (Message response : responses_)
             {
                 try
                 {
                     byte[] body = response.getMessageBody();
-                    bufIn.reset(body, body.length);
-                    ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+                    ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+                    ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
                     byte[] digest = result.digest();
                     if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
                     {
@@ -77,8 +77,8 @@
                 }
             }
         }
-		
-		private void doReadRepair() throws IOException
+
+        private void doReadRepair() throws IOException
 		{
             replicas_.add(FBUtilities.getLocalAddress());
             IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_,
replicas_.size());

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Tue Dec 29 06:13:04 2009
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.service;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -28,7 +30,6 @@
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.RowMutationMessage;
-import org.apache.cassandra.io.DataInputBuffer;
 import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.FBUtilities;
@@ -77,12 +78,11 @@
          * query exists then we need to compare the digest with 
          * the digest of the data that is received.
         */
-        DataInputBuffer bufIn = new DataInputBuffer();
 		for (Message response : responses)
 		{					            
             byte[] body = response.getMessageBody();
-            bufIn.reset(body, body.length);
-            ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+            ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
             if (result.isDigestQuery())
             {
                 digest = result.digest();
@@ -168,11 +168,10 @@
         for (Message response : responses)
         {
             byte[] body = response.getMessageBody();
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
+            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
             try
             {
-                ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+                ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
                 if (!result.isDigestQuery())
                 {
                     isDataPresent = true;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue
Dec 29 06:13:04 2009
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.service;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
 import java.util.*;
@@ -31,7 +33,6 @@
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.DataInputBuffer;
 import java.net.InetAddress;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
@@ -346,9 +347,8 @@
         {
             byte[] body;
             body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
-            ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
+            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+            ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
             if (response.row() != null)
                 rows.add(response.row());
         }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyTest.java Tue
Dec 29 06:13:04 2009
@@ -18,13 +18,14 @@
 */
 package org.apache.cassandra.db;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.TreeMap;
 
 import org.junit.Test;
 
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.db.filter.QueryPath;
 import static org.apache.cassandra.Util.column;
@@ -43,9 +44,8 @@
         DataOutputBuffer bufOut = new DataOutputBuffer();
         ColumnFamily.serializer().serialize(cf, bufOut);
 
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(bufOut.getData(), bufOut.getLength());
-        cf = ColumnFamily.serializer().deserialize(bufIn);
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(bufOut.getData(), 0, bufOut.getLength());
+        cf = ColumnFamily.serializer().deserialize(new DataInputStream(bufIn));
         assert cf != null;
         assert cf.name().equals("Standard1");
         assert cf.getSortedColumns().size() == 1;
@@ -72,9 +72,8 @@
         ColumnFamily.serializer().serialize(cf, bufOut);
 
         // verify
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(bufOut.getData(), bufOut.getLength());
-        cf = ColumnFamily.serializer().deserialize(bufIn);
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(bufOut.getData(), 0, bufOut.getLength());
+        cf = ColumnFamily.serializer().deserialize(new DataInputStream(bufIn));
         for (String cName : map.navigableKeySet())
         {
             assert new String(cf.getColumn(cName.getBytes()).value()).equals(map.get(cName));

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java Tue Dec
29 06:13:04 2009
@@ -20,6 +20,8 @@
 
 import static org.junit.Assert.*;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -28,7 +30,6 @@
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AsciiType;
@@ -61,11 +62,11 @@
     {
         ReadCommandSerializer rms = ReadCommand.serializer();
         DataOutputBuffer dos = new DataOutputBuffer();
-        DataInputBuffer dis = new DataInputBuffer();
+        ByteArrayInputStream bis;
 
         rms.serialize(rm, dos);
-        dis.reset(dos.getData(), dos.getLength());
-        return rms.deserialize(dis);
+        bis = new ByteArrayInputStream(dos.getData(), 0, dos.getLength());
+        return rms.deserialize(new DataInputStream(bis));
     }
     
     @Test

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java Tue
Dec 29 06:13:04 2009
@@ -23,9 +23,10 @@
 
 import static org.junit.Assert.*;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import java.net.InetAddress;
 import org.junit.Test;
@@ -49,9 +50,8 @@
         DataOutputBuffer output = new DataOutputBuffer();
         GossipDigest.serializer().serialize(expected, output);
         
-        DataInputBuffer input = new DataInputBuffer();
-        input.reset(output.getData(), output.getLength());
-        GossipDigest actual = GossipDigest.serializer().deserialize(input);
+        ByteArrayInputStream input = new ByteArrayInputStream(output.getData(), 0, output.getLength());
+        GossipDigest actual = GossipDigest.serializer().deserialize(new DataInputStream(input));
         assertEquals(0, expected.compareTo(actual));
     }
 

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/FilterTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/FilterTest.java?rev=894312&r1=894311&r2=894312&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/FilterTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/utils/FilterTest.java Tue Dec
29 06:13:04 2009
@@ -18,6 +18,8 @@
 */
 package org.apache.cassandra.utils;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -25,7 +27,6 @@
 
 import org.junit.Test;
 
-import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 
 public class FilterTest
@@ -102,9 +103,8 @@
         DataOutputBuffer out = new DataOutputBuffer();
         f.getSerializer().serialize(f, out);
 
-        DataInputBuffer in = new DataInputBuffer();
-        in.reset(out.getData(), out.getLength());
-        Filter f2 = f.getSerializer().deserialize(in);
+        ByteArrayInputStream in = new ByteArrayInputStream(out.getData(), 0, out.getLength());
+        Filter f2 = f.getSerializer().deserialize(new DataInputStream(in));
 
         assert f2.isPresent("a");
         assert !f2.isPresent("b");



Mime
View raw message