kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7414; Out of range errors should never be fatal for follower (#5654)
Date Mon, 17 Sep 2018 19:37:09 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c5ef614  KAFKA-7414; Out of range errors should never be fatal for follower (#5654)
c5ef614 is described below

commit c5ef614bbf133e18bd207e22f2697a2d1d3e8e4e
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Mon Sep 17 12:36:53 2018 -0700

    KAFKA-7414; Out of range errors should never be fatal for follower (#5654)
    
    This patch fixes the inconsistent handling of out of range errors in the replica fetcher.
Previously we would raise a fatal error if the follower's offset is ahead of the leader's
and unclean leader election is not enabled. The behavior was inconsistent depending on the
message format. With KIP-101/KIP-279 and the new message format, upon becoming a follower,
the replica would use leader epoch information to reconcile the end of the log with the leader
and simply truncate. Additionall [...]
    
    With this patch, we simply skip the unclean leader election check and allow the needed
truncation to occur. When the truncation offset is below the high watermark, a warning will
be logged. This makes the behavior consistent for all message formats and removes a scenario
in which an error on one partition can bring the broker down.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
---
 .../scala/kafka/server/AbstractFetcherThread.scala |  17 +--
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |   2 -
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   9 +-
 .../ReplicaFetcherThreadFatalErrorTest.scala       | 146 ---------------------
 .../kafka/server/AbstractFetcherThreadTest.scala   |  18 ++-
 5 files changed, 11 insertions(+), 181 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 44137cf..4a2719e 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -38,9 +38,9 @@ import java.util.concurrent.atomic.AtomicLong
 import com.yammer.metrics.core.Gauge
 import kafka.log.LogAppendInfo
 import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
+import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
-import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, ListOffsetRequest}
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
 
 import scala.math._
 
@@ -77,8 +77,6 @@ abstract class AbstractFetcherThread(name: String,
 
   protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]]
 
-  protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean
-
   protected def latestEpoch(topicPartition: TopicPartition): Option[Int]
 
   protected def logEndOffset(topicPartition: TopicPartition): Long
@@ -289,7 +287,6 @@ abstract class AbstractFetcherThread(name: String,
                     info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition
$topicPartition is " +
                       s"out of range, which typically implies a leader change. Reset fetch
offset to $newOffset")
                   } catch {
-                    case e: FatalExitError => throw e
                     case e: Throwable =>
                       error(s"Error getting offset for partition $topicPartition", e)
                       partitionsWithError += topicPartition
@@ -458,16 +455,6 @@ abstract class AbstractFetcherThread(name: String,
      */
     val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition)
     if (leaderEndOffset < replicaEndOffset) {
-      // Prior to truncating the follower's log, ensure that doing so is not disallowed by
the configuration for unclean leader election.
-      // This situation could only happen if the unclean election configuration for a topic
changes while a replica is down. Otherwise,
-      // we should never encounter this situation since a non-ISR leader cannot be elected
if disallowed by the broker configuration.
-      if (!isUncleanLeaderElectionAllowed(topicPartition)) {
-        // Log a fatal error and shutdown the broker to ensure that data loss does not occur
unexpectedly.
-        fatal(s"Exiting because log truncation is not allowed for partition $topicPartition,
current leader's " +
-          s"latest offset $leaderEndOffset is less than replica's latest offset $replicaEndOffset}")
-        throw new FatalExitError
-      }
-
       warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current
" +
         s"leader's latest offset $leaderEndOffset")
       truncate(topicPartition, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, leaderEndOffset))
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index dc585eb..2244771 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -125,8 +125,6 @@ class ReplicaAlterLogDirsThread(name: String,
     logAppendInfo
   }
 
-  override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition):
Boolean = true
-
   override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition): Long
= {
     replicaMgr.getReplicaOrException(topicPartition).logStartOffset
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 5dcd29b..bdbadd9 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -21,9 +21,8 @@ import java.util.Optional
 
 import kafka.api._
 import kafka.cluster.BrokerEndPoint
-import kafka.log.{LogAppendInfo, LogConfig}
+import kafka.log.LogAppendInfo
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
-import kafka.zk.AdminZkClient
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.KafkaStorageException
@@ -173,12 +172,6 @@ class ReplicaFetcherThread(name: String,
         "equal or larger than your settings for max.message.bytes, both at a broker and topic
level.")
   }
 
-  override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition):
Boolean = {
-    val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
-    LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig(
-      ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable
-  }
-
   override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition,
PD)] = {
     try {
       val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
deleted file mode 100644
index 392c912..0000000
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-import kafka.cluster.BrokerEndPoint
-import kafka.utils.{Exit, TestUtils}
-import kafka.utils.TestUtils.createBrokerConfigs
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.Records
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
-import org.apache.kafka.common.utils.Time
-import org.junit.{After, Test}
-
-import scala.collection.Map
-import scala.collection.JavaConverters._
-import scala.concurrent.Future
-
-class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
-
-  private var brokers: Seq[KafkaServer] = null
-  @volatile private var shutdownCompleted = false
-
-  @After
-  override def tearDown() {
-    Exit.resetExitProcedure()
-    TestUtils.shutdownServers(brokers)
-    super.tearDown()
-  }
-
-  /**
-    * Verifies that a follower shuts down if the offset for an `added partition` is out of
range and if a fatal
-    * exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that
there are no deadlocks
-    * when the shutdown hook is invoked and hence this test.
-    */
-  @Test
-  def testFatalErrorInAddPartitions(): Unit = {
-
-    // Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the
broker shuts down before
-    // the metadata is propagated.
-    def createTopic(topic: String): Unit = {
-      adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2)
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    }
-
-    val props = createBrokerConfigs(2, zkConnect)
-    brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params
=>
-      import params._
-      new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager,
metrics, time, quotaManager) {
-        override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw
new FatalExitError
-        override def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]): Unit
=
-          super.addPartitions(partitionAndOffsets.mapValues(_ => -1))
-      }
-    }))
-    createTopic("topic")
-    TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
-  }
-
-  /**
-    * Verifies that a follower shuts down if the offset of a partition in the fetch response
is out of range and if a
-    * fatal exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure
that there are no deadlocks
-    * when the shutdown hook is invoked and hence this test.
-    */
-  @Test
-  def testFatalErrorInProcessFetchRequest(): Unit = {
-    val props = createBrokerConfigs(2, zkConnect)
-    brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params
=>
-      import params._
-      new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager,
metrics, time, quotaManager) {
-        override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw
new FatalExitError
-        override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition,
PD)] = {
-          fetchRequest.fetchData.asScala.keys.toSeq.map { tp =>
-            (tp, new FetchResponse.PartitionData[Records](Errors.OFFSET_OUT_OF_RANGE,
-              FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-              FetchResponse.INVALID_LOG_START_OFFSET, null, null))
-          }
-        }
-      }
-    }))
-    TestUtils.createTopic(zkClient, "topic", numPartitions = 1, replicationFactor = 2, servers
= brokers)
-    TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
-  }
-
-  private case class FetcherThreadParams(threadName: String, fetcherId: Int, sourceBroker:
BrokerEndPoint,
-                                         replicaManager: ReplicaManager, metrics: Metrics,
time: Time,
-                                         quotaManager: ReplicationQuotaManager)
-
-  private def createServer(config: KafkaConfig, fetcherThread: FetcherThreadParams =>
ReplicaFetcherThread): KafkaServer = {
-    val time = Time.SYSTEM
-    val server = new KafkaServer(config, time) {
-
-      override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
{
-        new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown,
-          quotaManagers, new BrokerTopicStats, metadataCache, logDirFailureChannel) {
-
-          override protected def createReplicaFetcherManager(metrics: Metrics, time: Time,
threadNamePrefix: Option[String],
-                                                             quotaManager: ReplicationQuotaManager)
=
-            new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
{
-              override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint):
AbstractFetcherThread = {
-                val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
-                val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
-                fetcherThread(FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager,
metrics,
-                  time, quotaManager))
-              }
-            }
-        }
-      }
-
-    }
-
-    Exit.setExitProcedure { (_, _) =>
-      import scala.concurrent.ExecutionContext.Implicits._
-      // Run in a separate thread like shutdown hooks
-      Future {
-        server.shutdown()
-        shutdownCompleted = true
-      }
-      // Sleep until interrupted to emulate the fact that `System.exit()` never returns
-      Thread.sleep(Long.MaxValue)
-      throw new AssertionError
-    }
-    server.startup()
-    server
-  }
-
-}
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 8c1d95a..7a7aeb3 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -151,10 +151,10 @@ class AbstractFetcherThreadTest {
     assertEquals(leaderState.highWatermark, replicaState.highWatermark)
   }
 
-  @Test(expected = classOf[FatalExitError])
-  def testFollowerFetchOutOfRangeHighUncleanLeaderElectionDisallowed(): Unit = {
+  @Test
+  def testFollowerFetchOutOfRangeHigh(): Unit = {
     val partition = new TopicPartition("topic", 0)
-    val fetcher = new MockFetcherThread(isUncleanLeaderElectionAllowed = false)
+    val fetcher = new MockFetcherThread()
 
     val replicaLog = Seq(
       mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
@@ -185,6 +185,10 @@ class AbstractFetcherThreadTest {
     leaderState.highWatermark = 0L
 
     fetcher.doWork()
+
+    assertEquals(0L, replicaState.logEndOffset)
+    assertEquals(0L, replicaState.logStartOffset)
+    assertEquals(0L, replicaState.highWatermark)
   }
 
   @Test
@@ -275,9 +279,7 @@ class AbstractFetcherThreadTest {
     }
   }
 
-  class MockFetcherThread(val replicaId: Int = 0,
-                          val leaderId: Int = 1,
-                          isUncleanLeaderElectionAllowed: Boolean = true)
+  class MockFetcherThread(val replicaId: Int = 0, val leaderId: Int = 1)
     extends AbstractFetcherThread("mock-fetcher",
       clientId = "mock-fetcher",
       sourceBroker = new BrokerEndPoint(leaderId, host = "localhost", port = Random.nextInt()))
{
@@ -380,10 +382,6 @@ class AbstractFetcherThreadTest {
       ResultWithPartitions(Some(fetchRequest), Set.empty)
     }
 
-    override def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean
= {
-      isUncleanLeaderElectionAllowed
-    }
-
     override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
       val state = replicaPartitionState(topicPartition)
       state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH))


Mime
View raw message