Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ACF16200C6D for ; Sat, 22 Apr 2017 19:50:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AB831160BA2; Sat, 22 Apr 2017 17:50:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 85123160BBE for ; Sat, 22 Apr 2017 19:50:53 +0200 (CEST) Received: (qmail 31498 invoked by uid 500); 22 Apr 2017 17:50:51 -0000 Mailing-List: contact commits-help@mahout.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mahout.apache.org Delivered-To: mailing list commits@mahout.apache.org Received: (qmail 30549 invoked by uid 99); 22 Apr 2017 17:50:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Apr 2017 17:50:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 542C5E8F07; Sat, 22 Apr 2017 17:50:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: akm@apache.org To: commits@mahout.apache.org Date: Sat, 22 Apr 2017 17:51:02 -0000 Message-Id: <39010c33ecdc4656a73bd9ad45957220@git.apache.org> In-Reply-To: <7e4241657e314afa82a254b7f3def23f@git.apache.org> References: <7e4241657e314afa82a254b7f3def23f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/15] mahout git commit: Initial MVP for the website using Jekyll archived-at: Sat, 22 Apr 2017 17:50:55 -0000 http://git-wip-us.apache.org/repos/asf/mahout/blob/54ef150e/website/_pages/docs/0.13.0/algorithms/d-spca.md ---------------------------------------------------------------------- diff --git a/website/_pages/docs/0.13.0/algorithms/d-spca.md b/website/_pages/docs/0.13.0/algorithms/d-spca.md new file mode 100644 index 0000000..3505c6c --- /dev/null +++ b/website/_pages/docs/0.13.0/algorithms/d-spca.md @@ -0,0 +1,174 @@ +--- +layout: mahoutdoc +title: Mahout Samsara Dist Sto PCA +permalink: /docs/0.13.0/algorithms/samsara/dspca +--- +# Distributed Stochastic PCA + + +## Intro + +Mahout has a distributed implementation of Stochastic PCA[1]. This algorithm computes the exact equivalent of Mahout's dssvd(`\(\mathbf{A-1\mu^\top}\)`) by modifying the `dssvd` algorithm so as to avoid forming `\(\mathbf{A-1\mu^\top}\)`, which would densify a sparse input. Thus, it is suitable for work with both dense and sparse inputs. + +## Algorithm + +Given an *m* `\(\times\)` *n* matrix `\(\mathbf{A}\)`, a target rank *k*, and an oversampling parameter *p*, this procedure computes a *k*-rank PCA by finding the unknowns in `\(\mathbf{A−1\mu^\top \approx U\Sigma V^\top}\)`: + +1. Create seed for random *n* `\(\times\)` *(k+p)* matrix `\(\Omega\)`. +2. `\(\mathbf{s_\Omega \leftarrow \Omega^\top \mu}\)`. +3. `\(\mathbf{Y_0 \leftarrow A\Omega − 1 {s_\Omega}^\top, Y \in \mathbb{R}^{m\times(k+p)}}\)`. +4. Column-orthonormalize `\(\mathbf{Y_0} \rightarrow \mathbf{Q}\)` by computing thin decomposition `\(\mathbf{Y_0} = \mathbf{QR}\)`. Also, `\(\mathbf{Q}\in\mathbb{R}^{m\times(k+p)}, \mathbf{R}\in\mathbb{R}^{(k+p)\times(k+p)}\)`. +5. `\(\mathbf{s_Q \leftarrow Q^\top 1}\)`. +6. `\(\mathbf{B_0 \leftarrow Q^\top A: B \in \mathbb{R}^{(k+p)\times n}}\)`. +7. `\(\mathbf{s_B \leftarrow {B_0}^\top \mu}\)`. +8. For *i* in 1..*q* repeat (power iterations): + - For *j* in 1..*n* apply `\(\mathbf{(B_{i−1})_{∗j} \leftarrow (B_{i−1})_{∗j}−\mu_j s_Q}\)`. + - `\(\mathbf{Y_i \leftarrow A{B_{i−1}}^\top−1(s_B−\mu^\top \mu s_Q)^\top}\)`. + - Column-orthonormalize `\(\mathbf{Y_i} \rightarrow \mathbf{Q}\)` by computing thin decomposition `\(\mathbf{Y_i = QR}\)`. + - `\(\mathbf{s_Q \leftarrow Q^\top 1}\)`. + - `\(\mathbf{B_i \leftarrow Q^\top A}\)`. + - `\(\mathbf{s_B \leftarrow {B_i}^\top \mu}\)`. +9. Let `\(\mathbf{C \triangleq s_Q {s_B}^\top}\)`. `\(\mathbf{M \leftarrow B_q {B_q}^\top − C − C^\top + \mu^\top \mu s_Q {s_Q}^\top}\)`. +10. Compute an eigensolution of the small symmetric `\(\mathbf{M = \hat{U} \Lambda \hat{U}^\top: M \in \mathbb{R}^{(k+p)\times(k+p)}}\)`. +11. The singular values `\(\Sigma = \Lambda^{\circ 0.5}\)`, or, in other words, `\(\mathbf{\sigma_i= \sqrt{\lambda_i}}\)`. +12. If needed, compute `\(\mathbf{U = Q\hat{U}}\)`. +13. If needed, compute `\(\mathbf{V = B^\top \hat{U} \Sigma^{−1}}\)`. +14. If needed, items converted to the PCA space can be computed as `\(\mathbf{U\Sigma}\)`. + +## Implementation + +Mahout `dspca(...)` is implemented in the mahout `math-scala` algebraic optimizer which translates Mahout's R-like linear algebra operators into a physical plan for both Spark and H2O distributed engines. + + def dspca[K](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + (DrmLike[K], DrmLike[Int], Vector) = { + + // Some mapBlock() calls need it + implicit val ktag = drmA.keyClassTag + + val drmAcp = drmA.checkpoint() + implicit val ctx = drmAcp.context + + val m = drmAcp.nrow + val n = drmAcp.ncol + assert(k <= (m min n), "k cannot be greater than smaller of m, n.") + val pfxed = safeToNonNegInt((m min n) - k min p) + + // Actual decomposition rank + val r = k + pfxed + + // Dataset mean + val mu = drmAcp.colMeans + + val mtm = mu dot mu + + // We represent Omega by its seed. + val omegaSeed = RandomUtils.getRandom().nextInt() + val omega = Matrices.symmetricUniformView(n, r, omegaSeed) + + // This done in front in a single-threaded fashion for now. Even though it doesn't require any + // memory beyond that is required to keep xi around, it still might be parallelized to backs + // for significantly big n and r. TODO + val s_o = omega.t %*% mu + + val bcastS_o = drmBroadcast(s_o) + val bcastMu = drmBroadcast(mu) + + var drmY = drmAcp.mapBlock(ncol = r) { + case (keys, blockA) ⇒ + val s_o:Vector = bcastS_o + val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) + for (row ← 0 until blockY.nrow) blockY(row, ::) -= s_o + keys → blockY + } + // Checkpoint Y + .checkpoint() + + var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint() + + var s_q = drmQ.colSums() + var bcastVarS_q = drmBroadcast(s_q) + + // This actually should be optimized as identically partitioned map-side A'B since A and Q should + // still be identically partitioned. + var drmBt = (drmAcp.t %*% drmQ).checkpoint() + + var s_b = (drmBt.t %*% mu).collect(::, 0) + var bcastVarS_b = drmBroadcast(s_b) + + for (i ← 0 until q) { + + // These closures don't seem to live well with outside-scope vars. This doesn't record closure + // attributes correctly. So we create additional set of vals for broadcast vars to properly + // create readonly closure attributes in this very scope. + val bcastS_q = bcastVarS_q + val bcastMuInner = bcastMu + + // Fix Bt as B' -= xi cross s_q + drmBt = drmBt.mapBlock() { + case (keys, block) ⇒ + val s_q: Vector = bcastS_q + val mu: Vector = bcastMuInner + keys.zipWithIndex.foreach { + case (key, idx) ⇒ block(idx, ::) -= s_q * mu(key) + } + keys → block + } + + drmY.uncache() + drmQ.uncache() + + val bCastSt_b = drmBroadcast(s_b -=: mtm * s_q) + + drmY = (drmAcp %*% drmBt) + // Fix Y by subtracting st_b from each row of the AB' + .mapBlock() { + case (keys, block) ⇒ + val st_b: Vector = bCastSt_b + block := { (_, c, v) ⇒ v - st_b(c) } + keys → block + } + // Checkpoint Y + .checkpoint() + + drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint() + + s_q = drmQ.colSums() + bcastVarS_q = drmBroadcast(s_q) + + // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not + // identically partitioned anymore. + drmBt = (drmAcp.t %*% drmQ).checkpoint() + + s_b = (drmBt.t %*% mu).collect(::, 0) + bcastVarS_b = drmBroadcast(s_b) + } + + val c = s_q cross s_b + val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -=: + c -=: c.t +=: mtm *=: (s_q cross s_q) + val (inCoreUHat, d) = eigen(inCoreBBt) + val s = d.sqrt + + // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags + // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it? + val drmU = drmQ %*% inCoreUHat + val drmV = drmBt %*% (inCoreUHat %*% diagv(1 / s)) + + (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k)) + } + +## Usage + +The scala `dspca(...)` method can easily be called in any Spark, Flink, or H2O application built with the `math-scala` library and the corresponding `Spark`, `Flink`, or `H2O` engine module as follows: + + import org.apache.mahout.math._ + import decompositions._ + import drm._ + + val (drmU, drmV, s) = dspca(drmA, k=200, q=1) + +Note the parameter is optional and its default value is zero. + +## References + +[1]: Lyubimov and Palumbo, ["Apache Mahout: Beyond MapReduce; Distributed Algorithm Design"](https://www.amazon.com/Apache-Mahout-MapReduce-Dmitriy-Lyubimov/dp/1523775785) http://git-wip-us.apache.org/repos/asf/mahout/blob/54ef150e/website/_pages/docs/0.13.0/algorithms/d-ssvd.md ---------------------------------------------------------------------- diff --git a/website/_pages/docs/0.13.0/algorithms/d-ssvd.md b/website/_pages/docs/0.13.0/algorithms/d-ssvd.md new file mode 100644 index 0000000..f4ed1a8 --- /dev/null +++ b/website/_pages/docs/0.13.0/algorithms/d-ssvd.md @@ -0,0 +1,141 @@ +--- +layout: mahoutdoc +title: Mahout Samsara DSSVD +permalink: /docs/0.13.0/algorithms/samsara/dssvd +--- +# Distributed Stochastic Singular Value Decomposition + + +## Intro + +Mahout has a distributed implementation of Stochastic Singular Value Decomposition [1] using the parallelization strategy comprehensively defined in Nathan Halko's dissertation ["Randomized methods for computing low-rank approximations of matrices"](http://amath.colorado.edu/faculty/martinss/Pubs/2012_halko_dissertation.pdf) [2]. + +## Modified SSVD Algorithm + +Given an `\(m\times n\)` +matrix `\(\mathbf{A}\)`, a target rank `\(k\in\mathbb{N}_{1}\)` +, an oversampling parameter `\(p\in\mathbb{N}_{1}\)`, +and the number of additional power iterations `\(q\in\mathbb{N}_{0}\)`, +this procedure computes an `\(m\times\left(k+p\right)\)` +SVD `\(\mathbf{A\approx U}\boldsymbol{\Sigma}\mathbf{V}^{\top}\)`: + + 1. Create seed for random `\(n\times\left(k+p\right)\)` + matrix `\(\boldsymbol{\Omega}\)`. The seed defines matrix `\(\mathbf{\Omega}\)` + using Gaussian unit vectors per one of suggestions in [Halko, Martinsson, Tropp]. + + 2. `\(\mathbf{Y=A\boldsymbol{\Omega}},\,\mathbf{Y}\in\mathbb{R}^{m\times\left(k+p\right)}\)` + + 3. Column-orthonormalize `\(\mathbf{Y}\rightarrow\mathbf{Q}\)` + by computing thin decomposition `\(\mathbf{Y}=\mathbf{Q}\mathbf{R}\)`. + Also, `\(\mathbf{Q}\in\mathbb{R}^{m\times\left(k+p\right)},\,\mathbf{R}\in\mathbb{R}^{\left(k+p\right)\times\left(k+p\right)}\)`; denoted as `\(\mathbf{Q}=\mbox{qr}\left(\mathbf{Y}\right).\mathbf{Q}\)` + + 4. `\(\mathbf{B}_{0}=\mathbf{Q}^{\top}\mathbf{A}:\,\,\mathbf{B}\in\mathbb{R}^{\left(k+p\right)\times n}\)`. + + 5. If `\(q>0\)` + repeat: for `\(i=1..q\)`: + `\(\mathbf{B}_{i}^{\top}=\mathbf{A}^{\top}\mbox{qr}\left(\mathbf{A}\mathbf{B}_{i-1}^{\top}\right).\mathbf{Q}\)` + (power iterations step). + + 6. Compute Eigensolution of a small Hermitian `\(\mathbf{B}_{q}\mathbf{B}_{q}^{\top}=\mathbf{\hat{U}}\boldsymbol{\Lambda}\mathbf{\hat{U}}^{\top}\)`, + `\(\mathbf{B}_{q}\mathbf{B}_{q}^{\top}\in\mathbb{R}^{\left(k+p\right)\times\left(k+p\right)}\)`. + + 7. Singular values `\(\mathbf{\boldsymbol{\Sigma}}=\boldsymbol{\Lambda}^{0.5}\)`, + or, in other words, `\(s_{i}=\sqrt{\sigma_{i}}\)`. + + 8. If needed, compute `\(\mathbf{U}=\mathbf{Q}\hat{\mathbf{U}}\)`. + + 9. If needed, compute `\(\mathbf{V}=\mathbf{B}_{q}^{\top}\hat{\mathbf{U}}\boldsymbol{\Sigma}^{-1}\)`. +Another way is `\(\mathbf{V}=\mathbf{A}^{\top}\mathbf{U}\boldsymbol{\Sigma}^{-1}\)`. + + + + +## Implementation + +Mahout `dssvd(...)` is implemented in the mahout `math-scala` algebraic optimizer which translates Mahout's R-like linear algebra operators into a physical plan for both Spark and H2O distributed engines. + + def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + (DrmLike[K], DrmLike[Int], Vector) = { + + val drmAcp = drmA.checkpoint() + + val m = drmAcp.nrow + val n = drmAcp.ncol + assert(k <= (m min n), "k cannot be greater than smaller of m, n.") + val pfxed = safeToNonNegInt((m min n) - k min p) + + // Actual decomposition rank + val r = k + pfxed + + // We represent Omega by its seed. + val omegaSeed = RandomUtils.getRandom().nextInt() + + // Compute Y = A*Omega. + var drmY = drmAcp.mapBlock(ncol = r) { + case (keys, blockA) => + val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) + keys -> blockY + } + + var drmQ = dqrThin(drmY.checkpoint())._1 + + // Checkpoint Q if last iteration + if (q == 0) drmQ = drmQ.checkpoint() + + var drmBt = drmAcp.t %*% drmQ + + // Checkpoint B' if last iteration + if (q == 0) drmBt = drmBt.checkpoint() + + for (i <- 0 until q) { + drmY = drmAcp %*% drmBt + drmQ = dqrThin(drmY.checkpoint())._1 + + // Checkpoint Q if last iteration + if (i == q - 1) drmQ = drmQ.checkpoint() + + drmBt = drmAcp.t %*% drmQ + + // Checkpoint B' if last iteration + if (i == q - 1) drmBt = drmBt.checkpoint() + } + + val (inCoreUHat, d) = eigen(drmBt.t %*% drmBt) + val s = d.sqrt + + // Since neither drmU nor drmV are actually computed until actually used + // we don't need the flags instructing compute (or not compute) either of the U,V outputs + val drmU = drmQ %*% inCoreUHat + val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s)) + + (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k)) + } + +Note: As a side effect of checkpointing, U and V values are returned as logical operators (i.e. they are neither checkpointed nor computed). Therefore there is no physical work actually done to compute `\(\mathbf{U}\)` or `\(\mathbf{V}\)` until they are used in a subsequent expression. + + +## Usage + +The scala `dssvd(...)` method can easily be called in any Spark or H2O application built with the `math-scala` library and the corresponding `Spark` or `H2O` engine module as follows: + + import org.apache.mahout.math._ + import decompositions._ + import drm._ + + + val(drmU, drmV, s) = dssvd(drma, k = 40, q = 1) + + +## References + +[1]: [Mahout Scala and Mahout Spark Bindings for Linear Algebra Subroutines](http://mahout.apache.org/users/sparkbindings/ScalaSparkBindings.pdf) + +[2]: [Randomized methods for computing low-rank +approximations of matrices](http://amath.colorado.edu/faculty/martinss/Pubs/2012_halko_dissertation.pdf) + +[2]: [Halko, Martinsson, Tropp](http://arxiv.org/abs/0909.4061) + +[3]: [Mahout Spark and Scala Bindings](http://mahout.apache.org/users/sparkbindings/home.html) + + + http://git-wip-us.apache.org/repos/asf/mahout/blob/54ef150e/website/_pages/docs/0.13.0/mahout-samsara/faq.md ---------------------------------------------------------------------- diff --git a/website/_pages/docs/0.13.0/mahout-samsara/faq.md b/website/_pages/docs/0.13.0/mahout-samsara/faq.md new file mode 100644 index 0000000..2e5301c --- /dev/null +++ b/website/_pages/docs/0.13.0/mahout-samsara/faq.md @@ -0,0 +1,50 @@ +--- +layout: mahoutdoc +title: Mahout Samsara +permalink: /docs/0.13.0/mahout-samsara/faq/ +--- +# FAQ for using Mahout with Spark + +**Q: Mahout Spark shell doesn't start; "ClassNotFound" problems or various classpath problems.** + +**A:** So far as of the time of this writing all reported problems starting the Spark shell in Mahout were revolving +around classpath issues one way or another. + +If you are getting method signature like errors, most probably you have mismatch between Mahout's Spark dependency +and actual Spark installed. (At the time of this writing the HEAD depends on Spark 1.1.0) but check mahout/pom.xml. + +Troubleshooting general classpath issues is pretty straightforward. Since Mahout is using Spark's installation +and its classpath as reported by Spark itself for Spark-related dependencies, it is important to make sure +the classpath is sane and is made available to Mahout: + +1. Check Spark is of correct version (same as in Mahout's poms), is compiled and SPARK_HOME is set. +2. Check Mahout is compiled and MAHOUT_HOME is set. +3. Run `$SPARK_HOME/bin/compute-classpath.sh` and make sure it produces sane result with no errors. +If it outputs something other than a straightforward classpath string, most likely Spark is not compiled/set correctly (later spark versions require +`sbt/sbt assembly` to be run, simply runnig `sbt/sbt publish-local` is not enough any longer). +4. Run `$MAHOUT_HOME/bin/mahout -spark classpath` and check that path reported in step (3) is included. + +**Q: I am using the command line Mahout jobs that run on Spark or am writing my own application that uses +Mahout's Spark code. When I run the code on my cluster I get ClassNotFound or signature errors during serialization. +What's wrong?** + +**A:** The Spark artifacts in the maven ecosystem may not match the exact binary you are running on your cluster. This may +cause class name or version mismatches. In this case you may wish +to build Spark yourself to guarantee that you are running exactly what you are building Mahout against. To do this follow these steps +in order: + +1. Build Spark with maven, but **do not** use the "package" target as described on the Spark site. Build with the "clean install" target instead. +Something like: "mvn clean install -Dhadoop1.2.1" or whatever your particular build options are. This will put the jars for Spark +in the local maven cache. +2. Deploy **your** Spark build to your cluster and test it there. +3. Build Mahout. This will cause maven to pull the jars for Spark from the local maven cache and may resolve missing +or mis-identified classes. +4. if you are building your own code do so against the local builds of Spark and Mahout. + +**Q: The implicit SparkContext 'sc' does not work in the Mahout spark-shell.** + +**A:** In the Mahout spark-shell the SparkContext is called 'sdc', where the 'd' stands for distributed. + + + + http://git-wip-us.apache.org/repos/asf/mahout/blob/54ef150e/website/_pages/docs/0.13.0/mahout-samsara/flink-bindings.md ---------------------------------------------------------------------- diff --git a/website/_pages/docs/0.13.0/mahout-samsara/flink-bindings.md b/website/_pages/docs/0.13.0/mahout-samsara/flink-bindings.md new file mode 100644 index 0000000..8d9260a --- /dev/null +++ b/website/_pages/docs/0.13.0/mahout-samsara/flink-bindings.md @@ -0,0 +1,48 @@ +--- +layout: mahoutdoc +title: Mahout Samsara Flink +permalink: /docs/0.13.0/mahout-samsara/flink-bindings/ +--- +#Introduction + +This document provides an overview of how the Mahout Samsara environment is implemented over the Apache Flink backend engine. This document gives an overview of the code layout for the Flink backend engine, the source code for which can be found under /flink directory in the Mahout codebase. + +Apache Flink is a distributed big data streaming engine that supports both Streaming and Batch interfaces. Batch processing is an extension of Flink’s Stream processing engine. + +The Mahout Flink integration presently supports Flink’s batch processing capabilities leveraging the DataSet API. + +The Mahout DRM, or Distributed Row Matrix, is an abstraction for storing a large matrix of numbers in-memory in a cluster by distributing logical rows among servers. Mahout's scala DSL provides an abstract API on DRMs for backend engines to provide implementations of this API. An example is the Spark backend engine. Each engine has it's own design of mapping the abstract API onto its data model and provides implementations for algebraic operators over that mapping. + +#Flink Overview + +Apache Flink is an open source, distributed Stream and Batch Processing Framework. At it's core, Flink is a Stream Processing engine and Batch processing is an extension of Stream Processing. + +Flink includes several APIs for building applications with the Flink Engine: + +
    +
  1. DataSet API for Batch data in Java, Scala and Python
  2. +
  3. DataStream API for Stream Processing in Java and Scala
  4. +
  5. Table API with SQL-like regular expression language in Java and Scala
  6. +
  7. Gelly Graph Processing API in Java and Scala
  8. +
  9. CEP API, a complex event processing library
  10. +
  11. FlinkML, a Machine Learning library
  12. +
+#Flink Environment Engine + +The Flink backend implements the abstract DRM as a Flink DataSet. A Flink job runs in the context of an ExecutionEnvironment (from the Flink Batch processing API). + +#Source Layout + +Within mahout.git, the top level directory, flink/ holds all the source code for the Flink backend engine. Sections of code that interface with the rest of the Mahout components are in Scala, and sections of the code that interface with Flink DataSet API and implement algebraic operators are in Java. Here is a brief overview of what functionality can be found within flink/ folder. + +flink/ - top level directory containing all Flink related code + +flink/src/main/scala/org/apache/mahout/flinkbindings/blas/*.scala - Physical operator code for the Samsara DSL algebra + +flink/src/main/scala/org/apache/mahout/flinkbindings/drm/*.scala - Flink Dataset DRM and broadcast implementation + +flink/src/main/scala/org/apache/mahout/flinkbindings/io/*.scala - Read / Write between DRMDataSet and files on HDFS + +flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala - DSL operator graph evaluator and various abstract API implementations for a distributed engine. + + http://git-wip-us.apache.org/repos/asf/mahout/blob/54ef150e/website/_pages/docs/0.13.0/mahout-samsara/h2o-internals.md ---------------------------------------------------------------------- diff --git a/website/_pages/docs/0.13.0/mahout-samsara/h2o-internals.md b/website/_pages/docs/0.13.0/mahout-samsara/h2o-internals.md new file mode 100644 index 0000000..a6ae6e6 --- /dev/null +++ b/website/_pages/docs/0.13.0/mahout-samsara/h2o-internals.md @@ -0,0 +1,49 @@ +--- +layout: mahoutdoc +title: Mahout Samsara H20 +permalink: /docs/0.13.0/mahout-samsara/h20-bindings/ +--- +# Introduction + +This document provides an overview of how the Mahout Samsara environment is implemented over the H2O backend engine. The document is aimed at Mahout developers, to give a high level description of the design so that one can explore the code inside `h2o/` with some context. + +## H2O Overview + +H2O is a distributed scalable machine learning system. Internal architecture of H2O has a distributed math engine (h2o-core) and a separate layer on top for algorithms and UI. The Mahout integration requires only the math engine (h2o-core). + +## H2O Data Model + +The data model of the H2O math engine is a distributed columnar store (of primarily numbers, but also strings). A column of numbers is called a Vector, which is broken into Chunks (of a few thousand elements). Chunks are distributed across the cluster based on a deterministic hash. Therefore, any member of the cluster knows where a particular Chunk of a Vector is homed. Each Chunk is separately compressed in memory and elements are individually decompressed on the fly upon access with purely register operations (thereby achieving high memory throughput). An ordered set of similarly partitioned Vecs are composed into a Frame. A Frame is therefore a large two dimensional table of numbers. All elements of a logical row in the Frame are guaranteed to be homed in the same server of the cluster. Generally speaking, H2O works well on "tall skinny" data, i.e, lots of rows (100s of millions) and modest number of columns (10s of thousands). + + +## Mahout DRM + +The Mahout DRM, or Distributed Row Matrix, is an abstraction for storing a large matrix of numbers in-memory in a cluster by distributing logical rows among servers. Mahout's scala DSL provides an abstract API on DRMs for backend engines to provide implementations of this API. Examples are the Spark and H2O backend engines. Each engine has it's own design of mapping the abstract API onto its data model and provides implementations for algebraic operators over that mapping. + + +## H2O Environment Engine + +The H2O backend implements the abstract DRM as an H2O Frame. Each logical column in the DRM is an H2O Vector. All elements of a logical DRM row are guaranteed to be homed on the same server. A set of rows stored on a server are presented as a read-only virtual in-core Matrix (i.e BlockMatrix) for the closure method in the `mapBlock(...)` API. + +H2O provides a flexible execution framework called `MRTask`. The `MRTask` framework typically executes over a Frame (or even a Vector), supports various types of map() methods, can optionally modify the Frame or Vector (though this never happens in the Mahout integration), and optionally create a new Vector or set of Vectors (to combine them into a new Frame, and consequently a new DRM). + + +## Source Layout + +Within mahout.git, the top level directory, `h2o/` holds all the source code related to the H2O backend engine. Part of the code (that interfaces with the rest of the Mahout componenets) is in Scala, and part of the code (that interfaces with h2o-core and implements algebraic operators) is in Java. Here is a brief overview of what functionality can be found where within `h2o/`. + + h2o/ - top level directory containing all H2O related code + + h2o/src/main/java/org/apache/mahout/h2obindings/ops/*.java - Physical operator code for the various DSL algebra + + h2o/src/main/java/org/apache/mahout/h2obindings/drm/*.java - DRM backing (onto Frame) and Broadcast implementation + + h2o/src/main/java/org/apache/mahout/h2obindings/H2OHdfs.java - Read / Write between DRM (Frame) and files on HDFS + + h2o/src/main/java/org/apache/mahout/h2obindings/H2OBlockMatrix.java - A vertical block matrix of DRM presented as a virtual copy-on-write in-core Matrix. Used in mapBlock() API + + h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java - A collection of various functionality and helpers. For e.g, convert between in-core Matrix and DRM, various summary statistics on DRM/Frame. + + h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala - DSL operator graph evaluator and various abstract API implementations for a distributed engine + + h2o/src/main/scala/org/apache/mahout/h2obindings/* - Various abstract API implementations ("glue work") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/54ef150e/website/_pages/docs/0.13.0/mahout-samsara/in-core-reference.md ---------------------------------------------------------------------- diff --git a/website/_pages/docs/0.13.0/mahout-samsara/in-core-reference.md b/website/_pages/docs/0.13.0/mahout-samsara/in-core-reference.md new file mode 100644 index 0000000..d754fdb --- /dev/null +++ b/website/_pages/docs/0.13.0/mahout-samsara/in-core-reference.md @@ -0,0 +1,302 @@ +--- +layout: mahoutdoc +title: Mahout Samsara In Core +permalink: /docs/0.13.0/mahout-samsara/incore/ +--- +## Mahout-Samsara's In-Core Linear Algebra DSL Reference + +#### Imports + +The following imports are used to enable Mahout-Samsara's Scala DSL bindings for in-core Linear Algebra: + + import org.apache.mahout.math._ + import scalabindings._ + import RLikeOps._ + +#### Inline initalization + +Dense vectors: + + val densVec1: Vector = (1.0, 1.1, 1.2) + val denseVec2 = dvec(1, 0, 1,1 ,1,2) + +Sparse vectors: + + val sparseVec1: Vector = (5 -> 1.0) :: (10 -> 2.0) :: Nil + val sparseVec1 = svec((5 -> 1.0) :: (10 -> 2.0) :: Nil) + + // to create a vector with specific cardinality + val sparseVec1 = svec((5 -> 1.0) :: (10 -> 2.0) :: Nil, cardinality = 20) + +Inline matrix initialization, either sparse or dense, is always done row wise. + +Dense matrices: + + val A = dense((1, 2, 3), (3, 4, 5)) + +Sparse matrices: + + val A = sparse( + (1, 3) :: Nil, + (0, 2) :: (1, 2.5) :: Nil + ) + +Diagonal matrix with constant diagonal elements: + + diag(3.5, 10) + +Diagonal matrix with main diagonal backed by a vector: + + diagv((1, 2, 3, 4, 5)) + +Identity matrix: + + eye(10) + +####Slicing and Assigning + +Getting a vector element: + + val d = vec(5) + +Setting a vector element: + + vec(5) = 3.0 + +Getting a matrix element: + + val d = m(3,5) + +Setting a matrix element: + + M(3,5) = 3.0 + +Getting a matrix row or column: + + val rowVec = M(3, ::) + val colVec = M(::, 3) + +Setting a matrix row or column via vector assignment: + + M(3, ::) := (1, 2, 3) + M(::, 3) := (1, 2, 3) + +Setting a subslices of a matrix row or column: + + a(0, 0 to 1) = (3, 5) + +Setting a subslices of a matrix row or column via vector assignment: + + a(0, 0 to 1) := (3, 5) + +Getting a matrix as from matrix contiguous block: + + val B = A(2 to 3, 3 to 4) + +Assigning a contiguous block to a matrix: + + A(0 to 1, 1 to 2) = dense((3, 2), (3 ,3)) + +Assigning a contiguous block to a matrix using the matrix assignment operator: + + A(o to 1, 1 to 2) := dense((3, 2), (3, 3)) + +Assignment operator used for copying between vectors or matrices: + + vec1 := vec2 + M1 := M2 + +Assignment operator using assignment through a functional literal for a matrix: + + M := ((row, col, x) => if (row == col) 1 else 0 + +Assignment operator using assignment through a functional literal for a vector: + + vec := ((index, x) => sqrt(x) + +#### BLAS-like operations + +Plus/minus either vector or numeric with assignment or not: + + a + b + a - b + a + 5.0 + a - 5.0 + +Hadamard (elementwise) product, either vector or matrix or numeric operands: + + a * b + a * 0.5 + +Operations with assignment: + + a += b + a -= b + a += 5.0 + a -= 5.0 + a *= b + a *= 5 + +*Some nuanced rules*: + +1/x in R (where x is a vector or a matrix) is elementwise inverse. In scala it would be expressed as: + + val xInv = 1 /: x + +and R's 5.0 - x would be: + + val x1 = 5.0 -: x + +*note: All assignment operations, including :=, return the assignee just like in C++*: + + a -= b + +assigns **a - b** to **b** (in-place) and returns **b**. Similarly for **a /=: b** or **1 /=: v** + + +Dot product: + + a dot b + +Matrix and vector equivalency (or non-equivalency). **Dangerous, exact equivalence is rarely useful, better to use norm comparisons with an allowance of small errors.** + + a === b + a !== b + +Matrix multiply: + + a %*% b + +Optimized Right Multiply with a diagonal matrix: + + diag(5, 5) :%*% b + +Optimized Left Multiply with a diagonal matrix: + + A %*%: diag(5, 5) + +Second norm, of a vector or matrix: + + a.norm + +Transpose: + + val Mt = M.t + +*note: Transposition is currently handled via view, i.e. updating a transposed matrix will be updating the original.* Also computing something like `\(\mathbf{X^\top}\mathbf{X}\)`: + + val XtX = X.t %*% X + +will not therefore incur any additional data copying. + +#### Decompositions + +Matrix decompositions require an additional import: + + import org.apache.mahout.math.decompositions._ + + +All arguments in the following are matricies. + +**Cholesky decomposition** + + val ch = chol(M) + +**SVD** + + val (U, V, s) = svd(M) + +**EigenDecomposition** + + val (V, d) = eigen(M) + +**QR decomposition** + + val (Q, R) = qr(M) + +**Rank**: Check for rank deficiency (runs rank-revealing QR) + + M.isFullRank + +**In-core SSVD** + + Val (U, V, s) = ssvd(A, k = 50, p = 15, q = 1) + +**Solving linear equation systems and matrix inversion:** fully similar to R semantics; there are three forms of invocation: + + +Solve `\(\mathbf{AX}=\mathbf{B}\)`: + + solve(A, B) + +Solve `\(\mathbf{Ax}=\mathbf{b}\)`: + + solve(A, b) + +Compute `\(\mathbf{A^{-1}}\)`: + + solve(A) + +#### Misc + +Vector cardinality: + + a.length + +Matrix cardinality: + + m.nrow + m.ncol + +Means and sums: + + m.colSums + m.colMeans + m.rowSums + m.rowMeans + +Copy-By-Value: + + val b = a cloned + +#### Random Matrices + +`\(\mathcal{U}\)`(0,1) random matrix view: + + val incCoreA = Matrices.uniformView(m, n, seed) + + +`\(\mathcal{U}\)`(-1,1) random matrix view: + + val incCoreA = Matrices.symmetricUniformView(m, n, seed) + +`\(\mathcal{N}\)`(-1,1) random matrix view: + + val incCoreA = Matrices.gaussianView(m, n, seed) + +#### Iterators + +Mahout-Math already exposes a number of iterators. Scala code just needs the following imports to enable implicit conversions to scala iterators. + + import collection._ + import JavaConversions._ + +Iterating over rows in a Matrix: + + for (row <- m) { + ... do something with row + } + + + +For more information including information on Mahout-Samsara's out-of-core Linear algebra bindings see: [Mahout Scala Bindings and Mahout Spark Bindings for Linear Algebra Subroutines](http://mahout.apache.org/users/sparkbindings/ScalaSparkBindings.pdf) + + http://git-wip-us.apache.org/repos/asf/mahout/blob/54ef150e/website/_pages/docs/0.13.0/mahout-samsara/out-of-core-reference.md ---------------------------------------------------------------------- diff --git a/website/_pages/docs/0.13.0/mahout-samsara/out-of-core-reference.md b/website/_pages/docs/0.13.0/mahout-samsara/out-of-core-reference.md new file mode 100644 index 0000000..c6cdddc --- /dev/null +++ b/website/_pages/docs/0.13.0/mahout-samsara/out-of-core-reference.md @@ -0,0 +1,316 @@ +--- +layout: mahoutdoc +title: Mahout Samsara Out of Core +permalink: /docs/0.13.0/mahout-samsara/outofcore/ +--- +# Mahout-Samsara's Distributed Linear Algebra DSL Reference + +**Note: this page is meant only as a quick reference to Mahout-Samsara's R-Like DSL semantics. For more information, including information on Mahout-Samsara's Algebraic Optimizer please see: [Mahout Scala Bindings and Mahout Spark Bindings for Linear Algebra Subroutines](http://mahout.apache.org/users/sparkbindings/ScalaSparkBindings.pdf).** + +The subjects of this reference are solely applicable to Mahout-Samsara's **DRM** (distributed row matrix). + +In this reference, DRMs will be denoted as e.g. `A`, and in-core matrices as e.g. `inCoreA`. + +#### Imports + +The following imports are used to enable seamless in-core and distributed algebraic DSL operations: + + import org.apache.mahout.math._ + import scalabindings._ + import RLikeOps._ + import drm._ + import RLikeDRMOps._ + +If working with mixed scala/java code: + + import collection._ + import JavaConversions._ + +If you are working with Mahout-Samsara's Spark-specific operations e.g. for context creation: + + import org.apache.mahout.sparkbindings._ + +The Mahout shell does all of these imports automatically. + + +#### DRM Persistence operators + +**Mahout-Samsara's DRM persistance to HDFS is compatible with all Mahout-MapReduce algorithms such as seq2sparse.** + + +Loading a DRM from (HD)FS: + + drmDfsRead(path = hdfsPath) + +Parallelizing from an in-core matrix: + + val inCoreA = (dense(1, 2, 3), (3, 4, 5)) + val A = drmParallelize(inCoreA) + +Creating an empty DRM: + + val A = drmParallelizeEmpty(100, 50) + +Collecting to driver's jvm in-core: + + val inCoreA = A.collect + +**Warning: The collection of distributed matrices happens implicitly whenever conversion to an in-core (o.a.m.math.Matrix) type is required. E.g.:** + + val inCoreA: Matrix = ... + val drmB: DrmLike[Int] =... + val inCoreC: Matrix = inCoreA %*%: drmB + +**implies (incoreA %*%: drmB).collect** + +Collecting to (HD)FS as a Mahout's DRM formatted file: + + A.dfsWrite(path = hdfsPath) + +#### Logical algebraic operators on DRM matrices: + +A logical set of operators are defined for distributed matrices as a subset of those defined for in-core matrices. In particular, since all distributed matrices are immutable, there are no assignment operators (e.g. **A += B**) +*Note: please see: [Mahout Scala Bindings and Mahout Spark Bindings for Linear Algebra Subroutines](http://mahout.apache.org/users/sparkbindings/ScalaSparkBindings.pdf) for information on Mahout-Samsars's Algebraic Optimizer, and translation from logical operations to a physical plan for the back end.* + + +Cache a DRM and trigger an optimized physical plan: + + drmA.checkpoint(CacheHint.MEMORY_AND_DISK) + +Other valid caching Instructions: + + drmA.checkpoint(CacheHint.NONE) + drmA.checkpoint(CacheHint.DISK_ONLY) + drmA.checkpoint(CacheHint.DISK_ONLY_2) + drmA.checkpoint(CacheHint.MEMORY_ONLY) + drmA.checkpoint(CacheHint.MEMORY_ONLY_2) + drmA.checkpoint(CacheHint.MEMORY_ONLY_SER + drmA.checkpoint(CacheHint.MEMORY_ONLY_SER_2) + drmA.checkpoint(CacheHint.MEMORY_AND_DISK_2) + drmA.checkpoint(CacheHint.MEMORY_AND_DISK_SER) + drmA.checkpoint(CacheHint.MEMORY_AND_DISK_SER_2) + +*Note: Logical DRM operations are lazily computed. Currently the actual computations and optional caching will be triggered by dfsWrite(...), collect(...) and blockify(...).* + + + +Transposition: + + A.t + +Elementwise addition *(Matrices of identical geometry and row key types)*: + + A + B + +Elementwise subtraction *(Matrices of identical geometry and row key types)*: + + A - B + +Elementwise multiplication (Hadamard) *(Matrices of identical geometry and row key types)*: + + A * B + +Elementwise division *(Matrices of identical geometry and row key types)*: + + A / B + +**Elementwise operations involving one in-core argument (int-keyed DRMs only)**: + + A + inCoreB + A - inCoreB + A * inCoreB + A / inCoreB + A :+ inCoreB + A :- inCoreB + A :* inCoreB + A :/ inCoreB + inCoreA +: B + inCoreA -: B + inCoreA *: B + inCoreA /: B + +Note the Spark associativity change (e.g. `A *: inCoreB` means `B.leftMultiply(A`), same as when both arguments are in core). Whenever operator arguments include both in-core and out-of-core arguments, the operator can only be associated with the out-of-core (DRM) argument to support the distributed implementation. + +**Matrix-matrix multiplication %*%**: + +`\(\mathbf{M}=\mathbf{AB}\)` + + A %*% B + A %*% inCoreB + A %*% inCoreDiagonal + A %*%: B + + +*Note: same as above, whenever operator arguments include both in-core and out-of-core arguments, the operator can only be associated with the out-of-core (DRM) argument to support the distributed implementation.* + +**Matrix-vector multiplication %*%** +Currently we support a right multiply product of a DRM and an in-core Vector(`\(\mathbf{Ax}\)`) resulting in a single column DRM, which then can be collected in front (usually the desired outcome): + + val Ax = A %*% x + val inCoreX = Ax.collect(::, 0) + + +**Matrix-scalar +,-,*,/** +Elementwise operations of every matrix element and a scalar: + + A + 5.0 + A - 5.0 + A :- 5.0 + 5.0 -: A + A * 5.0 + A / 5.0 + 5.0 /: a + +Note that `5.0 -: A` means `\(m_{ij} = 5 - a_{ij}\)` and `5.0 /: A` means `\(m_{ij} = \frac{5}{a{ij}}\)` for all elements of the result. + + +#### Slicing + +General slice: + + A(100 to 200, 100 to 200) + +Horizontal Block: + + A(::, 100 to 200) + +Vertical Block: + + A(100 to 200, ::) + +*Note: if row range is not all-range (::) the the DRM must be `Int`-keyed. General case row slicing is not supported by DRMs with key types other than `Int`*. + + +#### Stitching + +Stitch side by side (cbind R semantics): + + val drmAnextToB = drmA cbind drmB + +Stitch side by side (Scala): + + val drmAnextToB = drmA.cbind(drmB) + +Analogously, vertical concatenation is available via **rbind** + +#### Custom pipelines on blocks +Internally, Mahout-Samsara's DRM is represented as a distributed set of vertical (Key, Block) tuples. + +**drm.mapBlock(...)**: + +The DRM operator `mapBlock` provides transformational access to the distributed vertical blockified tuples of a matrix (Row-Keys, Vertical-Matrix-Block). + +Using `mapBlock` to add 1.0 to a DRM: + + val inCoreA = dense((1, 2, 3), (2, 3 , 4), (3, 4, 5)) + val drmA = drmParallelize(inCoreA) + val B = A.mapBlock() { + case (keys, block) => keys -> (block += 1.0) + } + +#### Broadcasting Vectors and matrices to closures +Generally we can create and use one-way closure attributes to be used on the back end. + +Scalar matrix multiplication: + + val factor: Int = 15 + val drm2 = drm1.mapBlock() { + case (keys, block) => block *= factor + keys -> block + } + +**Closure attributes must be java-serializable. Currently Mahout's in-core Vectors and Matrices are not java-serializable, and must be broadcast to the closure using `drmBroadcast(...)`**: + + val v: Vector ... + val bcastV = drmBroadcast(v) + val drm2 = drm1.mapBlock() { + case (keys, block) => + for(row <- 0 until block.nrow) block(row, ::) -= bcastV + keys -> block + } + +#### Computations providing ad-hoc summaries + + +Matrix cardinality: + + drmA.nrow + drmA.ncol + +*Note: depending on the stage of optimization, these may trigger a computational action. I.e. if one calls `nrow()` n times, then the back end will actually recompute `nrow` n times.* + +Means and sums: + + drmA.colSums + drmA.colMeans + drmA.rowSums + drmA.rowMeans + + +*Note: These will always trigger a computational action. I.e. if one calls `colSums()` n times, then the back end will actually recompute `colSums` n times.* + +#### Distributed Matrix Decompositions + +To import the decomposition package: + + import org.apache.mahout.math._ + import decompositions._ + +Distributed thin QR: + + val (drmQ, incoreR) = dqrThin(drmA) + +Distributed SSVD: + + val (drmU, drmV, s) = dssvd(drmA, k = 40, q = 1) + +Distributed SPCA: + + val (drmU, drmV, s) = dspca(drmA, k = 30, q = 1) + +Distributed regularized ALS: + + val (drmU, drmV, i) = dals(drmA, + k = 50, + lambda = 0.0, + maxIterations = 10, + convergenceThreshold = 0.10)) + +#### Adjusting parallelism of computations + +Set the minimum parallelism to 100 for computations on `drmA`: + + drmA.par(min = 100) + +Set the exact parallelism to 100 for computations on `drmA`: + + drmA.par(exact = 100) + + +Set the engine specific automatic parallelism adjustment for computations on `drmA`: + + drmA.par(auto = true) + +#### Retrieving the engine specific data structure backing the DRM: + +**A Spark RDD:** + + val myRDD = drmA.checkpoint().rdd + +**An H2O Frame and Key Vec:** + + val myFrame = drmA.frame + val myKeys = drmA.keys + +**A Flink DataSet:** + + val myDataSet = drmA.ds + +For more information including information on Mahout-Samsara's Algebraic Optimizer and in-core Linear algebra bindings see: [Mahout Scala Bindings and Mahout Spark Bindings for Linear Algebra Subroutines](http://mahout.apache.org/users/sparkbindings/ScalaSparkBindings.pdf) + + + + + + + http://git-wip-us.apache.org/repos/asf/mahout/blob/54ef150e/website/_pages/docs/0.13.0/mahout-samsara/spark-bindings.md ---------------------------------------------------------------------- diff --git a/website/_pages/docs/0.13.0/mahout-samsara/spark-bindings.md b/website/_pages/docs/0.13.0/mahout-samsara/spark-bindings.md new file mode 100644 index 0000000..094db0c --- /dev/null +++ b/website/_pages/docs/0.13.0/mahout-samsara/spark-bindings.md @@ -0,0 +1,100 @@ +--- +layout: mahoutdoc +title: Mahout Samsara Spark +permalink: /docs/0.13.0/mahout-samsara/spark-bindings/ +--- + +# Scala & Spark Bindings: +*Bringing algebraic semantics* + +## What is Scala & Spark Bindings? + +In short, Scala & Spark Bindings for Mahout is Scala DSL and algebraic optimizer of something like this (actual formula from **(d)spca**) + + +`\[\mathbf{G}=\mathbf{B}\mathbf{B}^{\top}-\mathbf{C}-\mathbf{C}^{\top}+\mathbf{s}_{q}\mathbf{s}_{q}^{\top}\boldsymbol{\xi}^{\top}\boldsymbol{\xi}\]` + +bound to in-core and distributed computations (currently, on Apache Spark). + + +Mahout Scala & Spark Bindings expression of the above: + + val g = bt.t %*% bt - c - c.t + (s_q cross s_q) * (xi dot xi) + +The main idea is that a scientist writing algebraic expressions cannot care less of distributed +operation plans and works **entirely on the logical level** just like he or she would do with R. + +Another idea is decoupling logical expression from distributed back-end. As more back-ends are added, +this implies **"write once, run everywhere"**. + +The linear algebra side works with scalars, in-core vectors and matrices, and Mahout Distributed +Row Matrices (DRMs). + +The ecosystem of operators is built in the R's image, i.e. it follows R naming such as %*%, +colSums, nrow, length operating over vectors or matices. + +Important part of Spark Bindings is expression optimizer. It looks at expression as a whole +and figures out how it can be simplified, and which physical operators should be picked. For example, +there are currently about 5 different physical operators performing DRM-DRM multiplication +picked based on matrix geometry, distributed dataset partitioning, orientation etc. +If we count in DRM by in-core combinations, that would be another 4, i.e. 9 total -- all of it for just +simple x %*% y logical notation. + + + +Please refer to the documentation for details. + +## Status + +This environment addresses mostly R-like Linear Algebra optmizations for +Spark, Flink and H20. + + +## Documentation + +* Scala and Spark bindings manual: [web](http://apache.github.io/mahout/doc/ScalaSparkBindings.html), [pdf](ScalaSparkBindings.pdf) +* Overview blog on 0.10.x releases: [blog](http://www.weatheringthroughtechdays.com/2015/04/mahout-010x-first-mahout-release-as.html) + +## Distributed methods and solvers using Bindings + +* In-core ([ssvd]) and Distributed ([dssvd]) Stochastic SVD -- guinea pigs -- see the bindings manual +* In-core ([spca]) and Distributed ([dspca]) Stochastic PCA -- guinea pigs -- see the bindings manual +* Distributed thin QR decomposition ([dqrThin]) -- guinea pig -- see the bindings manual +* [Current list of algorithms](https://mahout.apache.org/users/basics/algorithms.html) + +[ssvd]: https://github.com/apache/mahout/blob/trunk/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala +[spca]: https://github.com/apache/mahout/blob/trunk/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala +[dssvd]: https://github.com/apache/mahout/blob/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala +[dspca]: https://github.com/apache/mahout/blob/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala +[dqrThin]: https://github.com/apache/mahout/blob/trunk/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala + + +## Related history of note + +* CLI and Driver for Spark version of item similarity -- [MAHOUT-1541](https://issues.apache.org/jira/browse/MAHOUT-1541) +* Command line interface for generalizable Spark pipelines -- [MAHOUT-1569](https://issues.apache.org/jira/browse/MAHOUT-1569) +* Cooccurrence Analysis / Item-based Recommendation -- [MAHOUT-1464](https://issues.apache.org/jira/browse/MAHOUT-1464) +* Spark Bindings -- [MAHOUT-1346](https://issues.apache.org/jira/browse/MAHOUT-1346) +* Scala Bindings -- [MAHOUT-1297](https://issues.apache.org/jira/browse/MAHOUT-1297) +* Interactive Scala & Spark Bindings Shell & Script processor -- [MAHOUT-1489](https://issues.apache.org/jira/browse/MAHOUT-1489) +* OLS tutorial using Mahout shell -- [MAHOUT-1542](https://issues.apache.org/jira/browse/MAHOUT-1542) +* Full abstraction of DRM apis and algorithms from a distributed engine -- [MAHOUT-1529](https://issues.apache.org/jira/browse/MAHOUT-1529) +* Port Naive Bayes -- [MAHOUT-1493](https://issues.apache.org/jira/browse/MAHOUT-1493) + +## Work in progress +* Text-delimited files for input and output -- [MAHOUT-1568](https://issues.apache.org/jira/browse/MAHOUT-1568) + + + +* *Your issue here!* + + + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/54ef150e/website/_pages/docs/0.13.0/quickstart.md ---------------------------------------------------------------------- diff --git a/website/_pages/docs/0.13.0/quickstart.md b/website/_pages/docs/0.13.0/quickstart.md new file mode 100644 index 0000000..fdb9ceb --- /dev/null +++ b/website/_pages/docs/0.13.0/quickstart.md @@ -0,0 +1,62 @@ +--- +layout: mahoutdoc +title: Quickstart +permalink: /docs/0.13.0/quickstart/ +--- +# Mahout Quick Start +# TODO : Fill this in with the bare essential basics + + + +# Mahout MapReduce Overview + +## Getting Mahout + +#### Download the latest release + +Download the latest release [here](http://www.apache.org/dyn/closer.cgi/mahout/). + +Or checkout the latest code from [here](http://mahout.apache.org/developers/version-control.html) + +#### Alternatively: Add Mahout 0.13.0 to a maven project + +Mahout is also available via a [maven repository](http://mvnrepository.com/artifact/org.apache.mahout) under the group id *org.apache.mahout*. +If you would like to import the latest release of mahout into a java project, add the following dependency in your *pom.xml*: + + + org.apache.mahout + mahout-mr + 0.13.0 + + + +## Features + +For a full list of Mahout's features see our [Features by Engine](http://mahout.apache.org/users/basics/algorithms.html) page. + + +## Using Mahout + +Mahout has prepared a bunch of examples and tutorials for users to quickly learn how to use its machine learning algorithms. + +#### Recommendations + +Check the [Recommender Quickstart](/users/recommender/quickstart.html) or the tutorial on [creating a userbased recommender in 5 minutes](/users/recommender/userbased-5-minutes.html). + +If you are building a recommender system for the first time, please also refer to a list of [Dos and Don'ts](/users/recommender/recommender-first-timer-faq.html) that might be helpful. + +#### Clustering + +Check the [Synthetic data](/users/clustering/clustering-of-synthetic-control-data.html) example. + +#### Classification + +If you are interested in how to train a **Naive Bayes** model, look at the [20 newsgroups](/users/classification/twenty-newsgroups.html) example. + +If you plan to build a **Hidden Markov Model** for speech recognition, the example [here](/users/classification/hidden-markov-models.html) might be instructive. + +Or you could build a **Random Forest** model by following this [quick start page](/users/classification/partial-implementation.html). + +#### Working with Text + +If you need to convert raw text into word vectors as input to clustering or classification algorithms, please refer to this page on [how to create vectors from text](/users/basics/creating-vectors-from-text.html). http://git-wip-us.apache.org/repos/asf/mahout/blob/54ef150e/website/_pages/docs/0.13.0/tutorials/classify-a-doc-from-the-shell.md ---------------------------------------------------------------------- diff --git a/website/_pages/docs/0.13.0/tutorials/classify-a-doc-from-the-shell.md b/website/_pages/docs/0.13.0/tutorials/classify-a-doc-from-the-shell.md new file mode 100644 index 0000000..b565b61 --- /dev/null +++ b/website/_pages/docs/0.13.0/tutorials/classify-a-doc-from-the-shell.md @@ -0,0 +1,257 @@ +--- +layout: mahoutdoc +title: Text Classification Example +permalink: /docs/0.13.0/tutorials/text-classification +--- + +#Building a text classifier in Mahout's Spark Shell + +This tutorial will take you through the steps used to train a Multinomial Naive Bayes model and create a text classifier based on that model using the ```mahout spark-shell```. + +## Prerequisites +This tutorial assumes that you have your Spark environment variables set for the ```mahout spark-shell``` see: [Playing with Mahout's Shell](http://mahout.apache.org/users/sparkbindings/play-with-shell.html). As well we assume that Mahout is running in cluster mode (i.e. with the ```MAHOUT_LOCAL``` environment variable **unset**) as we'll be reading and writing to HDFS. + +## Downloading and Vectorizing the Wikipedia dataset +*As of Mahout v. 0.10.0, we are still reliant on the MapReduce versions of ```mahout seqwiki``` and ```mahout seq2sparse``` to extract and vectorize our text. A* [*Spark implementation of seq2sparse*](https://issues.apache.org/jira/browse/MAHOUT-1663) *is in the works for Mahout v. 0.11.* However, to download the Wikipedia dataset, extract the bodies of the documentation, label each document and vectorize the text into TF-IDF vectors, we can simpmly run the [wikipedia-classifier.sh](https://github.com/apache/mahout/blob/master/examples/bin/classify-wikipedia.sh) example. + + Please select a number to choose the corresponding task to run + 1. CBayes (may require increased heap space on yarn) + 2. BinaryCBayes + 3. clean -- cleans up the work area in /tmp/mahout-work-wiki + Enter your choice : + +Enter (2). This will download a large recent XML dump of the Wikipedia database, into a ```/tmp/mahout-work-wiki``` directory, unzip it and place it into HDFS. It will run a [MapReduce job to parse the wikipedia set](http://mahout.apache.org/users/classification/wikipedia-classifier-example.html), extracting and labeling only pages with category tags for [United States] and [United Kingdom] (~11600 documents). It will then run ```mahout seq2sparse``` to convert the documents into TF-IDF vectors. The script will also a build and test a [Naive Bayes model using MapReduce](http://mahout.apache.org/users/classification/bayesian.html). When it is completed, you should see a confusion matrix on your screen. For this tutorial, we will ignore the MapReduce model, and build a new model using Spark based on the vectorized text output by ```seq2sparse```. + +## Getting Started + +Launch the ```mahout spark-shell```. There is an example script: ```spark-document-classifier.mscala``` (.mscala denotes a Mahout-Scala script which can be run similarly to an R script). We will be walking through this script for this tutorial but if you wanted to simply run the script, you could just issue the command: + + mahout> :load /path/to/mahout/examples/bin/spark-document-classifier.mscala + +For now, lets take the script apart piece by piece. You can cut and paste the following code blocks into the ```mahout spark-shell```. + +## Imports + +Our Mahout Naive Bayes imports: + + import org.apache.mahout.classifier.naivebayes._ + import org.apache.mahout.classifier.stats._ + import org.apache.mahout.nlp.tfidf._ + +Hadoop imports needed to read our dictionary: + + import org.apache.hadoop.io.Text + import org.apache.hadoop.io.IntWritable + import org.apache.hadoop.io.LongWritable + +## Read in our full set from HDFS as vectorized by seq2sparse in classify-wikipedia.sh + + val pathToData = "/tmp/mahout-work-wiki/" + val fullData = drmDfsRead(pathToData + "wikipediaVecs/tfidf-vectors") + +## Extract the category of each observation and aggregate those observations by category + + val (labelIndex, aggregatedObservations) = SparkNaiveBayes.extractLabelsAndAggregateObservations( + fullData) + +## Build a Muitinomial Naive Bayes model and self test on the training set + + val model = SparkNaiveBayes.train(aggregatedObservations, labelIndex, false) + val resAnalyzer = SparkNaiveBayes.test(model, fullData, false) + println(resAnalyzer) + +printing the ```ResultAnalyzer``` will display the confusion matrix. + +## Read in the dictionary and document frequency count from HDFS + + val dictionary = sdc.sequenceFile(pathToData + "wikipediaVecs/dictionary.file-0", + classOf[Text], + classOf[IntWritable]) + val documentFrequencyCount = sdc.sequenceFile(pathToData + "wikipediaVecs/df-count", + classOf[IntWritable], + classOf[LongWritable]) + + // setup the dictionary and document frequency count as maps + val dictionaryRDD = dictionary.map { + case (wKey, wVal) => wKey.asInstanceOf[Text] + .toString() -> wVal.get() + } + + val documentFrequencyCountRDD = documentFrequencyCount.map { + case (wKey, wVal) => wKey.asInstanceOf[IntWritable] + .get() -> wVal.get() + } + + val dictionaryMap = dictionaryRDD.collect.map(x => x._1.toString -> x._2.toInt).toMap + val dfCountMap = documentFrequencyCountRDD.collect.map(x => x._1.toInt -> x._2.toLong).toMap + +## Define a function to tokenize and vectorize new text using our current dictionary + +For this simple example, our function ```vectorizeDocument(...)``` will tokenize a new document into unigrams using native Java String methods and vectorize using our dictionary and document frequencies. You could also use a [Lucene](https://lucene.apache.org/core/) analyzer for bigrams, trigrams, etc., and integrate Apache [Tika](https://tika.apache.org/) to extract text from different document types (PDF, PPT, XLS, etc.). Here, however we will keep it simple, stripping and tokenizing our text using regexs and native String methods. + + def vectorizeDocument(document: String, + dictionaryMap: Map[String,Int], + dfMap: Map[Int,Long]): Vector = { + val wordCounts = document.replaceAll("[^\\p{L}\\p{Nd}]+", " ") + .toLowerCase + .split(" ") + .groupBy(identity) + .mapValues(_.length) + val vec = new RandomAccessSparseVector(dictionaryMap.size) + val totalDFSize = dfMap(-1) + val docSize = wordCounts.size + for (word <- wordCounts) { + val term = word._1 + if (dictionaryMap.contains(term)) { + val tfidf: TermWeight = new TFIDF() + val termFreq = word._2 + val dictIndex = dictionaryMap(term) + val docFreq = dfCountMap(dictIndex) + val currentTfIdf = tfidf.calculate(termFreq, + docFreq.toInt, + docSize, + totalDFSize.toInt) + vec.setQuick(dictIndex, currentTfIdf) + } + } + vec + } + +## Setup our classifier + + val labelMap = model.labelIndex + val numLabels = model.numLabels + val reverseLabelMap = labelMap.map(x => x._2 -> x._1) + + // instantiate the correct type of classifier + val classifier = model.isComplementary match { + case true => new ComplementaryNBClassifier(model) + case _ => new StandardNBClassifier(model) + } + +## Define an argmax function + +The label with the highest score wins the classification for a given document. + + def argmax(v: Vector): (Int, Double) = { + var bestIdx: Int = Integer.MIN_VALUE + var bestScore: Double = Integer.MIN_VALUE.asInstanceOf[Int].toDouble + for(i <- 0 until v.size) { + if(v(i) > bestScore){ + bestScore = v(i) + bestIdx = i + } + } + (bestIdx, bestScore) + } + +## Define our TF(-IDF) vector classifier + + def classifyDocument(clvec: Vector) : String = { + val cvec = classifier.classifyFull(clvec) + val (bestIdx, bestScore) = argmax(cvec) + reverseLabelMap(bestIdx) + } + +## Two sample news articles: United States Football and United Kingdom Football + + // A random United States football article + // http://www.reuters.com/article/2015/01/28/us-nfl-superbowl-security-idUSKBN0L12JR20150128 + val UStextToClassify = new String("(Reuters) - Super Bowl security officials acknowledge" + + " the NFL championship game represents a high profile target on a world stage but are" + + " unaware of any specific credible threats against Sunday's showcase. In advance of" + + " one of the world's biggest single day sporting events, Homeland Security Secretary" + + " Jeh Johnson was in Glendale on Wednesday to review security preparations and tour" + + " University of Phoenix Stadium where the Seattle Seahawks and New England Patriots" + + " will battle. Deadly shootings in Paris and arrest of suspects in Belgium, Greece and" + + " Germany heightened fears of more attacks around the world and social media accounts" + + " linked to Middle East militant groups have carried a number of threats to attack" + + " high-profile U.S. events. There is no specific credible threat, said Johnson, who" + + " has appointed a federal coordination team to work with local, state and federal" + + " agencies to ensure safety of fans, players and other workers associated with the" + + " Super Bowl. I'm confident we will have a safe and secure and successful event." + + " Sunday's game has been given a Special Event Assessment Rating (SEAR) 1 rating, the" + + " same as in previous years, except for the year after the Sept. 11, 2001 attacks, when" + + " a higher level was declared. But security will be tight and visible around Super" + + " Bowl-related events as well as during the game itself. All fans will pass through" + + " metal detectors and pat downs. Over 4,000 private security personnel will be deployed" + + " and the almost 3,000 member Phoenix police force will be on Super Bowl duty. Nuclear" + + " device sniffing teams will be deployed and a network of Bio-Watch detectors will be" + + " set up to provide a warning in the event of a biological attack. The Department of" + + " Homeland Security (DHS) said in a press release it had held special cyber-security" + + " and anti-sniper training sessions. A U.S. official said the Transportation Security" + + " Administration, which is responsible for screening airline passengers, will add" + + " screeners and checkpoint lanes at airports. Federal air marshals, behavior detection" + + " officers and dog teams will help to secure transportation systems in the area. We" + + " will be ramping it (security) up on Sunday, there is no doubt about that, said Federal"+ + " Coordinator Matthew Allen, the DHS point of contact for planning and support. I have" + + " every confidence the public safety agencies that represented in the planning process" + + " are going to have their best and brightest out there this weekend and we will have" + + " a very safe Super Bowl.") + + // A random United Kingdom football article + // http://www.reuters.com/article/2015/01/26/manchester-united-swissquote-idUSL6N0V52RZ20150126 + val UKtextToClassify = new String("(Reuters) - Manchester United have signed a sponsorship" + + " deal with online financial trading company Swissquote, expanding the commercial" + + " partnerships that have helped to make the English club one of the richest teams in" + + " world soccer. United did not give a value for the deal, the club's first in the sector," + + " but said on Monday it was a multi-year agreement. The Premier League club, 20 times" + + " English champions, claim to have 659 million followers around the globe, making the" + + " United name attractive to major brands like Chevrolet cars and sportswear group Adidas." + + " Swissquote said the global deal would allow it to use United's popularity in Asia to" + + " help it meet its targets for expansion in China. Among benefits from the deal," + + " Swissquote's clients will have a chance to meet United players and get behind the scenes" + + " at the Old Trafford stadium. Swissquote is a Geneva-based online trading company that" + + " allows retail investors to buy and sell foreign exchange, equities, bonds and other asset" + + " classes. Like other retail FX brokers, Swissquote was left nursing losses on the Swiss" + + " franc after Switzerland's central bank stunned markets this month by abandoning its cap" + + " on the currency. The fallout from the abrupt move put rival and West Ham United shirt" + + " sponsor Alpari UK into administration. Swissquote itself was forced to book a 25 million" + + " Swiss francs ($28 million) provision for its clients who were left out of pocket" + + " following the franc's surge. United's ability to grow revenues off the pitch has made" + + " them the second richest club in the world behind Spain's Real Madrid, despite a" + + " downturn in their playing fortunes. United Managing Director Richard Arnold said" + + " there was still lots of scope for United to develop sponsorships in other areas of" + + " business. The last quoted statistics that we had showed that of the top 25 sponsorship" + + " categories, we were only active in 15 of those, Arnold told Reuters. I think there is a" + + " huge potential still for the club, and the other thing we have seen is there is very" + + " significant growth even within categories. United have endured a tricky transition" + + " following the retirement of manager Alex Ferguson in 2013, finishing seventh in the" + + " Premier League last season and missing out on a place in the lucrative Champions League." + + " ($1 = 0.8910 Swiss francs) (Writing by Neil Maidment, additional reporting by Jemima" + + " Kelly; editing by Keith Weir)") + +## Vectorize and classify our documents + + val usVec = vectorizeDocument(UStextToClassify, dictionaryMap, dfCountMap) + val ukVec = vectorizeDocument(UKtextToClassify, dictionaryMap, dfCountMap) + + println("Classifying the news article about superbowl security (united states)") + classifyDocument(usVec) + + println("Classifying the news article about Manchester United (united kingdom)") + classifyDocument(ukVec) + +## Tie everything together in a new method to classify text + + def classifyText(txt: String): String = { + val v = vectorizeDocument(txt, dictionaryMap, dfCountMap) + classifyDocument(v) + } + +## Now we can simply call our classifyText(...) method on any String + + classifyText("Hello world from Queens") + classifyText("Hello world from London") + +## Model persistance + +You can save the model to HDFS: + + model.dfsWrite("/path/to/model") + +And retrieve it with: + + val model = NBModel.dfsRead("/path/to/model") + +The trained model can now be embedded in an external application. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/54ef150e/website/_pages/docs/0.13.0/tutorials/how-to-build-an-app.md ---------------------------------------------------------------------- diff --git a/website/_pages/docs/0.13.0/tutorials/how-to-build-an-app.md b/website/_pages/docs/0.13.0/tutorials/how-to-build-an-app.md new file mode 100644 index 0000000..9cf624b --- /dev/null +++ b/website/_pages/docs/0.13.0/tutorials/how-to-build-an-app.md @@ -0,0 +1,255 @@ +--- +layout: mahoutdoc +title: Mahout Samsara In Core +permalink: /docs/0.13.0/tutorials/build-app +--- +#How to create and App using Mahout + +This is an example of how to create a simple app using Mahout as a Library. The source is available on Github in the [3-input-cooc project](https://github.com/pferrel/3-input-cooc) with more explanation about what it does (has to do with collaborative filtering). For this tutorial we'll concentrate on the app rather than the data science. + +The app reads in three user-item interactions types and creats indicators for them using cooccurrence and cross-cooccurrence. The indicators will be written to text files in a format ready for search engine indexing in search engine based recommender. + +##Setup +In order to build and run the CooccurrenceDriver you need to install the following: + +* Install the Java 7 JDK from Oracle. Mac users look here: [Java SE Development Kit 7u72](http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html). +* Install sbt (simple build tool) 0.13.x for [Mac](http://www.scala-sbt.org/release/tutorial/Installing-sbt-on-Mac.html), [Linux](http://www.scala-sbt.org/release/tutorial/Installing-sbt-on-Linux.html) or [manual instalation](http://www.scala-sbt.org/release/tutorial/Manual-Installation.html). +* Install [Spark 1.1.1](https://spark.apache.org/docs/1.1.1/spark-standalone.html). Don't forget to setup SPARK_HOME +* Install [Mahout 0.10.0](http://mahout.apache.org/general/downloads.html). Don't forget to setup MAHOUT_HOME and MAHOUT_LOCAL + +Why install if you are only using them as a library? Certain binaries and scripts are required by the libraries to get information about the environment like discovering where jars are located. + +Spark requires a set of jars on the classpath for the client side part of an app and another set of jars must be passed to the Spark Context for running distributed code. The example should discover all the neccessary classes automatically. + +##Application +Using Mahout as a library in an application will require a little Scala code. Scala has an App trait so we'll create an object, which inherits from ```App``` + + + object CooccurrenceDriver extends App { + } + + +This will look a little different than Java since ```App``` does delayed initialization, which causes the body to be executed when the App is launched, just as in Java you would create a main method. + +Before we can execute something on Spark we'll need to create a context. We could use raw Spark calls here but default values are setup for a Mahout context by using the Mahout helper function. + + implicit val mc = mahoutSparkContext(masterUrl = "local", + appName = "CooccurrenceDriver") + +We need to read in three files containing different interaction types. The files will each be read into a Mahout IndexedDataset. This allows us to preserve application-specific user and item IDs throughout the calculations. + +For example, here is data/purchase.csv: + + u1,iphone + u1,ipad + u2,nexus + u2,galaxy + u3,surface + u4,iphone + u4,galaxy + +Mahout has a helper function that reads the text delimited files SparkEngine.indexedDatasetDFSReadElements. The function reads single element tuples (user-id,item-id) in a distributed way to create the IndexedDataset. Distributed Row Matrices (DRM) and Vectors are important data types supplied by Mahout and IndexedDataset is like a very lightweight Dataframe in R, it wraps a DRM with HashBiMaps for row and column IDs. + +One important thing to note about this example is that we read in all datasets before we adjust the number of rows in them to match the total number of users in the data. This is so the math works out [(A'A, A'B, A'C)](http://mahout.apache.org/users/algorithms/intro-cooccurrence-spark.html) even if some users took one action but not another there must be the same number of rows in all matrices. + + /** + * Read files of element tuples and create IndexedDatasets one per action. These + * share a userID BiMap but have their own itemID BiMaps + */ + def readActions(actionInput: Array[(String, String)]): Array[(String, IndexedDataset)] = { + var actions = Array[(String, IndexedDataset)]() + + val userDictionary: BiMap[String, Int] = HashBiMap.create() + + // The first action named in the sequence is the "primary" action and + // begins to fill up the user dictionary + for ( actionDescription <- actionInput ) {// grab the path to actions + val action: IndexedDataset = SparkEngine.indexedDatasetDFSReadElements( + actionDescription._2, + schema = DefaultIndexedDatasetElementReadSchema, + existingRowIDs = userDictionary) + userDictionary.putAll(action.rowIDs) + // put the name in the tuple with the indexedDataset + actions = actions :+ (actionDescription._1, action) + } + + // After all actions are read in the userDictonary will contain every user seen, + // even if they may not have taken all actions . Now we adjust the row rank of + // all IndxedDataset's to have this number of rows + // Note: this is very important or the cooccurrence calc may fail + val numUsers = userDictionary.size() // one more than the cardinality + + val resizedNameActionPairs = actions.map { a => + //resize the matrix by, in effect by adding empty rows + val resizedMatrix = a._2.create(a._2.matrix, userDictionary, a._2.columnIDs).newRowCardinality(numUsers) + (a._1, resizedMatrix) // return the Tuple of (name, IndexedDataset) + } + resizedNameActionPairs // return the array of Tuples + } + + +Now that we have the data read in we can perform the cooccurrence calculation. + + // actions.map creates an array of just the IndeedDatasets + val indicatorMatrices = SimilarityAnalysis.cooccurrencesIDSs( + actions.map(a => a._2)) + +All we need to do now is write the indicators. + + // zip a pair of arrays into an array of pairs, reattaching the action names + val indicatorDescriptions = actions.map(a => a._1).zip(indicatorMatrices) + writeIndicators(indicatorDescriptions) + + +The ```writeIndicators``` method uses the default write function ```dfsWrite```. + + /** + * Write indicatorMatrices to the output dir in the default format + * for indexing by a search engine. + */ + def writeIndicators( indicators: Array[(String, IndexedDataset)]) = { + for (indicator <- indicators ) { + // create a name based on the type of indicator + val indicatorDir = OutputPath + indicator._1 + indicator._2.dfsWrite( + indicatorDir, + // Schema tells the writer to omit LLR strengths + // and format for search engine indexing + IndexedDatasetWriteBooleanSchema) + } + } + + +See the Github project for the full source. Now we create a build.sbt to build the example. + + name := "cooccurrence-driver" + + organization := "com.finderbots" + + version := "0.1" + + scalaVersion := "2.10.4" + + val sparkVersion = "1.1.1" + + libraryDependencies ++= Seq( + "log4j" % "log4j" % "1.2.17", + // Mahout's Spark code + "commons-io" % "commons-io" % "2.4", + "org.apache.mahout" % "mahout-math-scala_2.10" % "0.10.0", + "org.apache.mahout" % "mahout-spark_2.10" % "0.10.0", + "org.apache.mahout" % "mahout-math" % "0.10.0", + "org.apache.mahout" % "mahout-hdfs" % "0.10.0", + // Google collections, AKA Guava + "com.google.guava" % "guava" % "16.0") + + resolvers += "typesafe repo" at " http://repo.typesafe.com/typesafe/releases/" + + resolvers += Resolver.mavenLocal + + packSettings + + packMain := Map( + "cooc" -> "CooccurrenceDriver") + + +##Build +Building the examples from project's root folder: + + $ sbt pack + +This will automatically set up some launcher scripts for the driver. To run execute + + $ target/pack/bin/cooc + +The driver will execute in Spark standalone mode and put the data in /path/to/3-input-cooc/data/indicators/*indicator-type* + +##Using a Debugger +To build and run this example in a debugger like IntelliJ IDEA. Install from the IntelliJ site and add the Scala plugin. + +Open IDEA and go to the menu File->New->Project from existing sources->SBT->/path/to/3-input-cooc. This will create an IDEA project from ```build.sbt``` in the root directory. + +At this point you may create a "Debug Configuration" to run. In the menu choose Run->Edit Configurations. Under "Default" choose "Application". In the dialog hit the elipsis button "..." to the right of "Environment Variables" and fill in your versions of JAVA_HOME, SPARK_HOME, and MAHOUT_HOME. In configuration editor under "Use classpath from" choose root-3-input-cooc module. + +![image](http://mahout.apache.org/images/debug-config.png) + +Now choose "Application" in the left pane and hit the plus sign "+". give the config a name and hit the elipsis button to the right of the "Main class" field as shown. + +![image](http://mahout.apache.org/images/debug-config-2.png) + + +After setting breakpoints you are now ready to debug the configuration. Go to the Run->Debug... menu and pick your configuration. This will execute using a local standalone instance of Spark. + +##The Mahout Shell + +For small script-like apps you may wish to use the Mahout shell. It is a Scala REPL type interactive shell built on the Spark shell with Mahout-Samsara extensions. + +To make the CooccurrenceDriver.scala into a script make the following changes: + +* You won't need the context, since it is created when the shell is launched, comment that line out. +* Replace the logger.info lines with println +* Remove the package info since it's not needed, this will produce the file in ```path/to/3-input-cooc/bin/CooccurrenceDriver.mscala```. + +Note the extension ```.mscala``` to indicate we are using Mahout's scala extensions for math, otherwise known as [Mahout-Samsara](http://mahout.apache.org/users/environment/out-of-core-reference.html) + +To run the code make sure the output does not exist already + + $ rm -r /path/to/3-input-cooc/data/indicators + +Launch the Mahout + Spark shell: + + $ mahout spark-shell + +You'll see the Mahout splash: + + MAHOUT_LOCAL is set, so we don't add HADOOP_CONF_DIR to classpath. + + _ _ + _ __ ___ __ _| |__ ___ _ _| |_ + | '_ ` _ \ / _` | '_ \ / _ \| | | | __| + | | | | | | (_| | | | | (_) | |_| | |_ + |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__| version 0.10.0 + + + Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_72) + Type in expressions to have them evaluated. + Type :help for more information. + 15/04/26 09:30:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable + Created spark context.. + Mahout distributed context is available as "implicit val sdc". + mahout> + +To load the driver type: + + mahout> :load /path/to/3-input-cooc/bin/CooccurrenceDriver.mscala + Loading ./bin/CooccurrenceDriver.mscala... + import com.google.common.collect.{HashBiMap, BiMap} + import org.apache.log4j.Logger + import org.apache.mahout.math.cf.SimilarityAnalysis + import org.apache.mahout.math.indexeddataset._ + import org.apache.mahout.sparkbindings._ + import scala.collection.immutable.HashMap + defined module CooccurrenceDriver + mahout> + +To run the driver type: + + mahout> CooccurrenceDriver.main(args = Array("")) + +You'll get some stats printed: + + Total number of users for all actions = 5 + purchase indicator matrix: + Number of rows for matrix = 4 + Number of columns for matrix = 5 + Number of rows after resize = 5 + view indicator matrix: + Number of rows for matrix = 4 + Number of columns for matrix = 5 + Number of rows after resize = 5 + category indicator matrix: + Number of rows for matrix = 5 + Number of columns for matrix = 7 + Number of rows after resize = 5 + +If you look in ```path/to/3-input-cooc/data/indicators``` you should find folders containing the indicator matrices.