Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0F369200C5E for ; Sat, 22 Apr 2017 20:58:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0DE57160BA2; Sat, 22 Apr 2017 18:58:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 580C7160B91 for ; Sat, 22 Apr 2017 20:58:37 +0200 (CEST) Received: (qmail 16226 invoked by uid 500); 22 Apr 2017 18:58:36 -0000 Mailing-List: contact commits-help@mahout.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mahout.apache.org Delivered-To: mailing list commits@mahout.apache.org Received: (qmail 16217 invoked by uid 99); 22 Apr 2017 18:58:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Apr 2017 18:58:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 35543E3A9C; Sat, 22 Apr 2017 18:58:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: rawkintrevo@apache.org To: commits@mahout.apache.org Message-Id: <100a8fff12c4478c8946c21d6942f07f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: mahout git commit: MAHOUT-1971 Aggregate Transpose Bug closes apache/mahout#307 Date: Sat, 22 Apr 2017 18:58:36 +0000 (UTC) archived-at: Sat, 22 Apr 2017 18:58:38 -0000 Repository: mahout Updated Branches: refs/heads/master c397ef7f7 -> 08e02602e MAHOUT-1971 Aggregate Transpose Bug closes apache/mahout#307 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/08e02602 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/08e02602 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/08e02602 Branch: refs/heads/master Commit: 08e02602e947ff945b9bd73ab5f0b45863df3e53 Parents: c397ef7 Author: rawkintrevo Authored: Sat Apr 22 13:58:17 2017 -0500 Committer: rawkintrevo Committed: Sat Apr 22 13:58:17 2017 -0500 ---------------------------------------------------------------------- .../mahout/flinkbindings/blas/FlinkOpAt.scala | 2 +- .../mahout/flinkbindings/DrmLikeOpsSuite.scala | 20 ++++++++++++++++++++ .../apache/mahout/sparkbindings/blas/At.scala | 2 +- .../mahout/sparkbindings/drm/DrmLikeSuite.scala | 20 ++++++++++++++++++++ 4 files changed, 42 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/08e02602/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala index 45214e5..5093216 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala @@ -49,7 +49,7 @@ object FlinkOpAt { val columnVector: Vector = new SequentialAccessSparseVector(ncol) keys.zipWithIndex.foreach { - case (key, idx) => columnVector(key) = block(idx, columnIndex) + case (key, idx) => columnVector(key) += block(idx, columnIndex) } (columnIndex, columnVector) http://git-wip-us.apache.org/repos/asf/mahout/blob/08e02602/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala index fe2277c..288561b 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala @@ -18,6 +18,8 @@ */ package org.apache.mahout.flinkbindings +import org.apache.mahout.logging.info +import org.apache.mahout.math.DenseMatrix import org.apache.mahout.math.drm.RLikeDrmOps._ import org.apache.mahout.math.drm._ import org.apache.mahout.math.scalabindings.RLikeOps._ @@ -70,4 +72,22 @@ class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite { (emptyDrm.collect - expected).norm should be < 1e-6 } + test("Aggregating transpose") { + + val mxA = new DenseMatrix(20, 10) := 1 + + val drmA = drmParallelize(mxA, numPartitions = 3) + + val reassignedA = drmA.mapBlock() { case (keys, block) ⇒ + keys.map(_ % 3) → block + } + + val mxAggrA = reassignedA.t(::, 0 until 3).collect + + info(mxAggrA.toString) + + mxAggrA(0,0) shouldBe 7 + mxAggrA(0,1) shouldBe 7 + mxAggrA(0,2) shouldBe 6 + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/08e02602/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala index fa25b73..b8e6025 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala @@ -62,7 +62,7 @@ object At { // Compute sparse vector. This should be quick if we assign values siquentially. val colV: Vector = new SequentialAccessSparseVector(ncol) keys.view.zipWithIndex.foreach({ - case (row, blockRow) => colV(row) = blockA(blockRow, blockCol) + case (row, blockRow) => colV(row) += blockA(blockRow, blockCol) }) blockCol -> colV http://git-wip-us.apache.org/repos/asf/mahout/blob/08e02602/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala index e88e7ef..bc6ee72 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeSuite.scala @@ -23,6 +23,7 @@ import scalabindings._ import drm._ import RLikeOps._ import RLikeDrmOps._ +import org.apache.mahout.logging.info import org.apache.mahout.sparkbindings._ import org.apache.mahout.sparkbindings.test.DistributedSparkSuite @@ -139,4 +140,23 @@ class DrmLikeSuite extends FunSuite with DistributedSparkSuite with DrmLikeSuite assert(dfM === testM) } + + test("Aggregating transpose") { + + val mxA = new DenseMatrix(20, 10) := 1 + + val drmA = drmParallelize(mxA, numPartitions = 3) + + val reassignedA = drmA.mapBlock() { case (keys, block) ⇒ + keys.map(_ % 3) → block + } + + val mxAggrA = reassignedA.t(::, 0 until 3).collect + + info(mxAggrA.toString) + + mxAggrA(0,0) shouldBe 7 + mxAggrA(0,1) shouldBe 7 + mxAggrA(0,2) shouldBe 6 + } }