kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-4897) LogCleaner#cleanSegments should not ignore failures to delete files
Date Thu, 25 Jan 2018 20:40:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-4897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339844#comment-16339844
] 

ASF GitHub Bot commented on KAFKA-4897:
---------------------------------------

hachikuji closed pull request #4393: KAFKA-4897: Add pause method to ShutdownableThread
URL: https://github.com/apache/kafka/pull/4393
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 0a6b82e5fc1..23f53569a6b 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -83,7 +83,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
         }
       } catch {
         case t: Throwable => {
-            if (!isRunning.get())
+            if (!isRunning)
               throw t /* If this thread is stopped, propagate this exception to kill the
thread. */
             else
               warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
@@ -98,7 +98,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
         )
       } catch {
         case t: Throwable =>
-          if (!isRunning.get())
+          if (!isRunning)
             throw t /* If this thread is stopped, propagate this exception to kill the thread.
*/
           else {
             warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")),
t)
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index e389821ad3b..d5456bea9a8 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -17,7 +17,7 @@
 package kafka.controller
 
 import java.net.SocketTimeoutException
-import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
 
 import com.yammer.metrics.core.Gauge
 import kafka.api._
@@ -213,13 +213,13 @@ class RequestSendThread(val controllerId: Int,
 
   override def doWork(): Unit = {
 
-    def backoff(): Unit = CoreUtils.swallow(Thread.sleep(100), this, Level.TRACE)
+    def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)
 
     val QueueItem(apiKey, requestBuilder, callback) = queue.take()
     var clientResponse: ClientResponse = null
     try {
       var isSendSuccessful = false
-      while (isRunning.get() && !isSendSuccessful) {
+      while (isRunning && !isSendSuccessful) {
         // if a broker goes down for a long time, then at some point the controller's zookeeper
listener will trigger a
         // removeBroker which will invoke shutdown() on this thread. At that point, we will
stop retrying.
         try {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c5c0d497318..637e24cb01d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
 import java.nio._
 import java.nio.file.Files
 import java.util.Date
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.core.Gauge
 import kafka.common._
@@ -233,10 +233,9 @@ class LogCleaner(val config: CleanerConfig,
                               checkDone = checkDone)
 
     @volatile var lastStats: CleanerStats = new CleanerStats()
-    private val backOffWaitLatch = new CountDownLatch(1)
 
     private def checkDone(topicPartition: TopicPartition) {
-      if (!isRunning.get())
+      if (!isRunning)
         throw new ThreadShutdownException
       cleanerManager.checkCleaningAborted(topicPartition)
     }
@@ -248,12 +247,6 @@ class LogCleaner(val config: CleanerConfig,
       cleanOrSleep()
     }
 
-    override def shutdown() = {
-    	 initiateShutdown()
-    	 backOffWaitLatch.countDown()
-    	 awaitShutdown()
-     }
-
     /**
      * Clean a log if there is a dirty log available, otherwise sleep for a bit
      */
@@ -289,7 +282,7 @@ class LogCleaner(val config: CleanerConfig,
           }
       }
       if (!cleaned)
-        backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
+        pause(config.backOffMs, TimeUnit.MILLISECONDS)
     }
 
     /**
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b078073ba03..925c33095a2 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -148,7 +148,7 @@ abstract class AbstractFetcherThread(name: String,
       responseData = fetch(fetchRequest)
     } catch {
       case t: Throwable =>
-        if (isRunning.get) {
+        if (isRunning) {
           warn(s"Error in fetch to broker ${sourceBroker.id}, request $fetchRequest", t)
           inLock(partitionMapLock) {
             partitionsWithError ++= partitionStates.partitionSet.asScala
@@ -218,7 +218,7 @@ abstract class AbstractFetcherThread(name: String,
                       partitionsWithError += topicPartition
                   }
                 case _ =>
-                  if (isRunning.get) {
+                  if (isRunning) {
                     error(s"Error for partition $topicPartition from broker ${sourceBroker.id}",
partitionData.exception.get)
                     partitionsWithError += topicPartition
                   }
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 71f33683dbe..0408e9212a3 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -368,7 +368,7 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint,
topicAn
       response = simpleConsumer.fetch(fetchRequest)
     } catch {
       case t: Throwable =>
-        if (!isRunning.get)
+        if (!isRunning)
           throw t
     }
 
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index 0922d15e93a..13bbc90f2a5 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -17,8 +17,7 @@
 
 package kafka.utils
 
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import org.apache.kafka.common.internals.FatalExitError
 
@@ -26,8 +25,8 @@ abstract class ShutdownableThread(val name: String, val isInterruptible:
Boolean
         extends Thread(name) with Logging {
   this.setDaemon(false)
   this.logIdent = "[" + name + "]: "
-  val isRunning: AtomicBoolean = new AtomicBoolean(true)
-  private val shutdownLatch = new CountDownLatch(1)
+  private val shutdownInitiated = new CountDownLatch(1)
+  private val shutdownComplete = new CountDownLatch(1)
 
   def shutdown(): Unit = {
     initiateShutdown()
@@ -35,27 +34,42 @@ abstract class ShutdownableThread(val name: String, val isInterruptible:
Boolean
   }
 
   def isShutdownComplete: Boolean = {
-    shutdownLatch.getCount == 0
+    shutdownComplete.getCount == 0
   }
 
   def initiateShutdown(): Boolean = {
-    if (isRunning.compareAndSet(true, false)) {
-      info("Shutting down")
-      if (isInterruptible)
-        interrupt()
-      true
-    } else
-      false
+    this.synchronized {
+      if (isRunning) {
+        info("Shutting down")
+        shutdownInitiated.countDown()
+        if (isInterruptible)
+          interrupt()
+        true
+      } else
+        false
+    }
   }
 
-    /**
+  /**
    * After calling initiateShutdown(), use this API to wait until the shutdown is complete
    */
   def awaitShutdown(): Unit = {
-    shutdownLatch.await()
+    shutdownComplete.await()
     info("Shutdown completed")
   }
 
+  /**
+   *  Causes the current thread to wait until the shutdown is initiated,
+   *  or the specified waiting time elapses.
+   *
+   * @param timeout
+   * @param unit
+   */
+  def pause(timeout: Long, unit: TimeUnit): Unit = {
+    if (shutdownInitiated.await(timeout, unit))
+      trace("shutdownInitiated latch count reached zero. Shutdown called.")
+  }
+
   /**
    * This method is repeatedly invoked until the thread shuts down or this method throws
an exception
    */
@@ -64,19 +78,24 @@ abstract class ShutdownableThread(val name: String, val isInterruptible:
Boolean
   override def run(): Unit = {
     info("Starting")
     try {
-      while (isRunning.get)
+      while (isRunning)
         doWork()
     } catch {
       case e: FatalExitError =>
-        isRunning.set(false)
-        shutdownLatch.countDown()
+        shutdownInitiated.countDown()
+        shutdownComplete.countDown()
         info("Stopped")
         Exit.exit(e.statusCode())
       case e: Throwable =>
-        if (isRunning.get())
+        if (isRunning)
           error("Error due to", e)
+    } finally {
+       shutdownComplete.countDown()
     }
-    shutdownLatch.countDown()
     info("Stopped")
   }
+
+  def isRunning: Boolean = {
+    shutdownInitiated.getCount() != 0
+  }
 }
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 8917921d85f..58d1be9ee5c 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -104,7 +104,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     val scheduler = new BounceBrokerScheduler(numIters)
     scheduler.start()
 
-    while (scheduler.isRunning.get()) {
+    while (scheduler.isRunning) {
       val records = consumer.poll(100).asScala
       assertEquals(Set(tp), consumer.assignment.asScala)
 
@@ -146,7 +146,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     val scheduler = new BounceBrokerScheduler(numIters)
     scheduler.start()
 
-    while(scheduler.isRunning.get()) {
+    while(scheduler.isRunning) {
       val coin = TestUtils.random.nextInt(3)
       if (coin == 0) {
         info("Seeking to end of log")
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index fc06b731034..a760d7d1d06 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -439,7 +439,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     @volatile var sent = 0
     override def doWork(): Unit = {
         try {
-            while (isRunning.get) {
+            while (isRunning) {
                 sent += 1
                 val record = new ProducerRecord(topic, s"key$sent", s"value$sent")
                 producer.send(record).get(10, TimeUnit.SECONDS)
@@ -456,7 +456,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     var received = 0
     override def doWork(): Unit = {
       try {
-        while (isRunning.get || (received < producerThread.sent && System.currentTimeMillis
< endTimeMs)) {
+        while (isRunning || (received < producerThread.sent && System.currentTimeMillis
< endTimeMs)) {
           received += consumer.poll(50).count
         }
       } finally {
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 3bc3d39b339..66b4874e3f9 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -88,21 +88,6 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc
         </Or>
     </Match>
 
-    <Match>
-        <!-- Add a suppression for KAFKA-4897: LogCleaner#cleanSegments should not ignore
failures to delete files.
-            TODO: remove this suppression when KAFKA-4897 is fixed. -->
-        <Package name="kafka.log"/>
-        <Source name="LogCleaner.scala"/>
-        <Bug pattern="RV_RETURN_VALUE_IGNORED,RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
-    </Match>
-
-    <Match>
-        <!-- Add a suppression for ignoring the return value of CountDownLatch#await.
-->
-        <Class name="kafka.log.Cleaner"/>
-        <Method name="cleanOrSleep"/>
-        <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
-    </Match>
-
     <Match>
         <!-- Add a suppression for having the thread start in the constructor of the old,
deprecated consumer. -->
         <Class name="kafka.producer.Producer"/>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> LogCleaner#cleanSegments should not ignore failures to delete files
> -------------------------------------------------------------------
>
>                 Key: KAFKA-4897
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4897
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Colin P. McCabe
>            Assignee: Manikumar
>            Priority: Major
>
> LogCleaner#cleanSegments should not ignore failures to delete files.  Currently, it ignores
the failure and does not even log an error message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message