flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [5/7] flink git commit: [FLINK-2050] [ml] Ports existing ML algorithms to new pipeline mechanism
Date Fri, 22 May 2015 08:43:37 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
index 5c6de55..d8efdaf 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.core.memory.{DataOutputView, DataInputView}
 import org.apache.flink.ml.common._
-import org.apache.flink.ml.recommendation.ALS.Factors
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
 import org.apache.flink.types.Value
 import org.apache.flink.util.Collector
 import org.apache.flink.api.common.functions.{Partitioner => FlinkPartitioner, GroupReduceFunction, CoGroupFunction}
@@ -73,55 +73,58 @@ import scala.util.Random
   *               .setIterations(10)
   *               .setNumFactors(10)
   *
-  *             val model = als.fit(inputDS)
+  *             als.fit(inputDS)
   *
   *             val data2Predict: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData)
   *
-  *             model.transform(data2Predict)
+  *             als.predict(data2Predict)
   *          }}}
   *
   * =Parameters=
   *
-  *  - [[ALS.NumFactors]]:
+  *  - [[org.apache.flink.ml.recommendation.ALS.NumFactors]]:
   *  The number of latent factors. It is the dimension of the calculated user and item vectors.
   *  (Default value: '''10''')
   *
-  *  - [[ALS.Lambda]]:
+  *  - [[org.apache.flink.ml.recommendation.ALS.Lambda]]:
   *  Regularization factor. Tune this value in order to avoid overfitting/generalization.
   *  (Default value: '''1''')
   *
-  *  - [[ALS.Iterations]]: The number of iterations to perform. (Default value: '''10''')
+  *  - [[org.apache.flink.ml.regression.MultipleLinearRegression.Iterations]]:
+  *  The number of iterations to perform. (Default value: '''10''')
   *
-  *  - [[ALS.Blocks]]:
+  *  - [[org.apache.flink.ml.recommendation.ALS.Blocks]]:
   *  The number of blocks into which the user and item matrix a grouped. The fewer
   *  blocks one uses, the less data is sent redundantly. However, bigger blocks entail bigger
   *  update messages which have to be stored on the Heap. If the algorithm fails because of
   *  an OutOfMemoryException, then try to increase the number of blocks. (Default value: '''None''')
   *
-  *  - [[ALS.Seed]]:
+  *  - [[org.apache.flink.ml.recommendation.ALS.Seed]]:
   *  Random seed used to generate the initial item matrix for the algorithm.
   *  (Default value: '''0''')
   *
-  *  - [[ALS.TemporaryPath]]:
+  *  - [[org.apache.flink.ml.recommendation.ALS.TemporaryPath]]:
   *  Path to a temporary directory into which intermediate results are stored. If
   *  this value is set, then the algorithm is split into two preprocessing steps, the ALS iteration
   *  and a post-processing step which calculates a last ALS half-step. The preprocessing steps
-  *  calculate the [[org.apache.flink.ml.recommendation.ALS.OutBlockInformation]] and [[org.apache
-  *  .flink.ml.recommendation.ALS.InBlockInformation]] for the given rating matrix. The result of
-  *  the individual steps are stored in the specified directory. By splitting the algorithm
-  *  into multiple smaller steps, Flink does not have to split the available memory amongst too many
-  *  operators. This allows the system to process bigger individual messasges and improves the
-  *  overall performance. (Default value: '''None''')
+  *  calculate the [[org.apache.flink.ml.recommendation.ALS.OutBlockInformation]] and
+  *  [[org.apache.flink.ml.recommendation.ALS.InBlockInformation]] for the given rating matrix.
+  *  The result of the individual steps are stored in the specified directory. By splitting the
+  *  algorithm into multiple smaller steps, Flink does not have to split the available memory
+  *  amongst too many operators. This allows the system to process bigger individual messasges and
+  *  improves the overall performance. (Default value: '''None''')
   *
   * The ALS implementation is based on Spark's MLLib implementation of ALS which you can find
   * [[https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/
   * recommendation/ALS.scala here]].
   */
-class
-ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
+class ALS extends Predictor[ALS] {
 
   import ALS._
 
+  // Stores the matrix factorization after the fitting phase
+  var factorsOption: Option[(DataSet[Factors], DataSet[Factors])] = None
+
   /** Sets the number of latent factors/row dimension of the latent model
     *
     * @param numFactors
@@ -183,91 +186,334 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
     this
   }
 
-  /** Calculates the matrix factorization for the given ratings. A rating is defined as
-    * a tuple of user ID, item ID and the corresponding rating.
+  /** Empirical risk of the trained model (matrix factorization).
     *
-    * @param input Set of user/item ratings for which the factorization has to be calculated
-    * @return Factorization containing the user and item matrix
+    * @param labeledData Reference data
+    * @param riskParameters Additional parameters for the empirical risk calculation
+    * @return
     */
-  def fit(input: DataSet[(Int, Int, Double)], fitParameters: ParameterMap): ALSModel = {
-    val resultParameters = this.parameters ++ fitParameters
-
-    val userBlocks = resultParameters.get(Blocks).getOrElse(input.count.toInt)
-    val itemBlocks = userBlocks
-    val persistencePath = resultParameters.get(TemporaryPath)
-    val seed = resultParameters(Seed)
-    val factors = resultParameters(NumFactors)
-    val iterations = resultParameters(Iterations)
-    val lambda = resultParameters(Lambda)
-
-    val ratings = input.map {
-      entry => {
-        val (userID, itemID, rating) = entry
-        Rating(userID, itemID, rating)
+  def empiricalRisk(
+      labeledData: DataSet[(Int, Int, Double)],
+      riskParameters: ParameterMap = ParameterMap.Empty)
+    : DataSet[Double] = {
+    val resultingParameters = parameters ++ riskParameters
+
+    val lambda = resultingParameters(Lambda)
+
+    val data = labeledData map {
+      x => (x._1, x._2)
+    }
+
+    factorsOption match {
+      case Some((userFactors, itemFactors)) => {
+        val predictions = data.join(userFactors).where(0).equalTo(0)
+          .join(itemFactors).where("_1._2").equalTo(0).map {
+          triple => {
+            val (((uID, iID), uFactors), iFactors) = triple
+
+            val uFactorsVector = uFactors.factors
+            val iFactorsVector = iFactors.factors
+
+            val squaredUNorm2 = blas.ddot(
+              uFactorsVector.length,
+              uFactorsVector,
+              1,
+              uFactorsVector,
+              1)
+            val squaredINorm2 = blas.ddot(
+              iFactorsVector.length,
+              iFactorsVector,
+              1,
+              iFactorsVector,
+              1)
+
+            val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1)
+
+            (uID, iID, prediction, squaredUNorm2, squaredINorm2)
+          }
+        }
+
+        labeledData.join(predictions).where(0,1).equalTo(0,1) {
+          (left, right) => {
+            val (_, _, expected) = left
+            val (_, _, predicted, squaredUNorm2, squaredINorm2) = right
+
+            val residual = expected - predicted
+
+            residual * residual + lambda * (squaredUNorm2 + squaredINorm2)
+          }
+        } reduce {
+          _ + _
+        }
       }
+
+      case None => throw new RuntimeException("The ALS model has not been fitted to data. " +
+        "Prior to predicting values, it has to be trained on data.")
     }
+  }
+}
 
-    val blockIDPartitioner = new BlockIDPartitioner()
+object ALS {
+  val USER_FACTORS_FILE = "userFactorsFile"
+  val ITEM_FACTORS_FILE = "itemFactorsFile"
 
-    val ratingsByUserBlock = ratings.map{
-      rating =>
-        val blockID = rating.user % userBlocks
-        (blockID, rating)
-    } partitionCustom(blockIDPartitioner, 0)
+  // ========================================= Parameters ==========================================
 
-    val ratingsByItemBlock = ratings map {
-      rating =>
-        val blockID = rating.item % itemBlocks
-        (blockID, new Rating(rating.item, rating.user, rating.rating))
-    } partitionCustom(blockIDPartitioner, 0)
+  case object NumFactors extends Parameter[Int] {
+    val defaultValue: Option[Int] = Some(10)
+  }
 
-    val (uIn, uOut) = createBlockInformation(userBlocks, itemBlocks, ratingsByUserBlock,
-      blockIDPartitioner)
-    val (iIn, iOut) = createBlockInformation(itemBlocks, userBlocks, ratingsByItemBlock,
-      blockIDPartitioner)
+  case object Lambda extends Parameter[Double] {
+    val defaultValue: Option[Double] = Some(1.0)
+  }
+
+  case object Iterations extends Parameter[Int] {
+    val defaultValue: Option[Int] = Some(10)
+  }
+
+  case object Blocks extends Parameter[Int] {
+    val defaultValue: Option[Int] = None
+  }
+
+  case object Seed extends Parameter[Long] {
+    val defaultValue: Option[Long] = Some(0L)
+  }
 
-    val (userIn, userOut) = persistencePath match {
-      case Some(path) => FlinkTools.persist(uIn, uOut, path + "userIn", path + "userOut")
-      case None => (uIn, uOut)
+  case object TemporaryPath extends Parameter[String] {
+    val defaultValue: Option[String] = None
+  }
+
+  // ==================================== ALS type definitions =====================================
+
+  /** Representation of a user-item rating
+    *
+    * @param user User ID of the rating user
+    * @param item Item iD of the rated item
+    * @param rating Rating value
+    */
+  case class Rating(user: Int, item: Int, rating: Double)
+
+  /** Latent factor model vector
+    *
+    * @param id
+    * @param factors
+    */
+  case class Factors(id: Int, factors: Array[Double]) {
+    override def toString = s"($id, ${factors.mkString(",")})"
+  }
+
+  case class Factorization(userFactors: DataSet[Factors], itemFactors: DataSet[Factors])
+
+  case class OutBlockInformation(elementIDs: Array[Int], outLinks: OutLinks) {
+    override def toString: String = {
+      s"OutBlockInformation:((${elementIDs.mkString(",")}), ($outLinks))"
     }
+  }
+
+  class OutLinks(var links: Array[scala.collection.mutable.BitSet]) extends Value {
+    def this() = this(null)
 
-    val (itemIn, itemOut) = persistencePath match {
-      case Some(path) => FlinkTools.persist(iIn, iOut, path + "itemIn", path + "itemOut")
-      case None => (iIn, iOut)
+    override def toString: String = {
+      s"${links.mkString("\n")}"
     }
 
-    val initialItems = itemOut.partitionCustom(blockIDPartitioner, 0).map{
-      outInfos =>
-        val blockID = outInfos._1
-        val infos = outInfos._2
+    override def write(out: DataOutputView): Unit = {
+      out.writeInt(links.length)
+      links foreach {
+        link => {
+          val bitMask = link.toBitMask
+          out.writeInt(bitMask.length)
+          for (element <- bitMask) {
+            out.writeLong(element)
+          }
+        }
+      }
+    }
 
-        (blockID, infos.elementIDs.map{
-          id =>
-            val random = new Random(id ^ seed)
-            randomFactors(factors, random)
-        })
-    }.withForwardedFields("0")
+    override def read(in: DataInputView): Unit = {
+      val length = in.readInt()
+      links = new Array[scala.collection.mutable.BitSet](length)
 
-    // iteration to calculate the item matrix
-    val items = initialItems.iterate(iterations) {
-      items => {
-        val users = updateFactors(userBlocks, items, itemOut, userIn, factors, lambda,
-          blockIDPartitioner)
-        updateFactors(itemBlocks, users, userOut, itemIn, factors, lambda, blockIDPartitioner)
+      for (i <- 0 until length) {
+        val bitMaskLength = in.readInt()
+        val bitMask = new Array[Long](bitMaskLength)
+        for (j <- 0 until bitMaskLength) {
+          bitMask(j) = in.readLong()
+        }
+        links(i) = mutable.BitSet.fromBitMask(bitMask)
       }
     }
 
-    val pItems = persistencePath match {
-      case Some(path) => FlinkTools.persist(items, path + "items")
-      case None => items
+    def apply(idx: Int) = links(idx)
+  }
+
+  case class InBlockInformation(elementIDs: Array[Int], ratingsForBlock: Array[BlockRating]) {
+
+    override def toString: String = {
+      s"InBlockInformation:((${elementIDs.mkString(",")}), (${ratingsForBlock.mkString("\n")}))"
+    }
+  }
+
+  case class BlockRating(var ratings: Array[(Array[Int], Array[Double])]) {
+    def apply(idx: Int) = ratings(idx)
+
+    override def toString: String = {
+      ratings.map {
+        case (left, right) => s"((${left.mkString(",")}),(${right.mkString(",")}))"
+      }.mkString(",")
+    }
+  }
+
+  case class BlockedFactorization(userFactors: DataSet[(Int, Array[Array[Double]])],
+                                  itemFactors: DataSet[(Int, Array[Array[Double]])])
+
+  class BlockIDPartitioner extends FlinkPartitioner[Int] {
+    override def partition(blockID: Int, numberOfPartitions: Int): Int = {
+      blockID % numberOfPartitions
     }
+  }
+
+  class BlockIDGenerator(blocks: Int) extends Serializable {
+    def apply(id: Int): Int = {
+      id % blocks
+    }
+  }
+
+  // ================================= Factory methods =============================================
+
+  def apply(): ALS = {
+    new ALS()
+  }
+
+  // ===================================== Operations ==============================================
+
+  /** Predict operation which calculates the matrix entry for the given indices  */
+  implicit val predictRating = new PredictOperation[ALS, (Int, Int), (Int ,Int, Double)] {
+    override def predict(
+        instance: ALS,
+        predictParameters: ParameterMap,
+        input: DataSet[(Int, Int)])
+      : DataSet[(Int, Int, Double)] = {
+
+      instance.factorsOption match {
+        case Some((userFactors, itemFactors)) => {
+          input.join(userFactors).where(0).equalTo(0)
+            .join(itemFactors).where("_1._2").equalTo(0).map {
+            triple => {
+              val (((uID, iID), uFactors), iFactors) = triple
+
+              val uFactorsVector = uFactors.factors
+              val iFactorsVector = iFactors.factors
+
+              val prediction = blas.ddot(
+                uFactorsVector.length,
+                uFactorsVector,
+                1,
+                iFactorsVector,
+                1)
+
+              (uID, iID, prediction)
+            }
+          }
+        }
+
+        case None => throw new RuntimeException("The ALS model has not been fitted to data. " +
+          "Prior to predicting values, it has to be trained on data.")
+      }
+    }
+  }
+
+  /** Calculates the matrix factorization for the given ratings. A rating is defined as
+    * a tuple of user ID, item ID and the corresponding rating.
+    *
+    * @return Factorization containing the user and item matrix
+    */
+  implicit val fitALS =  new FitOperation[ALS, (Int, Int, Double)] {
+    override def fit(
+        instance: ALS,
+        fitParameters: ParameterMap,
+        input: DataSet[(Int, Int, Double)])
+      : Unit = {
+      val resultParameters = instance.parameters ++ fitParameters
+
+      val userBlocks = resultParameters.get(Blocks).getOrElse(input.count.toInt)
+      val itemBlocks = userBlocks
+      val persistencePath = resultParameters.get(TemporaryPath)
+      val seed = resultParameters(Seed)
+      val factors = resultParameters(NumFactors)
+      val iterations = resultParameters(Iterations)
+      val lambda = resultParameters(Lambda)
+
+      val ratings = input.map {
+        entry => {
+          val (userID, itemID, rating) = entry
+          Rating(userID, itemID, rating)
+        }
+      }
+
+      val blockIDPartitioner = new BlockIDPartitioner()
+
+      val ratingsByUserBlock = ratings.map{
+        rating =>
+          val blockID = rating.user % userBlocks
+          (blockID, rating)
+      } partitionCustom(blockIDPartitioner, 0)
+
+      val ratingsByItemBlock = ratings map {
+        rating =>
+          val blockID = rating.item % itemBlocks
+          (blockID, new Rating(rating.item, rating.user, rating.rating))
+      } partitionCustom(blockIDPartitioner, 0)
+
+      val (uIn, uOut) = createBlockInformation(userBlocks, itemBlocks, ratingsByUserBlock,
+        blockIDPartitioner)
+      val (iIn, iOut) = createBlockInformation(itemBlocks, userBlocks, ratingsByItemBlock,
+        blockIDPartitioner)
+
+      val (userIn, userOut) = persistencePath match {
+        case Some(path) => FlinkTools.persist(uIn, uOut, path + "userIn", path + "userOut")
+        case None => (uIn, uOut)
+      }
+
+      val (itemIn, itemOut) = persistencePath match {
+        case Some(path) => FlinkTools.persist(iIn, iOut, path + "itemIn", path + "itemOut")
+        case None => (iIn, iOut)
+      }
 
-    // perform last half-step to calculate the user matrix
-    val users = updateFactors(userBlocks, pItems, itemOut, userIn, factors, lambda,
-      blockIDPartitioner)
+      val initialItems = itemOut.partitionCustom(blockIDPartitioner, 0).map{
+        outInfos =>
+          val blockID = outInfos._1
+          val infos = outInfos._2
+
+          (blockID, infos.elementIDs.map{
+            id =>
+              val random = new Random(id ^ seed)
+              randomFactors(factors, random)
+          })
+      }.withForwardedFields("0")
+
+      // iteration to calculate the item matrix
+      val items = initialItems.iterate(iterations) {
+        items => {
+          val users = updateFactors(userBlocks, items, itemOut, userIn, factors, lambda,
+            blockIDPartitioner)
+          updateFactors(itemBlocks, users, userOut, itemIn, factors, lambda, blockIDPartitioner)
+        }
+      }
+
+      val pItems = persistencePath match {
+        case Some(path) => FlinkTools.persist(items, path + "items")
+        case None => items
+      }
+
+      // perform last half-step to calculate the user matrix
+      val users = updateFactors(userBlocks, pItems, itemOut, userIn, factors, lambda,
+        blockIDPartitioner)
 
-    new ALSModel(unblock(users, userOut, blockIDPartitioner), unblock(pItems, itemOut,
-      blockIDPartitioner), lambda)
+      instance.factorsOption = Some((
+        unblock(users, userOut, blockIDPartitioner),
+        unblock(pItems, itemOut, blockIDPartitioner)))
+    }
   }
 
   /** Calculates a single half step of the ALS optimization. The result is the new value for
@@ -283,11 +529,11 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
     * @return New value for the optimized matrix (either user or item)
     */
   def updateFactors(numUserBlocks: Int,
-                    items: DataSet[(Int, Array[Array[Double]])],
-                    itemOut: DataSet[(Int, OutBlockInformation)],
-                    userIn: DataSet[(Int, InBlockInformation)],
-                    factors: Int,
-                    lambda: Double, blockIDPartitioner: FlinkPartitioner[Int]):
+    items: DataSet[(Int, Array[Array[Double]])],
+    itemOut: DataSet[(Int, OutBlockInformation)],
+    userIn: DataSet[(Int, InBlockInformation)],
+    factors: Int,
+    lambda: Double, blockIDPartitioner: FlinkPartitioner[Int]):
   DataSet[(Int, Array[Array[Double]])] = {
     // send the item vectors to the blocks whose users have rated the items
     val partialBlockMsgs = itemOut.join(items).where(0).equalTo(0).
@@ -334,8 +580,8 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
         val numRatings = new ArrayBuffer[Int]()
 
         override def coGroup(left: lang.Iterable[(Int, Int, Array[Array[Double]])],
-                             right: lang.Iterable[(Int, InBlockInformation)],
-                             collector: Collector[(Int, Array[Array[Double]])]): Unit = {
+          right: lang.Iterable[(Int, InBlockInformation)],
+          collector: Collector[(Int, Array[Array[Double]])]): Unit = {
           // there is only one InBlockInformation per user block
           val inInfo = right.iterator().next()._2
           val updates = left.iterator()
@@ -439,7 +685,7 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
     * @return
     */
   def createBlockInformation(userBlocks: Int, itemBlocks: Int, ratings: DataSet[(Int, Rating)],
-                             blockIDPartitioner: BlockIDPartitioner):
+    blockIDPartitioner: BlockIDPartitioner):
   (DataSet[(Int, InBlockInformation)], DataSet[(Int, OutBlockInformation)]) = {
     val blockIDGenerator = new BlockIDGenerator(itemBlocks)
 
@@ -498,8 +744,8 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
     * @return
     */
   def createOutBlockInformation(ratings: DataSet[(Int, Rating)],
-                                usersPerBlock: DataSet[(Int, Array[Int])],
-                                itemBlocks: Int, blockIDGenerator: BlockIDGenerator):
+    usersPerBlock: DataSet[(Int, Array[Int])],
+    itemBlocks: Int, blockIDGenerator: BlockIDGenerator):
   DataSet[(Int, OutBlockInformation)] = {
     ratings.coGroup(usersPerBlock).where(0).equalTo(0).apply {
       (ratings, users) =>
@@ -547,8 +793,8 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
     * @return
     */
   def createInBlockInformation(ratings: DataSet[(Int, Rating)],
-                               usersPerBlock: DataSet[(Int, Array[Int])],
-                               blockIDGenerator: BlockIDGenerator):
+    usersPerBlock: DataSet[(Int, Array[Int])],
+    blockIDGenerator: BlockIDGenerator):
   DataSet[(Int, InBlockInformation)] = {
     // Group for every user block the users which have rated the same item and collect their ratings
     val partialInInfos = ratings.map { x => (x._1, x._2.item, x._2.user, x._2.rating)}
@@ -628,9 +874,9 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
         val buffer = ArrayBuffer[BlockRating]()
 
         override def coGroup(partialInfosIterable:
-                             lang.Iterable[(Int, Int,  Array[(Array[Int], Array[Double])])],
-                             userIterable: lang.Iterable[(Int, Array[Int])],
-                             collector: Collector[(Int, InBlockInformation)]): Unit = {
+        lang.Iterable[(Int, Int,  Array[(Array[Int], Array[Double])])],
+          userIterable: lang.Iterable[(Int, Array[Int])],
+          collector: Collector[(Int, InBlockInformation)]): Unit = {
 
           val users = userIterable.iterator()
           val partialInfos = partialInfosIterable.iterator()
@@ -691,8 +937,8 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
     * @return
     */
   def unblock(users: DataSet[(Int, Array[Array[Double]])],
-              outInfo: DataSet[(Int, OutBlockInformation)],
-              blockIDPartitioner: BlockIDPartitioner): DataSet[Factors] = {
+    outInfo: DataSet[(Int, OutBlockInformation)],
+    blockIDPartitioner: BlockIDPartitioner): DataSet[Factors] = {
     users.join(outInfo).where(0).equalTo(0).withPartitioner(blockIDPartitioner).apply {
       (left, right, col: Collector[Factors]) => {
         val outInfo = right._2
@@ -725,7 +971,7 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
   }
 
   def generateFullMatrix(triangularMatrix: Array[Double], fullMatrix: Array[Double],
-                         factors: Int): Unit = {
+    factors: Int): Unit = {
     var row = 0
     var pos = 0
 
@@ -759,206 +1005,3 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
     Array.fill(factors)(random.nextDouble())
   }
 }
-
-object ALS {
-  val USER_FACTORS_FILE = "userFactorsFile"
-  val ITEM_FACTORS_FILE = "itemFactorsFile"
-
-  case object NumFactors extends Parameter[Int] {
-    val defaultValue: Option[Int] = Some(10)
-  }
-
-  case object Lambda extends Parameter[Double] {
-    val defaultValue: Option[Double] = Some(1.0)
-  }
-
-  case object Iterations extends Parameter[Int] {
-    val defaultValue: Option[Int] = Some(10)
-  }
-
-  case object Blocks extends Parameter[Int] {
-    val defaultValue: Option[Int] = None
-  }
-
-  case object Seed extends Parameter[Long] {
-    val defaultValue: Option[Long] = Some(0L)
-  }
-
-  case object TemporaryPath extends Parameter[String] {
-    val defaultValue: Option[String] = None
-  }
-
-  // ==================================== ALS type definitions =====================================
-
-  /** Representation of a user-item rating
-    *
-    * @param user User ID of the rating user
-    * @param item Item iD of the rated item
-    * @param rating Rating value
-    */
-  case class Rating(user: Int, item: Int, rating: Double)
-
-  /** Latent factor model vector
-    *
-    * @param id
-    * @param factors
-    */
-  case class Factors(id: Int, factors: Array[Double]) {
-    override def toString = s"($id, ${factors.mkString(",")})"
-  }
-
-  case class Factorization(userFactors: DataSet[Factors], itemFactors: DataSet[Factors])
-
-  case class OutBlockInformation(elementIDs: Array[Int], outLinks: OutLinks) {
-    override def toString: String = {
-      s"OutBlockInformation:((${elementIDs.mkString(",")}), ($outLinks))"
-    }
-  }
-
-  class OutLinks(var links: Array[scala.collection.mutable.BitSet]) extends Value {
-    def this() = this(null)
-
-    override def toString: String = {
-      s"${links.mkString("\n")}"
-    }
-
-    override def write(out: DataOutputView): Unit = {
-      out.writeInt(links.length)
-      links foreach {
-        link => {
-          val bitMask = link.toBitMask
-          out.writeInt(bitMask.length)
-          for (element <- bitMask) {
-            out.writeLong(element)
-          }
-        }
-      }
-    }
-
-    override def read(in: DataInputView): Unit = {
-      val length = in.readInt()
-      links = new Array[scala.collection.mutable.BitSet](length)
-
-      for (i <- 0 until length) {
-        val bitMaskLength = in.readInt()
-        val bitMask = new Array[Long](bitMaskLength)
-        for (j <- 0 until bitMaskLength) {
-          bitMask(j) = in.readLong()
-        }
-        links(i) = mutable.BitSet.fromBitMask(bitMask)
-      }
-    }
-
-    def apply(idx: Int) = links(idx)
-  }
-
-  case class InBlockInformation(elementIDs: Array[Int], ratingsForBlock: Array[BlockRating]) {
-
-    override def toString: String = {
-      s"InBlockInformation:((${elementIDs.mkString(",")}), (${ratingsForBlock.mkString("\n")}))"
-    }
-  }
-
-  case class BlockRating(var ratings: Array[(Array[Int], Array[Double])]) {
-    def apply(idx: Int) = ratings(idx)
-
-    override def toString: String = {
-      ratings.map {
-        case (left, right) => s"((${left.mkString(",")}),(${right.mkString(",")}))"
-      }.mkString(",")
-    }
-  }
-
-  case class BlockedFactorization(userFactors: DataSet[(Int, Array[Array[Double]])],
-                                  itemFactors: DataSet[(Int, Array[Array[Double]])])
-
-  class BlockIDPartitioner extends FlinkPartitioner[Int] {
-    override def partition(blockID: Int, numberOfPartitions: Int): Int = {
-      blockID % numberOfPartitions
-    }
-  }
-
-  class BlockIDGenerator(blocks: Int) extends Serializable {
-    def apply(id: Int): Int = {
-      id % blocks
-    }
-  }
-
-  // ========================= Factory methods =====================================
-
-  def apply(): ALS = {
-    new ALS()
-  }
-}
-
-/** Resulting model of the ALS algorithm.
-  *
-  * It contains the calculated factors, user and item matrix, of the given
-  * ratings matrix. Additionally it stores the used regularization value lambda in order to
-  * calculate the empirical risk of the model.
-  *
-  * @param userFactors Calculated user matrix
-  * @param itemFactors Calcualted item matrix
-  * @param lambda Regularization value used to calculate the model
-  */
-class ALSModel(
-    @transient val userFactors: DataSet[Factors],
-    @transient val itemFactors: DataSet[Factors],
-    val lambda: Double)
-  extends Transformer[(Int, Int), (Int, Int, Double)]
-  with Serializable {
-
-  override def transform(input: DataSet[(Int, Int)], parameters: ParameterMap): DataSet[(Int,
-    Int, Double)] = {
-
-    input.join(userFactors).where(0).equalTo(0)
-      .join(itemFactors).where("_1._2").equalTo(0).map {
-      triple => {
-        val (((uID, iID), uFactors), iFactors) = triple
-
-        val uFactorsVector = uFactors.factors
-        val iFactorsVector = iFactors.factors
-
-        val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1)
-
-        (uID, iID, prediction)
-      }
-    }
-  }
-
-  def empiricalRisk(labeledData: DataSet[(Int, Int, Double)]): DataSet[Double] = {
-    val data = labeledData map {
-      x => (x._1, x._2)
-    }
-
-    val predictions = data.join(userFactors).where(0).equalTo(0)
-      .join(itemFactors).where("_1._2").equalTo(0).map {
-      triple => {
-        val (((uID, iID), uFactors), iFactors) = triple
-
-        val uFactorsVector = uFactors.factors
-        val iFactorsVector = iFactors.factors
-
-        val squaredUNorm2 = blas.ddot(uFactorsVector.length, uFactorsVector, 1, uFactorsVector, 1)
-        val squaredINorm2 = blas.ddot(iFactorsVector.length, iFactorsVector, 1, iFactorsVector, 1)
-
-        val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1)
-
-        (uID, iID, prediction, squaredUNorm2, squaredINorm2)
-      }
-    }
-
-    labeledData.join(predictions).where(0,1).equalTo(0,1) {
-      (left, right) => {
-        val (_, _, expected) = left
-        val (_, _, predicted, squaredUNorm2, squaredINorm2) = right
-
-        val residual = expected - predicted
-
-        residual * residual + lambda * (squaredUNorm2 + squaredINorm2)
-      }
-    } reduce {
-      _ + _
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
index 87352fa..64b24dc 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
@@ -29,6 +29,7 @@ import org.apache.flink.ml.math.vector2Array
 import org.apache.flink.api.scala._
 
 import com.github.fommil.netlib.BLAS.{ getInstance => blas }
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
 
 /** Multiple linear regression using the ordinary least squares (OLS) estimator.
   *
@@ -67,30 +68,33 @@ import com.github.fommil.netlib.BLAS.{ getInstance => blas }
   *             val trainingDS: DataSet[LabeledVector] = ...
   *             val testingDS: DataSet[Vector] = ...
   *
-  *             val model = mlr.fit(trainingDS)
+  *             mlr.fit(trainingDS)
   *
-  *             val predictions = model.transform(testingDS)
+  *             val predictions = mlr.predict(testingDS)
   *          }}}
   *
   * =Parameters=
   *
-  *  - [[MultipleLinearRegression.Iterations]]: Maximum number of iterations.
+  *  - [[org.apache.flink.ml.regression.MultipleLinearRegression.Iterations]]:
+  *  Maximum number of iterations.
   *
-  *  - [[MultipleLinearRegression.Stepsize]]:
+  *  - [[org.apache.flink.ml.regression.MultipleLinearRegression.Stepsize]]:
   *  Initial step size for the gradient descent method.
   *  This value controls how far the gradient descent method moves in the opposite direction of the
   *  gradient. Tuning this parameter might be crucial to make it stable and to obtain a better
   *  performance.
   *
-  *  - [[MultipleLinearRegression.ConvergenceThreshold]]:
+  *  - [[org.apache.flink.ml.regression.MultipleLinearRegression.ConvergenceThreshold]]:
   *  Threshold for relative change of sum of squared residuals until convergence.
   *
   */
-class 
-MultipleLinearRegression extends Learner[LabeledVector, MultipleLinearRegressionModel]
-with Serializable {
+class MultipleLinearRegression extends Predictor[MultipleLinearRegression] {
+
   import MultipleLinearRegression._
 
+  // Stores the weights of the linear model after the fitting phase
+  var weightsOption: Option[DataSet[(Array[Double], Double)]] = None
+
   def setIterations(iterations: Int): MultipleLinearRegression = {
     parameters.add(Iterations, iterations)
     this
@@ -106,116 +110,174 @@ with Serializable {
     this
   }
 
-  override def fit(input: DataSet[LabeledVector], fitParameters: ParameterMap):
-  MultipleLinearRegressionModel = {
-    val map = this.parameters ++ fitParameters
-
-    // retrieve parameters of the algorithm
-    val numberOfIterations = map(Iterations)
-    val stepsize = map(Stepsize)
-    val convergenceThreshold = map.get(ConvergenceThreshold)
+  def squaredResidualSum(input: DataSet[LabeledVector]): DataSet[Double] = {
+    weightsOption match {
+      case Some(weights) => {
+        input.map {
+          new SquaredResiduals
+        }.withBroadcastSet(weights, WEIGHTVECTOR_BROADCAST).reduce {
+          _ + _
+        }
+      }
 
-    // calculate dimension of the feature vectors
-    val dimension = input.map{_.vector.size}.reduce {
-      (a, b) =>
-        require(a == b, "All input vector must have the same dimension.")
-        a
+      case None => {
+        throw new RuntimeException("The MultipleLinearRegression has not been fitted to the " +
+          "data. This is necessary to learn the weight vector of the linear function.")
+      }
     }
 
-    // initial weight vector is set to 0
-    val initialWeightVector = createInitialWeightVector(dimension)
+  }
+}
+
+object MultipleLinearRegression {
+  val WEIGHTVECTOR_BROADCAST = "weights_broadcast"
 
-    // check if a convergence threshold has been set
-    val resultingWeightVector = convergenceThreshold match {
-      case Some(convergence) =>
+  // ====================================== Parameters =============================================
 
-        // we have to calculate for each weight vector the sum of squared residuals
-        val initialSquaredResidualSum = input.map {
-          new SquaredResiduals
-        }.withBroadcastSet(initialWeightVector, WEIGHTVECTOR_BROADCAST).reduce {
-          _ + _
-        }
+  case object Stepsize extends Parameter[Double] {
+    val defaultValue = Some(0.1)
+  }
+
+  case object Iterations extends Parameter[Int] {
+    val defaultValue = Some(10)
+  }
+
+  case object ConvergenceThreshold extends Parameter[Double] {
+    val defaultValue = None
+  }
+
+  // ======================================== Factory methods ======================================
+
+  def apply(): MultipleLinearRegression = {
+    new MultipleLinearRegression()
+  }
+
+  // ====================================== Operations =============================================
 
-        // combine weight vector with current sum of squared residuals
-        val initialWeightVectorWithSquaredResidualSum = initialWeightVector.
-          crossWithTiny(initialSquaredResidualSum).setParallelism(1)
+  /** Trains the linear model to fit the training data. The resulting weight vector is stored in
+    * the [[MultipleLinearRegression]] instance.
+    *
+    */
+  implicit val fitMLR = new FitOperation[MultipleLinearRegression, LabeledVector] {
+    override def fit(
+        instance: MultipleLinearRegression,
+        fitParameters: ParameterMap,
+        input: DataSet[LabeledVector])
+      : Unit = {
+      val map = instance.parameters ++ fitParameters
+
+      // retrieve parameters of the algorithm
+      val numberOfIterations = map(Iterations)
+      val stepsize = map(Stepsize)
+      val convergenceThreshold = map.get(ConvergenceThreshold)
+
+      // calculate dimension of the feature vectors
+      val dimension = input.map{_.vector.size}.reduce {
+        (a, b) =>
+          require(a == b, "All input vector must have the same dimension.")
+          a
+      }
+
+      input.flatMap{
+        t =>
+          Seq(t)
+      }
+
+      // initial weight vector is set to 0
+      val initialWeightVector = createInitialWeightVector(dimension)
+
+      // check if a convergence threshold has been set
+      val resultingWeightVector = convergenceThreshold match {
+        case Some(convergence) =>
+
+          // we have to calculate for each weight vector the sum of squared residuals
+          val initialSquaredResidualSum = input.map {
+            new SquaredResiduals
+          }.withBroadcastSet(initialWeightVector, WEIGHTVECTOR_BROADCAST).reduce {
+            _ + _
+          }
+
+          // combine weight vector with current sum of squared residuals
+          val initialWeightVectorWithSquaredResidualSum = initialWeightVector.
+            crossWithTiny(initialSquaredResidualSum).setParallelism(1)
 
-        // start SGD iteration
-        val resultWithResidual = initialWeightVectorWithSquaredResidualSum.
-          iterateWithTermination(numberOfIterations) {
-          weightVectorSquaredResidualDS =>
+          // start SGD iteration
+          val resultWithResidual = initialWeightVectorWithSquaredResidualSum.
+            iterateWithTermination(numberOfIterations) {
+            weightVectorSquaredResidualDS =>
 
-            // extract weight vector and squared residual sum
-            val weightVector = weightVectorSquaredResidualDS.map{_._1}
-            val squaredResidualSum = weightVectorSquaredResidualDS.map{_._2}
+              // extract weight vector and squared residual sum
+              val weightVector = weightVectorSquaredResidualDS.map{_._1}
+              val squaredResidualSum = weightVectorSquaredResidualDS.map{_._2}
 
-            // TODO: Sample from input to realize proper SGD
-            val newWeightVector = input.map {
-              new LinearRegressionGradientDescent
-            }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST).reduce {
-              (left, right) =>
+              // TODO: Sample from input to realize proper SGD
+              val newWeightVector = input.map {
+                new LinearRegressionGradientDescent
+              }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST).reduce {
+                (left, right) =>
                   val (leftBetas, leftBeta0, leftCount) = left
                   val (rightBetas, rightBeta0, rightCount) = right
 
                   blas.daxpy(leftBetas.length, 1.0, rightBetas, 1, leftBetas, 1)
 
                   (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount)
-            }.map {
-              new LinearRegressionWeightsUpdate(stepsize)
-            }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST)
-
-            // calculate the sum of squared residuals for the new weight vector
-            val newResidual = input.map {
-              new SquaredResiduals
-            }.withBroadcastSet(newWeightVector, WEIGHTVECTOR_BROADCAST).reduce {
-              _ + _
-            }
+              }.map {
+                new LinearRegressionWeightsUpdate(stepsize)
+              }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST)
+
+              // calculate the sum of squared residuals for the new weight vector
+              val newResidual = input.map {
+                new SquaredResiduals
+              }.withBroadcastSet(newWeightVector, WEIGHTVECTOR_BROADCAST).reduce {
+                _ + _
+              }
 
-            // check if the relative change in the squared residual sum is smaller than the
-            // convergence threshold. If yes, then terminate => return empty termination data set
-            val termination = squaredResidualSum.crossWithTiny(newResidual).setParallelism(1).
-              filter{
-              pair => {
-                val (residual, newResidual) = pair
-
-                if (residual <= 0) {
-                  false
-                } else {
-                  math.abs((residual - newResidual)/residual) >= convergence
+              // check if the relative change in the squared residual sum is smaller than the
+              // convergence threshold. If yes, then terminate => return empty termination data set
+              val termination = squaredResidualSum.crossWithTiny(newResidual).setParallelism(1).
+                filter{
+                pair => {
+                  val (residual, newResidual) = pair
+
+                  if (residual <= 0) {
+                    false
+                  } else {
+                    math.abs((residual - newResidual)/residual) >= convergence
+                  }
                 }
               }
-            }
 
-            // result for new iteration
-            (newWeightVector cross newResidual, termination)
-        }
+              // result for new iteration
+              (newWeightVector cross newResidual, termination)
+          }
+
+          // remove squared residual sum to only return the weight vector
+          resultWithResidual.map{_._1}
+
+        case None =>
+          // No convergence criterion
+          initialWeightVector.iterate(numberOfIterations) {
+            weightVector => {
+
+              // TODO: Sample from input to realize proper SGD
+              input.map {
+                new LinearRegressionGradientDescent
+              }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST).reduce {
+                (left, right) =>
+                  val (leftBetas, leftBeta0, leftCount) = left
+                  val (rightBetas, rightBeta0, rightCount) = right
 
-        // remove squared residual sum to only return the weight vector
-        resultWithResidual.map{_._1}
-
-      case None =>
-        // No convergence criterion
-        initialWeightVector.iterate(numberOfIterations) {
-          weightVector => {
-
-            // TODO: Sample from input to realize proper SGD
-            input.map {
-              new LinearRegressionGradientDescent
-            }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST).reduce {
-              (left, right) =>
-                val (leftBetas, leftBeta0, leftCount) = left
-                val (rightBetas, rightBeta0, rightCount) = right
-
-                blas.daxpy(leftBetas.length, 1, rightBetas, 1, leftBetas, 1)
-                (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount)
-            }.map {
-              new LinearRegressionWeightsUpdate(stepsize)
-            }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST)
+                  blas.daxpy(leftBetas.length, 1, rightBetas, 1, leftBetas, 1)
+                  (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount)
+              }.map {
+                new LinearRegressionWeightsUpdate(stepsize)
+              }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST)
+            }
           }
-        }
-    }
+      }
 
-    new MultipleLinearRegressionModel(resultingWeightVector)
+      instance.weightsOption = Some(resultingWeightVector)
+    }
   }
 
   /** Creates a DataSet with one zero vector. The zero vector has dimension d, which is given
@@ -233,28 +295,58 @@ with Serializable {
         (values, 0.0)
     }
   }
-}
 
-object MultipleLinearRegression {
-  val WEIGHTVECTOR_BROADCAST = "weights_broadcast"
+  /** Calculates the predictions for new data with respect to the learned linear model.
+    *
+    * @tparam T Testing data type for which the prediction is calculated. Has to be a subtype of
+    *           [[Vector]]
+    * @return
+    */
+  implicit def predictVectors[T <: Vector] = {
+    new PredictOperation[MultipleLinearRegression, T, LabeledVector] {
+      override def predict(
+        instance: MultipleLinearRegression,
+        predictParameters: ParameterMap,
+        input: DataSet[T])
+      : DataSet[LabeledVector] = {
+        instance.weightsOption match {
+          case Some(weights) => {
+            input.map(new LinearRegressionPrediction[T])
+              .withBroadcastSet(weights, WEIGHTVECTOR_BROADCAST)
+          }
 
-  // Define parameters for MultipleLinearRegression
-  case object Stepsize extends Parameter[Double] {
-    val defaultValue = Some(0.1)
+          case None => {
+            throw new RuntimeException("The MultipleLinearRegression has not been fitted to the " +
+              "data. This is necessary to learn the weight vector of the linear function.")
+          }
+        }
+      }
+    }
   }
 
-  case object Iterations extends Parameter[Int] {
-    val defaultValue = Some(10)
-  }
+  private class LinearRegressionPrediction[T <: Vector] extends RichMapFunction[T, LabeledVector] {
+    private var weights: Array[Double] = null
+    private var weight0: Double = 0
 
-  case object ConvergenceThreshold extends Parameter[Double] {
-    val defaultValue = None
-  }
 
-  // ====================== Facotry methods ==========================
+    @throws(classOf[Exception])
+    override def open(configuration: Configuration): Unit = {
+      val t = getRuntimeContext
+        .getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST)
+
+      val weightsPair = t.get(0)
 
-  def apply(): MultipleLinearRegression = {
-    new MultipleLinearRegression()
+      weights = weightsPair._1
+      weight0 = weightsPair._2
+    }
+
+    override def map(value: T): LabeledVector = {
+      val dotProduct = blas.ddot(weights.length, weights, 1, vector2Array(value), 1)
+
+      val prediction = dotProduct + weight0
+
+      LabeledVector(prediction, value)
+    }
   }
 }
 
@@ -387,63 +479,3 @@ RichMapFunction[(Array[Double], Double, Int), (Array[Double], Double)] {
     (newWeights, newWeight0)
   }
 }
-
-//--------------------------------------------------------------------------------------------------
-//  Model definition
-//--------------------------------------------------------------------------------------------------
-
-/** Multiple linear regression model returned by [[MultipleLinearRegression]]. The model stores the
-  * calculated weight vector and applies the linear model to given vectors v:
-  *
-  * `hat y = w^T*v + w_0`
-  *
-  * with `hat y` being the predicted regression value.
-  *
-  * @param weights DataSet containing the calculated weight vector
-  */
-class MultipleLinearRegressionModel private[regression](
-    val weights: DataSet[(Array[Double], Double)])
-  extends Transformer[ Vector, LabeledVector ]
-  with Serializable {
-
-  import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST
-
-  // predict regression value for input
-  override def transform(input: DataSet[Vector],
-                         parameters: ParameterMap): DataSet[LabeledVector] = {
-    input.map(new LinearRegressionPrediction).withBroadcastSet(weights, WEIGHTVECTOR_BROADCAST)
-  }
-
-  def squaredResidualSum(input: DataSet[LabeledVector]): DataSet[Double] = {
-    input.map{
-      new SquaredResiduals()
-    }.withBroadcastSet(weights, WEIGHTVECTOR_BROADCAST).reduce{
-      _ + _
-    }
-  }
-
-  private class LinearRegressionPrediction extends RichMapFunction[Vector, LabeledVector] {
-    private var weights: Array[Double] = null
-    private var weight0: Double = 0
-
-
-    @throws(classOf[Exception])
-    override def open(configuration: Configuration): Unit = {
-      val t = getRuntimeContext
-        .getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST)
-
-      val weightsPair = t.get(0)
-
-      weights = weightsPair._1
-      weight0 = weightsPair._2
-    }
-
-    override def map(value: Vector): LabeledVector = {
-      val dotProduct = blas.ddot(weights.length, weights, 1, vector2Array(value), 1)
-
-      val prediction = dotProduct + weight0
-
-      LabeledVector(prediction, value)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/resources/log4j-test.properties b/flink-staging/flink-ml/src/test/resources/log4j-test.properties
index f3c51b8..76b237e 100644
--- a/flink-staging/flink-ml/src/test/resources/log4j-test.properties
+++ b/flink-staging/flink-ml/src/test/resources/log4j-test.properties
@@ -1,20 +1,20 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
 #
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
+#      http://www.apache.org/licenses/LICENSE-2.0
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
 # limitations under the License.
-#
+################################################################################
 
 log4j.rootLogger=OFF, console
 
@@ -32,4 +32,7 @@ log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.file=${log.dir}/{$mvn.forkNumber}.log
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala
new file mode 100644
index 0000000..a5e7496
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+
+class CoCoAITSuite extends FlatSpec with Matchers with FlinkTestBase {
+
+  behavior of "The CoCoA implementation"
+
+  it should "train a SVM" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val cocoa = CoCoA().
+    setBlocks(env.getParallelism).
+    setIterations(100).
+    setLocalIterations(100).
+    setRegularization(0.002).
+    setStepsize(0.1).
+    setSeed(0)
+
+    val trainingDS = env.fromCollection(Classification.trainingData)
+
+    cocoa.fit(trainingDS)
+
+    val weightVector = cocoa.weightsOption.get.collect().apply(0)
+
+    weightVector.valuesIterator.zip(Classification.expectedWeightVector.valueIterator).foreach {
+      case (weight, expectedWeight) =>
+        weight should be(expectedWeight +- 0.1)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala
deleted file mode 100644
index 0f000a3..0000000
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.classification
-
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.flink.api.scala._
-import org.apache.flink.test.util.FlinkTestBase
-
-class CoCoAITSuite extends FlatSpec with Matchers with FlinkTestBase {
-
-  behavior of "The CoCoA implementation"
-
-  it should "train a SVM" in {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val learner = CoCoA().
-    setBlocks(env.getParallelism).
-    setIterations(100).
-    setLocalIterations(100).
-    setRegularization(0.002).
-    setStepsize(0.1).
-    setSeed(0)
-
-    val trainingDS = env.fromCollection(Classification.trainingData)
-
-    val model = learner.fit(trainingDS)
-
-    val weightVector = model.weights.collect().apply(0)
-
-    weightVector.valuesIterator.zip(Classification.expectedWeightVector.valueIterator).foreach {
-      case (weight, expectedWeight) =>
-        weight should be(expectedWeight +- 0.1)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala
deleted file mode 100644
index a185282..0000000
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import org.scalatest.FlatSpec
-
-import org.apache.flink.api.scala._
-import org.apache.flink.ml.common.LabeledVector
-import org.apache.flink.ml.math.{SparseVector, DenseVector, Vector}
-import org.apache.flink.test.util.FlinkTestBase
-
-class SciKitPipelineSuite extends FlatSpec with FlinkTestBase {
-  behavior of "Pipeline"
-
-  it should "work" in {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val scaler = new Scaler
-    val offset = new Offset
-
-    val input: DataSet[Vector] = env.fromCollection(List(DenseVector(2,1,3), SparseVector.fromCOO(3, (1,1), (2,2))))
-    val training = env.fromCollection(List(LabeledVector(1.0, DenseVector(2,3,1)), LabeledVector(2.0, SparseVector.fromCOO(3, (1,1), (2,2)))))
-    val intData = env.fromCollection(List(1,2,3,4))
-
-    val result = scaler.transform(input)
-
-    result.print()
-
-    val result2 = offset.transform(input)
-    result2.print()
-
-    val chain = scaler.chainTransformer(offset)
-
-    val result3 = chain.transform(input)(ChainedTransformer.chainedTransformOperation(Scaler.vTransform, Offset.offsetTransform))
-
-    result3.print()
-
-    val chain2 = chain.chainTransformer(scaler)
-    val result4 = chain2.transform(input)
-
-    result4.print()
-
-    val kmeans = new KMeans()
-
-    val chainedPredictor = chain.chainPredictor(kmeans)
-
-    val prediction = chainedPredictor.predict(result)
-
-    prediction.print()
-
-    env.execute()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala
index b930ceb..0f045ab 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala
@@ -21,6 +21,7 @@ package org.apache.flink.ml.feature
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.ml.common.LabeledVector
 import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.ml.preprocessing.PolynomialFeatures
 import org.scalatest.{Matchers, FlatSpec}
 
 import org.apache.flink.api.scala._
@@ -45,10 +46,10 @@ class PolynomialBaseITSuite
 
     val inputDS = env.fromCollection (input)
 
-    val transformer = PolynomialBase ()
+    val transformer = PolynomialFeatures()
     .setDegree (3)
 
-    val transformedDS = transformer.transform (inputDS)
+    val transformedDS = transformer.transform(inputDS)
 
     val expectedMap = List (
     (1.0 -> DenseVector (1.0, 1.0, 1.0) ),
@@ -81,7 +82,7 @@ class PolynomialBaseITSuite
 
     val inputDS = env.fromCollection(input)
 
-    val transformer = PolynomialBase()
+    val transformer = PolynomialFeatures()
       .setDegree(3)
 
     val transformedDS = transformer.transform(inputDS)
@@ -106,7 +107,7 @@ class PolynomialBaseITSuite
 
     val inputDS = env.fromCollection(input)
 
-    val transformer = PolynomialBase()
+    val transformer = PolynomialFeatures()
       .setDegree(0)
 
     val transformedDS = transformer.transform(inputDS)

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
new file mode 100644
index 0000000..8803195
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.pipeline
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.ml.preprocessing.{PolynomialFeatures, StandardScaler}
+import org.apache.flink.ml.regression.MultipleLinearRegression
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+
+class PipelineITSuite extends FlatSpec with Matchers with FlinkTestBase {
+  behavior of "Flink's pipelines"
+
+  it should "support chaining of compatible transformer" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val vData = List(DenseVector(1.0, 2.0, 3.0), DenseVector(2.0, 3.0, 4.0))
+    val lvData = List(LabeledVector(1.0, DenseVector(1.0, 1.0, 1.0)),
+      LabeledVector(2.0, DenseVector(2.0, 2.0, 2.0)))
+
+    val vectorData = env.fromCollection(vData)
+    val labeledVectorData = env.fromCollection(lvData)
+
+    val expectedScaledVectorSet = Set(
+      DenseVector(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -1.0, -1.0, -1.0),
+      DenseVector(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
+    )
+
+    val expectedScaledLabeledVectorSet = Set(
+      LabeledVector(1.0, DenseVector(1.0, 3.0, 5.0, 9.0, 15.0, 25.0, -1.0, -3.0, -5.0)),
+      LabeledVector(2.0, DenseVector(1.0, -1.0, -3.0, 1.0, 3.0, 9.0, 1.0, -1.0, -3.0))
+    )
+
+    val scaler = StandardScaler()
+    val polyFeatures = PolynomialFeatures().setDegree(2)
+
+    val pipeline = scaler.chainTransformer(polyFeatures)
+
+    pipeline.fit(vectorData)
+
+    val scaledVectorDataDS = pipeline.transform(vectorData)
+    val scaledLabeledVectorDataDS = pipeline.transform(labeledVectorData)
+
+    val scaledVectorData = scaledVectorDataDS.collect()
+    val scaledLabeledVectorData = scaledLabeledVectorDataDS.collect()
+
+    scaledVectorData.size should be(expectedScaledVectorSet.size)
+
+    for(scaledVector <- scaledVectorData){
+      expectedScaledVectorSet should contain(scaledVector)
+    }
+
+    scaledLabeledVectorData.size should be(expectedScaledLabeledVectorSet.size)
+
+    for(scaledLabeledVector <- scaledLabeledVectorData) {
+      expectedScaledLabeledVectorSet should contain(scaledLabeledVector)
+    }
+  }
+
+  it should "throw an exception when the pipeline operators are not compatible" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val scaler = StandardScaler()
+    val mlr = MultipleLinearRegression()
+
+    val vData = List(DenseVector(1.0, 2.0, 3.0), DenseVector(2.0, 3.0, 4.0))
+    val vectorData = env.fromCollection(vData)
+
+    val pipeline = scaler.chainPredictor(mlr)
+
+    val exception = intercept[RuntimeException] {
+      pipeline.fit(vectorData)
+    }
+
+    exception.getMessage should equal("There is no FitOperation defined for class org.apache." +
+      "flink.ml.regression.MultipleLinearRegression which trains on a " +
+      "DataSet[class org.apache.flink.ml.math.DenseVector]")
+  }
+
+  it should "throw an exception when the input data is not supported" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val dData = List(1.0, 2.0, 3.0)
+    val doubleData = env.fromCollection(dData)
+
+    val scaler = StandardScaler()
+    val polyFeatures = PolynomialFeatures()
+
+    val pipeline = scaler.chainTransformer(polyFeatures)
+
+    val exception = intercept[RuntimeException] {
+      pipeline.fit(doubleData)
+    }
+
+    exception.getMessage should equal("There is no FitOperation defined for class org.apache." +
+      "flink.ml.preprocessing.StandardScaler which trains on a DataSet[double]")
+  }
+
+  it should "support multiple transformers and a predictor" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val data = List(LabeledVector(1.0, DenseVector(1.0, 2.0)),
+      LabeledVector(2.0, DenseVector(2.0, 3.0)),
+      LabeledVector(3.0, DenseVector(3.0, 4.0)))
+
+    val trainingData = env.fromCollection(data)
+
+    val chainedScalers2 = StandardScaler().chainTransformer(StandardScaler())
+    val chainedScalers3 = chainedScalers2.chainTransformer(StandardScaler())
+    val chainedScalers4 = chainedScalers3.chainTransformer(StandardScaler())
+    val chainedScalers5 = chainedScalers4.chainTransformer(StandardScaler())
+
+    val predictor = MultipleLinearRegression()
+
+
+    val pipeline = chainedScalers5.chainPredictor(predictor)
+
+    pipeline.fit(trainingData)
+
+    val weightVector = predictor.weightsOption.get.collect().head
+
+    weightVector._1.foreach{
+      _ should be (0.367282 +- 0.01)
+    }
+
+    weightVector._2 should be (1.3131727 +- 0.01)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
index ac3cbb6..30875b3 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
@@ -41,8 +41,9 @@ class StandardScalerITSuite
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val dataSet = env.fromCollection(data)
-    val transformer = StandardScaler()
-    val scaledVectors = transformer.transform(dataSet).collect
+    val scaler = StandardScaler()
+    scaler.fit(dataSet)
+    val scaledVectors = scaler.transform(dataSet).collect
 
     scaledVectors.length should equal(data.length)
 
@@ -73,8 +74,9 @@ class StandardScalerITSuite
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val dataSet = env.fromCollection(data)
-    val transformer = StandardScaler().setMean(10.0).setStd(2.0)
-    val scaledVectors = transformer.transform(dataSet).collect
+    val scaler = StandardScaler().setMean(10.0).setStd(2.0)
+    scaler.fit(dataSet)
+    val scaledVectors = scaler.transform(dataSet).collect
 
     scaledVectors.length should equal(data.length)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
index 245d7a8..2ad310d 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
@@ -48,13 +48,13 @@ class ALSITSuite
 
     val inputDS = env.fromCollection(data)
 
-    val model = als.fit(inputDS)
+    als.fit(inputDS)
 
     val testData = env.fromCollection(expectedResult.map{
       case (userID, itemID, rating) => (userID, itemID)
     })
 
-    val predictions = model.transform(testData).collect()
+    val predictions = als.predict(testData).collect()
 
     predictions.length should equal(expectedResult.length)
 
@@ -70,7 +70,7 @@ class ALSITSuite
       }
     }
 
-    val risk = model.empiricalRisk(inputDS).collect().apply(0)
+    val risk = als.empiricalRisk(inputDS).collect().apply(0)
 
     risk should be(expectedEmpiricalRisk +- 1)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
index 2d3f770..8be239a 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
@@ -20,7 +20,7 @@ package org.apache.flink.ml.regression
 
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.ml.common.ParameterMap
-import org.apache.flink.ml.feature.PolynomialBase
+import org.apache.flink.ml.preprocessing.PolynomialFeatures
 import org.scalatest.{Matchers, FlatSpec}
 
 import org.apache.flink.api.scala._
@@ -38,7 +38,7 @@ class MultipleLinearRegressionITSuite
 
     env.setParallelism(2)
 
-    val learner = MultipleLinearRegression()
+    val mlr = MultipleLinearRegression()
 
     import RegressionData._
 
@@ -49,9 +49,9 @@ class MultipleLinearRegressionITSuite
     parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
 
     val inputDS = env.fromCollection(data)
-    val model = learner.fit(inputDS, parameters)
+    mlr.fit(inputDS, parameters)
 
-    val weightList = model.weights.collect()
+    val weightList = mlr.weightsOption.get.collect()
 
     weightList.size should equal(1)
 
@@ -63,7 +63,7 @@ class MultipleLinearRegressionITSuite
     }
     weight0 should be (expectedWeight0 +- 0.4)
 
-    val srs = model.squaredResidualSum(inputDS).collect().apply(0)
+    val srs = mlr.squaredResidualSum(inputDS).collect().apply(0)
 
     srs should be (expectedSquaredResidualSum +- 2)
   }
@@ -73,21 +73,21 @@ class MultipleLinearRegressionITSuite
 
     env.setParallelism(2)
 
-    val polynomialBase = PolynomialBase()
-    val learner = MultipleLinearRegression()
+    val polynomialBase = PolynomialFeatures()
+    val mlr = MultipleLinearRegression()
 
-    val pipeline = polynomialBase.chain(learner)
+    val pipeline = polynomialBase.chainPredictor(mlr)
 
     val inputDS = env.fromCollection(RegressionData.polynomialData)
 
     val parameters = ParameterMap()
-      .add(PolynomialBase.Degree, 3)
+      .add(PolynomialFeatures.Degree, 3)
       .add(MultipleLinearRegression.Stepsize, 0.002)
       .add(MultipleLinearRegression.Iterations, 100)
 
-    val model = pipeline.fit(inputDS, parameters)
+    pipeline.fit(inputDS, parameters)
 
-    val weightList = model.weights.collect()
+    val weightList = mlr.weightsOption.get.collect()
 
     weightList.size should equal(1)
 
@@ -102,7 +102,7 @@ class MultipleLinearRegressionITSuite
 
     val transformedInput = polynomialBase.transform(inputDS, parameters)
 
-    val srs = model.squaredResidualSum(transformedInput).collect().apply(0)
+    val srs = mlr.squaredResidualSum(transformedInput).collect().apply(0)
 
     srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
index d039a8b..2ebd606 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
@@ -84,7 +84,7 @@ under the License.
 
 	<build>
 		<plugins>
-			<!-- get default data from flink-java-examples pipeline -->
+			<!-- get default data from flink-java-examples package -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-dependency-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
index 188ac4a..d7fbc8e 100644
--- a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
+++ b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
@@ -19,7 +19,7 @@
 /**
  * <strong>Table API</strong><br>
  *
- * This pipeline contains the generic part of the Table API. It can be used with Flink Streaming
+ * This package contains the generic part of the Table API. It can be used with Flink Streaming
  * and Flink Batch. From Scala as well as from Java.
  *
  * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
index 37c5937..e74651b 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
@@ -26,7 +26,7 @@ import scala.language.implicitConversions
 /**
  * == Table API (Scala) ==
  *
- * Importing this pipeline with:
+ * Importing this package with:
  *
  * {{{
  *   import org.apache.flink.api.scala.table._

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
index f50ca02..c5c8c94 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.table
 
 /**
- * This pipeline contains the base class of AST nodes and all the expression language AST classes.
+ * This package contains the base class of AST nodes and all the expression language AST classes.
  * Expression trees should not be manually constructed by users. They are implicitly constructed
  * from the implicit DSL conversions in
  * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
index a31ec61..bdcb22c 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api
 /**
  * == Table API ==
  *
- * This pipeline contains the generic part of the Table API. It can be used with Flink Streaming
+ * This package contains the generic part of the Table API. It can be used with Flink Streaming
  * and Flink Batch. From Scala as well as from Java.
  *
  * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
index adb9890..a598483 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.table
 
 /**
- * The operations in this pipeline are created by calling methods on [[Table]] they
+ * The operations in this package are created by calling methods on [[Table]] they
  * should not be manually created by users of the API.
  */
 package object plan

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
index 155a17e..a1bc4b7 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
@@ -18,6 +18,6 @@
 package org.apache.flink.api.table
 
 /**
- * The functions in this pipeline are used transforming Table API operations to Java API operations.
+ * The functions in this package are used transforming Table API operations to Java API operations.
  */
 package object runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
index fb80798..3b2fb7f 100644
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
+++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
@@ -150,7 +150,7 @@ public class TachyonFileSystemWrapperTest {
 		}
 	}
 
-	// pipeline visible
+	// package visible
 	static final class DopOneTestEnvironment extends LocalEnvironment {
 	 	static {
     		initializeContextEnvironment(new ExecutionEnvironmentFactory() {

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
index 5c30785..d61f80e 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
@@ -86,7 +86,7 @@ public class TPCHQuery3 {
 					}
 				});
 
-		// Join customers with orders and pipeline them into a ShippingPriorityItem
+		// Join customers with orders and package them into a ShippingPriorityItem
 		DataSet<ShippingPriorityItem> customerWithOrders =
 				customers.join(orders).where(0).equalTo(1)
 						.with(

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/assembly/test-custominput-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-custominput-assembly.xml b/flink-tests/src/test/assembly/test-custominput-assembly.xml
index 18adc47..e6f3568 100644
--- a/flink-tests/src/test/assembly/test-custominput-assembly.xml
+++ b/flink-tests/src/test/assembly/test-custominput-assembly.xml
@@ -28,7 +28,7 @@ under the License.
 		<fileSet>
 			<directory>${project.build.testOutputDirectory}</directory>
 			<outputDirectory>/</outputDirectory>
-			<!--modify/add include to match your pipeline(s) -->
+			<!--modify/add include to match your package(s) -->
 			<includes>
 				<include>org/apache/flink/test/classloading/jar/CustomInputSplitProgram.class</include>
 				<include>org/apache/flink/test/classloading/jar/CustomInputSplitProgram$*.class</include>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/assembly/test-kmeans-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-kmeans-assembly.xml b/flink-tests/src/test/assembly/test-kmeans-assembly.xml
index 3c547fb..a8d34ab 100644
--- a/flink-tests/src/test/assembly/test-kmeans-assembly.xml
+++ b/flink-tests/src/test/assembly/test-kmeans-assembly.xml
@@ -28,7 +28,7 @@ under the License.
 		<fileSet>
 			<directory>${project.build.testOutputDirectory}</directory>
 			<outputDirectory>/</outputDirectory>
-			<!--modify/add include to match your pipeline(s) -->
+			<!--modify/add include to match your package(s) -->
 			<includes>
 				<include>org/apache/flink/test/classloading/jar/KMeansForTest.class</include>
 				<include>org/apache/flink/test/classloading/jar/KMeansForTest$*.class</include>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml b/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml
index b311700..8321b21 100644
--- a/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml
+++ b/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml
@@ -28,7 +28,7 @@ under the License.
 		<fileSet>
 			<directory>${project.build.testOutputDirectory}</directory>
 			<outputDirectory>/</outputDirectory>
-			<!--modify/add include to match your pipeline(s) -->
+			<!--modify/add include to match your package(s) -->
 			<includes>
 				<include>org/apache/flink/test/classloading/jar/StreamingProgram.class</include>
 				<include>org/apache/flink/test/classloading/jar/StreamingProgram$*.class</include>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
index b3734a8..349275c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
@@ -92,7 +92,7 @@ public class TPCHQuery10ITCase extends RecordAPITestBase {
 		+ "32|2743|7744|4|4|6582.96|0.09|0.03|R|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n"
 		+ "32|85811|8320|5|44|79059.64|0.05|0.06|R|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n"
 		+ "32|11615|4117|6|6|9159.66|0.04|0.03|R|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n"
-		+ "33|61336|8855|1|31|40217.23|0.09|0.04|R|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic pipeline|\n"
+		+ "33|61336|8855|1|31|40217.23|0.09|0.04|R|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n"
 		+ "33|60519|5532|2|32|47344.32|0.02|0.05|R|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n"
 		+ "33|137469|9983|3|5|7532.30|0.05|0.03|R|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n"
 		+ "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n"

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java
index 7dc76b5..a0236c2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java
@@ -85,7 +85,7 @@ public class TPCHQuery3ITCase extends RecordAPITestBase {
 		+ "32|2743|7744|4|4|6582.96|0.09|0.03|N|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n"
 		+ "32|85811|8320|5|44|79059.64|0.05|0.06|N|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n"
 		+ "32|11615|4117|6|6|9159.66|0.04|0.03|N|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n"
-		+ "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic pipeline|\n"
+		+ "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n"
 		+ "33|60519|5532|2|32|47344.32|0.02|0.05|A|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n"
 		+ "33|137469|9983|3|5|7532.30|0.05|0.03|A|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n"
 		+ "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n"


Mime
View raw message