mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apalu...@apache.org
Subject [05/10] mahout git commit: wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml
Date Sun, 27 Mar 2016 19:36:44 GMT
wip: use properties from /home/andy/sandbox/mahout/conf/flink-config.yaml


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/100d343e
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/100d343e
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/100d343e

Branch: refs/heads/flink-binding
Commit: 100d343e4b6e66b1a7c581455cd1faab7bbdb538
Parents: ad22252
Author: Andrew Palumbo <apalumbo@apache.org>
Authored: Fri Mar 25 20:26:41 2016 -0400
Committer: Andrew Palumbo <apalumbo@apache.org>
Committed: Fri Mar 25 20:29:47 2016 -0400

----------------------------------------------------------------------
 .../drm/CheckpointedFlinkDrm.scala              | 22 +++++++++++++++-----
 1 file changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/100d343e/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index 0b3d13e..a5bbbb5 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.core.fs.Path
 import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
+import org.apache.flink.configuration.GlobalConfiguration
 import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
 import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
 import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil
@@ -56,8 +57,18 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   var parallelismDeg: Int = -1
 
   // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}}
-  val persistanceRootDir = ds.getExecutionEnvironment.
-    getConfig.getGlobalJobParameters.toMap.getOrDefault("taskmanager.tmp.dirs", "/tmp/")
+  val mahoutHome = System.getProperty("MAHOUT_HOME")
+
+  GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
+
+  val conf = GlobalConfiguration.getConfiguration()
+
+  if (!(conf == null )) {
+    val persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")
+  } else {
+    val persistanceRootDir = "/tmp/"
+  }
+
 
   private lazy val dim: (Long, Int) = {
     // combine computation of ncol and nrow in one pass
@@ -81,8 +92,7 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
   override val keyClassTag: ClassTag[K] = classTag[K]
 
   def cache() = {
-    implicit val typeInformation = createTypeInformation[(K,Vector)]
-    implicit val inputFormat = (ds.getType)
+//    implicit val typeInformation = createTypeInformation[(K,Vector)]
     if (!isCached) {
       cacheFileName = persistanceRootDir + System.nanoTime().toString
       parallelismDeg = ds.getParallelism
@@ -91,7 +101,9 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
     }
 
     val _ds = readPersistedDataSet(cacheFileName, ds)
-
+    if (!(parallelismDeg == _ds.getParallelism)) {
+      _ds.setParallelism(parallelismDeg).rebalance()
+    }
     datasetWrap(_ds)
   }
 


Mime
View raw message