mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlyubi...@apache.org
Subject [17/32] mahout git commit: MAHOUT-1702: Flink: AewScalar replaced with OpAewUnaryFunc
Date Tue, 20 Oct 2015 05:37:00 GMT
MAHOUT-1702: Flink: AewScalar replaced with OpAewUnaryFunc


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f66477fc
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f66477fc
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f66477fc

Branch: refs/heads/flink-binding
Commit: f66477fc53581796035e1dfef06d57e12719f1c5
Parents: 8de8b79
Author: Alexey Grigorev <alexey.s.grigoriev@gmail.com>
Authored: Wed Jun 24 14:56:08 2015 +0200
Committer: Alexey Grigorev <alexey.s.grigoriev@gmail.com>
Committed: Fri Sep 25 17:41:54 2015 +0200

----------------------------------------------------------------------
 .../flinkbindings/blas/FlinkOpAewScalar.scala   | 44 +++++++++++++++++---
 1 file changed, 39 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/f66477fc/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
index a1e1ab1..bf388b9 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
@@ -18,21 +18,28 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
+import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
-import org.apache.mahout.math.drm.logical.OpAewScalar
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
 import org.apache.mahout.flinkbindings.drm.FlinkDrm
 import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.drm.logical.OpAewScalar
+import org.apache.mahout.math.drm.logical.OpAewUnaryFunc
 import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
+import org.apache.mahout.math.scalabindings.RLikeOps._
 
 /**
  * Implementation is inspired by Spark-binding's OpAewScalar
- * (see https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala)

+ * (see https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala)
  */
 object FlinkOpAewScalar {
 
+  final val PROPERTY_AEWB_INPLACE = "mahout.math.AewB.inplace"
+  private def isInplace = System.getProperty(PROPERTY_AEWB_INPLACE, "false").toBoolean
+
+  @Deprecated
   def opScalarNoSideEffect[K: ClassTag](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double):
FlinkDrm[K] = {
     val function = EWOpsCloning.strToFunction(op.op)
 
@@ -45,8 +52,35 @@ object FlinkOpAewScalar {
     new BlockifiedFlinkDrm(res, op.ncol)
   }
 
+  def opUnaryFunction[K: ClassTag](op: OpAewUnaryFunc[K], A: FlinkDrm[K], f: (Double) =>
Double): FlinkDrm[K] = {
+    val inplace = isInplace
+
+    val res = if (op.evalZeros) {
+      A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+        def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = {
+          val (keys, block) = tuple
+          val newBlock = if (inplace) block else block.cloned
+          newBlock := ((_, _, x) => f(x))
+          (keys, newBlock)
+        }
+      })
+    } else {
+      A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] {
+        def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = {
+          val (keys, block) = tuple
+          val newBlock = if (inplace) block else block.cloned
+          for (row <- newBlock; el <- row.nonZeroes) el := f(el.get)
+          (keys, newBlock)
+        }
+      })
+    }
+
+    new BlockifiedFlinkDrm(res, op.ncol)
+  }
+
 }
 
+@Deprecated
 object EWOpsCloning {
 
   type MatrixScalarFunc = (Matrix, Double) => Matrix


Mime
View raw message