cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject cassandra git commit: follow up to CASSANDRA-8670: providing small improvements to performance of writeUTF; and improving safety of DataOutputBuffer when size is known upfront
Date Fri, 03 Apr 2015 11:50:42 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 4e29b7a9a -> c2ecfe7b7


follow up to CASSANDRA-8670:
providing small improvements to performance of writeUTF; and
improving safety of DataOutputBuffer when size is known upfront

patch by ariel and benedict for CASSANDRA-8670


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c2ecfe7b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c2ecfe7b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c2ecfe7b

Branch: refs/heads/trunk
Commit: c2ecfe7b7bffbced652b4da9dcf4ca263d345695
Parents: 4e29b7a
Author: Ariel Weisberg <ariel.wesiberg@datastax.com>
Authored: Fri Apr 3 12:29:17 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Fri Apr 3 12:33:29 2015 +0100

----------------------------------------------------------------------
 .../cassandra/db/commitlog/CommitLog.java       |  5 +-
 .../cassandra/db/marshal/CompositeType.java     |  3 +-
 .../io/util/BufferedDataOutputStreamPlus.java   |  4 +-
 .../io/util/DataOutputBufferFixed.java          | 65 ++++++++++++++++++++
 .../cassandra/service/pager/PagingState.java    |  3 +-
 .../streaming/messages/StreamInitMessage.java   |  3 +-
 .../org/apache/cassandra/utils/FBUtilities.java |  3 +-
 .../io/util/BufferedDataOutputStreamTest.java   | 39 ++++++++++++
 8 files changed, 117 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2ecfe7b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 7fa7575..cf38d44 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -29,10 +29,10 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.commons.lang3.StringUtils;
 
 import com.github.tjake.ICRC32;
+
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
@@ -41,6 +41,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -251,7 +252,7 @@ public class CommitLog implements CommitLogMBean
         {
             ICRC32 checksum = CRC32Factory.instance.create();
             final ByteBuffer buffer = alloc.getBuffer();
-            BufferedDataOutputStreamPlus dos = new BufferedDataOutputStreamPlus(null, buffer);
+            BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer);
 
             // checksummed length
             dos.writeInt((int) size);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2ecfe7b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 9ee9fb3..1bc772d 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -403,7 +404,7 @@ public class CompositeType extends AbstractCompositeType
         {
             try
             {
-                DataOutputBuffer out = new DataOutputBuffer(serializedSize);
+                DataOutputBuffer out = new DataOutputBufferFixed(serializedSize);
                 if (isStatic)
                     out.writeShort(STATIC_MARKER);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2ecfe7b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
index f4f46a1..5669a8d 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -75,13 +75,13 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
         Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough
to accommodate a long/double");
     }
 
-    public BufferedDataOutputStreamPlus(WritableByteChannel channel, ByteBuffer buffer)
+    protected BufferedDataOutputStreamPlus(WritableByteChannel channel, ByteBuffer buffer)
     {
         super(channel);
         this.buffer = buffer;
     }
 
-    public BufferedDataOutputStreamPlus(ByteBuffer buffer)
+    protected BufferedDataOutputStreamPlus(ByteBuffer buffer)
     {
         super();
         this.buffer = buffer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2ecfe7b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
new file mode 100644
index 0000000..fb8d671
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+
+/**
+ * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream
and exposing
+ * its buffer so copies can be avoided. This version does not expand if it runs out of capacity
and
+ * throws BufferOverflowException instead.
+ *
+ * This class is completely thread unsafe.
+ */
+public class DataOutputBufferFixed extends DataOutputBuffer
+{
+    public DataOutputBufferFixed()
+    {
+        this(128);
+    }
+
+    public DataOutputBufferFixed(int size)
+    {
+        super(ByteBuffer.allocate(size));
+    }
+
+    public DataOutputBufferFixed(ByteBuffer buffer)
+    {
+        super(buffer);
+    }
+
+    @Override
+    protected void doFlush() throws IOException
+    {
+        throw new BufferOverflowException();
+    }
+
+    /*
+     * Not currently reachable (all paths hit doFLush first), but in the spirit of things
this should throw
+     * if it is called.
+     * @see org.apache.cassandra.io.util.DataOutputBuffer#reallocate(long)
+     */
+    @Override
+    protected void reallocate(long newSize)
+    {
+        throw new BufferOverflowException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2ecfe7b/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index 43d3cb8..ff461ab 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.transport.ProtocolException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -61,7 +62,7 @@ public class PagingState
     {
         try
         {
-            DataOutputBuffer out = new DataOutputBuffer(serializedSize());
+            DataOutputBuffer out = new DataOutputBufferFixed(serializedSize());
             ByteBufferUtil.writeWithShortLength(partitionKey, out);
             ByteBufferUtil.writeWithShortLength(cellName, out);
             out.writeInt(remaining);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2ecfe7b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 0937f71..03ac944 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
@@ -80,7 +81,7 @@ public class StreamInitMessage
         try
         {
             int size = (int)StreamInitMessage.serializer.serializedSize(this, version);
-            DataOutputBuffer buffer = new DataOutputBuffer(size);
+            DataOutputBuffer buffer = new DataOutputBufferFixed(size);
             StreamInitMessage.serializer.serialize(this, buffer, version);
             bytes = buffer.getData();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2ecfe7b/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index be7cbd8..dd54ae1 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.net.AsyncOneResponse;
 import org.apache.thrift.*;
@@ -719,7 +720,7 @@ public class FBUtilities
         try
         {
             int size = (int) serializer.serializedSize(object, version);
-            DataOutputBuffer buffer = new DataOutputBuffer(size);
+            DataOutputBuffer buffer = new DataOutputBufferFixed(size);
             serializer.serialize(object, buffer, version);
             assert buffer.getLength() == size && buffer.getData().length == size
                 : String.format("Final buffer length %s to accommodate data size of %s (predicted
%s) for %s",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2ecfe7b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
index 8eaea31..f5b239c 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -6,16 +6,55 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
 import java.lang.reflect.Field;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.util.Random;
 
 import org.junit.Test;
 
+import com.google.common.base.Throwables;
+
 import static org.junit.Assert.*;
 
 public class BufferedDataOutputStreamTest
 {
+
+    @Test(expected = BufferOverflowException.class)
+    public void testDataOutputBufferFixedByes() throws Exception
+    {
+        try (DataOutputBufferFixed dob = new DataOutputBufferFixed())
+        {
+            try
+            {
+                for (int ii = 0; ii < 128; ii++)
+                    dob.write(0);
+            }
+            catch (BufferOverflowException e)
+            {
+                fail("Should not throw BufferOverflowException yet");
+            }
+            dob.write(0);
+        }
+    }
+
+    @Test(expected = BufferOverflowException.class)
+    public void testDataOutputBufferFixedByteBuffer() throws Exception
+    {
+        try (DataOutputBufferFixed dob = new DataOutputBufferFixed())
+        {
+            try
+            {
+                dob.write(ByteBuffer.allocateDirect(128));
+            }
+            catch (BufferOverflowException e)
+            {
+                fail("Should not throw BufferOverflowException yet");
+            }
+            dob.write(ByteBuffer.allocateDirect(1));
+        }
+    }
+
     WritableByteChannel adapter = new WritableByteChannel()
     {
 


Mime
View raw message