http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
index 5c6de55..d8efdaf 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.common.operators.Order
import org.apache.flink.core.memory.{DataOutputView, DataInputView}
import org.apache.flink.ml.common._
-import org.apache.flink.ml.recommendation.ALS.Factors
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
import org.apache.flink.types.Value
import org.apache.flink.util.Collector
import org.apache.flink.api.common.functions.{Partitioner => FlinkPartitioner, GroupReduceFunction, CoGroupFunction}
@@ -73,55 +73,58 @@ import scala.util.Random
* .setIterations(10)
* .setNumFactors(10)
*
- * val model = als.fit(inputDS)
+ * als.fit(inputDS)
*
* val data2Predict: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData)
*
- * model.transform(data2Predict)
+ * als.predict(data2Predict)
* }}}
*
* =Parameters=
*
- * - [[ALS.NumFactors]]:
+ * - [[org.apache.flink.ml.recommendation.ALS.NumFactors]]:
* The number of latent factors. It is the dimension of the calculated user and item vectors.
* (Default value: '''10''')
*
- * - [[ALS.Lambda]]:
+ * - [[org.apache.flink.ml.recommendation.ALS.Lambda]]:
* Regularization factor. Tune this value in order to avoid overfitting/generalization.
* (Default value: '''1''')
*
- * - [[ALS.Iterations]]: The number of iterations to perform. (Default value: '''10''')
+ * - [[org.apache.flink.ml.regression.MultipleLinearRegression.Iterations]]:
+ * The number of iterations to perform. (Default value: '''10''')
*
- * - [[ALS.Blocks]]:
+ * - [[org.apache.flink.ml.recommendation.ALS.Blocks]]:
* The number of blocks into which the user and item matrix a grouped. The fewer
* blocks one uses, the less data is sent redundantly. However, bigger blocks entail bigger
* update messages which have to be stored on the Heap. If the algorithm fails because of
* an OutOfMemoryException, then try to increase the number of blocks. (Default value: '''None''')
*
- * - [[ALS.Seed]]:
+ * - [[org.apache.flink.ml.recommendation.ALS.Seed]]:
* Random seed used to generate the initial item matrix for the algorithm.
* (Default value: '''0''')
*
- * - [[ALS.TemporaryPath]]:
+ * - [[org.apache.flink.ml.recommendation.ALS.TemporaryPath]]:
* Path to a temporary directory into which intermediate results are stored. If
* this value is set, then the algorithm is split into two preprocessing steps, the ALS iteration
* and a post-processing step which calculates a last ALS half-step. The preprocessing steps
- * calculate the [[org.apache.flink.ml.recommendation.ALS.OutBlockInformation]] and [[org.apache
- * .flink.ml.recommendation.ALS.InBlockInformation]] for the given rating matrix. The result of
- * the individual steps are stored in the specified directory. By splitting the algorithm
- * into multiple smaller steps, Flink does not have to split the available memory amongst too many
- * operators. This allows the system to process bigger individual messasges and improves the
- * overall performance. (Default value: '''None''')
+ * calculate the [[org.apache.flink.ml.recommendation.ALS.OutBlockInformation]] and
+ * [[org.apache.flink.ml.recommendation.ALS.InBlockInformation]] for the given rating matrix.
+ * The result of the individual steps are stored in the specified directory. By splitting the
+ * algorithm into multiple smaller steps, Flink does not have to split the available memory
+ * amongst too many operators. This allows the system to process bigger individual messasges and
+ * improves the overall performance. (Default value: '''None''')
*
* The ALS implementation is based on Spark's MLLib implementation of ALS which you can find
* [[https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/
* recommendation/ALS.scala here]].
*/
-class
-ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
+class ALS extends Predictor[ALS] {
import ALS._
+ // Stores the matrix factorization after the fitting phase
+ var factorsOption: Option[(DataSet[Factors], DataSet[Factors])] = None
+
/** Sets the number of latent factors/row dimension of the latent model
*
* @param numFactors
@@ -183,91 +186,334 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
this
}
- /** Calculates the matrix factorization for the given ratings. A rating is defined as
- * a tuple of user ID, item ID and the corresponding rating.
+ /** Empirical risk of the trained model (matrix factorization).
*
- * @param input Set of user/item ratings for which the factorization has to be calculated
- * @return Factorization containing the user and item matrix
+ * @param labeledData Reference data
+ * @param riskParameters Additional parameters for the empirical risk calculation
+ * @return
*/
- def fit(input: DataSet[(Int, Int, Double)], fitParameters: ParameterMap): ALSModel = {
- val resultParameters = this.parameters ++ fitParameters
-
- val userBlocks = resultParameters.get(Blocks).getOrElse(input.count.toInt)
- val itemBlocks = userBlocks
- val persistencePath = resultParameters.get(TemporaryPath)
- val seed = resultParameters(Seed)
- val factors = resultParameters(NumFactors)
- val iterations = resultParameters(Iterations)
- val lambda = resultParameters(Lambda)
-
- val ratings = input.map {
- entry => {
- val (userID, itemID, rating) = entry
- Rating(userID, itemID, rating)
+ def empiricalRisk(
+ labeledData: DataSet[(Int, Int, Double)],
+ riskParameters: ParameterMap = ParameterMap.Empty)
+ : DataSet[Double] = {
+ val resultingParameters = parameters ++ riskParameters
+
+ val lambda = resultingParameters(Lambda)
+
+ val data = labeledData map {
+ x => (x._1, x._2)
+ }
+
+ factorsOption match {
+ case Some((userFactors, itemFactors)) => {
+ val predictions = data.join(userFactors).where(0).equalTo(0)
+ .join(itemFactors).where("_1._2").equalTo(0).map {
+ triple => {
+ val (((uID, iID), uFactors), iFactors) = triple
+
+ val uFactorsVector = uFactors.factors
+ val iFactorsVector = iFactors.factors
+
+ val squaredUNorm2 = blas.ddot(
+ uFactorsVector.length,
+ uFactorsVector,
+ 1,
+ uFactorsVector,
+ 1)
+ val squaredINorm2 = blas.ddot(
+ iFactorsVector.length,
+ iFactorsVector,
+ 1,
+ iFactorsVector,
+ 1)
+
+ val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1)
+
+ (uID, iID, prediction, squaredUNorm2, squaredINorm2)
+ }
+ }
+
+ labeledData.join(predictions).where(0,1).equalTo(0,1) {
+ (left, right) => {
+ val (_, _, expected) = left
+ val (_, _, predicted, squaredUNorm2, squaredINorm2) = right
+
+ val residual = expected - predicted
+
+ residual * residual + lambda * (squaredUNorm2 + squaredINorm2)
+ }
+ } reduce {
+ _ + _
+ }
}
+
+ case None => throw new RuntimeException("The ALS model has not been fitted to data. " +
+ "Prior to predicting values, it has to be trained on data.")
}
+ }
+}
- val blockIDPartitioner = new BlockIDPartitioner()
+object ALS {
+ val USER_FACTORS_FILE = "userFactorsFile"
+ val ITEM_FACTORS_FILE = "itemFactorsFile"
- val ratingsByUserBlock = ratings.map{
- rating =>
- val blockID = rating.user % userBlocks
- (blockID, rating)
- } partitionCustom(blockIDPartitioner, 0)
+ // ========================================= Parameters ==========================================
- val ratingsByItemBlock = ratings map {
- rating =>
- val blockID = rating.item % itemBlocks
- (blockID, new Rating(rating.item, rating.user, rating.rating))
- } partitionCustom(blockIDPartitioner, 0)
+ case object NumFactors extends Parameter[Int] {
+ val defaultValue: Option[Int] = Some(10)
+ }
- val (uIn, uOut) = createBlockInformation(userBlocks, itemBlocks, ratingsByUserBlock,
- blockIDPartitioner)
- val (iIn, iOut) = createBlockInformation(itemBlocks, userBlocks, ratingsByItemBlock,
- blockIDPartitioner)
+ case object Lambda extends Parameter[Double] {
+ val defaultValue: Option[Double] = Some(1.0)
+ }
+
+ case object Iterations extends Parameter[Int] {
+ val defaultValue: Option[Int] = Some(10)
+ }
+
+ case object Blocks extends Parameter[Int] {
+ val defaultValue: Option[Int] = None
+ }
+
+ case object Seed extends Parameter[Long] {
+ val defaultValue: Option[Long] = Some(0L)
+ }
- val (userIn, userOut) = persistencePath match {
- case Some(path) => FlinkTools.persist(uIn, uOut, path + "userIn", path + "userOut")
- case None => (uIn, uOut)
+ case object TemporaryPath extends Parameter[String] {
+ val defaultValue: Option[String] = None
+ }
+
+ // ==================================== ALS type definitions =====================================
+
+ /** Representation of a user-item rating
+ *
+ * @param user User ID of the rating user
+ * @param item Item iD of the rated item
+ * @param rating Rating value
+ */
+ case class Rating(user: Int, item: Int, rating: Double)
+
+ /** Latent factor model vector
+ *
+ * @param id
+ * @param factors
+ */
+ case class Factors(id: Int, factors: Array[Double]) {
+ override def toString = s"($id, ${factors.mkString(",")})"
+ }
+
+ case class Factorization(userFactors: DataSet[Factors], itemFactors: DataSet[Factors])
+
+ case class OutBlockInformation(elementIDs: Array[Int], outLinks: OutLinks) {
+ override def toString: String = {
+ s"OutBlockInformation:((${elementIDs.mkString(",")}), ($outLinks))"
}
+ }
+
+ class OutLinks(var links: Array[scala.collection.mutable.BitSet]) extends Value {
+ def this() = this(null)
- val (itemIn, itemOut) = persistencePath match {
- case Some(path) => FlinkTools.persist(iIn, iOut, path + "itemIn", path + "itemOut")
- case None => (iIn, iOut)
+ override def toString: String = {
+ s"${links.mkString("\n")}"
}
- val initialItems = itemOut.partitionCustom(blockIDPartitioner, 0).map{
- outInfos =>
- val blockID = outInfos._1
- val infos = outInfos._2
+ override def write(out: DataOutputView): Unit = {
+ out.writeInt(links.length)
+ links foreach {
+ link => {
+ val bitMask = link.toBitMask
+ out.writeInt(bitMask.length)
+ for (element <- bitMask) {
+ out.writeLong(element)
+ }
+ }
+ }
+ }
- (blockID, infos.elementIDs.map{
- id =>
- val random = new Random(id ^ seed)
- randomFactors(factors, random)
- })
- }.withForwardedFields("0")
+ override def read(in: DataInputView): Unit = {
+ val length = in.readInt()
+ links = new Array[scala.collection.mutable.BitSet](length)
- // iteration to calculate the item matrix
- val items = initialItems.iterate(iterations) {
- items => {
- val users = updateFactors(userBlocks, items, itemOut, userIn, factors, lambda,
- blockIDPartitioner)
- updateFactors(itemBlocks, users, userOut, itemIn, factors, lambda, blockIDPartitioner)
+ for (i <- 0 until length) {
+ val bitMaskLength = in.readInt()
+ val bitMask = new Array[Long](bitMaskLength)
+ for (j <- 0 until bitMaskLength) {
+ bitMask(j) = in.readLong()
+ }
+ links(i) = mutable.BitSet.fromBitMask(bitMask)
}
}
- val pItems = persistencePath match {
- case Some(path) => FlinkTools.persist(items, path + "items")
- case None => items
+ def apply(idx: Int) = links(idx)
+ }
+
+ case class InBlockInformation(elementIDs: Array[Int], ratingsForBlock: Array[BlockRating]) {
+
+ override def toString: String = {
+ s"InBlockInformation:((${elementIDs.mkString(",")}), (${ratingsForBlock.mkString("\n")}))"
+ }
+ }
+
+ case class BlockRating(var ratings: Array[(Array[Int], Array[Double])]) {
+ def apply(idx: Int) = ratings(idx)
+
+ override def toString: String = {
+ ratings.map {
+ case (left, right) => s"((${left.mkString(",")}),(${right.mkString(",")}))"
+ }.mkString(",")
+ }
+ }
+
+ case class BlockedFactorization(userFactors: DataSet[(Int, Array[Array[Double]])],
+ itemFactors: DataSet[(Int, Array[Array[Double]])])
+
+ class BlockIDPartitioner extends FlinkPartitioner[Int] {
+ override def partition(blockID: Int, numberOfPartitions: Int): Int = {
+ blockID % numberOfPartitions
}
+ }
+
+ class BlockIDGenerator(blocks: Int) extends Serializable {
+ def apply(id: Int): Int = {
+ id % blocks
+ }
+ }
+
+ // ================================= Factory methods =============================================
+
+ def apply(): ALS = {
+ new ALS()
+ }
+
+ // ===================================== Operations ==============================================
+
+ /** Predict operation which calculates the matrix entry for the given indices */
+ implicit val predictRating = new PredictOperation[ALS, (Int, Int), (Int ,Int, Double)] {
+ override def predict(
+ instance: ALS,
+ predictParameters: ParameterMap,
+ input: DataSet[(Int, Int)])
+ : DataSet[(Int, Int, Double)] = {
+
+ instance.factorsOption match {
+ case Some((userFactors, itemFactors)) => {
+ input.join(userFactors).where(0).equalTo(0)
+ .join(itemFactors).where("_1._2").equalTo(0).map {
+ triple => {
+ val (((uID, iID), uFactors), iFactors) = triple
+
+ val uFactorsVector = uFactors.factors
+ val iFactorsVector = iFactors.factors
+
+ val prediction = blas.ddot(
+ uFactorsVector.length,
+ uFactorsVector,
+ 1,
+ iFactorsVector,
+ 1)
+
+ (uID, iID, prediction)
+ }
+ }
+ }
+
+ case None => throw new RuntimeException("The ALS model has not been fitted to data. " +
+ "Prior to predicting values, it has to be trained on data.")
+ }
+ }
+ }
+
+ /** Calculates the matrix factorization for the given ratings. A rating is defined as
+ * a tuple of user ID, item ID and the corresponding rating.
+ *
+ * @return Factorization containing the user and item matrix
+ */
+ implicit val fitALS = new FitOperation[ALS, (Int, Int, Double)] {
+ override def fit(
+ instance: ALS,
+ fitParameters: ParameterMap,
+ input: DataSet[(Int, Int, Double)])
+ : Unit = {
+ val resultParameters = instance.parameters ++ fitParameters
+
+ val userBlocks = resultParameters.get(Blocks).getOrElse(input.count.toInt)
+ val itemBlocks = userBlocks
+ val persistencePath = resultParameters.get(TemporaryPath)
+ val seed = resultParameters(Seed)
+ val factors = resultParameters(NumFactors)
+ val iterations = resultParameters(Iterations)
+ val lambda = resultParameters(Lambda)
+
+ val ratings = input.map {
+ entry => {
+ val (userID, itemID, rating) = entry
+ Rating(userID, itemID, rating)
+ }
+ }
+
+ val blockIDPartitioner = new BlockIDPartitioner()
+
+ val ratingsByUserBlock = ratings.map{
+ rating =>
+ val blockID = rating.user % userBlocks
+ (blockID, rating)
+ } partitionCustom(blockIDPartitioner, 0)
+
+ val ratingsByItemBlock = ratings map {
+ rating =>
+ val blockID = rating.item % itemBlocks
+ (blockID, new Rating(rating.item, rating.user, rating.rating))
+ } partitionCustom(blockIDPartitioner, 0)
+
+ val (uIn, uOut) = createBlockInformation(userBlocks, itemBlocks, ratingsByUserBlock,
+ blockIDPartitioner)
+ val (iIn, iOut) = createBlockInformation(itemBlocks, userBlocks, ratingsByItemBlock,
+ blockIDPartitioner)
+
+ val (userIn, userOut) = persistencePath match {
+ case Some(path) => FlinkTools.persist(uIn, uOut, path + "userIn", path + "userOut")
+ case None => (uIn, uOut)
+ }
+
+ val (itemIn, itemOut) = persistencePath match {
+ case Some(path) => FlinkTools.persist(iIn, iOut, path + "itemIn", path + "itemOut")
+ case None => (iIn, iOut)
+ }
- // perform last half-step to calculate the user matrix
- val users = updateFactors(userBlocks, pItems, itemOut, userIn, factors, lambda,
- blockIDPartitioner)
+ val initialItems = itemOut.partitionCustom(blockIDPartitioner, 0).map{
+ outInfos =>
+ val blockID = outInfos._1
+ val infos = outInfos._2
+
+ (blockID, infos.elementIDs.map{
+ id =>
+ val random = new Random(id ^ seed)
+ randomFactors(factors, random)
+ })
+ }.withForwardedFields("0")
+
+ // iteration to calculate the item matrix
+ val items = initialItems.iterate(iterations) {
+ items => {
+ val users = updateFactors(userBlocks, items, itemOut, userIn, factors, lambda,
+ blockIDPartitioner)
+ updateFactors(itemBlocks, users, userOut, itemIn, factors, lambda, blockIDPartitioner)
+ }
+ }
+
+ val pItems = persistencePath match {
+ case Some(path) => FlinkTools.persist(items, path + "items")
+ case None => items
+ }
+
+ // perform last half-step to calculate the user matrix
+ val users = updateFactors(userBlocks, pItems, itemOut, userIn, factors, lambda,
+ blockIDPartitioner)
- new ALSModel(unblock(users, userOut, blockIDPartitioner), unblock(pItems, itemOut,
- blockIDPartitioner), lambda)
+ instance.factorsOption = Some((
+ unblock(users, userOut, blockIDPartitioner),
+ unblock(pItems, itemOut, blockIDPartitioner)))
+ }
}
/** Calculates a single half step of the ALS optimization. The result is the new value for
@@ -283,11 +529,11 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
* @return New value for the optimized matrix (either user or item)
*/
def updateFactors(numUserBlocks: Int,
- items: DataSet[(Int, Array[Array[Double]])],
- itemOut: DataSet[(Int, OutBlockInformation)],
- userIn: DataSet[(Int, InBlockInformation)],
- factors: Int,
- lambda: Double, blockIDPartitioner: FlinkPartitioner[Int]):
+ items: DataSet[(Int, Array[Array[Double]])],
+ itemOut: DataSet[(Int, OutBlockInformation)],
+ userIn: DataSet[(Int, InBlockInformation)],
+ factors: Int,
+ lambda: Double, blockIDPartitioner: FlinkPartitioner[Int]):
DataSet[(Int, Array[Array[Double]])] = {
// send the item vectors to the blocks whose users have rated the items
val partialBlockMsgs = itemOut.join(items).where(0).equalTo(0).
@@ -334,8 +580,8 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
val numRatings = new ArrayBuffer[Int]()
override def coGroup(left: lang.Iterable[(Int, Int, Array[Array[Double]])],
- right: lang.Iterable[(Int, InBlockInformation)],
- collector: Collector[(Int, Array[Array[Double]])]): Unit = {
+ right: lang.Iterable[(Int, InBlockInformation)],
+ collector: Collector[(Int, Array[Array[Double]])]): Unit = {
// there is only one InBlockInformation per user block
val inInfo = right.iterator().next()._2
val updates = left.iterator()
@@ -439,7 +685,7 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
* @return
*/
def createBlockInformation(userBlocks: Int, itemBlocks: Int, ratings: DataSet[(Int, Rating)],
- blockIDPartitioner: BlockIDPartitioner):
+ blockIDPartitioner: BlockIDPartitioner):
(DataSet[(Int, InBlockInformation)], DataSet[(Int, OutBlockInformation)]) = {
val blockIDGenerator = new BlockIDGenerator(itemBlocks)
@@ -498,8 +744,8 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
* @return
*/
def createOutBlockInformation(ratings: DataSet[(Int, Rating)],
- usersPerBlock: DataSet[(Int, Array[Int])],
- itemBlocks: Int, blockIDGenerator: BlockIDGenerator):
+ usersPerBlock: DataSet[(Int, Array[Int])],
+ itemBlocks: Int, blockIDGenerator: BlockIDGenerator):
DataSet[(Int, OutBlockInformation)] = {
ratings.coGroup(usersPerBlock).where(0).equalTo(0).apply {
(ratings, users) =>
@@ -547,8 +793,8 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
* @return
*/
def createInBlockInformation(ratings: DataSet[(Int, Rating)],
- usersPerBlock: DataSet[(Int, Array[Int])],
- blockIDGenerator: BlockIDGenerator):
+ usersPerBlock: DataSet[(Int, Array[Int])],
+ blockIDGenerator: BlockIDGenerator):
DataSet[(Int, InBlockInformation)] = {
// Group for every user block the users which have rated the same item and collect their ratings
val partialInInfos = ratings.map { x => (x._1, x._2.item, x._2.user, x._2.rating)}
@@ -628,9 +874,9 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
val buffer = ArrayBuffer[BlockRating]()
override def coGroup(partialInfosIterable:
- lang.Iterable[(Int, Int, Array[(Array[Int], Array[Double])])],
- userIterable: lang.Iterable[(Int, Array[Int])],
- collector: Collector[(Int, InBlockInformation)]): Unit = {
+ lang.Iterable[(Int, Int, Array[(Array[Int], Array[Double])])],
+ userIterable: lang.Iterable[(Int, Array[Int])],
+ collector: Collector[(Int, InBlockInformation)]): Unit = {
val users = userIterable.iterator()
val partialInfos = partialInfosIterable.iterator()
@@ -691,8 +937,8 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
* @return
*/
def unblock(users: DataSet[(Int, Array[Array[Double]])],
- outInfo: DataSet[(Int, OutBlockInformation)],
- blockIDPartitioner: BlockIDPartitioner): DataSet[Factors] = {
+ outInfo: DataSet[(Int, OutBlockInformation)],
+ blockIDPartitioner: BlockIDPartitioner): DataSet[Factors] = {
users.join(outInfo).where(0).equalTo(0).withPartitioner(blockIDPartitioner).apply {
(left, right, col: Collector[Factors]) => {
val outInfo = right._2
@@ -725,7 +971,7 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
}
def generateFullMatrix(triangularMatrix: Array[Double], fullMatrix: Array[Double],
- factors: Int): Unit = {
+ factors: Int): Unit = {
var row = 0
var pos = 0
@@ -759,206 +1005,3 @@ ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
Array.fill(factors)(random.nextDouble())
}
}
-
-object ALS {
- val USER_FACTORS_FILE = "userFactorsFile"
- val ITEM_FACTORS_FILE = "itemFactorsFile"
-
- case object NumFactors extends Parameter[Int] {
- val defaultValue: Option[Int] = Some(10)
- }
-
- case object Lambda extends Parameter[Double] {
- val defaultValue: Option[Double] = Some(1.0)
- }
-
- case object Iterations extends Parameter[Int] {
- val defaultValue: Option[Int] = Some(10)
- }
-
- case object Blocks extends Parameter[Int] {
- val defaultValue: Option[Int] = None
- }
-
- case object Seed extends Parameter[Long] {
- val defaultValue: Option[Long] = Some(0L)
- }
-
- case object TemporaryPath extends Parameter[String] {
- val defaultValue: Option[String] = None
- }
-
- // ==================================== ALS type definitions =====================================
-
- /** Representation of a user-item rating
- *
- * @param user User ID of the rating user
- * @param item Item iD of the rated item
- * @param rating Rating value
- */
- case class Rating(user: Int, item: Int, rating: Double)
-
- /** Latent factor model vector
- *
- * @param id
- * @param factors
- */
- case class Factors(id: Int, factors: Array[Double]) {
- override def toString = s"($id, ${factors.mkString(",")})"
- }
-
- case class Factorization(userFactors: DataSet[Factors], itemFactors: DataSet[Factors])
-
- case class OutBlockInformation(elementIDs: Array[Int], outLinks: OutLinks) {
- override def toString: String = {
- s"OutBlockInformation:((${elementIDs.mkString(",")}), ($outLinks))"
- }
- }
-
- class OutLinks(var links: Array[scala.collection.mutable.BitSet]) extends Value {
- def this() = this(null)
-
- override def toString: String = {
- s"${links.mkString("\n")}"
- }
-
- override def write(out: DataOutputView): Unit = {
- out.writeInt(links.length)
- links foreach {
- link => {
- val bitMask = link.toBitMask
- out.writeInt(bitMask.length)
- for (element <- bitMask) {
- out.writeLong(element)
- }
- }
- }
- }
-
- override def read(in: DataInputView): Unit = {
- val length = in.readInt()
- links = new Array[scala.collection.mutable.BitSet](length)
-
- for (i <- 0 until length) {
- val bitMaskLength = in.readInt()
- val bitMask = new Array[Long](bitMaskLength)
- for (j <- 0 until bitMaskLength) {
- bitMask(j) = in.readLong()
- }
- links(i) = mutable.BitSet.fromBitMask(bitMask)
- }
- }
-
- def apply(idx: Int) = links(idx)
- }
-
- case class InBlockInformation(elementIDs: Array[Int], ratingsForBlock: Array[BlockRating]) {
-
- override def toString: String = {
- s"InBlockInformation:((${elementIDs.mkString(",")}), (${ratingsForBlock.mkString("\n")}))"
- }
- }
-
- case class BlockRating(var ratings: Array[(Array[Int], Array[Double])]) {
- def apply(idx: Int) = ratings(idx)
-
- override def toString: String = {
- ratings.map {
- case (left, right) => s"((${left.mkString(",")}),(${right.mkString(",")}))"
- }.mkString(",")
- }
- }
-
- case class BlockedFactorization(userFactors: DataSet[(Int, Array[Array[Double]])],
- itemFactors: DataSet[(Int, Array[Array[Double]])])
-
- class BlockIDPartitioner extends FlinkPartitioner[Int] {
- override def partition(blockID: Int, numberOfPartitions: Int): Int = {
- blockID % numberOfPartitions
- }
- }
-
- class BlockIDGenerator(blocks: Int) extends Serializable {
- def apply(id: Int): Int = {
- id % blocks
- }
- }
-
- // ========================= Factory methods =====================================
-
- def apply(): ALS = {
- new ALS()
- }
-}
-
-/** Resulting model of the ALS algorithm.
- *
- * It contains the calculated factors, user and item matrix, of the given
- * ratings matrix. Additionally it stores the used regularization value lambda in order to
- * calculate the empirical risk of the model.
- *
- * @param userFactors Calculated user matrix
- * @param itemFactors Calcualted item matrix
- * @param lambda Regularization value used to calculate the model
- */
-class ALSModel(
- @transient val userFactors: DataSet[Factors],
- @transient val itemFactors: DataSet[Factors],
- val lambda: Double)
- extends Transformer[(Int, Int), (Int, Int, Double)]
- with Serializable {
-
- override def transform(input: DataSet[(Int, Int)], parameters: ParameterMap): DataSet[(Int,
- Int, Double)] = {
-
- input.join(userFactors).where(0).equalTo(0)
- .join(itemFactors).where("_1._2").equalTo(0).map {
- triple => {
- val (((uID, iID), uFactors), iFactors) = triple
-
- val uFactorsVector = uFactors.factors
- val iFactorsVector = iFactors.factors
-
- val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1)
-
- (uID, iID, prediction)
- }
- }
- }
-
- def empiricalRisk(labeledData: DataSet[(Int, Int, Double)]): DataSet[Double] = {
- val data = labeledData map {
- x => (x._1, x._2)
- }
-
- val predictions = data.join(userFactors).where(0).equalTo(0)
- .join(itemFactors).where("_1._2").equalTo(0).map {
- triple => {
- val (((uID, iID), uFactors), iFactors) = triple
-
- val uFactorsVector = uFactors.factors
- val iFactorsVector = iFactors.factors
-
- val squaredUNorm2 = blas.ddot(uFactorsVector.length, uFactorsVector, 1, uFactorsVector, 1)
- val squaredINorm2 = blas.ddot(iFactorsVector.length, iFactorsVector, 1, iFactorsVector, 1)
-
- val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1)
-
- (uID, iID, prediction, squaredUNorm2, squaredINorm2)
- }
- }
-
- labeledData.join(predictions).where(0,1).equalTo(0,1) {
- (left, right) => {
- val (_, _, expected) = left
- val (_, _, predicted, squaredUNorm2, squaredINorm2) = right
-
- val residual = expected - predicted
-
- residual * residual + lambda * (squaredUNorm2 + squaredINorm2)
- }
- } reduce {
- _ + _
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
index 87352fa..64b24dc 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
@@ -29,6 +29,7 @@ import org.apache.flink.ml.math.vector2Array
import org.apache.flink.api.scala._
import com.github.fommil.netlib.BLAS.{ getInstance => blas }
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
/** Multiple linear regression using the ordinary least squares (OLS) estimator.
*
@@ -67,30 +68,33 @@ import com.github.fommil.netlib.BLAS.{ getInstance => blas }
* val trainingDS: DataSet[LabeledVector] = ...
* val testingDS: DataSet[Vector] = ...
*
- * val model = mlr.fit(trainingDS)
+ * mlr.fit(trainingDS)
*
- * val predictions = model.transform(testingDS)
+ * val predictions = mlr.predict(testingDS)
* }}}
*
* =Parameters=
*
- * - [[MultipleLinearRegression.Iterations]]: Maximum number of iterations.
+ * - [[org.apache.flink.ml.regression.MultipleLinearRegression.Iterations]]:
+ * Maximum number of iterations.
*
- * - [[MultipleLinearRegression.Stepsize]]:
+ * - [[org.apache.flink.ml.regression.MultipleLinearRegression.Stepsize]]:
* Initial step size for the gradient descent method.
* This value controls how far the gradient descent method moves in the opposite direction of the
* gradient. Tuning this parameter might be crucial to make it stable and to obtain a better
* performance.
*
- * - [[MultipleLinearRegression.ConvergenceThreshold]]:
+ * - [[org.apache.flink.ml.regression.MultipleLinearRegression.ConvergenceThreshold]]:
* Threshold for relative change of sum of squared residuals until convergence.
*
*/
-class
-MultipleLinearRegression extends Learner[LabeledVector, MultipleLinearRegressionModel]
-with Serializable {
+class MultipleLinearRegression extends Predictor[MultipleLinearRegression] {
+
import MultipleLinearRegression._
+ // Stores the weights of the linear model after the fitting phase
+ var weightsOption: Option[DataSet[(Array[Double], Double)]] = None
+
def setIterations(iterations: Int): MultipleLinearRegression = {
parameters.add(Iterations, iterations)
this
@@ -106,116 +110,174 @@ with Serializable {
this
}
- override def fit(input: DataSet[LabeledVector], fitParameters: ParameterMap):
- MultipleLinearRegressionModel = {
- val map = this.parameters ++ fitParameters
-
- // retrieve parameters of the algorithm
- val numberOfIterations = map(Iterations)
- val stepsize = map(Stepsize)
- val convergenceThreshold = map.get(ConvergenceThreshold)
+ def squaredResidualSum(input: DataSet[LabeledVector]): DataSet[Double] = {
+ weightsOption match {
+ case Some(weights) => {
+ input.map {
+ new SquaredResiduals
+ }.withBroadcastSet(weights, WEIGHTVECTOR_BROADCAST).reduce {
+ _ + _
+ }
+ }
- // calculate dimension of the feature vectors
- val dimension = input.map{_.vector.size}.reduce {
- (a, b) =>
- require(a == b, "All input vector must have the same dimension.")
- a
+ case None => {
+ throw new RuntimeException("The MultipleLinearRegression has not been fitted to the " +
+ "data. This is necessary to learn the weight vector of the linear function.")
+ }
}
- // initial weight vector is set to 0
- val initialWeightVector = createInitialWeightVector(dimension)
+ }
+}
+
+object MultipleLinearRegression {
+ val WEIGHTVECTOR_BROADCAST = "weights_broadcast"
- // check if a convergence threshold has been set
- val resultingWeightVector = convergenceThreshold match {
- case Some(convergence) =>
+ // ====================================== Parameters =============================================
- // we have to calculate for each weight vector the sum of squared residuals
- val initialSquaredResidualSum = input.map {
- new SquaredResiduals
- }.withBroadcastSet(initialWeightVector, WEIGHTVECTOR_BROADCAST).reduce {
- _ + _
- }
+ case object Stepsize extends Parameter[Double] {
+ val defaultValue = Some(0.1)
+ }
+
+ case object Iterations extends Parameter[Int] {
+ val defaultValue = Some(10)
+ }
+
+ case object ConvergenceThreshold extends Parameter[Double] {
+ val defaultValue = None
+ }
+
+ // ======================================== Factory methods ======================================
+
+ def apply(): MultipleLinearRegression = {
+ new MultipleLinearRegression()
+ }
+
+ // ====================================== Operations =============================================
- // combine weight vector with current sum of squared residuals
- val initialWeightVectorWithSquaredResidualSum = initialWeightVector.
- crossWithTiny(initialSquaredResidualSum).setParallelism(1)
+ /** Trains the linear model to fit the training data. The resulting weight vector is stored in
+ * the [[MultipleLinearRegression]] instance.
+ *
+ */
+ implicit val fitMLR = new FitOperation[MultipleLinearRegression, LabeledVector] {
+ override def fit(
+ instance: MultipleLinearRegression,
+ fitParameters: ParameterMap,
+ input: DataSet[LabeledVector])
+ : Unit = {
+ val map = instance.parameters ++ fitParameters
+
+ // retrieve parameters of the algorithm
+ val numberOfIterations = map(Iterations)
+ val stepsize = map(Stepsize)
+ val convergenceThreshold = map.get(ConvergenceThreshold)
+
+ // calculate dimension of the feature vectors
+ val dimension = input.map{_.vector.size}.reduce {
+ (a, b) =>
+ require(a == b, "All input vector must have the same dimension.")
+ a
+ }
+
+ input.flatMap{
+ t =>
+ Seq(t)
+ }
+
+ // initial weight vector is set to 0
+ val initialWeightVector = createInitialWeightVector(dimension)
+
+ // check if a convergence threshold has been set
+ val resultingWeightVector = convergenceThreshold match {
+ case Some(convergence) =>
+
+ // we have to calculate for each weight vector the sum of squared residuals
+ val initialSquaredResidualSum = input.map {
+ new SquaredResiduals
+ }.withBroadcastSet(initialWeightVector, WEIGHTVECTOR_BROADCAST).reduce {
+ _ + _
+ }
+
+ // combine weight vector with current sum of squared residuals
+ val initialWeightVectorWithSquaredResidualSum = initialWeightVector.
+ crossWithTiny(initialSquaredResidualSum).setParallelism(1)
- // start SGD iteration
- val resultWithResidual = initialWeightVectorWithSquaredResidualSum.
- iterateWithTermination(numberOfIterations) {
- weightVectorSquaredResidualDS =>
+ // start SGD iteration
+ val resultWithResidual = initialWeightVectorWithSquaredResidualSum.
+ iterateWithTermination(numberOfIterations) {
+ weightVectorSquaredResidualDS =>
- // extract weight vector and squared residual sum
- val weightVector = weightVectorSquaredResidualDS.map{_._1}
- val squaredResidualSum = weightVectorSquaredResidualDS.map{_._2}
+ // extract weight vector and squared residual sum
+ val weightVector = weightVectorSquaredResidualDS.map{_._1}
+ val squaredResidualSum = weightVectorSquaredResidualDS.map{_._2}
- // TODO: Sample from input to realize proper SGD
- val newWeightVector = input.map {
- new LinearRegressionGradientDescent
- }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST).reduce {
- (left, right) =>
+ // TODO: Sample from input to realize proper SGD
+ val newWeightVector = input.map {
+ new LinearRegressionGradientDescent
+ }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST).reduce {
+ (left, right) =>
val (leftBetas, leftBeta0, leftCount) = left
val (rightBetas, rightBeta0, rightCount) = right
blas.daxpy(leftBetas.length, 1.0, rightBetas, 1, leftBetas, 1)
(leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount)
- }.map {
- new LinearRegressionWeightsUpdate(stepsize)
- }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST)
-
- // calculate the sum of squared residuals for the new weight vector
- val newResidual = input.map {
- new SquaredResiduals
- }.withBroadcastSet(newWeightVector, WEIGHTVECTOR_BROADCAST).reduce {
- _ + _
- }
+ }.map {
+ new LinearRegressionWeightsUpdate(stepsize)
+ }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST)
+
+ // calculate the sum of squared residuals for the new weight vector
+ val newResidual = input.map {
+ new SquaredResiduals
+ }.withBroadcastSet(newWeightVector, WEIGHTVECTOR_BROADCAST).reduce {
+ _ + _
+ }
- // check if the relative change in the squared residual sum is smaller than the
- // convergence threshold. If yes, then terminate => return empty termination data set
- val termination = squaredResidualSum.crossWithTiny(newResidual).setParallelism(1).
- filter{
- pair => {
- val (residual, newResidual) = pair
-
- if (residual <= 0) {
- false
- } else {
- math.abs((residual - newResidual)/residual) >= convergence
+ // check if the relative change in the squared residual sum is smaller than the
+ // convergence threshold. If yes, then terminate => return empty termination data set
+ val termination = squaredResidualSum.crossWithTiny(newResidual).setParallelism(1).
+ filter{
+ pair => {
+ val (residual, newResidual) = pair
+
+ if (residual <= 0) {
+ false
+ } else {
+ math.abs((residual - newResidual)/residual) >= convergence
+ }
}
}
- }
- // result for new iteration
- (newWeightVector cross newResidual, termination)
- }
+ // result for new iteration
+ (newWeightVector cross newResidual, termination)
+ }
+
+ // remove squared residual sum to only return the weight vector
+ resultWithResidual.map{_._1}
+
+ case None =>
+ // No convergence criterion
+ initialWeightVector.iterate(numberOfIterations) {
+ weightVector => {
+
+ // TODO: Sample from input to realize proper SGD
+ input.map {
+ new LinearRegressionGradientDescent
+ }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST).reduce {
+ (left, right) =>
+ val (leftBetas, leftBeta0, leftCount) = left
+ val (rightBetas, rightBeta0, rightCount) = right
- // remove squared residual sum to only return the weight vector
- resultWithResidual.map{_._1}
-
- case None =>
- // No convergence criterion
- initialWeightVector.iterate(numberOfIterations) {
- weightVector => {
-
- // TODO: Sample from input to realize proper SGD
- input.map {
- new LinearRegressionGradientDescent
- }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST).reduce {
- (left, right) =>
- val (leftBetas, leftBeta0, leftCount) = left
- val (rightBetas, rightBeta0, rightCount) = right
-
- blas.daxpy(leftBetas.length, 1, rightBetas, 1, leftBetas, 1)
- (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount)
- }.map {
- new LinearRegressionWeightsUpdate(stepsize)
- }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST)
+ blas.daxpy(leftBetas.length, 1, rightBetas, 1, leftBetas, 1)
+ (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount)
+ }.map {
+ new LinearRegressionWeightsUpdate(stepsize)
+ }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST)
+ }
}
- }
- }
+ }
- new MultipleLinearRegressionModel(resultingWeightVector)
+ instance.weightsOption = Some(resultingWeightVector)
+ }
}
/** Creates a DataSet with one zero vector. The zero vector has dimension d, which is given
@@ -233,28 +295,58 @@ with Serializable {
(values, 0.0)
}
}
-}
-object MultipleLinearRegression {
- val WEIGHTVECTOR_BROADCAST = "weights_broadcast"
+ /** Calculates the predictions for new data with respect to the learned linear model.
+ *
+ * @tparam T Testing data type for which the prediction is calculated. Has to be a subtype of
+ * [[Vector]]
+ * @return
+ */
+ implicit def predictVectors[T <: Vector] = {
+ new PredictOperation[MultipleLinearRegression, T, LabeledVector] {
+ override def predict(
+ instance: MultipleLinearRegression,
+ predictParameters: ParameterMap,
+ input: DataSet[T])
+ : DataSet[LabeledVector] = {
+ instance.weightsOption match {
+ case Some(weights) => {
+ input.map(new LinearRegressionPrediction[T])
+ .withBroadcastSet(weights, WEIGHTVECTOR_BROADCAST)
+ }
- // Define parameters for MultipleLinearRegression
- case object Stepsize extends Parameter[Double] {
- val defaultValue = Some(0.1)
+ case None => {
+ throw new RuntimeException("The MultipleLinearRegression has not been fitted to the " +
+ "data. This is necessary to learn the weight vector of the linear function.")
+ }
+ }
+ }
+ }
}
- case object Iterations extends Parameter[Int] {
- val defaultValue = Some(10)
- }
+ private class LinearRegressionPrediction[T <: Vector] extends RichMapFunction[T, LabeledVector] {
+ private var weights: Array[Double] = null
+ private var weight0: Double = 0
- case object ConvergenceThreshold extends Parameter[Double] {
- val defaultValue = None
- }
- // ====================== Facotry methods ==========================
+ @throws(classOf[Exception])
+ override def open(configuration: Configuration): Unit = {
+ val t = getRuntimeContext
+ .getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST)
+
+ val weightsPair = t.get(0)
- def apply(): MultipleLinearRegression = {
- new MultipleLinearRegression()
+ weights = weightsPair._1
+ weight0 = weightsPair._2
+ }
+
+ override def map(value: T): LabeledVector = {
+ val dotProduct = blas.ddot(weights.length, weights, 1, vector2Array(value), 1)
+
+ val prediction = dotProduct + weight0
+
+ LabeledVector(prediction, value)
+ }
}
}
@@ -387,63 +479,3 @@ RichMapFunction[(Array[Double], Double, Int), (Array[Double], Double)] {
(newWeights, newWeight0)
}
}
-
-//--------------------------------------------------------------------------------------------------
-// Model definition
-//--------------------------------------------------------------------------------------------------
-
-/** Multiple linear regression model returned by [[MultipleLinearRegression]]. The model stores the
- * calculated weight vector and applies the linear model to given vectors v:
- *
- * `hat y = w^T*v + w_0`
- *
- * with `hat y` being the predicted regression value.
- *
- * @param weights DataSet containing the calculated weight vector
- */
-class MultipleLinearRegressionModel private[regression](
- val weights: DataSet[(Array[Double], Double)])
- extends Transformer[ Vector, LabeledVector ]
- with Serializable {
-
- import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST
-
- // predict regression value for input
- override def transform(input: DataSet[Vector],
- parameters: ParameterMap): DataSet[LabeledVector] = {
- input.map(new LinearRegressionPrediction).withBroadcastSet(weights, WEIGHTVECTOR_BROADCAST)
- }
-
- def squaredResidualSum(input: DataSet[LabeledVector]): DataSet[Double] = {
- input.map{
- new SquaredResiduals()
- }.withBroadcastSet(weights, WEIGHTVECTOR_BROADCAST).reduce{
- _ + _
- }
- }
-
- private class LinearRegressionPrediction extends RichMapFunction[Vector, LabeledVector] {
- private var weights: Array[Double] = null
- private var weight0: Double = 0
-
-
- @throws(classOf[Exception])
- override def open(configuration: Configuration): Unit = {
- val t = getRuntimeContext
- .getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST)
-
- val weightsPair = t.get(0)
-
- weights = weightsPair._1
- weight0 = weightsPair._2
- }
-
- override def map(value: Vector): LabeledVector = {
- val dotProduct = blas.ddot(weights.length, weights, 1, vector2Array(value), 1)
-
- val prediction = dotProduct + weight0
-
- LabeledVector(prediction, value)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/resources/log4j-test.properties b/flink-staging/flink-ml/src/test/resources/log4j-test.properties
index f3c51b8..76b237e 100644
--- a/flink-staging/flink-ml/src/test/resources/log4j-test.properties
+++ b/flink-staging/flink-ml/src/test/resources/log4j-test.properties
@@ -1,20 +1,20 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
# limitations under the License.
-#
+################################################################################
log4j.rootLogger=OFF, console
@@ -32,4 +32,7 @@ log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.dir}/{$mvn.forkNumber}.log
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala
new file mode 100644
index 0000000..a5e7496
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+
+class CoCoAITSuite extends FlatSpec with Matchers with FlinkTestBase {
+
+ behavior of "The CoCoA implementation"
+
+ it should "train a SVM" in {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val cocoa = CoCoA().
+ setBlocks(env.getParallelism).
+ setIterations(100).
+ setLocalIterations(100).
+ setRegularization(0.002).
+ setStepsize(0.1).
+ setSeed(0)
+
+ val trainingDS = env.fromCollection(Classification.trainingData)
+
+ cocoa.fit(trainingDS)
+
+ val weightVector = cocoa.weightsOption.get.collect().apply(0)
+
+ weightVector.valuesIterator.zip(Classification.expectedWeightVector.valueIterator).foreach {
+ case (weight, expectedWeight) =>
+ weight should be(expectedWeight +- 0.1)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala
deleted file mode 100644
index 0f000a3..0000000
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoASuite.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.classification
-
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.flink.api.scala._
-import org.apache.flink.test.util.FlinkTestBase
-
-class CoCoAITSuite extends FlatSpec with Matchers with FlinkTestBase {
-
- behavior of "The CoCoA implementation"
-
- it should "train a SVM" in {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val learner = CoCoA().
- setBlocks(env.getParallelism).
- setIterations(100).
- setLocalIterations(100).
- setRegularization(0.002).
- setStepsize(0.1).
- setSeed(0)
-
- val trainingDS = env.fromCollection(Classification.trainingData)
-
- val model = learner.fit(trainingDS)
-
- val weightVector = model.weights.collect().apply(0)
-
- weightVector.valuesIterator.zip(Classification.expectedWeightVector.valueIterator).foreach {
- case (weight, expectedWeight) =>
- weight should be(expectedWeight +- 0.1)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala
deleted file mode 100644
index a185282..0000000
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/experimental/SciKitPipelineSuite.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import org.scalatest.FlatSpec
-
-import org.apache.flink.api.scala._
-import org.apache.flink.ml.common.LabeledVector
-import org.apache.flink.ml.math.{SparseVector, DenseVector, Vector}
-import org.apache.flink.test.util.FlinkTestBase
-
-class SciKitPipelineSuite extends FlatSpec with FlinkTestBase {
- behavior of "Pipeline"
-
- it should "work" in {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val scaler = new Scaler
- val offset = new Offset
-
- val input: DataSet[Vector] = env.fromCollection(List(DenseVector(2,1,3), SparseVector.fromCOO(3, (1,1), (2,2))))
- val training = env.fromCollection(List(LabeledVector(1.0, DenseVector(2,3,1)), LabeledVector(2.0, SparseVector.fromCOO(3, (1,1), (2,2)))))
- val intData = env.fromCollection(List(1,2,3,4))
-
- val result = scaler.transform(input)
-
- result.print()
-
- val result2 = offset.transform(input)
- result2.print()
-
- val chain = scaler.chainTransformer(offset)
-
- val result3 = chain.transform(input)(ChainedTransformer.chainedTransformOperation(Scaler.vTransform, Offset.offsetTransform))
-
- result3.print()
-
- val chain2 = chain.chainTransformer(scaler)
- val result4 = chain2.transform(input)
-
- result4.print()
-
- val kmeans = new KMeans()
-
- val chainedPredictor = chain.chainPredictor(kmeans)
-
- val prediction = chainedPredictor.predict(result)
-
- prediction.print()
-
- env.execute()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala
index b930ceb..0f045ab 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITSuite.scala
@@ -21,6 +21,7 @@ package org.apache.flink.ml.feature
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.ml.preprocessing.PolynomialFeatures
import org.scalatest.{Matchers, FlatSpec}
import org.apache.flink.api.scala._
@@ -45,10 +46,10 @@ class PolynomialBaseITSuite
val inputDS = env.fromCollection (input)
- val transformer = PolynomialBase ()
+ val transformer = PolynomialFeatures()
.setDegree (3)
- val transformedDS = transformer.transform (inputDS)
+ val transformedDS = transformer.transform(inputDS)
val expectedMap = List (
(1.0 -> DenseVector (1.0, 1.0, 1.0) ),
@@ -81,7 +82,7 @@ class PolynomialBaseITSuite
val inputDS = env.fromCollection(input)
- val transformer = PolynomialBase()
+ val transformer = PolynomialFeatures()
.setDegree(3)
val transformedDS = transformer.transform(inputDS)
@@ -106,7 +107,7 @@ class PolynomialBaseITSuite
val inputDS = env.fromCollection(input)
- val transformer = PolynomialBase()
+ val transformer = PolynomialFeatures()
.setDegree(0)
val transformedDS = transformer.transform(inputDS)
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
new file mode 100644
index 0000000..8803195
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.pipeline
+
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.ml.preprocessing.{PolynomialFeatures, StandardScaler}
+import org.apache.flink.ml.regression.MultipleLinearRegression
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{Matchers, FlatSpec}
+
+class PipelineITSuite extends FlatSpec with Matchers with FlinkTestBase {
+ behavior of "Flink's pipelines"
+
+ it should "support chaining of compatible transformer" in {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val vData = List(DenseVector(1.0, 2.0, 3.0), DenseVector(2.0, 3.0, 4.0))
+ val lvData = List(LabeledVector(1.0, DenseVector(1.0, 1.0, 1.0)),
+ LabeledVector(2.0, DenseVector(2.0, 2.0, 2.0)))
+
+ val vectorData = env.fromCollection(vData)
+ val labeledVectorData = env.fromCollection(lvData)
+
+ val expectedScaledVectorSet = Set(
+ DenseVector(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, -1.0, -1.0, -1.0),
+ DenseVector(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
+ )
+
+ val expectedScaledLabeledVectorSet = Set(
+ LabeledVector(1.0, DenseVector(1.0, 3.0, 5.0, 9.0, 15.0, 25.0, -1.0, -3.0, -5.0)),
+ LabeledVector(2.0, DenseVector(1.0, -1.0, -3.0, 1.0, 3.0, 9.0, 1.0, -1.0, -3.0))
+ )
+
+ val scaler = StandardScaler()
+ val polyFeatures = PolynomialFeatures().setDegree(2)
+
+ val pipeline = scaler.chainTransformer(polyFeatures)
+
+ pipeline.fit(vectorData)
+
+ val scaledVectorDataDS = pipeline.transform(vectorData)
+ val scaledLabeledVectorDataDS = pipeline.transform(labeledVectorData)
+
+ val scaledVectorData = scaledVectorDataDS.collect()
+ val scaledLabeledVectorData = scaledLabeledVectorDataDS.collect()
+
+ scaledVectorData.size should be(expectedScaledVectorSet.size)
+
+ for(scaledVector <- scaledVectorData){
+ expectedScaledVectorSet should contain(scaledVector)
+ }
+
+ scaledLabeledVectorData.size should be(expectedScaledLabeledVectorSet.size)
+
+ for(scaledLabeledVector <- scaledLabeledVectorData) {
+ expectedScaledLabeledVectorSet should contain(scaledLabeledVector)
+ }
+ }
+
+ it should "throw an exception when the pipeline operators are not compatible" in {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val scaler = StandardScaler()
+ val mlr = MultipleLinearRegression()
+
+ val vData = List(DenseVector(1.0, 2.0, 3.0), DenseVector(2.0, 3.0, 4.0))
+ val vectorData = env.fromCollection(vData)
+
+ val pipeline = scaler.chainPredictor(mlr)
+
+ val exception = intercept[RuntimeException] {
+ pipeline.fit(vectorData)
+ }
+
+ exception.getMessage should equal("There is no FitOperation defined for class org.apache." +
+ "flink.ml.regression.MultipleLinearRegression which trains on a " +
+ "DataSet[class org.apache.flink.ml.math.DenseVector]")
+ }
+
+ it should "throw an exception when the input data is not supported" in {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val dData = List(1.0, 2.0, 3.0)
+ val doubleData = env.fromCollection(dData)
+
+ val scaler = StandardScaler()
+ val polyFeatures = PolynomialFeatures()
+
+ val pipeline = scaler.chainTransformer(polyFeatures)
+
+ val exception = intercept[RuntimeException] {
+ pipeline.fit(doubleData)
+ }
+
+ exception.getMessage should equal("There is no FitOperation defined for class org.apache." +
+ "flink.ml.preprocessing.StandardScaler which trains on a DataSet[double]")
+ }
+
+ it should "support multiple transformers and a predictor" in {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val data = List(LabeledVector(1.0, DenseVector(1.0, 2.0)),
+ LabeledVector(2.0, DenseVector(2.0, 3.0)),
+ LabeledVector(3.0, DenseVector(3.0, 4.0)))
+
+ val trainingData = env.fromCollection(data)
+
+ val chainedScalers2 = StandardScaler().chainTransformer(StandardScaler())
+ val chainedScalers3 = chainedScalers2.chainTransformer(StandardScaler())
+ val chainedScalers4 = chainedScalers3.chainTransformer(StandardScaler())
+ val chainedScalers5 = chainedScalers4.chainTransformer(StandardScaler())
+
+ val predictor = MultipleLinearRegression()
+
+
+ val pipeline = chainedScalers5.chainPredictor(predictor)
+
+ pipeline.fit(trainingData)
+
+ val weightVector = predictor.weightsOption.get.collect().head
+
+ weightVector._1.foreach{
+ _ should be (0.367282 +- 0.01)
+ }
+
+ weightVector._2 should be (1.3131727 +- 0.01)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
index ac3cbb6..30875b3 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
@@ -41,8 +41,9 @@ class StandardScalerITSuite
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromCollection(data)
- val transformer = StandardScaler()
- val scaledVectors = transformer.transform(dataSet).collect
+ val scaler = StandardScaler()
+ scaler.fit(dataSet)
+ val scaledVectors = scaler.transform(dataSet).collect
scaledVectors.length should equal(data.length)
@@ -73,8 +74,9 @@ class StandardScalerITSuite
val env = ExecutionEnvironment.getExecutionEnvironment
val dataSet = env.fromCollection(data)
- val transformer = StandardScaler().setMean(10.0).setStd(2.0)
- val scaledVectors = transformer.transform(dataSet).collect
+ val scaler = StandardScaler().setMean(10.0).setStd(2.0)
+ scaler.fit(dataSet)
+ val scaledVectors = scaler.transform(dataSet).collect
scaledVectors.length should equal(data.length)
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
index 245d7a8..2ad310d 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
@@ -48,13 +48,13 @@ class ALSITSuite
val inputDS = env.fromCollection(data)
- val model = als.fit(inputDS)
+ als.fit(inputDS)
val testData = env.fromCollection(expectedResult.map{
case (userID, itemID, rating) => (userID, itemID)
})
- val predictions = model.transform(testData).collect()
+ val predictions = als.predict(testData).collect()
predictions.length should equal(expectedResult.length)
@@ -70,7 +70,7 @@ class ALSITSuite
}
}
- val risk = model.empiricalRisk(inputDS).collect().apply(0)
+ val risk = als.empiricalRisk(inputDS).collect().apply(0)
risk should be(expectedEmpiricalRisk +- 1)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
index 2d3f770..8be239a 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
@@ -20,7 +20,7 @@ package org.apache.flink.ml.regression
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.ml.common.ParameterMap
-import org.apache.flink.ml.feature.PolynomialBase
+import org.apache.flink.ml.preprocessing.PolynomialFeatures
import org.scalatest.{Matchers, FlatSpec}
import org.apache.flink.api.scala._
@@ -38,7 +38,7 @@ class MultipleLinearRegressionITSuite
env.setParallelism(2)
- val learner = MultipleLinearRegression()
+ val mlr = MultipleLinearRegression()
import RegressionData._
@@ -49,9 +49,9 @@ class MultipleLinearRegressionITSuite
parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
val inputDS = env.fromCollection(data)
- val model = learner.fit(inputDS, parameters)
+ mlr.fit(inputDS, parameters)
- val weightList = model.weights.collect()
+ val weightList = mlr.weightsOption.get.collect()
weightList.size should equal(1)
@@ -63,7 +63,7 @@ class MultipleLinearRegressionITSuite
}
weight0 should be (expectedWeight0 +- 0.4)
- val srs = model.squaredResidualSum(inputDS).collect().apply(0)
+ val srs = mlr.squaredResidualSum(inputDS).collect().apply(0)
srs should be (expectedSquaredResidualSum +- 2)
}
@@ -73,21 +73,21 @@ class MultipleLinearRegressionITSuite
env.setParallelism(2)
- val polynomialBase = PolynomialBase()
- val learner = MultipleLinearRegression()
+ val polynomialBase = PolynomialFeatures()
+ val mlr = MultipleLinearRegression()
- val pipeline = polynomialBase.chain(learner)
+ val pipeline = polynomialBase.chainPredictor(mlr)
val inputDS = env.fromCollection(RegressionData.polynomialData)
val parameters = ParameterMap()
- .add(PolynomialBase.Degree, 3)
+ .add(PolynomialFeatures.Degree, 3)
.add(MultipleLinearRegression.Stepsize, 0.002)
.add(MultipleLinearRegression.Iterations, 100)
- val model = pipeline.fit(inputDS, parameters)
+ pipeline.fit(inputDS, parameters)
- val weightList = model.weights.collect()
+ val weightList = mlr.weightsOption.get.collect()
weightList.size should equal(1)
@@ -102,7 +102,7 @@ class MultipleLinearRegressionITSuite
val transformedInput = polynomialBase.transform(inputDS, parameters)
- val srs = model.squaredResidualSum(transformedInput).collect().apply(0)
+ val srs = mlr.squaredResidualSum(transformedInput).collect().apply(0)
srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
index d039a8b..2ebd606 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
@@ -84,7 +84,7 @@ under the License.
<build>
<plugins>
- <!-- get default data from flink-java-examples pipeline -->
+ <!-- get default data from flink-java-examples package -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
index 188ac4a..d7fbc8e 100644
--- a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
+++ b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
@@ -19,7 +19,7 @@
/**
* <strong>Table API</strong><br>
*
- * This pipeline contains the generic part of the Table API. It can be used with Flink Streaming
+ * This package contains the generic part of the Table API. It can be used with Flink Streaming
* and Flink Batch. From Scala as well as from Java.
*
* When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
index 37c5937..e74651b 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
@@ -26,7 +26,7 @@ import scala.language.implicitConversions
/**
* == Table API (Scala) ==
*
- * Importing this pipeline with:
+ * Importing this package with:
*
* {{{
* import org.apache.flink.api.scala.table._
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
index f50ca02..c5c8c94 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
@@ -18,7 +18,7 @@
package org.apache.flink.api.table
/**
- * This pipeline contains the base class of AST nodes and all the expression language AST classes.
+ * This package contains the base class of AST nodes and all the expression language AST classes.
* Expression trees should not be manually constructed by users. They are implicitly constructed
* from the implicit DSL conversions in
* [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
index a31ec61..bdcb22c 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api
/**
* == Table API ==
*
- * This pipeline contains the generic part of the Table API. It can be used with Flink Streaming
+ * This package contains the generic part of the Table API. It can be used with Flink Streaming
* and Flink Batch. From Scala as well as from Java.
*
* When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
index adb9890..a598483 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
@@ -18,7 +18,7 @@
package org.apache.flink.api.table
/**
- * The operations in this pipeline are created by calling methods on [[Table]] they
+ * The operations in this package are created by calling methods on [[Table]] they
* should not be manually created by users of the API.
*/
package object plan
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
index 155a17e..a1bc4b7 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
@@ -18,6 +18,6 @@
package org.apache.flink.api.table
/**
- * The functions in this pipeline are used transforming Table API operations to Java API operations.
+ * The functions in this package are used transforming Table API operations to Java API operations.
*/
package object runtime
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
index fb80798..3b2fb7f 100644
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
+++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
@@ -150,7 +150,7 @@ public class TachyonFileSystemWrapperTest {
}
}
- // pipeline visible
+ // package visible
static final class DopOneTestEnvironment extends LocalEnvironment {
static {
initializeContextEnvironment(new ExecutionEnvironmentFactory() {
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
index 5c30785..d61f80e 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
@@ -86,7 +86,7 @@ public class TPCHQuery3 {
}
});
- // Join customers with orders and pipeline them into a ShippingPriorityItem
+ // Join customers with orders and package them into a ShippingPriorityItem
DataSet<ShippingPriorityItem> customerWithOrders =
customers.join(orders).where(0).equalTo(1)
.with(
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/assembly/test-custominput-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-custominput-assembly.xml b/flink-tests/src/test/assembly/test-custominput-assembly.xml
index 18adc47..e6f3568 100644
--- a/flink-tests/src/test/assembly/test-custominput-assembly.xml
+++ b/flink-tests/src/test/assembly/test-custominput-assembly.xml
@@ -28,7 +28,7 @@ under the License.
<fileSet>
<directory>${project.build.testOutputDirectory}</directory>
<outputDirectory>/</outputDirectory>
- <!--modify/add include to match your pipeline(s) -->
+ <!--modify/add include to match your package(s) -->
<includes>
<include>org/apache/flink/test/classloading/jar/CustomInputSplitProgram.class</include>
<include>org/apache/flink/test/classloading/jar/CustomInputSplitProgram$*.class</include>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/assembly/test-kmeans-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-kmeans-assembly.xml b/flink-tests/src/test/assembly/test-kmeans-assembly.xml
index 3c547fb..a8d34ab 100644
--- a/flink-tests/src/test/assembly/test-kmeans-assembly.xml
+++ b/flink-tests/src/test/assembly/test-kmeans-assembly.xml
@@ -28,7 +28,7 @@ under the License.
<fileSet>
<directory>${project.build.testOutputDirectory}</directory>
<outputDirectory>/</outputDirectory>
- <!--modify/add include to match your pipeline(s) -->
+ <!--modify/add include to match your package(s) -->
<includes>
<include>org/apache/flink/test/classloading/jar/KMeansForTest.class</include>
<include>org/apache/flink/test/classloading/jar/KMeansForTest$*.class</include>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml b/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml
index b311700..8321b21 100644
--- a/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml
+++ b/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml
@@ -28,7 +28,7 @@ under the License.
<fileSet>
<directory>${project.build.testOutputDirectory}</directory>
<outputDirectory>/</outputDirectory>
- <!--modify/add include to match your pipeline(s) -->
+ <!--modify/add include to match your package(s) -->
<includes>
<include>org/apache/flink/test/classloading/jar/StreamingProgram.class</include>
<include>org/apache/flink/test/classloading/jar/StreamingProgram$*.class</include>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
index b3734a8..349275c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery10ITCase.java
@@ -92,7 +92,7 @@ public class TPCHQuery10ITCase extends RecordAPITestBase {
+ "32|2743|7744|4|4|6582.96|0.09|0.03|R|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n"
+ "32|85811|8320|5|44|79059.64|0.05|0.06|R|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n"
+ "32|11615|4117|6|6|9159.66|0.04|0.03|R|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n"
- + "33|61336|8855|1|31|40217.23|0.09|0.04|R|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic pipeline|\n"
+ + "33|61336|8855|1|31|40217.23|0.09|0.04|R|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n"
+ "33|60519|5532|2|32|47344.32|0.02|0.05|R|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n"
+ "33|137469|9983|3|5|7532.30|0.05|0.03|R|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n"
+ "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n"
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java
index 7dc76b5..a0236c2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobTests/TPCHQuery3ITCase.java
@@ -85,7 +85,7 @@ public class TPCHQuery3ITCase extends RecordAPITestBase {
+ "32|2743|7744|4|4|6582.96|0.09|0.03|N|O|1995-08-04|1995-10-01|1995-09-03|NONE|REG AIR|e slyly final pac|\n"
+ "32|85811|8320|5|44|79059.64|0.05|0.06|N|O|1995-08-28|1995-08-20|1995-09-14|DELIVER IN PERSON|AIR|symptotes nag according to the ironic depo|\n"
+ "32|11615|4117|6|6|9159.66|0.04|0.03|N|O|1995-07-21|1995-09-23|1995-07-25|COLLECT COD|RAIL| gifts cajole carefully.|\n"
- + "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic pipeline|\n"
+ + "33|61336|8855|1|31|40217.23|0.09|0.04|A|F|1993-10-29|1993-12-19|1993-11-08|COLLECT COD|TRUCK|ng to the furiously ironic package|\n"
+ "33|60519|5532|2|32|47344.32|0.02|0.05|A|F|1993-12-09|1994-01-04|1993-12-28|COLLECT COD|MAIL|gular theodolites|\n"
+ "33|137469|9983|3|5|7532.30|0.05|0.03|A|F|1993-12-09|1993-12-25|1993-12-23|TAKE BACK RETURN|AIR|. stealthily bold exc|\n"
+ "33|33918|3919|4|41|75928.31|0.09|0.00|R|F|1993-11-09|1994-01-24|1993-11-11|TAKE BACK RETURN|MAIL|unusual packages doubt caref|\n"
|