kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jun Rao (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
Date Thu, 17 Jul 2014 15:37:05 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14065034#comment-14065034
] 

Jun Rao commented on KAFKA-1414:
--------------------------------

Thanks for patch v3. A few more comments.

30. No need to pass in recoverThreads and shutdownThreads to LogConfig.
31. LogManagerTest: Could we move createLogManager to TestUtils? Then we can reuse it in other
places like ReplicaMangerTest as well.
32. LogManager
32.1 loadLogs(): I overlooked this in the previous review. Similar to shutdown, the parallelization
here needs to be done at the log level, instead of log dir level. This will help the case
when a single log dir is built on a multi-disk volume.
33. shutdown(): The patch waits until all logs in each log dir are closed before moving to
the next log dir. Would it be better to submit the job for all logs in all log dir first and
then do the wait? This will allows us to use all log dirs in parallel. The downside is that
if we hit an IOException in one of the log dirs, we won't be able write the recovery and the
cleanShudown file in other logDirs. However, that's the current behavior anyway.
34. For Jay's comment, I suspect that both startup and shutdown time are dominated by I/O.
So, it probably makes sense to use a single config for both startup and shutdown. 

> Speedup broker startup after hard reset
> ---------------------------------------
>
>                 Key: KAFKA-1414
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1414
>             Project: Kafka
>          Issue Type: Improvement
>          Components: log
>    Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
>            Reporter: Dmitry Bugaychenko
>            Assignee: Jay Kreps
>         Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch,
KAFKA-1414-rev1.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch,
parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch
>
>
> After hard reset due to power failure broker takes way too much time recovering unflushed
segments in a single thread. This could be easiliy improved launching multiple threads (one
per data dirrectory, assuming that typically each data directory is on a dedicated drive).
Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm
too new to scala, so do not take it literally:
> {code}
>   /**
>    * Recover and load all logs in the given data directories
>    */
>   private def loadLogs(dirs: Seq[File]) {
>     val threads : Array[Thread] = new Array[Thread](dirs.size)
>     var i: Int = 0
>     val me = this
>     for(dir <- dirs) {
>       val thread = new Thread( new Runnable {
>         def run()
>         {
>           val recoveryPoints = me.recoveryPointCheckpoints(dir).read
>           /* load the logs */
>           val subDirs = dir.listFiles()
>           if(subDirs != null) {
>             val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
>             if(cleanShutDownFile.exists())
>               info("Found clean shutdown file. Skipping recovery for all logs in data
directory '%s'".format(dir.getAbsolutePath))
>             for(dir <- subDirs) {
>               if(dir.isDirectory) {
>                 info("Loading log '" + dir.getName + "'")
>                 val topicPartition = Log.parseTopicPartitionName(dir.getName)
>                 val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
>                 val log = new Log(dir,
>                   config,
>                   recoveryPoints.getOrElse(topicPartition, 0L),
>                   scheduler,
>                   time)
>                 val previous = addLogWithLock(topicPartition, log)
>                 if(previous != null)
>                   throw new IllegalArgumentException("Duplicate log directories found:
%s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
>               }
>             }
>             cleanShutDownFile.delete()
>           }
>         }
>       })
>       thread.start()
>       threads(i) = thread
>       i = i + 1
>     }
>     for(thread <- threads) {
>       thread.join()
>     }
>   }
>   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
>     logCreationOrDeletionLock synchronized {
>       this.logs.put(topicPartition, log)
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message