Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 90EB5200CA3 for ; Thu, 18 May 2017 03:54:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8F517160BCB; Thu, 18 May 2017 01:54:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 60399160BBA for ; Thu, 18 May 2017 03:54:51 +0200 (CEST) Received: (qmail 83192 invoked by uid 500); 18 May 2017 01:54:50 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 83181 invoked by uid 99); 18 May 2017 01:54:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 May 2017 01:54:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3D918DFF93; Thu, 18 May 2017 01:54:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: junrao@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: =?utf-8?q?kafka_git_commit=3A_KAFKA-5036=3B_hold_onto_the_leader_l?= =?utf-8?q?ock_in_Partition_while_serving_an_O=E2=80=A6?= Date: Thu, 18 May 2017 01:54:50 +0000 (UTC) archived-at: Thu, 18 May 2017 01:54:52 -0000 Repository: kafka Updated Branches: refs/heads/trunk c64cfd2e2 -> 249152062 KAFKA-5036; hold onto the leader lock in Partition while serving an O… …ffsetForLeaderEpoch request Author: Jun Rao Reviewers: Ismael Juma , Ben Stopford Closes #3074 from junrao/kafka-5036 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/24915206 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/24915206 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/24915206 Branch: refs/heads/trunk Commit: 24915206260c33cd7118db5359f3927d3be1ff60 Parents: c64cfd2 Author: Jun Rao Authored: Wed May 17 18:54:46 2017 -0700 Committer: Jun Rao Committed: Wed May 17 18:54:46 2017 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/cluster/Partition.scala | 45 +++++++---- core/src/main/scala/kafka/log/Log.scala | 2 +- .../src/main/scala/kafka/server/KafkaApis.scala | 6 +- .../scala/kafka/server/ReplicaManager.scala | 48 +++++------ .../server/epoch/LeaderEpochFileCache.scala | 2 +- .../epoch/OffsetsForLeaderEpochTest.scala | 83 +++++++++++--------- 6 files changed, 100 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1d13689..f123a16 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -16,28 +16,30 @@ */ package kafka.cluster -import kafka.common._ -import kafka.utils._ -import kafka.utils.CoreUtils.{inReadLock, inWriteLock} -import kafka.admin.AdminUtils -import kafka.api.LeaderAndIsr -import kafka.log.LogConfig -import kafka.server._ -import kafka.metrics.KafkaMetricsGroup -import kafka.controller.KafkaController import java.io.IOException import java.util.concurrent.locks.ReentrantReadWriteLock -import org.apache.kafka.common.errors.{PolicyViolationException, NotEnoughReplicasException, NotLeaderForPartitionException} -import org.apache.kafka.common.protocol.Errors - -import scala.collection.JavaConverters._ import com.yammer.metrics.core.Gauge +import kafka.admin.AdminUtils +import kafka.api.LeaderAndIsr +import kafka.common.NotAssignedReplicaException +import kafka.controller.KafkaController +import kafka.log.LogConfig +import kafka.metrics.KafkaMetricsGroup +import kafka.server._ +import kafka.utils.CoreUtils.{inReadLock, inWriteLock} +import kafka.utils._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.PartitionState +import org.apache.kafka.common.requests.EpochEndOffset._ +import org.apache.kafka.common.requests.{EpochEndOffset, PartitionState} import org.apache.kafka.common.utils.Time +import scala.collection.JavaConverters._ + /** * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR */ @@ -510,6 +512,21 @@ class Partition(val topic: String, } } + /** + * @param leaderEpoch Requested leader epoch + * @return The last offset of messages published under this leader epoch. + */ + def lastOffsetForLeaderEpoch(leaderEpoch: Int): EpochEndOffset = { + inReadLock(leaderIsrUpdateLock) { + leaderReplicaIfLocal match { + case Some(leaderReplica) => + new EpochEndOffset(NONE, leaderReplica.epochs.get.endOffsetFor(leaderEpoch)) + case None => + new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET) + } + } + } + private def updateIsr(newIsr: Set[Replica]) { val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId, http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index dd22a26..7a47657 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -513,7 +513,7 @@ class Log(@volatile var dir: File, * @throws KafkaStorageException If the append fails due to an I/O error. * @return Information about the appended messages including the first and last offset. */ - private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean = true, leaderEpoch: Int): LogAppendInfo = { + private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) // return if we have no valid messages or if this is a duplicate of the last appended entry http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 33b696a..c3c37c1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1656,12 +1656,12 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = { - val offsetForEpoch = request.body[OffsetsForLeaderEpochRequest] - val requestInfo = offsetForEpoch.epochsByTopicPartition() + val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest] + val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition() authorizeClusterAction(request) val responseBody = new OffsetsForLeaderEpochResponse( - replicaManager.getResponseFor(requestInfo) + replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava ) sendResponseExemptThrottle(request, new RequestChannel.Response(request, responseBody)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index eec9ab2..47b6d69 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -20,36 +20,32 @@ import java.io.{File, IOException} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import org.apache.kafka.common.errors._ import com.yammer.metrics.core.Gauge import kafka.api._ import kafka.cluster.{Partition, Replica} +import kafka.common.KafkaStorageException import kafka.controller.KafkaController import kafka.log.{Log, LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils._ -import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _} +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse, LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest} -import org.apache.kafka.common.requests._ -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse, LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest, _} +import org.apache.kafka.common.utils.Time -import scala.collection._ import scala.collection.JavaConverters._ -import java.util.{Map => JMap} - -import kafka.common.KafkaStorageException -import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.protocol.Errors._ -import org.apache.kafka.common.requests.EpochEndOffset._ +import scala.collection._ /* * Result metadata of a log append operation on the log @@ -1108,22 +1104,16 @@ class ReplicaManager(val config: KafkaConfig, new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) } - def getResponseFor(requestedEpochInfo: JMap[TopicPartition, Integer]): JMap[TopicPartition, EpochEndOffset] = { - OffsetsForLeaderEpoch.getResponseFor(this, requestedEpochInfo) - } -} - -object OffsetsForLeaderEpoch extends Logging { - def getResponseFor(replicaManager: ReplicaManager, requestedEpochInfo: JMap[TopicPartition, Integer]): JMap[TopicPartition, EpochEndOffset] = { - debug(s"Processing OffsetForEpochRequest: $requestedEpochInfo") - requestedEpochInfo.asScala.map { case (tp, epoch) => - val offset = try { - new EpochEndOffset(NONE, replicaManager.getLeaderReplicaIfLocal(tp).epochs.get.endOffsetFor(epoch)) - } catch { - case _: NotLeaderForPartitionException => new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET) - case _: UnknownTopicOrPartitionException => new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET) + def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, Integer]): Map[TopicPartition, EpochEndOffset] = { + requestedEpochInfo.map { case (tp, leaderEpoch) => + val epochEndOffset = getPartition(tp) match { + case Some(partition) => + partition.lastOffsetForLeaderEpoch(leaderEpoch) + case None => + new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET) } - (tp, offset) - }.toMap.asJava + tp -> epochEndOffset + } } } + http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index 2b1ecc7..ffca900 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -180,7 +180,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM checkpoint.write(epochs) } - def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Latest: {epoch:$latestEpoch, offset$latestOffset} for Partition: $topicPartition" + def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Current: {epoch:$latestEpoch, offset$latestOffset} for Partition: $topicPartition" def validateAndMaybeWarn(epoch: Int, offset: Long) = { assert(epoch >= 0, s"Received a PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}") http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index 77b9068..d004641 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -16,83 +16,90 @@ */ package kafka.server.epoch -import kafka.server.OffsetsForLeaderEpoch +import java.util.concurrent.atomic.AtomicBoolean + +import kafka.cluster.Replica +import kafka.server._ +import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException} +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.EpochEndOffset +import org.apache.kafka.common.requests.EpochEndOffset._ import org.easymock.EasyMock._ -import org.junit.Test import org.junit.Assert._ - -import scala.collection.JavaConverters._ -import scala.collection.mutable +import org.junit.Test class OffsetsForLeaderEpochTest { + private val config = TestUtils.createBrokerConfigs(1, TestUtils.MockZkConnect).map(KafkaConfig.fromProps).head + private val time = new MockTime + private val metrics = new Metrics + private val tp = new TopicPartition("topic", 1) @Test def shouldGetEpochsFromReplica(): Unit = { - val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager]) - val replica = createNiceMock(classOf[kafka.cluster.Replica]) - val cache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache]) - //Given - val tp = new TopicPartition("topic", 1) val offset = 42 val epochRequested: Integer = 5 - val request = mutable.Map(tp -> epochRequested).asJava + val request = Map(tp -> epochRequested) //Stubs - expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica) - expect(replica.epochs).andReturn(Some(cache)) - expect(cache.endOffsetFor(epochRequested)).andReturn(offset) - replay(replica, replicaManager, cache) + val mockLog = createNiceMock(classOf[kafka.log.Log]) + val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache]) + expect(mockCache.endOffsetFor(epochRequested)).andReturn(offset) + expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes() + replay(mockCache, mockLog) + + // create a replica manager with 1 partition that has 1 replica + val replicaManager = new ReplicaManager(config, metrics, time, null, null, null, new AtomicBoolean(false), + QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, + new MetadataCache(config.brokerId)) + val partition = replicaManager.getOrCreatePartition(tp) + val leaderReplica = new Replica(config.brokerId, partition, time, 0, Some(mockLog)) + partition.addReplicaIfNotExists(leaderReplica) + partition.leaderReplicaIdOpt = Some(config.brokerId) //When - val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, request) + val response = replicaManager.lastOffsetForLeaderEpoch(request) //Then - assertEquals(new EpochEndOffset(Errors.NONE, offset), response.get(tp)) + assertEquals(new EpochEndOffset(Errors.NONE, offset), response(tp)) } @Test - def shonuldReturnNoLeaderForPartitionIfThrown(): Unit = { - val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager]) + def shouldReturnNoLeaderForPartitionIfThrown(): Unit = { + //create a replica manager with 1 partition that has 0 replica + val replicaManager = new ReplicaManager(config, metrics, time, null, null, null, new AtomicBoolean(false), + QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, + new MetadataCache(config.brokerId)) + replicaManager.getOrCreatePartition(tp) //Given - val tp = new TopicPartition("topic", 1) val epochRequested: Integer = 5 - val request = mutable.Map(tp -> epochRequested).asJava - - //Stubs - expect(replicaManager.getLeaderReplicaIfLocal(tp)).andThrow(new NotLeaderForPartitionException()) - replay(replicaManager) + val request = Map(tp -> epochRequested) //When - val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, request) + val response = replicaManager.lastOffsetForLeaderEpoch(request) //Then - assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET), response.get(tp)) + assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET), response(tp)) } @Test def shouldReturnUnknownTopicOrPartitionIfThrown(): Unit = { - val replicaManager = createNiceMock(classOf[kafka.server.ReplicaManager]) + //create a replica manager with 0 partition + val replicaManager = new ReplicaManager(config, metrics, time, null, null, null, new AtomicBoolean(false), + QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats, + new MetadataCache(config.brokerId)) //Given - val tp = new TopicPartition("topic", 1) val epochRequested: Integer = 5 - val request = mutable.Map(tp -> epochRequested).asJava - - //Stubs - expect(replicaManager.getLeaderReplicaIfLocal(tp)).andThrow(new UnknownTopicOrPartitionException()) - replay(replicaManager) + val request = Map(tp -> epochRequested) //When - val response = OffsetsForLeaderEpoch.getResponseFor(replicaManager, request) + val response = replicaManager.lastOffsetForLeaderEpoch(request) //Then - assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET), response.get(tp)) + assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET), response(tp)) } } \ No newline at end of file