spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject spark git commit: [SPARK-6485] [MLLIB] [PYTHON] Add CoordinateMatrix/RowMatrix/IndexedRowMatrix to PySpark.
Date Tue, 04 Aug 2015 23:30:06 GMT
Repository: spark
Updated Branches:
  refs/heads/master 1833d9c08 -> 571d5b536


[SPARK-6485] [MLLIB] [PYTHON] Add CoordinateMatrix/RowMatrix/IndexedRowMatrix to PySpark.

This PR adds the RowMatrix, IndexedRowMatrix, and CoordinateMatrix distributed matrices to
PySpark.  Each distributed matrix class acts as a wrapper around the Scala/Java counterpart
by maintaining a reference to the Java object.  New distributed matrices can be created using
factory methods added to DistributedMatrices, which creates the Java distributed matrix and
then wraps it with the corresponding PySpark class.  This design allows for simple conversion
between the various distributed matrices, and lets us re-use the Scala code.  Serialization
between Python and Java is implemented using DataFrames as needed for IndexedRowMatrix and
CoordinateMatrix for simplicity.  Associated documentation and unit-tests have also been added.
 To facilitate code review, this PR implements access to the rows/entries as RDDs, the number
of rows & columns, and conversions between the various distributed matrices (not including
BlockMatrix), and does not implement the other linear algebra functions
  of the matrices, although this will be very simple to add now.

Author: Mike Dusenberry <mwdusenb@us.ibm.com>

Closes #7554 from dusenberrymw/SPARK-6485_Add_CoordinateMatrix_RowMatrix_IndexedMatrix_to_PySpark
and squashes the following commits:

bb039cb [Mike Dusenberry] Minor documentation update.
b887c18 [Mike Dusenberry] Updating the matrix conversion logic again to make it even cleaner.
 Now, we allow the 'rows' parameter in the constructors to be either an RDD or the Java matrix
object. If 'rows' is an RDD, we create a Java matrix object, wrap it, and then store that.
 If 'rows' is a Java matrix object of the correct type, we just wrap and store that directly.
 This is only for internal usage, and publicly, we still require 'rows' to be an RDD.  We
no longer store the 'rows' RDD, and instead just compute it from the Java object when needed.
 The point of this is that when we do matrix conversions, we do the conversion on the Scala/Java
side, which returns a Java object, so we should use that directly, but exposing 'java_matrix'
parameter in the public API is not ideal. This non-public feature of allowing 'rows' to be
a Java matrix object is documented in the '__init__' constructor docstrings, which are not
part of the generated public API, and doctests are also included.
7f0dcb6 [Mike Dusenberry] Updating module docstring.
cfc1be5 [Mike Dusenberry] Use 'new SQLContext(matrix.rows.sparkContext)' rather than 'SQLContext.getOrCreate',
as the later doesn't guarantee that the SparkContext will be the same as for the matrix.rows
data.
687e345 [Mike Dusenberry] Improving conversion performance.  This adds an optional 'java_matrix'
parameter to the constructors, and pulls the conversion logic out into a '_create_from_java'
function. Now, if the constructors are given a valid Java distributed matrix object as 'java_matrix',
they will store those internally, rather than create a new one on the Scala/Java side.
3e50b6e [Mike Dusenberry] Moving the distributed matrices to pyspark.mllib.linalg.distributed.
308f197 [Mike Dusenberry] Using properties for better documentation.
1633f86 [Mike Dusenberry] Minor documentation cleanup.
f0c13a7 [Mike Dusenberry] CoordinateMatrix should inherit from DistributedMatrix.
ffdd724 [Mike Dusenberry] Updating doctests to make documentation cleaner.
3fd4016 [Mike Dusenberry] Updating docstrings.
27cd5f6 [Mike Dusenberry] Simplifying input conversions in the constructors for each distributed
matrix.
a409cf5 [Mike Dusenberry] Updating doctests to be less verbose by using lists instead of DenseVectors
explicitly.
d19b0ba [Mike Dusenberry] Updating code and documentation to note that a vector-like object
(numpy array, list, etc.) can be used in place of explicit Vector object, and adding conversions
when necessary to RowMatrix construction.
4bd756d [Mike Dusenberry] Adding param documentation to IndexedRow and MatrixEntry.
c6bded5 [Mike Dusenberry] Move conversion logic from tuples to IndexedRow or MatrixEntry types
from within the IndexedRowMatrix and CoordinateMatrix constructors to separate _convert_to_indexed_row
and _convert_to_matrix_entry functions.
329638b [Mike Dusenberry] Moving the Experimental tag to the top of each docstring.
0be6826 [Mike Dusenberry] Simplifying doctests by removing duplicated rows/entries RDDs within
the various tests.
c0900df [Mike Dusenberry] Adding the colons that were accidentally not inserted.
4ad6819 [Mike Dusenberry] Documenting the  and  parameters.
3b854b9 [Mike Dusenberry] Minor updates to documentation.
10046e8 [Mike Dusenberry] Updating documentation to use class constructors instead of the
removed DistributedMatrices factory methods.
119018d [Mike Dusenberry] Adding static  methods to each of the distributed matrix classes
to consolidate conversion logic.
4d7af86 [Mike Dusenberry] Adding type checks to the constructors.  Although it is slightly
verbose, it is better for the user to have a good error message than a cryptic stacktrace.
93b6a3d [Mike Dusenberry] Pulling the DistributedMatrices Python class out of this pull request.
f6f3c68 [Mike Dusenberry] Pulling the DistributedMatrices Scala class out of this pull request.
6a3ecb7 [Mike Dusenberry] Updating pattern matching.
08f287b [Mike Dusenberry] Slight reformatting of the documentation.
a245dc0 [Mike Dusenberry] Updating Python doctests for compatability between Python 2 &
3. Since Python 3 removed the idea of a separate 'long' type, all values that would have been
outputted as a 'long' (ex: '4L') will now be treated as an 'int' and outputed as one (ex:
'4').  The doctests now explicitly convert to ints so that both Python 2 and 3 will have the
same output.  This is fine since the values are all small, and thus can be easily represented
as ints.
4d3a37e [Mike Dusenberry] Reformatting a few long Python doctest lines.
7e3ca16 [Mike Dusenberry] Fixing long lines.
f721ead [Mike Dusenberry] Updating documentation for each of the distributed matrices.
ab0e8b6 [Mike Dusenberry] Updating unit test to be more useful.
dda2f89 [Mike Dusenberry] Added wrappers for the conversions between the various distributed
matrices.  Added logic to be able to access the rows/entries of the distributed matrices,
which requires serialization through DataFrames for IndexedRowMatrix and CoordinateMatrix
types. Added unit tests.
0cd7166 [Mike Dusenberry] Implemented the CoordinateMatrix API in PySpark, following the idea
of the IndexedRowMatrix API, including using DataFrames for serialization.
3c369cb [Mike Dusenberry] Updating the architecture a bit to make conversions between the
various distributed matrix types easier.  The different distributed matrix classes are now
only wrappers around the Java objects, and take the Java object as an argument during construction.
 This way, we can call  for example on an , which returns a reference to a Java RowMatrix
object, and then construct a PySpark RowMatrix object wrapped around the Java object.  This
is analogous to the behavior of PySpark RDDs and DataFrames.  We now delegate creation of
the various distributed matrices from scratch in PySpark to the factory methods on .
4bdd09b [Mike Dusenberry] Implemented the IndexedRowMatrix API in PySpark, following the idea
of the RowMatrix API.  Note that for the IndexedRowMatrix, we use DataFrames to serialize
the data between Python and Scala/Java, so we accept PySpark RDDs, then convert to a DataFrame,
then convert back to RDDs on the Scala/Java side before constructing the IndexedRowMatrix.
23bf1ec [Mike Dusenberry] Updating documentation to add PySpark RowMatrix. Inserting newline
above doctest so that it renders properly in API docs.
b194623 [Mike Dusenberry] Updating design to have a PySpark RowMatrix simply create and keep
a reference to a wrapper over a Java RowMatrix.  Updating DistributedMatrices factory methods
to accept numRows and numCols with default values.  Updating PySpark DistributedMatrices factory
method to simply create a PySpark RowMatrix. Adding additional doctests for numRows and numCols
parameters.
bc2d220 [Mike Dusenberry] Adding unit tests for RowMatrix methods.
d7e316f [Mike Dusenberry] Implemented the RowMatrix API in PySpark by doing the following:
Added a DistributedMatrices class to contain factory methods for creating the various distributed
matrices.  Added a factory method for creating a RowMatrix from an RDD of Vectors.  Added
a createRowMatrix function to the PythonMLlibAPI to interface with the factory method.  Added
DistributedMatrix, DistributedMatrices, and RowMatrix classes to the pyspark.mllib.linalg
api.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/571d5b53
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/571d5b53
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/571d5b53

Branch: refs/heads/master
Commit: 571d5b5363ff4dbbce1f7019ab8e86cbc3cba4d5
Parents: 1833d9c
Author: Mike Dusenberry <mwdusenb@us.ibm.com>
Authored: Tue Aug 4 16:30:03 2015 -0700
Committer: Xiangrui Meng <meng@databricks.com>
Committed: Tue Aug 4 16:30:03 2015 -0700

----------------------------------------------------------------------
 dev/sparktestsupport/modules.py                 |   1 +
 docs/mllib-data-types.md                        | 106 +++-
 .../spark/mllib/api/python/PythonMLLibAPI.scala |  53 +-
 python/docs/pyspark.mllib.rst                   |   8 +
 python/pyspark/mllib/common.py                  |   2 +
 python/pyspark/mllib/linalg/distributed.py      | 537 +++++++++++++++++++
 6 files changed, 704 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/571d5b53/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 956dc81..a9717ff 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -331,6 +331,7 @@ pyspark_mllib = Module(
         "pyspark.mllib.feature",
         "pyspark.mllib.fpm",
         "pyspark.mllib.linalg.__init__",
+        "pyspark.mllib.linalg.distributed",
         "pyspark.mllib.random",
         "pyspark.mllib.recommendation",
         "pyspark.mllib.regression",

http://git-wip-us.apache.org/repos/asf/spark/blob/571d5b53/docs/mllib-data-types.md
----------------------------------------------------------------------
diff --git a/docs/mllib-data-types.md b/docs/mllib-data-types.md
index 3aa0400..11033bf 100644
--- a/docs/mllib-data-types.md
+++ b/docs/mllib-data-types.md
@@ -372,12 +372,37 @@ long m = mat.numRows();
 long n = mat.numCols();
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+
+A [`RowMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.RowMatrix)
can be 
+created from an `RDD` of vectors.
+
+{% highlight python %}
+from pyspark.mllib.linalg.distributed import RowMatrix
+
+# Create an RDD of vectors.
+rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
+
+# Create a RowMatrix from an RDD of vectors.
+mat = RowMatrix(rows)
+
+# Get its size.
+m = mat.numRows()  # 4
+n = mat.numCols()  # 3
+
+# Get the rows as an RDD of vectors again.
+rowsRDD = mat.rows
+{% endhighlight %}
+</div>
+
 </div>
 
 ### IndexedRowMatrix
 
 An `IndexedRowMatrix` is similar to a `RowMatrix` but with meaningful row indices.  It is
backed by
-an RDD of indexed rows, so that each row is represented by its index (long-typed) and a local
vector.
+an RDD of indexed rows, so that each row is represented by its index (long-typed) and a local

+vector.
 
 <div class="codetabs">
 <div data-lang="scala" markdown="1">
@@ -431,7 +456,48 @@ long n = mat.numCols();
 // Drop its row indices.
 RowMatrix rowMat = mat.toRowMatrix();
 {% endhighlight %}
-</div></div>
+</div>
+
+<div data-lang="python" markdown="1">
+
+An [`IndexedRowMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.IndexedRowMatrix)
+can be created from an `RDD` of `IndexedRow`s, where 
+[`IndexedRow`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.IndexedRow)
is a 
+wrapper over `(long, vector)`.  An `IndexedRowMatrix` can be converted to a `RowMatrix` by
dropping
+its row indices.
+
+{% highlight python %}
+from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
+
+# Create an RDD of indexed rows.
+#   - This can be done explicitly with the IndexedRow class:
+indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]), 
+                              IndexedRow(1, [4, 5, 6]), 
+                              IndexedRow(2, [7, 8, 9]), 
+                              IndexedRow(3, [10, 11, 12])])
+#   - or by using (long, vector) tuples:
+indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]), 
+                              (2, [7, 8, 9]), (3, [10, 11, 12])])
+
+# Create an IndexedRowMatrix from an RDD of IndexedRows.
+mat = IndexedRowMatrix(indexedRows)
+
+# Get its size.
+m = mat.numRows()  # 4
+n = mat.numCols()  # 3
+
+# Get the rows as an RDD of IndexedRows.
+rowsRDD = mat.rows
+
+# Convert to a RowMatrix by dropping the row indices.
+rowMat = mat.toRowMatrix()
+
+# Convert to a CoordinateMatrix.
+coordinateMat = mat.toCoordinateMatrix()
+{% endhighlight %}
+</div>
+
+</div>
 
 ### CoordinateMatrix
 
@@ -495,6 +561,42 @@ long n = mat.numCols();
 IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix();
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+
+A [`CoordinateMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.CoordinateMatrix)
+can be created from an `RDD` of `MatrixEntry` entries, where 
+[`MatrixEntry`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.MatrixEntry)
is a 
+wrapper over `(long, long, float)`.  A `CoordinateMatrix` can be converted to a `RowMatrix`
by 
+calling `toRowMatrix`, or to an `IndexedRowMatrix` with sparse rows by calling `toIndexedRowMatrix`.
+
+{% highlight python %}
+from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
+
+# Create an RDD of coordinate entries.
+#   - This can be done explicitly with the MatrixEntry class:
+entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(6,
1, 3.7)])
+#   - or using (long, long, float) tuples:
+entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])
+
+# Create an CoordinateMatrix from an RDD of MatrixEntries.
+mat = CoordinateMatrix(entries)
+
+# Get its size.
+m = mat.numRows()  # 3
+n = mat.numCols()  # 2
+
+# Get the entries as an RDD of MatrixEntries.
+entriesRDD = mat.entries
+
+# Convert to a RowMatrix.
+rowMat = mat.toRowMatrix()
+
+# Convert to an IndexedRowMatrix.
+indexedRowMat = mat.toIndexedRowMatrix()
+{% endhighlight %}
+</div>
+
 </div>
 
 ### BlockMatrix

http://git-wip-us.apache.org/repos/asf/spark/blob/571d5b53/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 6f080d3..d2b3fae 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -37,6 +37,7 @@ import org.apache.spark.mllib.evaluation.RankingMetrics
 import org.apache.spark.mllib.feature._
 import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel}
 import org.apache.spark.mllib.linalg._
+import org.apache.spark.mllib.linalg.distributed._
 import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.random.{RandomRDDs => RG}
 import org.apache.spark.mllib.recommendation._
@@ -54,7 +55,7 @@ import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees,
RandomFo
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.mllib.util.LinearDataGenerator
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
 
@@ -1096,6 +1097,56 @@ private[python] class PythonMLLibAPI extends Serializable {
     Statistics.kolmogorovSmirnovTest(data, distName, paramsSeq: _*)
   }
 
+  /**
+   * Wrapper around RowMatrix constructor.
+   */
+  def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = {
+    new RowMatrix(rows.rdd, numRows, numCols)
+  }
+
+  /**
+   * Wrapper around IndexedRowMatrix constructor.
+   */
+  def createIndexedRowMatrix(rows: DataFrame, numRows: Long, numCols: Int): IndexedRowMatrix
= {
+    // We use DataFrames for serialization of IndexedRows from Python,
+    // so map each Row in the DataFrame back to an IndexedRow.
+    val indexedRows = rows.map {
+      case Row(index: Long, vector: Vector) => IndexedRow(index, vector)
+    }
+    new IndexedRowMatrix(indexedRows, numRows, numCols)
+  }
+
+  /**
+   * Wrapper around CoordinateMatrix constructor.
+   */
+  def createCoordinateMatrix(rows: DataFrame, numRows: Long, numCols: Long): CoordinateMatrix
= {
+    // We use DataFrames for serialization of MatrixEntry entries from
+    // Python, so map each Row in the DataFrame back to a MatrixEntry.
+    val entries = rows.map {
+      case Row(i: Long, j: Long, value: Double) => MatrixEntry(i, j, value)
+    }
+    new CoordinateMatrix(entries, numRows, numCols)
+  }
+
+  /**
+   * Return the rows of an IndexedRowMatrix.
+   */
+  def getIndexedRows(indexedRowMatrix: IndexedRowMatrix): DataFrame = {
+    // We use DataFrames for serialization of IndexedRows to Python,
+    // so return a DataFrame.
+    val sqlContext = new SQLContext(indexedRowMatrix.rows.sparkContext)
+    sqlContext.createDataFrame(indexedRowMatrix.rows)
+  }
+
+  /**
+   * Return the entries of a CoordinateMatrix.
+   */
+  def getMatrixEntries(coordinateMatrix: CoordinateMatrix): DataFrame = {
+    // We use DataFrames for serialization of MatrixEntry entries to
+    // Python, so return a DataFrame.
+    val sqlContext = new SQLContext(coordinateMatrix.entries.sparkContext)
+    sqlContext.createDataFrame(coordinateMatrix.entries)
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/571d5b53/python/docs/pyspark.mllib.rst
----------------------------------------------------------------------
diff --git a/python/docs/pyspark.mllib.rst b/python/docs/pyspark.mllib.rst
index 26ece4c..2d54ab1 100644
--- a/python/docs/pyspark.mllib.rst
+++ b/python/docs/pyspark.mllib.rst
@@ -46,6 +46,14 @@ pyspark.mllib.linalg module
     :undoc-members:
     :show-inheritance:
 
+pyspark.mllib.linalg.distributed module
+---------------------------------------
+
+.. automodule:: pyspark.mllib.linalg.distributed
+    :members:
+    :undoc-members:
+    :show-inheritance:
+
 pyspark.mllib.random module
 ---------------------------
 

http://git-wip-us.apache.org/repos/asf/spark/blob/571d5b53/python/pyspark/mllib/common.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py
index 855e85f..a439a48 100644
--- a/python/pyspark/mllib/common.py
+++ b/python/pyspark/mllib/common.py
@@ -73,6 +73,8 @@ def _py2java(sc, obj):
     """ Convert Python object into Java """
     if isinstance(obj, RDD):
         obj = _to_java_object_rdd(obj)
+    elif isinstance(obj, DataFrame):
+        obj = obj._jdf
     elif isinstance(obj, SparkContext):
         obj = obj._jsc
     elif isinstance(obj, list):

http://git-wip-us.apache.org/repos/asf/spark/blob/571d5b53/python/pyspark/mllib/linalg/distributed.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py
new file mode 100644
index 0000000..666d833
--- /dev/null
+++ b/python/pyspark/mllib/linalg/distributed.py
@@ -0,0 +1,537 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Package for distributed linear algebra.
+"""
+
+import sys
+
+if sys.version >= '3':
+    long = int
+
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD
+from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
+from pyspark.mllib.linalg import _convert_to_vector
+
+
+__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
+           'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix']
+
+
+class DistributedMatrix(object):
+    """
+    .. note:: Experimental
+
+    Represents a distributively stored matrix backed by one or
+    more RDDs.
+
+    """
+    def numRows(self):
+        """Get or compute the number of rows."""
+        raise NotImplementedError
+
+    def numCols(self):
+        """Get or compute the number of cols."""
+        raise NotImplementedError
+
+
+class RowMatrix(DistributedMatrix):
+    """
+    .. note:: Experimental
+
+    Represents a row-oriented distributed Matrix with no meaningful
+    row indices.
+
+    :param rows: An RDD of vectors.
+    :param numRows: Number of rows in the matrix. A non-positive
+                    value means unknown, at which point the number
+                    of rows will be determined by the number of
+                    records in the `rows` RDD.
+    :param numCols: Number of columns in the matrix. A non-positive
+                    value means unknown, at which point the number
+                    of columns will be determined by the size of
+                    the first row.
+    """
+    def __init__(self, rows, numRows=0, numCols=0):
+        """
+        Note: This docstring is not shown publicly.
+
+        Create a wrapper over a Java RowMatrix.
+
+        Publicly, we require that `rows` be an RDD.  However, for
+        internal usage, `rows` can also be a Java RowMatrix
+        object, in which case we can wrap it directly.  This
+        assists in clean matrix conversions.
+
+        >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
+        >>> mat = RowMatrix(rows)
+
+        >>> mat_diff = RowMatrix(rows)
+        >>> (mat_diff._java_matrix_wrapper._java_model ==
+        ...  mat._java_matrix_wrapper._java_model)
+        False
+
+        >>> mat_same = RowMatrix(mat._java_matrix_wrapper._java_model)
+        >>> (mat_same._java_matrix_wrapper._java_model ==
+        ...  mat._java_matrix_wrapper._java_model)
+        True
+        """
+        if isinstance(rows, RDD):
+            rows = rows.map(_convert_to_vector)
+            java_matrix = callMLlibFunc("createRowMatrix", rows, long(numRows), int(numCols))
+        elif (isinstance(rows, JavaObject)
+              and rows.getClass().getSimpleName() == "RowMatrix"):
+            java_matrix = rows
+        else:
+            raise TypeError("rows should be an RDD of vectors, got %s" % type(rows))
+
+        self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
+
+    @property
+    def rows(self):
+        """
+        Rows of the RowMatrix stored as an RDD of vectors.
+
+        >>> mat = RowMatrix(sc.parallelize([[1, 2, 3], [4, 5, 6]]))
+        >>> rows = mat.rows
+        >>> rows.first()
+        DenseVector([1.0, 2.0, 3.0])
+        """
+        return self._java_matrix_wrapper.call("rows")
+
+    def numRows(self):
+        """
+        Get or compute the number of rows.
+
+        >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6],
+        ...                        [7, 8, 9], [10, 11, 12]])
+
+        >>> mat = RowMatrix(rows)
+        >>> print(mat.numRows())
+        4
+
+        >>> mat = RowMatrix(rows, 7, 6)
+        >>> print(mat.numRows())
+        7
+        """
+        return self._java_matrix_wrapper.call("numRows")
+
+    def numCols(self):
+        """
+        Get or compute the number of cols.
+
+        >>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6],
+        ...                        [7, 8, 9], [10, 11, 12]])
+
+        >>> mat = RowMatrix(rows)
+        >>> print(mat.numCols())
+        3
+
+        >>> mat = RowMatrix(rows, 7, 6)
+        >>> print(mat.numCols())
+        6
+        """
+        return self._java_matrix_wrapper.call("numCols")
+
+
+class IndexedRow(object):
+    """
+    .. note:: Experimental
+
+    Represents a row of an IndexedRowMatrix.
+
+    Just a wrapper over a (long, vector) tuple.
+
+    :param index: The index for the given row.
+    :param vector: The row in the matrix at the given index.
+    """
+    def __init__(self, index, vector):
+        self.index = long(index)
+        self.vector = _convert_to_vector(vector)
+
+    def __repr__(self):
+        return "IndexedRow(%s, %s)" % (self.index, self.vector)
+
+
+def _convert_to_indexed_row(row):
+    if isinstance(row, IndexedRow):
+        return row
+    elif isinstance(row, tuple) and len(row) == 2:
+        return IndexedRow(*row)
+    else:
+        raise TypeError("Cannot convert type %s into IndexedRow" % type(row))
+
+
+class IndexedRowMatrix(DistributedMatrix):
+    """
+    .. note:: Experimental
+
+    Represents a row-oriented distributed Matrix with indexed rows.
+
+    :param rows: An RDD of IndexedRows or (long, vector) tuples.
+    :param numRows: Number of rows in the matrix. A non-positive
+                    value means unknown, at which point the number
+                    of rows will be determined by the max row
+                    index plus one.
+    :param numCols: Number of columns in the matrix. A non-positive
+                    value means unknown, at which point the number
+                    of columns will be determined by the size of
+                    the first row.
+    """
+    def __init__(self, rows, numRows=0, numCols=0):
+        """
+        Note: This docstring is not shown publicly.
+
+        Create a wrapper over a Java IndexedRowMatrix.
+
+        Publicly, we require that `rows` be an RDD.  However, for
+        internal usage, `rows` can also be a Java IndexedRowMatrix
+        object, in which case we can wrap it directly.  This
+        assists in clean matrix conversions.
+
+        >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+        ...                        IndexedRow(1, [4, 5, 6])])
+        >>> mat = IndexedRowMatrix(rows)
+
+        >>> mat_diff = IndexedRowMatrix(rows)
+        >>> (mat_diff._java_matrix_wrapper._java_model ==
+        ...  mat._java_matrix_wrapper._java_model)
+        False
+
+        >>> mat_same = IndexedRowMatrix(mat._java_matrix_wrapper._java_model)
+        >>> (mat_same._java_matrix_wrapper._java_model ==
+        ...  mat._java_matrix_wrapper._java_model)
+        True
+        """
+        if isinstance(rows, RDD):
+            rows = rows.map(_convert_to_indexed_row)
+            # We use DataFrames for serialization of IndexedRows from
+            # Python, so first convert the RDD to a DataFrame on this
+            # side. This will convert each IndexedRow to a Row
+            # containing the 'index' and 'vector' values, which can
+            # both be easily serialized.  We will convert back to
+            # IndexedRows on the Scala side.
+            java_matrix = callMLlibFunc("createIndexedRowMatrix", rows.toDF(),
+                                        long(numRows), int(numCols))
+        elif (isinstance(rows, JavaObject)
+              and rows.getClass().getSimpleName() == "IndexedRowMatrix"):
+            java_matrix = rows
+        else:
+            raise TypeError("rows should be an RDD of IndexedRows or (long, vector) tuples,
"
+                            "got %s" % type(rows))
+
+        self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
+
+    @property
+    def rows(self):
+        """
+        Rows of the IndexedRowMatrix stored as an RDD of IndexedRows.
+
+        >>> mat = IndexedRowMatrix(sc.parallelize([IndexedRow(0, [1, 2, 3]),
+        ...                                        IndexedRow(1, [4, 5, 6])]))
+        >>> rows = mat.rows
+        >>> rows.first()
+        IndexedRow(0, [1.0,2.0,3.0])
+        """
+        # We use DataFrames for serialization of IndexedRows from
+        # Java, so we first convert the RDD of rows to a DataFrame
+        # on the Scala/Java side. Then we map each Row in the
+        # DataFrame back to an IndexedRow on this side.
+        rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model)
+        rows = rows_df.map(lambda row: IndexedRow(row[0], row[1]))
+        return rows
+
+    def numRows(self):
+        """
+        Get or compute the number of rows.
+
+        >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+        ...                        IndexedRow(1, [4, 5, 6]),
+        ...                        IndexedRow(2, [7, 8, 9]),
+        ...                        IndexedRow(3, [10, 11, 12])])
+
+        >>> mat = IndexedRowMatrix(rows)
+        >>> print(mat.numRows())
+        4
+
+        >>> mat = IndexedRowMatrix(rows, 7, 6)
+        >>> print(mat.numRows())
+        7
+        """
+        return self._java_matrix_wrapper.call("numRows")
+
+    def numCols(self):
+        """
+        Get or compute the number of cols.
+
+        >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+        ...                        IndexedRow(1, [4, 5, 6]),
+        ...                        IndexedRow(2, [7, 8, 9]),
+        ...                        IndexedRow(3, [10, 11, 12])])
+
+        >>> mat = IndexedRowMatrix(rows)
+        >>> print(mat.numCols())
+        3
+
+        >>> mat = IndexedRowMatrix(rows, 7, 6)
+        >>> print(mat.numCols())
+        6
+        """
+        return self._java_matrix_wrapper.call("numCols")
+
+    def toRowMatrix(self):
+        """
+        Convert this matrix to a RowMatrix.
+
+        >>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
+        ...                        IndexedRow(6, [4, 5, 6])])
+        >>> mat = IndexedRowMatrix(rows).toRowMatrix()
+        >>> mat.rows.collect()
+        [DenseVector([1.0, 2.0, 3.0]), DenseVector([4.0, 5.0, 6.0])]
+        """
+        java_row_matrix = self._java_matrix_wrapper.call("toRowMatrix")
+        return RowMatrix(java_row_matrix)
+
+    def toCoordinateMatrix(self):
+        """
+        Convert this matrix to a CoordinateMatrix.
+
+        >>> rows = sc.parallelize([IndexedRow(0, [1, 0]),
+        ...                        IndexedRow(6, [0, 5])])
+        >>> mat = IndexedRowMatrix(rows).toCoordinateMatrix()
+        >>> mat.entries.take(3)
+        [MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 0.0), MatrixEntry(6, 0, 0.0)]
+        """
+        java_coordinate_matrix = self._java_matrix_wrapper.call("toCoordinateMatrix")
+        return CoordinateMatrix(java_coordinate_matrix)
+
+
+class MatrixEntry(object):
+    """
+    .. note:: Experimental
+
+    Represents an entry of a CoordinateMatrix.
+
+    Just a wrapper over a (long, long, float) tuple.
+
+    :param i: The row index of the matrix.
+    :param j: The column index of the matrix.
+    :param value: The (i, j)th entry of the matrix, as a float.
+    """
+    def __init__(self, i, j, value):
+        self.i = long(i)
+        self.j = long(j)
+        self.value = float(value)
+
+    def __repr__(self):
+        return "MatrixEntry(%s, %s, %s)" % (self.i, self.j, self.value)
+
+
+def _convert_to_matrix_entry(entry):
+    if isinstance(entry, MatrixEntry):
+        return entry
+    elif isinstance(entry, tuple) and len(entry) == 3:
+        return MatrixEntry(*entry)
+    else:
+        raise TypeError("Cannot convert type %s into MatrixEntry" % type(entry))
+
+
+class CoordinateMatrix(DistributedMatrix):
+    """
+    .. note:: Experimental
+
+    Represents a matrix in coordinate format.
+
+    :param entries: An RDD of MatrixEntry inputs or
+                    (long, long, float) tuples.
+    :param numRows: Number of rows in the matrix. A non-positive
+                    value means unknown, at which point the number
+                    of rows will be determined by the max row
+                    index plus one.
+    :param numCols: Number of columns in the matrix. A non-positive
+                    value means unknown, at which point the number
+                    of columns will be determined by the max row
+                    index plus one.
+    """
+    def __init__(self, entries, numRows=0, numCols=0):
+        """
+        Note: This docstring is not shown publicly.
+
+        Create a wrapper over a Java CoordinateMatrix.
+
+        Publicly, we require that `rows` be an RDD.  However, for
+        internal usage, `rows` can also be a Java CoordinateMatrix
+        object, in which case we can wrap it directly.  This
+        assists in clean matrix conversions.
+
+        >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+        ...                           MatrixEntry(6, 4, 2.1)])
+        >>> mat = CoordinateMatrix(entries)
+
+        >>> mat_diff = CoordinateMatrix(entries)
+        >>> (mat_diff._java_matrix_wrapper._java_model ==
+        ...  mat._java_matrix_wrapper._java_model)
+        False
+
+        >>> mat_same = CoordinateMatrix(mat._java_matrix_wrapper._java_model)
+        >>> (mat_same._java_matrix_wrapper._java_model ==
+        ...  mat._java_matrix_wrapper._java_model)
+        True
+        """
+        if isinstance(entries, RDD):
+            entries = entries.map(_convert_to_matrix_entry)
+            # We use DataFrames for serialization of MatrixEntry entries
+            # from Python, so first convert the RDD to a DataFrame on
+            # this side. This will convert each MatrixEntry to a Row
+            # containing the 'i', 'j', and 'value' values, which can
+            # each be easily serialized. We will convert back to
+            # MatrixEntry inputs on the Scala side.
+            java_matrix = callMLlibFunc("createCoordinateMatrix", entries.toDF(),
+                                        long(numRows), long(numCols))
+        elif (isinstance(entries, JavaObject)
+              and entries.getClass().getSimpleName() == "CoordinateMatrix"):
+            java_matrix = entries
+        else:
+            raise TypeError("entries should be an RDD of MatrixEntry entries or "
+                            "(long, long, float) tuples, got %s" % type(entries))
+
+        self._java_matrix_wrapper = JavaModelWrapper(java_matrix)
+
+    @property
+    def entries(self):
+        """
+        Entries of the CoordinateMatrix stored as an RDD of
+        MatrixEntries.
+
+        >>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2),
+        ...                                        MatrixEntry(6, 4, 2.1)]))
+        >>> entries = mat.entries
+        >>> entries.first()
+        MatrixEntry(0, 0, 1.2)
+        """
+        # We use DataFrames for serialization of MatrixEntry entries
+        # from Java, so we first convert the RDD of entries to a
+        # DataFrame on the Scala/Java side. Then we map each Row in
+        # the DataFrame back to a MatrixEntry on this side.
+        entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model)
+        entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
+        return entries
+
+    def numRows(self):
+        """
+        Get or compute the number of rows.
+
+        >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+        ...                           MatrixEntry(1, 0, 2),
+        ...                           MatrixEntry(2, 1, 3.7)])
+
+        >>> mat = CoordinateMatrix(entries)
+        >>> print(mat.numRows())
+        3
+
+        >>> mat = CoordinateMatrix(entries, 7, 6)
+        >>> print(mat.numRows())
+        7
+        """
+        return self._java_matrix_wrapper.call("numRows")
+
+    def numCols(self):
+        """
+        Get or compute the number of cols.
+
+        >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+        ...                           MatrixEntry(1, 0, 2),
+        ...                           MatrixEntry(2, 1, 3.7)])
+
+        >>> mat = CoordinateMatrix(entries)
+        >>> print(mat.numCols())
+        2
+
+        >>> mat = CoordinateMatrix(entries, 7, 6)
+        >>> print(mat.numCols())
+        6
+        """
+        return self._java_matrix_wrapper.call("numCols")
+
+    def toRowMatrix(self):
+        """
+        Convert this matrix to a RowMatrix.
+
+        >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+        ...                           MatrixEntry(6, 4, 2.1)])
+
+        >>> # This CoordinateMatrix will have 7 effective rows, due to
+        >>> # the highest row index being 6, but the ensuing RowMatrix
+        >>> # will only have 2 rows since there are only entries on 2
+        >>> # unique rows.
+        >>> mat = CoordinateMatrix(entries).toRowMatrix()
+        >>> print(mat.numRows())
+        2
+
+        >>> # This CoordinateMatrix will have 5 columns, due to the
+        >>> # highest column index being 4, and the ensuing RowMatrix
+        >>> # will have 5 columns as well.
+        >>> mat = CoordinateMatrix(entries).toRowMatrix()
+        >>> print(mat.numCols())
+        5
+        """
+        java_row_matrix = self._java_matrix_wrapper.call("toRowMatrix")
+        return RowMatrix(java_row_matrix)
+
+    def toIndexedRowMatrix(self):
+        """
+        Convert this matrix to an IndexedRowMatrix.
+
+        >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
+        ...                           MatrixEntry(6, 4, 2.1)])
+
+        >>> # This CoordinateMatrix will have 7 effective rows, due to
+        >>> # the highest row index being 6, and the ensuing
+        >>> # IndexedRowMatrix will have 7 rows as well.
+        >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
+        >>> print(mat.numRows())
+        7
+
+        >>> # This CoordinateMatrix will have 5 columns, due to the
+        >>> # highest column index being 4, and the ensuing
+        >>> # IndexedRowMatrix will have 5 columns as well.
+        >>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
+        >>> print(mat.numCols())
+        5
+        """
+        java_indexed_row_matrix = self._java_matrix_wrapper.call("toIndexedRowMatrix")
+        return IndexedRowMatrix(java_indexed_row_matrix)
+
+
+def _test():
+    import doctest
+    from pyspark import SparkContext
+    from pyspark.sql import SQLContext
+    import pyspark.mllib.linalg.distributed
+    globs = pyspark.mllib.linalg.distributed.__dict__.copy()
+    globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
+    globs['sqlContext'] = SQLContext(globs['sc'])
+    (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
+    globs['sc'].stop()
+    if failure_count:
+        exit(-1)
+
+if __name__ == "__main__":
+    _test()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message