mahout-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (MAHOUT-1817) Implement caching in Flink Bindings
Date Sun, 27 Mar 2016 05:05:25 GMT

    [ https://issues.apache.org/jira/browse/MAHOUT-1817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15213339#comment-15213339
] 

ASF GitHub Bot commented on MAHOUT-1817:
----------------------------------------

Github user andrewpalumbo commented on a diff in the pull request:

    https://github.com/apache/mahout/pull/203#discussion_r57523491
  
    --- Diff: flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
---
    @@ -76,20 +94,31 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
     
       override val keyClassTag: ClassTag[K] = classTag[K]
     
    +  /** Note as of Flink 1.0.0, no direct flink caching exists so we save
    +    * the dataset to the filesystem and read it back when cache is called */
       def cache() = {
         if (!isCached) {
    -      cacheFileName = System.nanoTime().toString
    +      cacheFileName = persistanceRootDir + System.nanoTime().toString
           parallelismDeg = ds.getParallelism
           isCached = true
    +      persist(ds, cacheFileName)
         }
    -    implicit val typeInformation = createTypeInformation[(K,Vector)]
    +    val _ds = readPersistedDataSet(cacheFileName, ds)
     
    -    val _ds = persist(ds, persistanceRootDir + cacheFileName)
    +    // We may want to look more closely at this:
    +    // since we've cached a drm, triggering a computation
    +    // it may not make sense to keep the same parallelism degree
    +    if (!(parallelismDeg == _ds.getParallelism)) {
    +      _ds.setParallelism(parallelismDeg).rebalance()
    --- End diff --
    
    yes @smarthi  so I will I will likely remove this check tomorrow and then push the rest
as is.   I guess we need to go through and make sure that all operations are setting the parallelism
if they need to.


> Implement caching in Flink Bindings
> -----------------------------------
>
>                 Key: MAHOUT-1817
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1817
>             Project: Mahout
>          Issue Type: New Feature
>          Components: Flink
>    Affects Versions: 0.11.2
>            Reporter: Andrew Palumbo
>            Assignee: Andrew Palumbo
>            Priority: Blocker
>             Fix For: 0.12.0
>
>
> Flink does not have in-memory caching analogous to that of Spark.  We need find a way
to honour the {{checkpoint()}} contract in Flink Bindings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message