mahout-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (MAHOUT-1817) Implement caching in Flink Bindings
Date Sun, 27 Mar 2016 19:19:25 GMT

    [ https://issues.apache.org/jira/browse/MAHOUT-1817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15213589#comment-15213589
] 

ASF GitHub Bot commented on MAHOUT-1817:
----------------------------------------

Github user andrewpalumbo commented on a diff in the pull request:

    https://github.com/apache/mahout/pull/203#discussion_r57533408
  
    --- Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
---
    @@ -76,20 +94,38 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
     
       override val keyClassTag: ClassTag[K] = classTag[K]
     
    +  /** Note as of Flink 1.0.0, no direct flink caching exists so we save
    +    * the dataset to the filesystem and read it back when cache is called */
       def cache() = {
         if (!isCached) {
    -      cacheFileName = System.nanoTime().toString
    +      cacheFileName = persistanceRootDir + System.nanoTime().toString
           parallelismDeg = ds.getParallelism
           isCached = true
    +      persist(ds, cacheFileName)
         }
    -    implicit val typeInformation = createTypeInformation[(K,Vector)]
    +    val _ds = readPersistedDataSet(cacheFileName, ds)
    +
    +    /** Leave the parallelism degree to be set the operators
    +      * TODO: find out a way to set the parallelism degree based on the
    +      * final drm after computation is actually triggered
    +      *
    +      *  // We may want to look more closely at this:
    +      *  // since we've cached a drm, triggering a computation
    +      *  // it may not make sense to keep the same parallelism degree
    +      *  if (!(parallelismDeg == _ds.getParallelism)) {
    +      *    _ds.setParallelism(parallelismDeg).rebalance()
    +      *  }
    +      *
    +      */
     
    --- End diff --
    
    commented out getParallelism setting in cache() for now.  will push soon if no objections.


> Implement caching in Flink Bindings
> -----------------------------------
>
>                 Key: MAHOUT-1817
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1817
>             Project: Mahout
>          Issue Type: New Feature
>          Components: Flink
>    Affects Versions: 0.11.2
>            Reporter: Andrew Palumbo
>            Assignee: Andrew Palumbo
>            Priority: Blocker
>             Fix For: 0.12.0
>
>
> Flink does not have in-memory caching analogous to that of Spark.  We need find a way
to honour the {{checkpoint()}} contract in Flink Bindings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message