Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F022119ACC for ; Tue, 22 Mar 2016 19:39:13 +0000 (UTC) Received: (qmail 2258 invoked by uid 500); 22 Mar 2016 19:39:13 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 2218 invoked by uid 500); 22 Mar 2016 19:39:13 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 2209 invoked by uid 99); 22 Mar 2016 19:39:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Mar 2016 19:39:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ACEE2DFF12; Tue, 22 Mar 2016 19:39:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gwenshap@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-3426; Improve protocol type errors when invalid sizes are received Date: Tue, 22 Mar 2016 19:39:13 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk 73470b028 -> 73c79000e KAFKA-3426; Improve protocol type errors when invalid sizes are received Author: Ismael Juma Reviewers: Grant Henke, Gwen Shapira Closes #1100 from ijuma/kafka-3426-invalid-protocol-type-errors-invalid-sizes Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73c79000 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73c79000 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73c79000 Branch: refs/heads/trunk Commit: 73c79000edddd929cd0af25f4a29fcc682a1c9c0 Parents: 73470b0 Author: Ismael Juma Authored: Tue Mar 22 12:39:04 2016 -0700 Committer: Gwen Shapira Committed: Tue Mar 22 12:39:04 2016 -0700 ---------------------------------------------------------------------- .../kafka/common/protocol/types/ArrayOf.java | 2 + .../kafka/common/protocol/types/Type.java | 22 ++++- .../types/ProtocolSerializationTest.java | 93 +++++++++++++++++++- 3 files changed, 112 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/73c79000/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java index 4a36cb7..a08f876 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java @@ -41,6 +41,8 @@ public class ArrayOf extends Type { @Override public Object read(ByteBuffer buffer) { int size = buffer.getInt(); + if (size < 0) + throw new SchemaException("Array size " + size + " cannot be negative"); if (size > buffer.remaining()) throw new SchemaException("Error reading array of size " + size + ", only " + buffer.remaining() + " bytes available"); Object[] objs = new Object[size]; http://git-wip-us.apache.org/repos/asf/kafka/blob/73c79000/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java index c4bcb1e..92c1f7c 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java @@ -184,14 +184,19 @@ public abstract class Type { public void write(ByteBuffer buffer, Object o) { byte[] bytes = Utils.utf8((String) o); if (bytes.length > Short.MAX_VALUE) - throw new SchemaException("String is longer than the maximum string length."); + throw new SchemaException("String length " + bytes.length + " is larger than the maximum string length."); buffer.putShort((short) bytes.length); buffer.put(bytes); } @Override public Object read(ByteBuffer buffer) { - int length = buffer.getShort(); + short length = buffer.getShort(); + if (length < 0) + throw new SchemaException("String length " + length + " cannot be negative"); + if (length > buffer.remaining()) + throw new SchemaException("Error reading string of length " + length + ", only " + buffer.remaining() + " bytes available"); + byte[] bytes = new byte[length]; buffer.get(bytes); return Utils.utf8(bytes); @@ -231,16 +236,18 @@ public abstract class Type { byte[] bytes = Utils.utf8((String) o); if (bytes.length > Short.MAX_VALUE) - throw new SchemaException("String is longer than the maximum string length."); + throw new SchemaException("String length " + bytes.length + " is larger than the maximum string length."); buffer.putShort((short) bytes.length); buffer.put(bytes); } @Override public Object read(ByteBuffer buffer) { - int length = buffer.getShort(); + short length = buffer.getShort(); if (length < 0) return null; + if (length > buffer.remaining()) + throw new SchemaException("Error reading string of length " + length + ", only " + buffer.remaining() + " bytes available"); byte[] bytes = new byte[length]; buffer.get(bytes); @@ -285,6 +292,11 @@ public abstract class Type { @Override public Object read(ByteBuffer buffer) { int size = buffer.getInt(); + if (size < 0) + throw new SchemaException("Bytes size " + size + " cannot be negative"); + if (size > buffer.remaining()) + throw new SchemaException("Error reading bytes of size " + size + ", only " + buffer.remaining() + " bytes available"); + ByteBuffer val = buffer.slice(); val.limit(size); buffer.position(buffer.position() + size); @@ -336,6 +348,8 @@ public abstract class Type { int size = buffer.getInt(); if (size < 0) return null; + if (size > buffer.remaining()) + throw new SchemaException("Error reading bytes of size " + size + ", only " + buffer.remaining() + " bytes available"); ByteBuffer val = buffer.slice(); val.limit(size); http://git-wip-us.apache.org/repos/asf/kafka/blob/73c79000/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java index e20aa10..5c34277 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java @@ -117,7 +117,7 @@ public class ProtocolSerializationTest { } @Test - public void testArray() { + public void testReadArraySizeTooLarge() { Type type = new ArrayOf(Type.INT8); int size = 10; ByteBuffer invalidBuffer = ByteBuffer.allocate(4 + size); @@ -133,6 +133,97 @@ public class ProtocolSerializationTest { } } + @Test + public void testReadNegativeArraySize() { + Type type = new ArrayOf(Type.INT8); + int size = 10; + ByteBuffer invalidBuffer = ByteBuffer.allocate(4 + size); + invalidBuffer.putInt(-1); + for (int i = 0; i < size; i++) + invalidBuffer.put((byte) i); + invalidBuffer.rewind(); + try { + type.read(invalidBuffer); + fail("Array size not validated"); + } catch (SchemaException e) { + // Expected exception + } + } + + @Test + public void testReadStringSizeTooLarge() { + byte[] stringBytes = "foo".getBytes(); + ByteBuffer invalidBuffer = ByteBuffer.allocate(2 + stringBytes.length); + invalidBuffer.putShort((short) (stringBytes.length * 5)); + invalidBuffer.put(stringBytes); + invalidBuffer.rewind(); + try { + Type.STRING.read(invalidBuffer); + fail("String size not validated"); + } catch (SchemaException e) { + // Expected exception + } + invalidBuffer.rewind(); + try { + Type.NULLABLE_STRING.read(invalidBuffer); + fail("String size not validated"); + } catch (SchemaException e) { + // Expected exception + } + } + + @Test + public void testReadNegativeStringSize() { + byte[] stringBytes = "foo".getBytes(); + ByteBuffer invalidBuffer = ByteBuffer.allocate(2 + stringBytes.length); + invalidBuffer.putShort((short) -1); + invalidBuffer.put(stringBytes); + invalidBuffer.rewind(); + try { + Type.STRING.read(invalidBuffer); + fail("String size not validated"); + } catch (SchemaException e) { + // Expected exception + } + } + + @Test + public void testReadBytesSizeTooLarge() { + byte[] stringBytes = "foo".getBytes(); + ByteBuffer invalidBuffer = ByteBuffer.allocate(4 + stringBytes.length); + invalidBuffer.putInt(stringBytes.length * 5); + invalidBuffer.put(stringBytes); + invalidBuffer.rewind(); + try { + Type.BYTES.read(invalidBuffer); + fail("Bytes size not validated"); + } catch (SchemaException e) { + // Expected exception + } + invalidBuffer.rewind(); + try { + Type.NULLABLE_BYTES.read(invalidBuffer); + fail("Bytes size not validated"); + } catch (SchemaException e) { + // Expected exception + } + } + + @Test + public void testReadNegativeBytesSize() { + byte[] stringBytes = "foo".getBytes(); + ByteBuffer invalidBuffer = ByteBuffer.allocate(4 + stringBytes.length); + invalidBuffer.putInt(-20); + invalidBuffer.put(stringBytes); + invalidBuffer.rewind(); + try { + Type.BYTES.read(invalidBuffer); + fail("Bytes size not validated"); + } catch (SchemaException e) { + // Expected exception + } + } + private Object roundtrip(Type type, Object obj) { ByteBuffer buffer = ByteBuffer.allocate(type.sizeOf(obj)); type.write(buffer, obj);