[ https://issues.apache.org/jira/browse/FLINK4613?page=com.atlassian.jira.plugin.system.issuetabpanels:commenttabpanel&focusedCommentId=15654378#comment15654378
]
ASF GitHub Bot commented on FLINK4613:

Github user thvasilo commented on a diff in the pull request:
https://github.com/apache/flink/pull/2542#discussion_r87421513
 Diff: flinklibraries/flinkml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala

@@ 675,7 +756,69 @@ object ALS {
collector.collect((blockID, array))
}
}
 }.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+ }
+
+ // broadcasting XtX matrix in the implicit case
+ val updatedFactorMatrix = if (implicitPrefs) {
+ newMatrix.withBroadcastSet(XtXtoBroadcast.get, "XtX")
+ } else {
+ newMatrix
+ }
+
+ updatedFactorMatrix.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0")
+ }
+
+ /**
+ * Computes the XtX matrix for the implicit version before updating the factors.
+ * This matrix is intended to be broadcast, but as we cannot use a sink inside a Flink
+ * iteration, so we represent it as a [[DataSet]] with a single element containing
the matrix.
+ *
+ * The algorithm computes `X_i^T * X_i` for every block `X_i` of `X`,
+ * then sums all these computed matrices to get `X^T * X`.
+ */
+ private[recommendation] def computeXtX(x: DataSet[(Int, Array[Array[Double]])], factors:
Int):
+ DataSet[Array[Double]] = {
+ val triangleSize = factors * (factors  1) / 2 + factors
+
+ type MtxBlock = (Int, Array[Array[Double]])
+ // construct XtX for all blocks
+ val xtx = x
+ .mapPartition(new MapPartitionFunction[MtxBlock, Array[Double]]() {
+ var xtxForBlock: Array[Double] = null
+
+ override def mapPartition(blocks: Iterable[(Int, Array[Array[Double]])],
+ out: Collector[Array[Double]]): Unit = {
+
+ if (xtxForBlock == null) {
+ // creating the matrix if not yet created
+ xtxForBlock = Array.fill(triangleSize)(0.0)
+ } else {
+ // erasing the matrix
+ var i = 0
+ while (i < xtxForBlock.length) {
 End diff 
I don't imagine this making a major difference in performance, so let's just go with the
cleaner code angle and use `fill`.
I wish we had an easy to use integrated way to do proper profiling so such decisions can
be easier (i.e. if this is 0.5% of the CPU cost, then optimizing is pointless but right now
we don't know)
> Extend ALS to handle implicit feedback datasets
> 
>
> Key: FLINK4613
> URL: https://issues.apache.org/jira/browse/FLINK4613
> Project: Flink
> Issue Type: New Feature
> Components: Machine Learning Library
> Reporter: Gábor Hermann
> Assignee: Gábor Hermann
>
> The Alternating Least Squares implementation should be extended to handle _implicit feedback_
datasets. These datasets do not contain explicit ratings by users, they are rather built by
collecting user behavior (e.g. user listened to artist X for Y minutes), and they require
a slightly different optimization objective. See details by [Hu et alhttp://dx.doi.org/10.1109/ICDM.2008.22].
> We do not need to modify much in the original ALS algorithm. See [Spark ALS implementationhttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala],
which could be a basis for this extension. Only the updating factor part is modified, and
most of the changes are in the local parts of the algorithm (i.e. UDFs). In fact, the only
modification that is not local, is precomputing a matrix product Y^T * Y and broadcasting
it to all the nodes, which we can do with broadcast DataSets.

This message was sent by Atlassian JIRA
(v6.3.4#6332)
