From commits-return-29979-archive-asf-public=cust-asf.ponee.io@spark.apache.org Fri Jan 26 11:28:34 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 697AF180657 for ; Fri, 26 Jan 2018 11:28:34 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5992C160C3E; Fri, 26 Jan 2018 10:28:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 776FE160C20 for ; Fri, 26 Jan 2018 11:28:33 +0100 (CET) Received: (qmail 68269 invoked by uid 500); 26 Jan 2018 10:28:32 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 68260 invoked by uid 99); 26 Jan 2018 10:28:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Jan 2018 10:28:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8A474E9434; Fri, 26 Jan 2018 10:28:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mlnick@apache.org To: commits@spark.apache.org Message-Id: <1e8975e562fd46bcb2eb0fc89b227298@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-22797][PYSPARK] Bucketizer support multi-column Date: Fri, 26 Jan 2018 10:28:32 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master cd3956df0 -> c22eaa94e [SPARK-22797][PYSPARK] Bucketizer support multi-column ## What changes were proposed in this pull request? Bucketizer support multi-column in the python side ## How was this patch tested? existing tests and added tests Author: Zheng RuiFeng Closes #19892 from zhengruifeng/20542_py. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c22eaa94 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c22eaa94 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c22eaa94 Branch: refs/heads/master Commit: c22eaa94e85aaac649566495dcf763a5de3c8d06 Parents: cd3956d Author: Zheng RuiFeng Authored: Fri Jan 26 12:28:27 2018 +0200 Committer: Nick Pentreath Committed: Fri Jan 26 12:28:27 2018 +0200 ---------------------------------------------------------------------- python/pyspark/ml/feature.py | 105 +++++++++++++++++++++++-------- python/pyspark/ml/param/__init__.py | 10 +++ python/pyspark/ml/tests.py | 9 +++ 3 files changed, 99 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c22eaa94/python/pyspark/ml/feature.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index da85ba7..fdc7787 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -317,26 +317,33 @@ class BucketedRandomProjectionLSHModel(LSHModel, JavaMLReadable, JavaMLWritable) @inherit_doc -class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasHandleInvalid, - JavaMLReadable, JavaMLWritable): - """ - Maps a column of continuous features to a column of feature buckets. - - >>> values = [(0.1,), (0.4,), (1.2,), (1.5,), (float("nan"),), (float("nan"),)] - >>> df = spark.createDataFrame(values, ["values"]) +class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasInputCols, HasOutputCols, + HasHandleInvalid, JavaMLReadable, JavaMLWritable): + """ + Maps a column of continuous features to a column of feature buckets. Since 2.3.0, + :py:class:`Bucketizer` can map multiple columns at once by setting the :py:attr:`inputCols` + parameter. Note that when both the :py:attr:`inputCol` and :py:attr:`inputCols` parameters + are set, an Exception will be thrown. The :py:attr:`splits` parameter is only used for single + column usage, and :py:attr:`splitsArray` is for multiple columns. + + >>> values = [(0.1, 0.0), (0.4, 1.0), (1.2, 1.3), (1.5, float("nan")), + ... (float("nan"), 1.0), (float("nan"), 0.0)] + >>> df = spark.createDataFrame(values, ["values1", "values2"]) >>> bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4, float("inf")], - ... inputCol="values", outputCol="buckets") - >>> bucketed = bucketizer.setHandleInvalid("keep").transform(df).collect() - >>> len(bucketed) - 6 - >>> bucketed[0].buckets - 0.0 - >>> bucketed[1].buckets - 0.0 - >>> bucketed[2].buckets - 1.0 - >>> bucketed[3].buckets - 2.0 + ... inputCol="values1", outputCol="buckets") + >>> bucketed = bucketizer.setHandleInvalid("keep").transform(df.select("values1")) + >>> bucketed.show(truncate=False) + +-------+-------+ + |values1|buckets| + +-------+-------+ + |0.1 |0.0 | + |0.4 |0.0 | + |1.2 |1.0 | + |1.5 |2.0 | + |NaN |3.0 | + |NaN |3.0 | + +-------+-------+ + ... >>> bucketizer.setParams(outputCol="b").transform(df).head().b 0.0 >>> bucketizerPath = temp_path + "/bucketizer" @@ -347,6 +354,22 @@ class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasHandleInvalid, >>> bucketed = bucketizer.setHandleInvalid("skip").transform(df).collect() >>> len(bucketed) 4 + >>> bucketizer2 = Bucketizer(splitsArray= + ... [[-float("inf"), 0.5, 1.4, float("inf")], [-float("inf"), 0.5, float("inf")]], + ... inputCols=["values1", "values2"], outputCols=["buckets1", "buckets2"]) + >>> bucketed2 = bucketizer2.setHandleInvalid("keep").transform(df) + >>> bucketed2.show(truncate=False) + +-------+-------+--------+--------+ + |values1|values2|buckets1|buckets2| + +-------+-------+--------+--------+ + |0.1 |0.0 |0.0 |0.0 | + |0.4 |1.0 |0.0 |1.0 | + |1.2 |1.3 |1.0 |1.0 | + |1.5 |NaN |2.0 |2.0 | + |NaN |1.0 |3.0 |1.0 | + |NaN |0.0 |3.0 |0.0 | + +-------+-------+--------+--------+ + ... .. versionadded:: 1.4.0 """ @@ -363,14 +386,30 @@ class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasHandleInvalid, handleInvalid = Param(Params._dummy(), "handleInvalid", "how to handle invalid entries. " + "Options are 'skip' (filter out rows with invalid values), " + - "'error' (throw an error), or 'keep' (keep invalid values in a special " + - "additional bucket).", + "'error' (throw an error), or 'keep' (keep invalid values in a " + + "special additional bucket). Note that in the multiple column " + + "case, the invalid handling is applied to all columns. That said " + + "for 'error' it will throw an error if any invalids are found in " + + "any column, for 'skip' it will skip rows with any invalids in " + + "any columns, etc.", typeConverter=TypeConverters.toString) + splitsArray = Param(Params._dummy(), "splitsArray", "The array of split points for mapping " + + "continuous features into buckets for multiple columns. For each input " + + "column, with n+1 splits, there are n buckets. A bucket defined by " + + "splits x,y holds values in the range [x,y) except the last bucket, " + + "which also includes y. The splits should be of length >= 3 and " + + "strictly increasing. Values at -inf, inf must be explicitly provided " + + "to cover all Double values; otherwise, values outside the splits " + + "specified will be treated as errors.", + typeConverter=TypeConverters.toListListFloat) + @keyword_only - def __init__(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error"): + def __init__(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error", + splitsArray=None, inputCols=None, outputCols=None): """ - __init__(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error") + __init__(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error", \ + splitsArray=None, inputCols=None, outputCols=None) """ super(Bucketizer, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.Bucketizer", self.uid) @@ -380,9 +419,11 @@ class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasHandleInvalid, @keyword_only @since("1.4.0") - def setParams(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error"): + def setParams(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error", + splitsArray=None, inputCols=None, outputCols=None): """ - setParams(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error") + setParams(self, splits=None, inputCol=None, outputCol=None, handleInvalid="error", \ + splitsArray=None, inputCols=None, outputCols=None) Sets params for this Bucketizer. """ kwargs = self._input_kwargs @@ -402,6 +443,20 @@ class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, HasHandleInvalid, """ return self.getOrDefault(self.splits) + @since("2.3.0") + def setSplitsArray(self, value): + """ + Sets the value of :py:attr:`splitsArray`. + """ + return self._set(splitsArray=value) + + @since("2.3.0") + def getSplitsArray(self): + """ + Gets the array of split points or its default value. + """ + return self.getOrDefault(self.splitsArray) + @inherit_doc class CountVectorizer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable): http://git-wip-us.apache.org/repos/asf/spark/blob/c22eaa94/python/pyspark/ml/param/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py index 043c25c..5b6b702 100644 --- a/python/pyspark/ml/param/__init__.py +++ b/python/pyspark/ml/param/__init__.py @@ -135,6 +135,16 @@ class TypeConverters(object): raise TypeError("Could not convert %s to list of floats" % value) @staticmethod + def toListListFloat(value): + """ + Convert a value to list of list of floats, if possible. + """ + if TypeConverters._can_convert_to_list(value): + value = TypeConverters.toList(value) + return [TypeConverters.toListFloat(v) for v in value] + raise TypeError("Could not convert %s to list of list of floats" % value) + + @staticmethod def toListInt(value): """ Convert a value to list of ints, if possible. http://git-wip-us.apache.org/repos/asf/spark/blob/c22eaa94/python/pyspark/ml/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 1af2b91..b8bddbd 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -238,6 +238,15 @@ class ParamTypeConversionTests(PySparkTestCase): self.assertRaises(TypeError, lambda: LogisticRegression(fitIntercept=1)) self.assertRaises(TypeError, lambda: LogisticRegression(fitIntercept="false")) + def test_list_list_float(self): + b = Bucketizer(splitsArray=[[-0.1, 0.5, 3], [-5, 1.5]]) + self.assertEqual(b.getSplitsArray(), [[-0.1, 0.5, 3.0], [-5.0, 1.5]]) + self.assertTrue(all([type(v) == list for v in b.getSplitsArray()])) + self.assertTrue(all([type(v) == float for v in b.getSplitsArray()[0]])) + self.assertTrue(all([type(v) == float for v in b.getSplitsArray()[1]])) + self.assertRaises(TypeError, lambda: Bucketizer(splitsArray=["a", 1.0])) + self.assertRaises(TypeError, lambda: Bucketizer(splitsArray=[[-5, 1.5], ["a", 1.0]])) + class PipelineTests(PySparkTestCase): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org