Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7BC13200BF6 for ; Mon, 26 Dec 2016 23:17:22 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7935A160B0E; Mon, 26 Dec 2016 22:17:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7A379160B3B for ; Mon, 26 Dec 2016 23:17:20 +0100 (CET) Received: (qmail 2848 invoked by uid 500); 26 Dec 2016 22:17:19 -0000 Mailing-List: contact commits-help@predictionio.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@predictionio.incubator.apache.org Delivered-To: mailing list commits@predictionio.incubator.apache.org Received: (qmail 2839 invoked by uid 99); 26 Dec 2016 22:17:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Dec 2016 22:17:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 2A75E18009D for ; Mon, 26 Dec 2016 22:17:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id Mg3XoOwCRaLZ for ; Mon, 26 Dec 2016 22:17:08 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 3FC105FDAA for ; Mon, 26 Dec 2016 22:17:05 +0000 (UTC) Received: (qmail 2109 invoked by uid 99); 26 Dec 2016 22:17:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Dec 2016 22:17:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5403FDFB7D; Mon, 26 Dec 2016 22:17:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: donald@apache.org To: commits@predictionio.incubator.apache.org Date: Mon, 26 Dec 2016 22:17:05 -0000 Message-Id: <5a99c6b22ea4477590e882634f344015@git.apache.org> In-Reply-To: <40dc786eff8d45ca814ed58c3d4d852b@git.apache.org> References: <40dc786eff8d45ca814ed58c3d4d852b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/51] [abbrv] [partial] incubator-predictionio-site git commit: Documentation based on apache/incubator-predictionio#3d1b777d0ec2e4e6d6c77d43a7d528ac44287cb5 archived-at: Mon, 26 Dec 2016 22:17:22 -0000 http://git-wip-us.apache.org/repos/asf/incubator-predictionio-site/blob/5b520d3f/templates/recommendation/batch-evaluator/index.html ---------------------------------------------------------------------- diff --git a/templates/recommendation/batch-evaluator/index.html b/templates/recommendation/batch-evaluator/index.html new file mode 100644 index 0000000..dfd4232 --- /dev/null +++ b/templates/recommendation/batch-evaluator/index.html @@ -0,0 +1,297 @@ +Batch Persistable Evaluator (Recommendation)

This how-to tutorial would explain how you can also use $pio eval to persist predicted result for a batch of queries. Please read the Evaluation to understand the usage of DataSoure's readEval() and the Evaluation component first.

This tutorial is based on some experimental and developer features, which may be changed in future release.

This tutorial is based on Recommendation template version v0.3.2

1. Modify DataSource

Modify DataSource's readEval() to generate the batch Queries which you want to run batch predict.

1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+20
+21
+22
+23
+24
+25
+26
+27
+28
+29
+30
+31
+  override
+  def readEval(sc: SparkContext)
+  : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+    // This function only return one evaluation data set
+
+    // Create your own queries here. Below are provided as examples.
+    // for example, you may get all distinct user id from the trainingData to create the Query
+    val batchQueries: RDD[Query] = sc.parallelize(
+      Seq(
+        Query(user = "1", num = 10),
+        Query(user = "3", num = 15),
+        Query(user = "5", num = 20)
+      )
+    )
+
+    val queryAndActual: RDD[(Query, ActualResult)] = batchQueries.map (q =>
+      // the ActualResult contain dummy empty rating array
+      // because we not interested in Actual result for batch predict purpose.
+      (q, ActualResult(Array()))
+    )
+
+    val evalDataSet = (
+      readTraining(sc),
+      new EmptyEvaluationInfo(),
+      queryAndActual
+    )
+
+    Seq(evalDataSet)
+  }
+
+

Alternatively, you can create a new DataSource extending original DataSource. Then you can add the new one in Engine.scala and specify which one to use in engine.json.

2. Add a new Evaluator

Create a new file BatchPersistableEvaluator.scala. Unlike the MetricEvaluator, this Evaluator simply writes the Query and correpsonding PredictedResult to the output directory without performaning any metrics calculation.

Note that output directory is specified by the variable outputDir.

1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+20
+21
+22
+23
+24
+25
+26
+27
+28
+29
+30
+31
+32
+33
+34
+35
+36
+37
+38
+39
+40
+41
+42
+43
+44
+45
+46
+47
+48
+49
+50
+51
+52
+53
+54
+55
+56
+57
+58
+59
+60
+61
+62
+63
+64
+65
+66
+67
+68
+69
+70
+71
+72
+73
+74
+75
+76
+77
+78
+79
+80
+81
+82
package org.template.recommendation
+
+import org.apache.predictionio.controller.EmptyEvaluationInfo
+import org.apache.predictionio.controller.Engine
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.EngineParamsGenerator
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.core.BaseEvaluator
+import org.apache.predictionio.core.BaseEvaluatorResult
+import org.apache.predictionio.workflow.WorkflowParams
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+import org.json4s.DefaultFormats
+import org.json4s.Formats
+import org.json4s.native.Serialization
+
+import grizzled.slf4j.Logger
+
+class BatchPersistableEvaluatorResult extends BaseEvaluatorResult {}
+
+class BatchPersistableEvaluator extends BaseEvaluator[
+  EmptyEvaluationInfo,
+  Query,
+  PredictedResult,
+  ActualResult,
+  BatchPersistableEvaluatorResult] {
+  @transient lazy val logger = Logger[this.type]
+
+  // A helper object for the json4s serialization
+  case class Row(query: Query, predictedResult: PredictedResult)
+    extends Serializable
+
+  def evaluateBase(
+    sc: SparkContext,
+    evaluation: Evaluation,
+    engineEvalDataSet: Seq[(
+      EngineParams,
+      Seq[(EmptyEvaluationInfo, RDD[(Query, PredictedResult, ActualResult)])])],
+    params: WorkflowParams): BatchPersistableEvaluatorResult = {
+
+    /** Extract the first data, as we are only interested in the first
+      * evaluation. It is possible to relax this restriction, and have the
+      * output logic below to write to different directory for different engine
+      * params.
+      */
+
+    require(
+      engineEvalDataSet.size == 1, "There should be only one engine params")
+
+    val evalDataSet = engineEvalDataSet.head._2
+
+    require(evalDataSet.size == 1, "There should be only one RDD[(Q, P, A)]")
+
+    val qpaRDD = evalDataSet.head._2
+
+    // qpaRDD contains 3 queries we specified in readEval, the corresponding
+    // predictedResults, and the dummy actual result.
+
+    /** The output directory. Better to use absolute path if you run on cluster.
+      * If your database has a Hadoop interface, you can also convert the
+      * following to write to your database in parallel as well.
+      */
+    val outputDir = "batch_result"
+
+    logger.info("Writing result to disk")
+    qpaRDD
+      .map { case (q, p, a) => Row(q, p) }
+      .map { row =>
+        // Convert into a json
+        implicit val formats: Formats = DefaultFormats
+        Serialization.write(row)
+      }
+      .saveAsTextFile(outputDir)
+
+    logger.info(s"Result can be found in $outputDir")
+
+    new BatchPersistableEvaluatorResult()
+  }
+}
+

3. Define Evaluation and EngineParamsGenerator object

Create a new file BatchEvaluation.scala. Note that the new BatchPersistableEvaluator is used. The BatchEngineParamsList specifies the parameters of the engine.

Modify the appName parameter in DataSourceParams to match your app name.

1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+20
+21
+22
+23
+24
+25
+26
+27
+28
+29
package org.template.recommendation
+
+import org.apache.predictionio.controller.EngineParamsGenerator
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.Evaluation
+
+object BatchEvaluation extends Evaluation {
+  // Define Engine and Evaluator used in Evaluation
+
+  /**
+    * Specify the new BatchPersistableEvaluator.
+    */
+  engineEvaluator =
+    (RecommendationEngine(), new BatchPersistableEvaluator())
+}
+
+object BatchEngineParamsList extends EngineParamsGenerator {
+  // We only interest in a single engine params.
+  engineParamsList = Seq(
+    EngineParams(
+      dataSourceParams =
+        DataSourceParams(appName = "INVALID_APP_NAME", evalParams = None),
+      algorithmParamsList = Seq(("als", ALSAlgorithmParams(
+        rank = 10,
+        numIterations = 20,
+        lambda = 0.01,
+        seed = Some(3L))))))
+}
+
+

4. build and run

Run the following command to build

1
$ pio build
+

After the build is successful, you should see the following outputs:

1
[INFO] [Console$] Your engine is ready for training.
+

To run the BatchEvaluation with BatchEngineParamsList, run the following command:

1
$ pio eval org.template.recommendation.BatchEvaluation   org.template.recommendation.BatchEngineParamsList
+

You should see the following outputs:

1
+2
+3
+4
[INFO] [BatchPersistableEvaluator] Writing result to disk
+[INFO] [BatchPersistableEvaluator] Result can be found in batch_result
+[INFO] [CoreWorkflow$] Updating evaluation instance with result: org.template.recommendation.BatchPersistableEvaluatorResult@2f886889
+[INFO] [CoreWorkflow$] runEvaluation completed
+

You should find the batch queries and the predicted results in the output directory batch_result/.

\ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio-site/blob/5b520d3f/templates/recommendation/batch-evaluator/index.html.gz ---------------------------------------------------------------------- diff --git a/templates/recommendation/batch-evaluator/index.html.gz b/templates/recommendation/batch-evaluator/index.html.gz new file mode 100644 index 0000000..709bc11 Binary files /dev/null and b/templates/recommendation/batch-evaluator/index.html.gz differ http://git-wip-us.apache.org/repos/asf/incubator-predictionio-site/blob/5b520d3f/templates/recommendation/blacklist-items/index.html ---------------------------------------------------------------------- diff --git a/templates/recommendation/blacklist-items/index.html b/templates/recommendation/blacklist-items/index.html new file mode 100644 index 0000000..c8f7fcc --- /dev/null +++ b/templates/recommendation/blacklist-items/index.html @@ -0,0 +1,159 @@ +Filter Recommended Items by Blacklist in Query (Recommendation)
On this page

Edit this page

Let's say you want to supply a backList for each query to exclude some items from recommendation (For example, in the browsing session, the user just added some items to shopping cart, or you have a list of items you want to filter out, you may want to supply blackList in Query). This how-to will demonstrate how you ca n do it.

Note that you may also use E-Commerce Recommendation Template which supports this feature by default.

If you are looking for filtering out items based on the specific user-to-item events logged by EventServer (eg. filter all items which the user has "buy" events on), you can use the E-Commerce Recommendation Template. Please refer to the algorithm parameters "unseenOnly" and "seenEvents" of the E-Commerce Recommenation Template.

Add Query Parameter

First of all we need to specify query parameter to send items ids that the user has already seen. Lets modify case class Query in MyRecommendation/src/main/scala/Engine.scala:

1
+2
+3
+4
+5
case class Query(
+  user: String,
+  num: Int,
+  blackList: Set[String] // NOTE: line added
+) extends Serializable
+

Filter the Data

Then we need to change the code that computes recommendation score to filter out the seen items. Lets modify class MyRecommendation/src/main/scala/ALSModel.scala. Just add the following two methods to that class.

1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+20
  def recommendProductsWithFilter(
+      user: Int,
+      num: Int,
+      productIdFilter: Set[Int]) = {
+    val filteredProductFeatures = productFeatures
+      .filter(features => !productIdFilter.contains(features._1)) // (*)
+    recommend(userFeatures.lookup(user).head, filteredProductFeatures, num)
+      .map(t => Rating(user, t._1, t._2))
+  }
+
+  private def recommend(
+      recommendToFeatures: Array[Double],
+      recommendableFeatures: RDD[(Int, Array[Double])],
+      num: Int): Array[(Int, Double)] = {
+    val recommendToVector = new DoubleMatrix(recommendToFeatures)
+    val scored = recommendableFeatures.map { case (id,features) =>
+      (id, recommendToVector.dot(new DoubleMatrix(features)))
+    }
+    scored.top(num)(Ordering.by(_._2))
+  }
+

Also it’s required to add import of the org.jblas.DoubleMatrix class. Please make attention that method recommend is the copy of method org.apache.spark.mllib.recommendation.MatrixFactorizationModel#recommend. We can't reuse this because it’s private. Method recommendProductsWithFilter is the almost full copy of org.apache.spark.mllib.recommendation.MatrixFactorizationModel#recommendProduct method. The difference only is the line with commentary ‘(*)’ where we apply filtering.

Put It All Together

Next we need to invoke our new method with filtering when we query recommendations. Lets modify method predict in MyRecommendation/src/main/scala/ALSAlgorithm.scala:

1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+16
+17
  def predict(model: ALSModel, query: Query): PredictedResult = {
+    // Convert String ID to Int index for Mllib
+    model.userStringIntMap.get(query.user).map { userInt =>
+      // create inverse view of itemStringIntMap
+      val itemIntStringMap = model.itemStringIntMap.inverse
+      // recommendProducts() returns Array[MLlibRating], which uses item Int
+      // index. Convert it to String ID for returning PredictedResult
+      val blackListedIds = query.blackList.flatMap(model.itemStringIntMap.get) // NOTE: line added
+      val itemScores = model
+        .recommendProductsWithFilter(userInt, query.num, blackListedIds) // NOTE: line modified
+        .map(r => ItemScore(itemIntStringMap(r.product), r.rating))
+      new PredictedResult(itemScores)
+    }.getOrElse{
+      logger.info(s"No prediction for unknown user ${query.user}.")
+      new PredictedResult(Array.empty)
+    }
+  }
+

Test the Result

Then we can build/train/deploy the engine and test the result:

The query

1
+2
+3
+4
curl \
+-H "Content-Type: application/json" \
+-d '{ "user": "1", "num": 4 }' \
+http://localhost:8000/queries.json
+

will return the result

1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
{
+    "itemScores": [{
+        "item": "32",
+        "score": 13.405593705856901
+    }, {
+        "item": "90",
+        "score": 10.980439687813178
+    }, {
+        "item": "75",
+        "score": 10.748973860065737
+    }, {
+        "item": "1",
+        "score": 9.769636099226231
+    }]
+}
+

Lets say that the user has seen the 32 item.

1
+2
+3
+4
curl \
+-H "Content-Type: application/json" \
+-d '{ "user": "1", "num": 4, "blackList": ["32"] }' \
+http://localhost:8000/queries.json
+

will return the result

1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
{
+    "itemScores": [{
+        "item": "90",
+        "score": 10.980439687813178
+    }, {
+        "item": "75",
+        "score": 10.748973860065737
+    }, {
+        "item": "1",
+        "score": 9.769636099226231
+    }, {
+        "item": "49",
+        "score": 8.653951817512265
+    }]
+}
+

without item 32.

\ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio-site/blob/5b520d3f/templates/recommendation/blacklist-items/index.html.gz ---------------------------------------------------------------------- diff --git a/templates/recommendation/blacklist-items/index.html.gz b/templates/recommendation/blacklist-items/index.html.gz new file mode 100644 index 0000000..763a005 Binary files /dev/null and b/templates/recommendation/blacklist-items/index.html.gz differ