kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6739) Broker receives error when handling request with java.lang.IllegalArgumentException: Magic v0 does not support record headers
Date Tue, 03 Apr 2018 17:40:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424336#comment-16424336
] 

ASF GitHub Bot commented on KAFKA-6739:
---------------------------------------

hachikuji closed pull request #4813: KAFKA-6739: Ignore the presence of headers when down-converting
from V2 to V1/V0
URL: https://github.com/apache/kafka/pull/4813
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 2452798d485..89a5413e00c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -130,8 +130,13 @@ private MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer
buffer, R
 
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
                 timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
-        for (Record record : recordBatchAndRecords.records)
-            builder.append(record);
+        for (Record record : recordBatchAndRecords.records) {
+            // Down-convert this record. Ignore headers when down-converting to V0 and V1
since they are not supported
+            if (magic > RecordBatch.MAGIC_VALUE_V1)
+                builder.append(record);
+            else
+                builder.appendWithOffset(record.offset(), record.timestamp(), record.key(),
record.value());
+        }
 
         builder.close();
         return builder;
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 53ac2003586..fdd3ede16cc 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -40,6 +42,7 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertArrayEquals;
 
 public class FileRecordsTest {
 
@@ -358,6 +361,11 @@ public void testConversion() throws IOException {
 
     private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException
{
         List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);
+
+        Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()),
+                            new RecordHeader("headerKey2", "headerValue2".getBytes()),
+                            new RecordHeader("headerKey3", "headerValue3".getBytes())};
+
         List<SimpleRecord> records = asList(
                 new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
                 new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
@@ -366,9 +374,10 @@ private void doTestConversion(CompressionType compressionType, byte toMagic)
thr
                 new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
                 new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()),
                 new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
-                new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes()),
+                new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers),
                 new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()),
-                new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes()));
+                new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers));
+        assertEquals("incorrect test setup", offsets.size(), records.size());
 
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0,
compressionType,
@@ -452,6 +461,7 @@ private void verifyConvertedRecords(List<SimpleRecord> initialRecords,
                     assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(),
record.timestamp());
                     assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
                     assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+                    assertArrayEquals("Headers should not change", initialRecords.get(i).headers(),
record.headers());
                 }
                 i += 1;
             }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Broker receives error when handling request with java.lang.IllegalArgumentException:
Magic v0 does not support record headers
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6739
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6739
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.0.0
>            Reporter: Koelli Mungee
>            Assignee: Dhruvil Shah
>            Priority: Critical
>             Fix For: 1.2.0, 1.1.1
>
>
> A broker running at 1.0.0 with the following properties 
>  
> {code:java}
> log.message.format.version=1.0
> inter.broker.protocol.version=1.0
> {code}
> receives this ERROR while handling fetch request for a message with a header
> {code:java}
> [2018-03-23 01:48:03,093] ERROR [KafkaApi-1] Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=test=[{partition=11,fetch_offset=20645,max_bytes=1048576}]}]}
(kafka.server.KafkaApis) java.lang.IllegalArgumentException: Magic v0 does not support record
headers 
> at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)

> at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586)

> at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134)

> at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109)

> at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) 
> at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)

> at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)

> at scala.Option.map(Option.scala:146) 
> at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)

> at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)

> at scala.Option.flatMap(Option.scala:171) 
> at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)

> at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)

> at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)

> at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)

> at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)

> at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)

> at kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)

> at kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)

> at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) 
> at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)

> at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)

> at kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)

> at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587)

> at kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604) 
> at kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604) 
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820) 
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596) 
> at kafka.server.KafkaApis.handle(KafkaApis.scala:100) 
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65) 
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message