kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6238; Fix inter-broker protocol message format compatibility check
Date Wed, 21 Feb 2018 09:41:20 GMT
This is an automated email from the ASF dual-hosted git repository.

damianguy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 660c0c0  KAFKA-6238; Fix inter-broker protocol message format compatibility check
660c0c0 is described below

commit 660c0c0aa33ced5307ee70bfdb78ebde4b978d73
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Feb 21 09:38:39 2018 +0000

    KAFKA-6238; Fix inter-broker protocol message format compatibility check
    
    This patch fixes a bug in the validation of the inter-broker protocol and the message
format version. We should allow the configured message format api version to be greater than
the inter-broker protocol api version as long as the actual message format versions are equal.
For example, if the message format version is set to 1.0, it is fine for the inter-broker
protocol version to be 0.11.0 because they both use message format v2.
    
    I have added a unit test which checks compatibility for all combinations of the message
format version and the inter-broker protocol version.
    
    Author: Jason Gustafson <jason@confluent.io>
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
    
    Closes #4583 from hachikuji/KAFKA-6328-REOPENED
---
 .../apache/kafka/common/record/RecordFormat.java   | 41 +++++++++++++++++++
 core/src/main/scala/kafka/api/ApiVersion.scala     | 46 ++++++++++++++--------
 core/src/main/scala/kafka/log/Log.scala            |  4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  5 ++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  9 ++++-
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 .../test/scala/unit/kafka/api/ApiVersionTest.scala | 13 ++++++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 24 +++++++++++
 docs/upgrade.html                                  | 19 +++++----
 9 files changed, 131 insertions(+), 32 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
new file mode 100644
index 0000000..e71ec59
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+public enum RecordFormat {
+    V0(0), V1(1), V2(2);
+
+    public final byte value;
+
+    RecordFormat(int value) {
+        this.value = (byte) value;
+    }
+
+    public static RecordFormat lookup(byte version) {
+        switch (version) {
+            case 0: return V0;
+            case 1: return V1;
+            case 2: return V2;
+            default: throw new IllegalArgumentException("Unknown format version: " + version);
+        }
+    }
+
+    public static RecordFormat current() {
+        return V2;
+    }
+
+}
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index b8329c1..9270a7a 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.record.RecordFormat
 
 /**
  * This class contains the different Kafka versions.
@@ -90,11 +90,23 @@ object ApiVersion {
 
   def latestVersion = versionNameMap.values.max
 
+  def allVersions: Set[ApiVersion] = {
+    versionNameMap.values.toSet
+  }
+
+  def minVersionForMessageFormat(messageFormatVersion: RecordFormat): String = {
+    messageFormatVersion match {
+      case RecordFormat.V0 => "0.8.0"
+      case RecordFormat.V1 => "0.10.0"
+      case RecordFormat.V2 => "0.11.0"
+      case _ => throw new IllegalArgumentException(s"Invalid message format version $messageFormatVersion")
+    }
+  }
 }
 
 sealed trait ApiVersion extends Ordered[ApiVersion] {
   val version: String
-  val messageFormatVersion: Byte
+  val messageFormatVersion: RecordFormat
   val id: Int
 
   override def compare(that: ApiVersion): Int =
@@ -106,90 +118,90 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
 // Keep the IDs in order of versions
 case object KAFKA_0_8_0 extends ApiVersion {
   val version: String = "0.8.0.X"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+  val messageFormatVersion = RecordFormat.V0
   val id: Int = 0
 }
 
 case object KAFKA_0_8_1 extends ApiVersion {
   val version: String = "0.8.1.X"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+  val messageFormatVersion = RecordFormat.V0
   val id: Int = 1
 }
 
 case object KAFKA_0_8_2 extends ApiVersion {
   val version: String = "0.8.2.X"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+  val messageFormatVersion = RecordFormat.V0
   val id: Int = 2
 }
 
 case object KAFKA_0_9_0 extends ApiVersion {
   val version: String = "0.9.0.X"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+  val messageFormatVersion = RecordFormat.V0
   val id: Int = 3
 }
 
 case object KAFKA_0_10_0_IV0 extends ApiVersion {
   val version: String = "0.10.0-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 4
 }
 
 case object KAFKA_0_10_0_IV1 extends ApiVersion {
   val version: String = "0.10.0-IV1"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 5
 }
 
 case object KAFKA_0_10_1_IV0 extends ApiVersion {
   val version: String = "0.10.1-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 6
 }
 
 case object KAFKA_0_10_1_IV1 extends ApiVersion {
   val version: String = "0.10.1-IV1"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 7
 }
 
 case object KAFKA_0_10_1_IV2 extends ApiVersion {
   val version: String = "0.10.1-IV2"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 8
 }
 
 case object KAFKA_0_10_2_IV0 extends ApiVersion {
   val version: String = "0.10.2-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 9
 }
 
 case object KAFKA_0_11_0_IV0 extends ApiVersion {
   val version: String = "0.11.0-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 10
 }
 
 case object KAFKA_0_11_0_IV1 extends ApiVersion {
   val version: String = "0.11.0-IV1"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 11
 }
 
 case object KAFKA_0_11_0_IV2 extends ApiVersion {
   val version: String = "0.11.0-IV2"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 12
 }
 
 case object KAFKA_1_0_IV0 extends ApiVersion {
   val version: String = "1.0-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 13
 }
 
 case object KAFKA_1_1_IV0 extends ApiVersion {
   val version: String = "1.1-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 14
 }
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index fec984c..257dd8f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -465,7 +465,7 @@ class Log(@volatile var dir: File,
 
   private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit
= lock synchronized {
     checkIfMemoryMappedBufferClosed()
-    val messageFormatVersion = config.messageFormatVersion.messageFormatVersion
+    val messageFormatVersion = config.messageFormatVersion.messageFormatVersion.value
     info(s"Loading producer state from offset $lastOffset for partition $topicPartition with
message " +
       s"format version $messageFormatVersion")
 
@@ -663,7 +663,7 @@ class Log(@volatile var dir: File,
               appendInfo.sourceCodec,
               appendInfo.targetCodec,
               config.compact,
-              config.messageFormatVersion.messageFormatVersion,
+              config.messageFormatVersion.messageFormatVersion.value,
               config.messageTimestampType,
               config.messageTimestampDifferenceMaxMs,
               leaderEpoch,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index b84587f..9e79afa 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -59,7 +59,7 @@ import DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
 
-import scala.collection.{mutable, _}
+import scala.collection._
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
@@ -1347,7 +1347,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (apiVersionRequest.hasUnsupportedRequestVersion)
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
       else
-        ApiVersionsResponse.apiVersionsResponse(requestThrottleMs, config.interBrokerProtocolVersion.messageFormatVersion)
+        ApiVersionsResponse.apiVersionsResponse(requestThrottleMs,
+          config.interBrokerProtocolVersion.messageFormatVersion.value)
     }
     sendResponseMaybeThrottle(request, createResponseCallback)
   }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 529d0e6..8b2fb10 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1330,8 +1330,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
       s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0.
"+
       s"Use a routable IP address.")
-    require(interBrokerProtocolVersion >= logMessageFormatVersion,
-      s"log.message.format.version $logMessageFormatVersionString cannot be used when inter.broker.protocol.version
is set to $interBrokerProtocolVersionString")
+
+    val messageFormatVersion = logMessageFormatVersion.messageFormatVersion
+    require(interBrokerProtocolVersion.messageFormatVersion.value >= messageFormatVersion.value,
+      s"log.message.format.version $logMessageFormatVersionString can only be used when "
+
+        "inter.broker.protocol.version is set to version " +
+        s"${ApiVersion.minVersionForMessageFormat(messageFormatVersion)} or higher")
+
     val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT
|| interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL
     require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol
== SaslConfigs.GSSAPI_MECHANISM,
       s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when
inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f4bfe39..470842e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1002,7 +1002,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def getMagic(topicPartition: TopicPartition): Option[Byte] =
-    getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion))
+    getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion.value))
 
   def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest)
: Seq[TopicPartition] =  {
     replicaStateChangeLock synchronized {
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 6fc6974..88c9d52 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.api
 
+import org.apache.kafka.common.record.RecordFormat
 import org.junit.Test
 import org.junit.Assert._
 
@@ -74,4 +75,16 @@ class ApiVersionTest {
     assertEquals(KAFKA_1_0_IV0, ApiVersion("1.0.1"))
   }
 
+  @Test
+  def testMinVersionForMessageFormat(): Unit = {
+    assertEquals("0.8.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V0))
+    assertEquals("0.10.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V1))
+    assertEquals("0.11.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V2))
+
+    // Ensure that all message format versions have a defined min version so that we remember
+    // to update the function
+    for (messageFormatVersion <- RecordFormat.values)
+      assertNotNull(ApiVersion.minVersionForMessageFormat(messageFormatVersion))
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 6b26334..0213c12 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -523,6 +523,30 @@ class KafkaConfigTest {
   }
 
   @Test
+  def testInterBrokerVersionMessageFormatCompatibility(): Unit = {
+    def buildConfig(interBrokerProtocol: ApiVersion, messageFormat: ApiVersion): KafkaConfig
= {
+      val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+      props.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocol.version)
+      props.put(KafkaConfig.LogMessageFormatVersionProp, messageFormat.version)
+      KafkaConfig.fromProps(props)
+    }
+
+    ApiVersion.allVersions.foreach { interBrokerVersion =>
+      ApiVersion.allVersions.foreach { messageFormatVersion =>
+        if (interBrokerVersion.messageFormatVersion.value >= messageFormatVersion.messageFormatVersion.value)
{
+          val config = buildConfig(interBrokerVersion, messageFormatVersion)
+          assertEquals(messageFormatVersion, config.logMessageFormatVersion)
+          assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)
+        } else {
+          intercept[IllegalArgumentException] {
+            buildConfig(interBrokerVersion, messageFormatVersion)
+          }
+        }
+      }
+    }
+  }
+
+  @Test
   def testFromPropsInvalid() {
     def getBaseProperties(): Properties = {
       val validRequiredProperties = new Properties()
diff --git a/docs/upgrade.html b/docs/upgrade.html
index b3ae68f..3ac293d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -36,10 +36,10 @@
             <li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  (See <a
href="#upgrade_10_performance_impact">potential performance impact
                 following the upgrade</a> for the details on what this configuration
does.)</li>
         </ul>
-        If you are upgrading from 0.11.0.x and you have not overridden the message format,
then you only need to override
+        If you are upgrading from 0.11.0.x or 1.0.x and you have not overridden the message
format, then you only need to override
         the inter-broker protocol format.
         <ul>
-            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0,
0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0).</li>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0 or 1.0).</li>
         </ul>
     </li>
     <li> Upgrade the brokers one at a time: shut down the broker, update the code,
and restart it. </li>
@@ -106,10 +106,11 @@
             <li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  (See <a
href="#upgrade_10_performance_impact">potential performance impact
 		following the upgrade</a> for the details on what this configuration does.)</li>
         </ul>
-	If you are upgrading from 0.11.0.x and you have not overridden the message format, then
you only need to override
-	the inter-broker protocol format.
+	If you are upgrading from 0.11.0.x and you have not overridden the message format, you must
set
+	both the message format version and the inter-broker protocol version to 0.11.0.
         <ul>
-            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0,
0.10.0, 0.10.1, 0.10.2, 0.11.0).</li>
+            <li>inter.broker.protocol.version=0.11.0</li>
+            <li>log.message.format.version=0.11.0</li>
         </ul>
     </li>
     <li> Upgrade the brokers one at a time: shut down the broker, update the code,
and restart it. </li>
@@ -117,9 +118,11 @@
     <li> Restart the brokers one by one for the new protocol version to take effect.
</li>
     <li> If you have overridden the message format version as instructed above, then
you need to do one more rolling restart to
         upgrade it to its latest version. Once all (or most) consumers have been upgraded
to 0.11.0 or later,
-        change log.message.format.version to 1.0 on each broker and restart them one by one.
Note that the older Scala consumer
-        does not support the new message format introduced in 0.11, so to avoid the performance
cost of down-conversion (or to
-        take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly once
semantics</a>), the newer Java consumer must be used.</li>
+        change log.message.format.version to 1.0 on each broker and restart them one by one.
If you are upgrading from
+        0.11.0 and log.message.format.version is set to 0.11.0, you can update the config
and skip the rolling restart.
+        Note that the older Scala consumer does not support the new message format introduced
in 0.11, so to avoid the
+        performance cost of down-conversion (or to take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly
once semantics</a>),
+        the newer Java consumer must be used.</li>
 </ol>
 
 <p><b>Additional Upgrade Notes:</b></p>

-- 
To stop receiving notification emails like this one, please contact
damianguy@apache.org.

Mime
View raw message