kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3426; Improve protocol type errors when invalid sizes are received
Date Tue, 22 Mar 2016 19:39:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 73470b028 -> 73c79000e


KAFKA-3426; Improve protocol type errors when invalid sizes are received

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Grant Henke, Gwen Shapira

Closes #1100 from ijuma/kafka-3426-invalid-protocol-type-errors-invalid-sizes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73c79000
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73c79000
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73c79000

Branch: refs/heads/trunk
Commit: 73c79000edddd929cd0af25f4a29fcc682a1c9c0
Parents: 73470b0
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue Mar 22 12:39:04 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Mar 22 12:39:04 2016 -0700

----------------------------------------------------------------------
 .../kafka/common/protocol/types/ArrayOf.java    |  2 +
 .../kafka/common/protocol/types/Type.java       | 22 ++++-
 .../types/ProtocolSerializationTest.java        | 93 +++++++++++++++++++-
 3 files changed, 112 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/73c79000/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
index 4a36cb7..a08f876 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java
@@ -41,6 +41,8 @@ public class ArrayOf extends Type {
     @Override
     public Object read(ByteBuffer buffer) {
         int size = buffer.getInt();
+        if (size < 0)
+            throw new SchemaException("Array size " + size + " cannot be negative");
         if (size > buffer.remaining())
             throw new SchemaException("Error reading array of size " + size + ", only " +
buffer.remaining() + " bytes available");
         Object[] objs = new Object[size];

http://git-wip-us.apache.org/repos/asf/kafka/blob/73c79000/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index c4bcb1e..92c1f7c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -184,14 +184,19 @@ public abstract class Type {
         public void write(ByteBuffer buffer, Object o) {
             byte[] bytes = Utils.utf8((String) o);
             if (bytes.length > Short.MAX_VALUE)
-                throw new SchemaException("String is longer than the maximum string length.");
+                throw new SchemaException("String length " + bytes.length + " is larger than
the maximum string length.");
             buffer.putShort((short) bytes.length);
             buffer.put(bytes);
         }
 
         @Override
         public Object read(ByteBuffer buffer) {
-            int length = buffer.getShort();
+            short length = buffer.getShort();
+            if (length < 0)
+                throw new SchemaException("String length " + length + " cannot be negative");
+            if (length > buffer.remaining())
+                throw new SchemaException("Error reading string of length " + length + ",
only " + buffer.remaining() + " bytes available");
+            
             byte[] bytes = new byte[length];
             buffer.get(bytes);
             return Utils.utf8(bytes);
@@ -231,16 +236,18 @@ public abstract class Type {
 
             byte[] bytes = Utils.utf8((String) o);
             if (bytes.length > Short.MAX_VALUE)
-                throw new SchemaException("String is longer than the maximum string length.");
+                throw new SchemaException("String length " + bytes.length + " is larger than
the maximum string length.");
             buffer.putShort((short) bytes.length);
             buffer.put(bytes);
         }
 
         @Override
         public Object read(ByteBuffer buffer) {
-            int length = buffer.getShort();
+            short length = buffer.getShort();
             if (length < 0)
                 return null;
+            if (length > buffer.remaining())
+                throw new SchemaException("Error reading string of length " + length + ",
only " + buffer.remaining() + " bytes available");
 
             byte[] bytes = new byte[length];
             buffer.get(bytes);
@@ -285,6 +292,11 @@ public abstract class Type {
         @Override
         public Object read(ByteBuffer buffer) {
             int size = buffer.getInt();
+            if (size < 0)
+                throw new SchemaException("Bytes size " + size + " cannot be negative");
+            if (size > buffer.remaining())
+                throw new SchemaException("Error reading bytes of size " + size + ", only
" + buffer.remaining() + " bytes available");
+
             ByteBuffer val = buffer.slice();
             val.limit(size);
             buffer.position(buffer.position() + size);
@@ -336,6 +348,8 @@ public abstract class Type {
             int size = buffer.getInt();
             if (size < 0)
                 return null;
+            if (size > buffer.remaining())
+                throw new SchemaException("Error reading bytes of size " + size + ", only
" + buffer.remaining() + " bytes available");
 
             ByteBuffer val = buffer.slice();
             val.limit(size);

http://git-wip-us.apache.org/repos/asf/kafka/blob/73c79000/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index e20aa10..5c34277 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -117,7 +117,7 @@ public class ProtocolSerializationTest {
     }
 
     @Test
-    public void testArray() {
+    public void testReadArraySizeTooLarge() {
         Type type = new ArrayOf(Type.INT8);
         int size = 10;
         ByteBuffer invalidBuffer = ByteBuffer.allocate(4 + size);
@@ -133,6 +133,97 @@ public class ProtocolSerializationTest {
         }
     }
 
+    @Test
+    public void testReadNegativeArraySize() {
+        Type type = new ArrayOf(Type.INT8);
+        int size = 10;
+        ByteBuffer invalidBuffer = ByteBuffer.allocate(4 + size);
+        invalidBuffer.putInt(-1);
+        for (int i = 0; i < size; i++)
+            invalidBuffer.put((byte) i);
+        invalidBuffer.rewind();
+        try {
+            type.read(invalidBuffer);
+            fail("Array size not validated");
+        } catch (SchemaException e) {
+            // Expected exception
+        }
+    }
+
+    @Test
+    public void testReadStringSizeTooLarge() {
+        byte[] stringBytes = "foo".getBytes();
+        ByteBuffer invalidBuffer = ByteBuffer.allocate(2 + stringBytes.length);
+        invalidBuffer.putShort((short) (stringBytes.length * 5));
+        invalidBuffer.put(stringBytes);
+        invalidBuffer.rewind();
+        try {
+            Type.STRING.read(invalidBuffer);
+            fail("String size not validated");
+        } catch (SchemaException e) {
+            // Expected exception
+        }
+        invalidBuffer.rewind();
+        try {
+            Type.NULLABLE_STRING.read(invalidBuffer);
+            fail("String size not validated");
+        } catch (SchemaException e) {
+            // Expected exception
+        }
+    }
+
+    @Test
+    public void testReadNegativeStringSize() {
+        byte[] stringBytes = "foo".getBytes();
+        ByteBuffer invalidBuffer = ByteBuffer.allocate(2 + stringBytes.length);
+        invalidBuffer.putShort((short) -1);
+        invalidBuffer.put(stringBytes);
+        invalidBuffer.rewind();
+        try {
+            Type.STRING.read(invalidBuffer);
+            fail("String size not validated");
+        } catch (SchemaException e) {
+            // Expected exception
+        }
+    }
+
+    @Test
+    public void testReadBytesSizeTooLarge() {
+        byte[] stringBytes = "foo".getBytes();
+        ByteBuffer invalidBuffer = ByteBuffer.allocate(4 + stringBytes.length);
+        invalidBuffer.putInt(stringBytes.length * 5);
+        invalidBuffer.put(stringBytes);
+        invalidBuffer.rewind();
+        try {
+            Type.BYTES.read(invalidBuffer);
+            fail("Bytes size not validated");
+        } catch (SchemaException e) {
+            // Expected exception
+        }
+        invalidBuffer.rewind();
+        try {
+            Type.NULLABLE_BYTES.read(invalidBuffer);
+            fail("Bytes size not validated");
+        } catch (SchemaException e) {
+            // Expected exception
+        }
+    }
+
+    @Test
+    public void testReadNegativeBytesSize() {
+        byte[] stringBytes = "foo".getBytes();
+        ByteBuffer invalidBuffer = ByteBuffer.allocate(4 + stringBytes.length);
+        invalidBuffer.putInt(-20);
+        invalidBuffer.put(stringBytes);
+        invalidBuffer.rewind();
+        try {
+            Type.BYTES.read(invalidBuffer);
+            fail("Bytes size not validated");
+        } catch (SchemaException e) {
+            // Expected exception
+        }
+    }
+
     private Object roundtrip(Type type, Object obj) {
         ByteBuffer buffer = ByteBuffer.allocate(type.sizeOf(obj));
         type.write(buffer, obj);


Mime
View raw message