spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mengxr <...@git.apache.org>
Subject [GitHub] spark pull request: [WIP] SPARK-1430: Support sparse data in Pytho...
Date Mon, 14 Apr 2014 03:15:46 GMT
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/341#discussion_r11570444
  
    --- Diff: python/pyspark/mllib/_common.py ---
    @@ -55,159 +103,222 @@ def _serialize_double_vector(v):
         >>> array_equal(y, array([1.0, 2.0, 3.0]))
         True
         """
    -    if type(v) != ndarray:
    -        raise TypeError("_serialize_double_vector called on a %s; "
    -                "wanted ndarray" % type(v))
    -    """complex is only datatype that can't be converted to float64"""
    -    if issubdtype(v.dtype, complex):
    +    v = _convert_vector(v)
    +    if type(v) == ndarray:
    +        return _serialize_dense_vector(v)
    +    elif type(v) == SparseVector:
    +        return _serialize_sparse_vector(v)
    +    else:
             raise TypeError("_serialize_double_vector called on a %s; "
    -                "wanted ndarray" % type(v))
    -    if v.dtype != float64:
    -        v = v.astype(float64)
    +                "wanted ndarray or SparseVector" % type(v))
    +
    +
    +def _serialize_dense_vector(v):
    +    """Serialize a dense vector given as a NumPy array."""
         if v.ndim != 1:
             raise TypeError("_serialize_double_vector called on a %ddarray; "
                     "wanted a 1darray" % v.ndim)
    +    if v.dtype != float64:
    +        if numpy.issubdtype(v.dtype, numpy.complex):
    +            raise TypeError("_serialize_double_vector called on an ndarray of %s; "
    +                    "wanted ndarray of float64" % v.dtype)
    +        v = v.astype(float64)
         length = v.shape[0]
    -    ba = bytearray(16 + 8*length)
    -    header = ndarray(shape=[2], buffer=ba, dtype="int64")
    -    header[0] = 1
    -    header[1] = length
    -    copyto(ndarray(shape=[length], buffer=ba, offset=16,
    -            dtype="float64"), v)
    +    ba = bytearray(5 + 8 * length)
    +    ba[0] = DENSE_VECTOR_MAGIC
    +    length_bytes = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)
    +    length_bytes[0] = length
    +    copyto(ndarray(shape=[length], buffer=ba, offset=5, dtype=float64), v)
    +    return ba
    +
    +
    +def _serialize_sparse_vector(v):
    +    """Serialize a pyspark.mllib.linalg.SparseVector."""
    +    nonzeros = len(v.indices)
    +    ba = bytearray(9 + 12 * nonzeros)
    +    ba[0] = SPARSE_VECTOR_MAGIC
    +    header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
    +    header[0] = v.size
    +    header[1] = nonzeros
    +    copyto(ndarray(shape=[nonzeros], buffer=ba, offset=9, dtype=int32), v.indices)
    +    values_offset = 9 + 4 * nonzeros
    +    copyto(ndarray(shape=[nonzeros], buffer=ba, offset=values_offset, dtype=float64),
v.values)
         return ba
     
    +
     def _deserialize_double_vector(ba):
         """Deserialize a double vector from a mutually understood format.
     
         >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
         >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
         True
    +    >>> s = SparseVector(4, [1, 3], [3.0, 5.5])
    +    >>> s == _deserialize_double_vector(_serialize_double_vector(s))
    +    True
         """
         if type(ba) != bytearray:
             raise TypeError("_deserialize_double_vector called on a %s; "
                     "wanted bytearray" % type(ba))
    -    if len(ba) < 16:
    +    if len(ba) < 5:
             raise TypeError("_deserialize_double_vector called on a %d-byte array, "
                     "which is too short" % len(ba))
    -    if (len(ba) & 7) != 0:
    -        raise TypeError("_deserialize_double_vector called on a %d-byte array, "
    -                "which is not a multiple of 8" % len(ba))
    -    header = ndarray(shape=[2], buffer=ba, dtype="int64")
    -    if header[0] != 1:
    +    if ba[0] == DENSE_VECTOR_MAGIC:
    +        return _deserialize_dense_vector(ba)
    +    elif ba[0] == SPARSE_VECTOR_MAGIC:
    +        return _deserialize_sparse_vector(ba)
    +    else:
             raise TypeError("_deserialize_double_vector called on bytearray "
                             "with wrong magic")
    -    length = header[1]
    -    if len(ba) != 8*length + 16:
    -        raise TypeError("_deserialize_double_vector called on bytearray "
    +
    +
    +def _deserialize_dense_vector(ba):
    +    """Deserialize a dense vector into a numpy array."""
    +    if len(ba) < 5:
    +        raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
    +                "which is too short" % len(ba))
    +    length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0]
    +    if len(ba) != 8 * length + 5:
    +        raise TypeError("_deserialize_dense_vector called on bytearray "
    +                        "with wrong length")
    +    return _deserialize_numpy_array([length], ba, 5)
    +
    +
    +def _deserialize_sparse_vector(ba):
    +    """Deserialize a sparse vector into a MLlib SparseVector object."""
    +    if len(ba) < 9:
    +        raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
    +                "which is too short" % len(ba))
    +    header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
    +    size = header[0]
    +    nonzeros = header[1]
    +    if len(ba) != 9 + 12 * nonzeros:
    +        raise TypeError("_deserialize_sparse_vector called on bytearray "
                             "with wrong length")
    -    return _deserialize_byte_array([length], ba, 16)
    +    indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32)
    +    values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype=float64)
    +    return SparseVector(int(size), indices, values)
    +
     
     def _serialize_double_matrix(m):
         """Serialize a double matrix into a mutually understood format."""
    -    if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
    +    if (type(m) == ndarray and m.ndim == 2):
    +        if m.dtype != float64:
    +            if numpy.issubdtype(m.dtype, numpy.complex):
    +                raise TypeError("_serialize_double_matrix called on an ndarray of %s;
"
    +                        "wanted ndarray of float64" % m.dtype)
    +            m = m.astype(float64)
             rows = m.shape[0]
             cols = m.shape[1]
    -        ba = bytearray(24 + 8 * rows * cols)
    -        header = ndarray(shape=[3], buffer=ba, dtype="int64")
    -        header[0] = 2
    -        header[1] = rows
    -        header[2] = cols
    -        copyto(ndarray(shape=[rows, cols], buffer=ba, offset=24,
    -                       dtype="float64", order='C'), m)
    +        ba = bytearray(9 + 8 * rows * cols)
    +        ba[0] = DENSE_MATRIX_MAGIC
    +        lengths = ndarray(shape=[3], buffer=ba, offset=1, dtype=int32)
    +        lengths[0] = rows
    +        lengths[1] = cols
    +        copyto(ndarray(shape=[rows, cols], buffer=ba, offset=9,
    +                       dtype=float64, order='C'), m)
             return ba
         else:
             raise TypeError("_serialize_double_matrix called on a "
                             "non-double-matrix")
     
    +
     def _deserialize_double_matrix(ba):
         """Deserialize a double matrix from a mutually understood format."""
         if type(ba) != bytearray:
             raise TypeError("_deserialize_double_matrix called on a %s; "
                     "wanted bytearray" % type(ba))
    -    if len(ba) < 24:
    +    if len(ba) < 9:
             raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
                     "which is too short" % len(ba))
    -    if (len(ba) & 7) != 0:
    -        raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
    -                "which is not a multiple of 8" % len(ba))
    -    header = ndarray(shape=[3], buffer=ba, dtype="int64")
    -    if (header[0] != 2):
    +    if ba[0] != DENSE_MATRIX_MAGIC:
             raise TypeError("_deserialize_double_matrix called on bytearray "
                             "with wrong magic")
    -    rows = header[1]
    -    cols = header[2]
    -    if (len(ba) != 8*rows*cols + 24):
    +    lengths = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
    +    rows = lengths[0]
    +    cols = lengths[1]
    +    if (len(ba) != 8 * rows * cols + 9):
             raise TypeError("_deserialize_double_matrix called on bytearray "
                             "with wrong length")
    -    return _deserialize_byte_array([rows, cols], ba, 24)
    +    return _deserialize_numpy_array([rows, cols], ba, 9)
    +
    +
    +def _serialize_labeled_point(p):
    +    """Serialize a LabeledPoint with a features vector of any type."""
    +    #from pyspark.mllib.regression import LabeledPoint
    +    #assert type(p) == LabeledPoint, "Expected a LabeledPoint object"
    +    from pyspark.mllib.regression import LabeledPoint
    +    serialized_features = _serialize_double_vector(p.features)
    +    header = bytearray(9)
    +    header[0] = LABELED_POINT_MAGIC
    +    header_float = ndarray(shape=[1], buffer=header, offset=1, dtype=float64)
    +    header_float[0] = p.label
    +    return header + serialized_features
     
    -def _linear_predictor_typecheck(x, coeffs):
    -    """Check that x is a one-dimensional vector of the right shape.
    -    This is a temporary hackaround until I actually implement bulk predict."""
    -    if type(x) == ndarray:
    -        if x.ndim == 1:
    -            if x.shape == coeffs.shape:
    -                pass
    -            else:
    -                raise RuntimeError("Got array of %d elements; wanted %d"
    -                        % (shape(x)[0], shape(coeffs)[0]))
    -        else:
    -            raise RuntimeError("Bulk predict not yet supported.")
    -    elif (type(x) == RDD):
    -        raise RuntimeError("Bulk predict not yet supported.")
    -    else:
    -        raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
     
     def _get_unmangled_rdd(data, serializer):
         dataBytes = data.map(serializer)
         dataBytes._bypass_serializer = True
    -    dataBytes.cache()
    +    dataBytes.cache() # TODO: users should unpersist() this later!
         return dataBytes
     
    -# Map a pickled Python RDD of numpy double vectors to a Java RDD of
    +
    +# Map a pickled Python RDD of Python dense or sparse vectors to a Java RDD of
     # _serialized_double_vectors
     def _get_unmangled_double_vector_rdd(data):
         return _get_unmangled_rdd(data, _serialize_double_vector)
     
    -class LinearModel(object):
    -    """Something that has a vector of coefficients and an intercept."""
    -    def __init__(self, coeff, intercept):
    -        self._coeff = coeff
    -        self._intercept = intercept
     
    -class LinearRegressionModelBase(LinearModel):
    -    """A linear regression model.
    +# Map a pickled Python RDD of LabeledPoint to a Java RDD of _serialized_labeled_points
    +def _get_unmangled_labeled_point_rdd(data):
    +    return _get_unmangled_rdd(data, _serialize_labeled_point)
     
    -    >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
    -    >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
    -    True
    +
    +# Common functions for dealing with and training linear models
    +
    +def _linear_predictor_typecheck(x, coeffs):
         """
    -    def predict(self, x):
    -        """Predict the value of the dependent variable given a vector x"""
    -        """containing values for the independent variables."""
    -        _linear_predictor_typecheck(x, self._coeff)
    -        return dot(self._coeff, x) + self._intercept
    +    Check that x is a one-dimensional vector of the right shape.
    +    This is a temporary hackaround until we actually implement bulk predict.
    +    """
    +    x = _convert_vector(x)
    +    if type(x) == ndarray:
    +        if x.ndim == 1:
    +            if x.shape != coeffs.shape:
    +                raise RuntimeError("Got array of %d elements; wanted %d"
    +                        % (numpy.shape(x)[0], coeffs.shape[0]))
    +        else:
    +            raise RuntimeError("Bulk predict not yet supported.")
    +    elif type(x) == SparseVector:
    +        if x.size != coeffs.shape[0]:
    +           raise RuntimeError("Got sparse vector of size %d; wanted %d"
    +                   % (x.size, coeffs.shape[0]))
    +    elif (type(x) == RDD):
    +        raise RuntimeError("Bulk predict not yet supported.")
    +    else:
    +        raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
    +
     
     # If we weren't given initial weights, take a zero vector of the appropriate
     # length.
     def _get_initial_weights(initial_weights, data):
         if initial_weights is None:
    -        initial_weights = data.first()
    -        if type(initial_weights) != ndarray:
    -            raise TypeError("At least one data element has type "
    -                    + type(initial_weights).__name__ + " which is not ndarray")
    -        if initial_weights.ndim != 1:
    -            raise TypeError("At least one data element has "
    -                    + initial_weights.ndim + " dimensions, which is not 1")
    -        initial_weights = ones([initial_weights.shape[0] - 1])
    +        initial_weights = _convert_vector(data.first().features)
    +        if type(initial_weights) == ndarray:
    +            if initial_weights.ndim != 1:
    +                raise TypeError("At least one data element has "
    +                        + initial_weights.ndim + " dimensions, which is not 1")
    +            initial_weights = numpy.ones([initial_weights.shape[0]])
    --- End diff --
    
    Should use zeros instead of ones.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message