mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smar...@apache.org
Subject [1/2] mahout git commit: WIP, migrating to Flink 0.10 and the Flink Scala API
Date Thu, 12 Nov 2015 06:15:49 GMT
Repository: mahout
Updated Branches:
  refs/heads/flink-binding 54f51de82 -> af015ece7


WIP, migrating to Flink 0.10 and the Flink Scala API


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e2ab67f2
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e2ab67f2
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e2ab67f2

Branch: refs/heads/flink-binding
Commit: e2ab67f2629a6c49c4d8e911274b409c2b57101c
Parents: 54f51de
Author: smarthi <smarthi@apache.org>
Authored: Tue Nov 10 19:51:20 2015 -0500
Committer: smarthi <smarthi@apache.org>
Committed: Tue Nov 10 19:51:20 2015 -0500

----------------------------------------------------------------------
 .../mahout/flinkbindings/DataSetOps.scala       | 89 +++++++++-----------
 .../mahout/flinkbindings/FlinkByteBCast.scala   |  4 +-
 .../flinkbindings/FlinkDistributedContext.scala |  2 +-
 .../mahout/flinkbindings/FlinkEngine.scala      | 32 +++----
 .../mahout/flinkbindings/blas/FlinkOpAewB.scala |  8 +-
 .../flinkbindings/blas/FlinkOpAewScalar.scala   | 16 ++--
 .../mahout/flinkbindings/blas/FlinkOpAt.scala   | 10 +--
 .../mahout/flinkbindings/blas/FlinkOpAtA.scala  |  6 +-
 .../mahout/flinkbindings/blas/FlinkOpAtB.scala  | 10 +--
 .../mahout/flinkbindings/blas/FlinkOpAx.scala   | 15 ++--
 .../flinkbindings/blas/FlinkOpCBind.scala       | 11 +--
 .../flinkbindings/blas/FlinkOpRBind.scala       |  2 +-
 .../blas/FlinkOpTimesRightMatrix.scala          |  4 +-
 .../drm/CheckpointedFlinkDrm.scala              | 45 ++++------
 .../mahout/flinkbindings/drm/FlinkDrm.scala     | 24 ++----
 .../apache/mahout/flinkbindings/package.scala   | 13 +--
 .../flinkbindings/DistributedFlinkSuite.scala   |  3 +-
 .../flinkbindings/examples/ReadCsvExample.scala |  2 +-
 18 files changed, 129 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
index 4f437ae..2387d4b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala
@@ -18,61 +18,50 @@
  */
 package org.apache.mahout.flinkbindings
 
-import java.lang.Iterable
-import java.util.Collections
-import java.util.Comparator
-import scala.collection.JavaConverters._
-import org.apache.flink.util.Collector
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.flink.api.common.functions.RichMapPartitionFunction
-import org.apache.flink.configuration.Configuration
-import scala.reflect.ClassTag
-
-
-class DataSetOps[K: ClassTag](val ds: DataSet[K]) {
+//@Deprecated
+//class DataSetOps[K: ClassTag](val ds: DataSet[K]) {
 
   /**
    * Implementation taken from http://stackoverflow.com/questions/30596556/zipwithindex-on-apache-flink
    * 
    * TODO: remove when FLINK-2152 is committed and released 
    */
-  def zipWithIndex(): DataSet[(Int, K)] = {
-
-    // first for each partition count the number of elements - to calculate the offsets
-    val counts = ds.mapPartition(new RichMapPartitionFunction[K, (Int, Int)] {
-      override def mapPartition(values: Iterable[K], out: Collector[(Int, Int)]): Unit =
{
-        val cnt: Int = values.asScala.count(_ => true)
-        val subtaskIdx = getRuntimeContext.getIndexOfThisSubtask
-        out.collect((subtaskIdx, cnt))
-      }
-    })
+//  def zipWithIndex(): DataSet[(Int, K)] = {
+//
+//     first for each partition count the number of elements - to calculate the offsets
+//    val counts = ds.mapPartition(new RichMapPartitionFunction[K, (Int, Int)] {
+//      override def mapPartition(values: Iterable[K], out: Collector[(Int, Int)]): Unit
= {
+//        val cnt: Int = values.asScala.count(_ => true)
+//        val subtaskIdx = getRuntimeContext.getIndexOfThisSubtask
+//        out.collect((subtaskIdx, cnt))
+//      }
+//    })
 
     // then use the offsets to index items of each partition
-    val zipped = ds.mapPartition(new RichMapPartitionFunction[K, (Int, K)] {
-        var offset: Int = 0
-
-        override def open(parameters: Configuration): Unit = {
-          val offsetsJava: java.util.List[(Int, Int)] = 
-                  getRuntimeContext.getBroadcastVariable("counts")
-          val offsets = offsetsJava.asScala
-
-          val sortedOffsets = 
-            offsets sortBy { case (id, _) => id } map { case (_, cnt) => cnt }
-
-          val subtaskId = getRuntimeContext.getIndexOfThisSubtask
-          offset = sortedOffsets.take(subtaskId).sum.toInt
-        }
-
-        override def mapPartition(values: Iterable[K], out: Collector[(Int, K)]): Unit =
{
-          val it = values.asScala
-          it.zipWithIndex.foreach { case (value, idx) =>
-            out.collect((idx + offset, value))
-          }
-        }
-    }).withBroadcastSet(counts, "counts");
-
-    zipped
-  }
-
-}
\ No newline at end of file
+//    val zipped = ds.mapPartition(new RichMapPartitionFunction[K, (Int, K)] {
+//        var offset: Int = 0
+//
+//        override def open(parameters: Configuration): Unit = {
+//          val offsetsJava: java.util.List[(Int, Int)] =
+//                  getRuntimeContext.getBroadcastVariable("counts")
+//          val offsets = offsetsJava.asScala
+//
+//          val sortedOffsets =
+//            offsets sortBy { case (id, _) => id } map { case (_, cnt) => cnt }
+//
+//          val subtaskId = getRuntimeContext.getIndexOfThisSubtask
+//          offset = sortedOffsets.take(subtaskId).sum
+//        }
+//
+//        override def mapPartition(values: Iterable[K], out: Collector[(Int, K)]): Unit
= {
+//          val it = values.asScala
+//          it.zipWithIndex.foreach { case (value, idx) =>
+//            out.collect((idx + offset, value))
+//          }
+//        }
+//    }).withBroadcastSet(counts, "counts")
+//
+//    zipped
+//  }
+//
+//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
index 1024452..8544db0 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
@@ -72,7 +72,7 @@ object FlinkByteBCast {
     dataOutput.writeInt(StreamTypeVector)
     writeable.write(dataOutput)
     val array = dataOutput.toByteArray()
-    return new FlinkByteBCast[Vector](array)
+    new FlinkByteBCast[Vector](array)
   }
 
   def wrap(m: Matrix): FlinkByteBCast[Matrix] = {
@@ -81,7 +81,7 @@ object FlinkByteBCast {
     dataOutput.writeInt(StreamTypeMatrix)
     writeable.write(dataOutput)
     val array = dataOutput.toByteArray()
-    return new FlinkByteBCast[Matrix](array)
+    new FlinkByteBCast[Matrix](array)
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
index ebe473f..c818030 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
@@ -18,7 +18,7 @@
  */
 package org.apache.mahout.flinkbindings
 
-import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.mahout.math.drm.DistributedContext
 import org.apache.mahout.math.drm.DistributedEngine
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 6b12d11..d03aef7 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -18,17 +18,12 @@
  */
 package org.apache.mahout.flinkbindings
 
-import java.util.Collection
-
-import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.java.tuple.Tuple2
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.SequenceFileInputFormat
 import org.apache.mahout.flinkbindings.blas._
 import org.apache.mahout.flinkbindings.drm._
 import org.apache.mahout.flinkbindings.io.HDFSUtil
@@ -42,6 +37,8 @@ import org.apache.mahout.math.indexeddataset.Schema
 import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.scalabindings.RLikeOps._
 
+import org.apache.flink.api.scala._
+
 
 object FlinkEngine extends DistributedEngine {
 
@@ -65,13 +62,13 @@ object FlinkEngine extends DistributedEngine {
 
     val metadata = hdfsUtils.readDrmHeader(path)
 
-    val unwrapKey = metadata.unwrapKeyFunction
+    val unwrapKey  = metadata.unwrapKeyFunction
 
-    val dataset = env.readSequenceFile(classOf[Writable], classOf[VectorWritable], path)
+    val ds = env.readSequenceFile(classOf[Writable], classOf[VectorWritable], path)
 
-    val res = dataset.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)]
{
-      def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = {
-        (unwrapKey(tuple.f0), tuple.f1)
+    val res = ds.map(new MapFunction[(Writable, VectorWritable), (Any, Vector)] {
+      def map(tuple: (Writable, VectorWritable)): (Any, Vector) = {
+        (unwrapKey(tuple._1), tuple._2)
       }
     })
 
@@ -159,7 +156,7 @@ object FlinkEngine extends DistributedEngine {
       def reduce(v1: Vector, v2: Vector) = v1 + v2
     })
 
-    val list = sum.collect.asScala.toList
+    val list = sum.collect
     list.head
   }
 
@@ -180,7 +177,7 @@ object FlinkEngine extends DistributedEngine {
       def reduce(v1: Vector, v2: Vector) = v1 + v2
     })
 
-    val list = result.collect.asScala.toList
+    val list = result.collect
     list.head
   }
 
@@ -203,7 +200,7 @@ object FlinkEngine extends DistributedEngine {
       def reduce(v1: Double, v2: Double) = v1 + v2
     })
 
-    val list = sumOfSquares.collect.asScala.toList
+    val list = sumOfSquares.collect
     list.head
   }
 
@@ -229,10 +226,8 @@ object FlinkEngine extends DistributedEngine {
   private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
       (implicit dc: DistributedContext): DrmDataSet[Int] = {
     val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
-    val rowsJava: Collection[DrmTuple[Int]]  = rows.asJava
-
     val dataSetType = TypeExtractor.getForObject(rows.head)
-    dc.env.fromCollection(rowsJava, dataSetType).setParallelism(parallelismDegree)
+    dc.env.fromCollection(rows).setParallelism(parallelismDegree)
   }
 
   /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data
set keys. */
@@ -251,10 +246,7 @@ object FlinkEngine extends DistributedEngine {
 
       for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol):
Vector)
     }
-
-    val dataSetType = TypeExtractor.getForObject(nonParallelResult.head)
-    val result = dc.env.fromCollection(nonParallelResult.asJava, dataSetType)
-
+    val result = dc.env.fromCollection(nonParallelResult)
     new CheckpointedFlinkDrm(ds=result, _nrow=nrow, _ncol=ncol)
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
index 38fe312..f879e86 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
@@ -6,7 +6,7 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.flink.api.common.functions.CoGroupFunction
-import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.scala.DataSet
 import org.apache.flink.util.Collector
 import org.apache.mahout.flinkbindings._
 import org.apache.mahout.flinkbindings.drm.FlinkDrm
@@ -40,13 +40,13 @@ object FlinkOpAewB {
         val it1 = Lists.newArrayList(it1java).asScala
         val it2 = Lists.newArrayList(it2java).asScala
 
-        if (!it1.isEmpty && !it2.isEmpty) {
+        if (it1.nonEmpty && it2.nonEmpty) {
           val (idx, a) = it1.head
           val (_, b) = it2.head
           out.collect((idx, function(a, b)))
-        } else if (it1.isEmpty && !it2.isEmpty) {
+        } else if (it1.isEmpty && it2.nonEmpty) {
           out.collect(it2.head)
-        } else if (!it1.isEmpty && it2.isEmpty) {
+        } else if (it1.nonEmpty && it2.isEmpty) {
           out.collect(it1.head)
         }
       }

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
index ab434bb..67d710b 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
@@ -18,18 +18,16 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
 import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.drm.logical.OpAewScalar
-import org.apache.mahout.math.drm.logical.OpAewUnaryFunc
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewScalar, TEwFunc}
 import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.drm.logical.AbstractUnaryOp
-import org.apache.mahout.math.drm.logical.TEwFunc
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+import org.apache.flink.api.scala._
 
 /**
  * Implementation is inspired by Spark-binding's OpAewScalar

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
index 274b1ca..e515b34 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.functions.GroupReduceFunction
 import org.apache.flink.shaded.com.google.common.collect.Lists
 import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings._
 import org.apache.mahout.flinkbindings.drm.FlinkDrm
 import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
 import org.apache.mahout.math.Matrix
@@ -37,6 +36,8 @@ import org.apache.mahout.math.drm.DrmTuple
 import org.apache.mahout.math.drm.logical.OpAt
 import org.apache.mahout.math.scalabindings.RLikeOps._
 
+import org.apache.flink.api.scala._
+
 /**
  * Implementation is taken from Spark's At
  * https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
@@ -53,7 +54,7 @@ object FlinkOpAt {
 
     val sparseParts = A.asBlockified.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix),
DrmTuple[Int]] {
       def flatMap(typle: (Array[Int], Matrix), out: Collector[DrmTuple[Int]]): Unit = typle
match {
-        case (keys, block) => {
+        case (keys, block) =>
           (0 until block.ncol).map(columnIdx => {
             val columnVector: Vector = new SequentialAccessSparseVector(ncol)
 
@@ -61,9 +62,8 @@ object FlinkOpAt {
               columnVector(key) = block(idx, columnIdx)
             }
 
-            out.collect(new Tuple2(columnIdx, columnVector))
+            out.collect((columnIdx, columnVector))
           })
-        }
       }
     })
 
@@ -73,7 +73,7 @@ object FlinkOpAt {
       def reduce(values: Iterable[(Int, Vector)], out: Collector[DrmTuple[Int]]): Unit =
{
         val it = Lists.newArrayList(values).asScala
         val (idx, _) = it.head
-        val vector = it map { case (idx, vec) => vec } reduce (_ + _)
+        val vector = (it map { case (idx, vec) => vec }).sum
         out.collect((idx, vector))
       }
     })

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
index 0e30eff..629857a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
@@ -5,7 +5,7 @@ import java.lang.Iterable
 import scala.collection.JavaConverters._
 
 import org.apache.flink.api.common.functions._
-import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.scala.DataSet
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.shaded.com.google.common.collect.Lists
 import org.apache.flink.util.Collector
@@ -56,7 +56,7 @@ object FlinkOpAtA {
       def reduce(m1: Matrix, m2: Matrix) = m1 + m2
     }).collect()
 
-    res.asScala.head
+    res.head
   }
 
   def fat(op: OpAtA[Any], A: FlinkDrm[Any]): FlinkDrm[Int] = {
@@ -155,7 +155,7 @@ object FlinkOpAtA {
     val offsets = (0 to numSplits).map(i => i * (baseSplit + 1) - (0 max i - slack))
     // And then we connect the ranges using gaps between offsets:
 
-    val ranges = offsets.sliding(2).map { offs => (offs(0) until offs(1)) }
+    val ranges = offsets.sliding(2).map { offs => offs(0) until offs(1) }
     ranges.toArray
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
index 0dd0dd2..b514868 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
@@ -25,9 +25,7 @@ import scala.reflect.ClassTag
 
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.scala.DataSet
 import org.apache.flink.util.Collector
 import org.apache.mahout.flinkbindings._
 import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
@@ -40,7 +38,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._
 
 import com.google.common.collect.Lists
 
-
+import org.apache.flink.api.scala._
 
 /**
  * Implementation is taken from Spark's AtB
@@ -65,8 +63,8 @@ object FlinkOpAtB {
              joined.flatMap(new FlatMapFunction[Tuple2[(_, Vector), (_, Vector)], (Int, Matrix)]
{
       def flatMap(in: Tuple2[(_, Vector), (_, Vector)],
                   out: Collector[(Int, Matrix)]): Unit = {
-        val avec = in.f0._2
-        val bvec = in.f1._2
+        val avec = in._1._2
+        val bvec = in._1._2
 
         0.until(blockCount) map { blockKey =>
           val blockStart = blockKey * blockHeight

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index 503ab17..4302457 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -20,17 +20,17 @@ package org.apache.mahout.flinkbindings.blas
 
 import java.util.List
 
-import scala.reflect.ClassTag
-
 import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.configuration.Configuration
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.Vector
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
+import org.apache.mahout.math.{Matrix, Vector}
 import org.apache.mahout.math.drm.logical.OpAx
 import org.apache.mahout.math.scalabindings.RLikeOps._
 
+import scala.reflect.ClassTag
+
+import org.apache.flink.api.scala._
+
 
 /**
  * Implementation is taken from Spark's Ax
@@ -40,7 +40,6 @@ object FlinkOpAx {
 
   def blockifiedBroadcastAx[K: ClassTag](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = {
     implicit val ctx = A.context
-    //    val x = drmBroadcast(op.x)
 
     val singletonDataSetX = ctx.env.fromElements(op.x)
 
@@ -48,7 +47,7 @@ object FlinkOpAx {
       var x: Vector = null
 
       override def open(params: Configuration): Unit = {
-        val runtime = this.getRuntimeContext()
+        val runtime = this.getRuntimeContext
         val dsX: List[Vector] = runtime.getBroadcastVariable("vector")
         x = dsX.get(0)
       }

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
index 49ca7d5..6cf5e5c 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.functions.CoGroupFunction
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.scala.DataSet
 import org.apache.flink.util.Collector
 import org.apache.mahout.flinkbindings._
 import org.apache.mahout.flinkbindings.drm._
@@ -35,6 +35,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._
 import com.google.common.collect.Lists
 import org.apache.mahout.flinkbindings.DrmDataSet
 
+import org.apache.mahout.math.scalabindings._
 
 /**
  * Implementation is taken from Spark's cbind
@@ -61,11 +62,11 @@ object FlinkOpCBind {
         val it1 = Lists.newArrayList(it1java).asScala
         val it2 = Lists.newArrayList(it2java).asScala
 
-        if (!it1.isEmpty && !it2.isEmpty) {
+        if (it1.nonEmpty && it2.nonEmpty) {
           val (idx, a) = it1.head
           val (_, b) = it2.head
 
-          val result: Vector = if (a.isDense && b.isDense) { 
+          val result: Vector = if (a.isDense && b.isDense) {
             new DenseVector(n) 
           } else {
             new SequentialAccessSparseVector(n)
@@ -75,7 +76,7 @@ object FlinkOpCBind {
           result(n1 until n) := b
 
           out.collect((idx, result))
-        } else if (it1.isEmpty && !it2.isEmpty) {
+        } else if (it1.isEmpty && it2.nonEmpty) {
           val (idx, b) = it2.head
           val result: Vector = if (b.isDense) { 
             new DenseVector(n)
@@ -84,7 +85,7 @@ object FlinkOpCBind {
           }
           result(n1 until n) := b
           out.collect((idx, result))
-        } else if (!it1.isEmpty && it2.isEmpty) {
+        } else if (it1.nonEmpty && it2.isEmpty) {
           val (idx, a) = it1.head
           val result: Vector = if (a.isDense) {
             new DenseVector(n)

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
index 9ebff51..83beaa1 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
@@ -20,7 +20,7 @@ package org.apache.mahout.flinkbindings.blas
 
 import scala.reflect.ClassTag
 
-import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.scala.DataSet
 import org.apache.mahout.flinkbindings.drm.FlinkDrm
 import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
 import org.apache.mahout.math.Vector

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
index af3854d..989fad1 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
@@ -28,6 +28,8 @@ import org.apache.mahout.math.Matrix
 import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
 import org.apache.mahout.math.scalabindings.RLikeOps._
 
+import org.apache.flink.api.scala._
+
 /**
  * Implementation is taken from Spark's OpTimesRightMatrix:
  * https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -43,7 +45,7 @@ object FlinkOpTimesRightMatrix {
       var inCoreB: Matrix = null
 
       override def open(params: Configuration): Unit = {
-        val runtime = this.getRuntimeContext()
+        val runtime = this.getRuntimeContext
         val dsB: java.util.List[Matrix] = runtime.getBroadcastVariable("matrix")
         inCoreB = dsB.get(0)
       }

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index b6e6211..96d57d2 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -18,34 +18,21 @@
  */
 package org.apache.mahout.flinkbindings.drm
 
-import scala.collection.JavaConverters._
-import scala.util.Random
-import scala.reflect.{ClassTag, classTag}
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat
+import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
 import org.apache.flink.api.java.tuple.Tuple2
-import org.apache.hadoop.io.IntWritable
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.FileOutputFormat
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.SequenceFileOutputFormat
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.DrmDataSet
-import org.apache.mahout.math.DenseMatrix
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.SparseMatrix
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.VectorWritable
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.CacheHint
-import org.apache.mahout.math.drm.CheckpointedDrm
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.mahout.math.drm.DrmTuple
-import org.apache.mahout.math.scalabindings._
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
+import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat}
+import org.apache.mahout.flinkbindings.{DrmDataSet, _}
+import org.apache.mahout.math.{DenseMatrix, Matrix, SparseMatrix, Vector, VectorWritable}
+import org.apache.mahout.math.drm.{CacheHint, CheckpointedDrm, DistributedContext, DrmTuple,
_}
 import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+
+import scala.collection.JavaConverters._
+import scala.reflect.{ClassTag, classTag}
+import scala.util.Random
 
 class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
       private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
@@ -71,7 +58,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
       }
     })
 
-    val list = res.collect().asScala.toList
+    val list = res.collect()
     list.head
   }
 
@@ -95,7 +82,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
   def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this
 
   def collect: Matrix = {
-    val data = ds.collect().asScala.toList
+    val data = ds.collect()
     val isDense = data.forall(_._2.isDense)
 
     val cols = ncol
@@ -139,7 +126,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
     val keyTag = implicitly[ClassTag[K]]
     val convertKey = keyToWritableFunc(keyTag)
 
-    val writableDataset = ds.map(new MapFunction[(K, Vector), Tuple2[Writable, VectorWritable]]
{
+    val writableDataset = ds.map(new MapFunction[(K, Vector), (Writable, VectorWritable)]
{
       def map(tuple: (K, Vector)): Tuple2[Writable, VectorWritable] = tuple match {
         case (idx, vec) => new Tuple2(convertKey(idx), new VectorWritable(vec))
       }

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
index dbc6b11..c9c1b2c 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
@@ -20,23 +20,17 @@ package org.apache.mahout.flinkbindings.drm
 
 import java.lang.Iterable
 
-import scala.collection.JavaConverters.iterableAsScalaIterableConverter
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.functions.MapPartitionFunction
-import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.flink.api.common.functions.{FlatMapFunction, MapPartitionFunction}
+import org.apache.flink.api.scala._
 import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings.BlockifiedDrmDataSet
-import org.apache.mahout.flinkbindings.DrmDataSet
-import org.apache.mahout.flinkbindings.FlinkDistributedContext
-import org.apache.mahout.flinkbindings.wrapContext
-import org.apache.mahout.math.DenseMatrix
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.SparseRowMatrix
+import org.apache.mahout.flinkbindings.{BlockifiedDrmDataSet, DrmDataSet, FlinkDistributedContext,
wrapContext}
 import org.apache.mahout.math.drm.DrmTuple
-import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.{DenseMatrix, Matrix, SparseRowMatrix}
+
+import scala.collection.JavaConverters.iterableAsScalaIterableConverter
+import scala.reflect.ClassTag
 
 trait FlinkDrm[K] {
   def executionEnvironment: ExecutionEnvironment
@@ -100,7 +94,7 @@ class BlockifiedFlinkDrm[K: ClassTag](val ds: BlockifiedDrmDataSet[K],
val ncol:
 
   def asRowWise = {
     val out = ds.flatMap(new FlatMapFunction[(Array[K], Matrix), DrmTuple[K]] {
-      def flatMap(typle: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = typle match
{
+      def flatMap(tuple: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = tuple match
{
         case (keys, block) => keys.view.zipWithIndex.foreach {
           case (key, idx) =>
             out.collect((key, block(idx, ::)))

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index 656b8de..6b8f2ae 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -19,18 +19,21 @@
 package org.apache.mahout
 
 import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
 import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrmOps, CheckpointedFlinkDrm,
FlinkDrm, RowsFlinkDrm}
 import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, VectorWritable}
 import org.apache.mahout.math.drm.{BlockifiedDrmTuple, CheckpointedDrm, DistributedContext,
DrmTuple, _}
 import org.slf4j.LoggerFactory
 
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.utils._
+
 import scala.Array._
 import scala.reflect.ClassTag
 
 package object flinkbindings {
 
-  private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbindings")
+  private[flinkbindings] val log = LoggerFactory.getLogger("org.apache.mahout.flinkbindings")
 
   /** Row-wise organized DRM dataset type */
   type DrmDataSet[K] = DataSet[DrmTuple[K]]
@@ -78,7 +81,7 @@ package object flinkbindings {
 
 
   def readCsv(file: String, delim: String = ",", comment: String = "#")
-             (implicit dc: DistributedContext): CheckpointedDrm[Int] = {
+             (implicit dc: DistributedContext): CheckpointedDrm[Long] = {
     val vectors = dc.env.readTextFile(file)
       .filter(new FilterFunction[String] {
         def filter(in: String): Boolean = {
@@ -94,8 +97,8 @@ package object flinkbindings {
     datasetToDrm(vectors)
   }
 
-  def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Int] = {
-    val zipped = new DataSetOps(ds).zipWithIndex
+  def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Long] = {
+    val zipped = ds.zipWithIndex
     datasetWrap(zipped)
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
index dd76ff4..6fb71ea 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
@@ -18,8 +18,7 @@
  */
 package org.apache.mahout.flinkbindings
 
-import org.apache.flink.api.java.ExecutionEnvironment
-import org.apache.mahout.flinkbindings._
+import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.mahout.math.drm.DistributedContext
 import org.apache.mahout.test.DistributedMahoutSuite
 import org.scalatest.Suite

http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
----------------------------------------------------------------------
diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
index a9e8436..4e713c7 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
@@ -18,7 +18,7 @@
  */
 package org.apache.mahout.flinkbindings.examples
 
-import org.apache.flink.api.java.ExecutionEnvironment
+import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.RLikeDrmOps._
 import org.apache.mahout.flinkbindings._


Mime
View raw message