spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "holdenk (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-13048) EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
Date Mon, 01 Feb 2016 22:43:39 GMT

    [ https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15127179#comment-15127179
] 

holdenk edited comment on SPARK-13048 at 2/1/16 10:43 PM:
----------------------------------------------------------

This sounds useful, although we probably wouldn't want to always leave the last checkpoint
around (and we also need to provide a way for the user to cleanup the last check point).

We could make a getCheckPointedLDAModel or offer a param to the current method and then add
a cleanup function to the resulting LDAModel for the user to call. Any thoughts [~josephkb]
or [~mengxr]? Also an interesting question would be how to expose something similar in the
pipelines API.


was (Author: holdenk):
This sounds useful, although we probably wouldn't want to always leave the last checkpoint
around (and we also need to provide a way for the user to cleanup the last check point).

We could make a getCheckPointedLDAModel or offer a param to the current method and then add
a cleanup function to the resulting LDAModel for the user to call. Any thoughts [~josephkb]
or [~mengxr]?

> EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
> ------------------------------------------------------------------
>
>                 Key: SPARK-13048
>                 URL: https://issues.apache.org/jira/browse/SPARK-13048
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.5.2
>         Environment: Standalone Spark cluster
>            Reporter: Jeff Stein
>
> In EMLDAOptimizer, all checkpoints are deleted before returning the DistributedLDAModel.
> The most recent checkpoint is still necessary for operations on the DistributedLDAModel
under a couple scenarios:
> - The graph doesn't fit in memory on the worker nodes (e.g. very large data set).
> - Late worker failures that require reading the now-dependent checkpoint.
> I ran into this problem running a 10M record LDA model in a memory starved environment.
The model consistently failed in either the {{collect at LDAModel.scala:528}} stage (when
converting to a LocalLDAModel) or in the {{reduce at LDAModel.scala:563}} stage (when calling
"describeTopics" on the model). In both cases, a FileNotFoundException is thrown attempting
to access a checkpoint file.
> I'm not sure what the correct fix is here; it might involve a class signature change.
An alternative simple fix is to leave the last checkpoint around and expect the user to clean
the checkpoint directory themselves.
> {noformat}
> java.io.FileNotFoundException: File does not exist: /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
> {noformat}
> Relevant code is included below.
> LDAOptimizer.scala:
> {noformat}
>   override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel
= {
>     require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
>     this.graphCheckpointer.deleteAllCheckpoints()
>     // The constructor's default arguments assume gammaShape = 100 to ensure equivalence
in
>     // LDAModel.toLocal conversion
>     new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, this.vocabSize,
>       Vectors.dense(Array.fill(this.k)(this.docConcentration)), this.topicConcentration,
>       iterationTimes)
>   }
> {noformat}
> PeriodicCheckpointer.scala
> {noformat}
>   /**
>    * Call this at the end to delete any remaining checkpoint files.
>    */
>   def deleteAllCheckpoints(): Unit = {
>     while (checkpointQueue.nonEmpty) {
>       removeCheckpointFile()
>     }
>   }
>   /**
>    * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
>    * This prints a warning but does not fail if the files cannot be removed.
>    */
>   private def removeCheckpointFile(): Unit = {
>     val old = checkpointQueue.dequeue()
>     // Since the old checkpoint is not deleted by Spark, we manually delete it.
>     val fs = FileSystem.get(sc.hadoopConfiguration)
>     getCheckpointFiles(old).foreach { checkpointFile =>
>       try {
>         fs.delete(new Path(checkpointFile), true)
>       } catch {
>         case e: Exception =>
>           logWarning("PeriodicCheckpointer could not remove old checkpoint file: " +
>             checkpointFile)
>       }
>     }
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message