kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3718; propagate all KafkaConfig __consumer_offsets configs to OffsetConfig instantiation
Date Thu, 26 May 2016 08:17:54 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 cb8b6c071 -> 5f9446498


KAFKA-3718; propagate all KafkaConfig __consumer_offsets configs to OffsetConfig instantiation

Kafka has two configurable compression codecs: the one used by the client (source codec) and
the one finally used when storing into the log (target codec). The target codec defaults to
KafkaConfig.compressionType and can be dynamically configured through zookeeper.

The GroupCoordinator appends group membership information into the __consumer_offsets topic
by:
1. making a message with group membership information
2. making a MessageSet with the single message compressed with the source codec
3. doing a log.append on the MessageSet

Without this patch, KafkaConfig.offsetsTopicCompressionCodec doesn't get propagated to OffsetConfig
instantiation, so GroupMetadataManager uses a source codec of NoCompressionCodec when making
the MessageSet. Let's say we have enough group information such that the message formed exceeds
KafkaConfig.messageMaxBytes before compression but would fall below the threshold after compression
using our source codec. Even if we had dynamically configured __consumer_offsets with our
favorite compression codec, the log.append will throw RecordTooLargeException during analyzeAndValidateMessageSet
since the message was unexpectedly uncompressed instead of having been compressed with the
source codec defined by KafkaConfig.offsetsTopicCompressionCodec.

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>,
Ismael Juma <ismael@juma.me.uk>

Closes #1394 from onurkaraman/KAFKA-3718

(cherry picked from commit 62dc1afb69369c64207991ba59bcd203505d37ea)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5f944649
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5f944649
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5f944649

Branch: refs/heads/0.10.0
Commit: 5f9446498e24afdb1149af53583c0ab23345d965
Parents: cb8b6c0
Author: Onur Karaman <okaraman@linkedin.com>
Authored: Thu May 26 09:17:31 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu May 26 09:17:49 2016 +0100

----------------------------------------------------------------------
 .../kafka/coordinator/GroupCoordinator.scala    | 10 ++--
 .../main/scala/kafka/server/KafkaServer.scala   | 14 ++---
 .../kafka/api/AuthorizerIntegrationTest.scala   |  2 +-
 .../api/GroupCoordinatorIntegrationTest.scala   | 63 ++++++++++++++++++++
 .../kafka/api/IntegrationTestHarness.scala      |  2 +-
 5 files changed, 78 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5f944649/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index fb71254..f445764 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
 import kafka.log.LogConfig
-import kafka.message.UncompressedCodec
+import kafka.message.ProducerCompressionCodec
 import kafka.server._
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
@@ -65,7 +65,7 @@ class GroupCoordinator(val brokerId: Int,
     val props = new Properties
     props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
     props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
-    props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)
+    props.put(LogConfig.CompressionTypeProp, ProducerCompressionCodec.name)
     props
   }
 
@@ -744,14 +744,16 @@ object GroupCoordinator {
       offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
       offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
       offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+      offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
       offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+      offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
       offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
       offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
     val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
       groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
 
-    val groupManager = new GroupMetadataManager(config.brokerId, offsetConfig, replicaManager,
zkUtils, time)
-    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager, heartbeatPurgatory,
joinPurgatory, time)
+    val groupMetadataManager = new GroupMetadataManager(config.brokerId, offsetConfig, replicaManager,
zkUtils, time)
+    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager,
heartbeatPurgatory, joinPurgatory, time)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f944649/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 2832ebc..de3054a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -123,7 +123,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
   var dynamicConfigHandlers: Map[String, ConfigHandler] = null
   var dynamicConfigManager: DynamicConfigManager = null
 
-  var consumerCoordinator: GroupCoordinator = null
+  var groupCoordinator: GroupCoordinator = null
 
   var kafkaController: KafkaController = null
 
@@ -199,9 +199,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime,
metrics, threadNamePrefix)
         kafkaController.startup()
 
-        /* start kafka coordinator */
-        consumerCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
-        consumerCoordinator.startup()
+        /* start group coordinator */
+        groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
+        groupCoordinator.startup()
 
         /* Get the authorizer and initialize it if one is specified.*/
         authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName
=>
@@ -211,7 +211,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         }
 
         /* start processing requests */
-        apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
+        apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
           kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel,
apis, config.numIoThreads)
         brokerState.newState(RunningAsBroker)
@@ -555,8 +555,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           CoreUtils.swallow(replicaManager.shutdown())
         if(logManager != null)
           CoreUtils.swallow(logManager.shutdown())
-        if(consumerCoordinator != null)
-          CoreUtils.swallow(consumerCoordinator.shutdown())
+        if(groupCoordinator != null)
+          CoreUtils.swallow(groupCoordinator.shutdown())
         if(kafkaController != null)
           CoreUtils.swallow(kafkaController.shutdown())
         if(zkUtils != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f944649/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index bc705f1..2d5900f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -148,7 +148,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       1,
       1,
       servers,
-      servers.head.consumerCoordinator.offsetsTopicConfigs)
+      servers.head.groupCoordinator.offsetsTopicConfigs)
     // create the test topic with all the brokers as replicas
     TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f944649/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
new file mode 100644
index 0000000..9183d0f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package integration.kafka.api
+
+import kafka.common.TopicAndPartition
+import kafka.integration.KafkaServerTestHarness
+import kafka.log.Log
+import kafka.message.GZIPCompressionCodec
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.OffsetAndMetadata
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.TopicConstants
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.Test
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+import java.util.Properties
+
+class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
+  val offsetsTopicCompressionCodec = GZIPCompressionCodec
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.codec.toString)
+
+  override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown
= false).map {
+    KafkaConfig.fromProps(_, overridingProps)
+  }
+
+  @Test
+  def testGroupCoordinatorPropagatesOfffsetsTopicCompressionCodec() {
+    val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
+                                               securityProtocol = SecurityProtocol.PLAINTEXT)
+    val offsetMap = Map(
+      new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10,
"")
+    ).asJava
+    consumer.commitSync(offsetMap)
+    val logManager = servers.head.getLogManager
+
+    def getGroupMetadataLogOpt: Option[Log] =
+      logManager.getLog(TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, 0))
+
+    TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.nonEmpty)),
+                            "Commit message not appended in time")
+
+    val logSegments = getGroupMetadataLogOpt.get.logSegments
+    val incorrectCompressionCodecs = logSegments.flatMap(_.log.map(_.message.compressionCodec)).filter(_
!= offsetsTopicCompressionCodec)
+    assertEquals("Incorrect compression codecs should be empty", Seq.empty, incorrectCompressionCodecs)
+
+    consumer.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f944649/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index de05c9c..6e76f90 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -81,7 +81,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
       serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
       serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       servers,
-      servers(0).consumerCoordinator.offsetsTopicConfigs)
+      servers(0).groupCoordinator.offsetsTopicConfigs)
   }
 
   @After


Mime
View raw message