Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D47AE1775C for ; Tue, 17 Feb 2015 18:22:51 +0000 (UTC) Received: (qmail 38571 invoked by uid 500); 17 Feb 2015 18:22:51 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 38541 invoked by uid 500); 17 Feb 2015 18:22:51 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 38532 invoked by uid 99); 17 Feb 2015 18:22:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Feb 2015 18:22:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 91021E03F7; Tue, 17 Feb 2015 18:22:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marmbrus@apache.org To: commits@spark.apache.org Message-Id: <92e1105728754cbcb1c58e9fc7c2fe73@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-5859] [PySpark] [SQL] fix DataFrame Python API Date: Tue, 17 Feb 2015 18:22:51 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master c74b07fa9 -> d8adefefc [SPARK-5859] [PySpark] [SQL] fix DataFrame Python API 1. added explain() 2. add isLocal() 3. do not call show() in __repl__ 4. add foreach() and foreachPartition() 5. add distinct() 6. fix functions.col()/column()/lit() 7. fix unit tests in sql/functions.py 8. fix unicode in showString() Author: Davies Liu Closes #4645 from davies/df6 and squashes the following commits: 6b46a2c [Davies Liu] fix DataFrame Python API Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8adefef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8adefef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8adefef Branch: refs/heads/master Commit: d8adefefcc2a4af32295440ed1d4917a6968f017 Parents: c74b07f Author: Davies Liu Authored: Tue Feb 17 10:22:48 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 10:22:48 2015 -0800 ---------------------------------------------------------------------- python/pyspark/sql/dataframe.py | 65 ++++++++++++++++++++++++++++++------ python/pyspark/sql/functions.py | 12 +++---- 2 files changed, 59 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d8adefef/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 28a59e7..8417240 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -238,6 +238,22 @@ class DataFrame(object): """ print (self._jdf.schema().treeString()) + def explain(self, extended=False): + """ + Prints the plans (logical and physical) to the console for + debugging purpose. + + If extended is False, only prints the physical plan. + """ + self._jdf.explain(extended) + + def isLocal(self): + """ + Returns True if the `collect` and `take` methods can be run locally + (without any Spark executors). + """ + return self._jdf.isLocal() + def show(self): """ Print the first 20 rows. @@ -247,14 +263,12 @@ class DataFrame(object): 2 Alice 5 Bob >>> df - age name - 2 Alice - 5 Bob + DataFrame[age: int, name: string] """ - print (self) + print self._jdf.showString().encode('utf8', 'ignore') def __repr__(self): - return self._jdf.showString() + return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) def count(self): """Return the number of elements in this RDD. @@ -336,6 +350,8 @@ class DataFrame(object): """ Return a new RDD by applying a function to each partition. + It's a shorthand for df.rdd.mapPartitions() + >>> rdd = sc.parallelize([1, 2, 3, 4], 4) >>> def f(iterator): yield 1 >>> rdd.mapPartitions(f).sum() @@ -343,6 +359,31 @@ class DataFrame(object): """ return self.rdd.mapPartitions(f, preservesPartitioning) + def foreach(self, f): + """ + Applies a function to all rows of this DataFrame. + + It's a shorthand for df.rdd.foreach() + + >>> def f(person): + ... print person.name + >>> df.foreach(f) + """ + return self.rdd.foreach(f) + + def foreachPartition(self, f): + """ + Applies a function to each partition of this DataFrame. + + It's a shorthand for df.rdd.foreachPartition() + + >>> def f(people): + ... for person in people: + ... print person.name + >>> df.foreachPartition(f) + """ + return self.rdd.foreachPartition(f) + def cache(self): """ Persist with the default storage level (C{MEMORY_ONLY_SER}). """ @@ -377,8 +418,13 @@ class DataFrame(object): """ Return a new :class:`DataFrame` that has exactly `numPartitions` partitions. """ - rdd = self._jdf.repartition(numPartitions, None) - return DataFrame(rdd, self.sql_ctx) + return DataFrame(self._jdf.repartition(numPartitions, None), self.sql_ctx) + + def distinct(self): + """ + Return a new :class:`DataFrame` containing the distinct rows in this DataFrame. + """ + return DataFrame(self._jdf.distinct(), self.sql_ctx) def sample(self, withReplacement, fraction, seed=None): """ @@ -957,10 +1003,7 @@ class Column(DataFrame): return Column(jc, self.sql_ctx) def __repr__(self): - if self._jdf.isComputable(): - return self._jdf.samples() - else: - return 'Column<%s>' % self._jdf.toString() + return 'Column<%s>' % self._jdf.toString().encode('utf8') def toPandas(self): """ http://git-wip-us.apache.org/repos/asf/spark/blob/d8adefef/python/pyspark/sql/functions.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index d0e0906..fc61162 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -37,7 +37,7 @@ def _create_function(name, doc=""): """ Create a function for aggregator by name""" def _(col): sc = SparkContext._active_spark_context - jc = getattr(sc._jvm.functions, name)(_to_java_column(col)) + jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) return Column(jc) _.__name__ = name _.__doc__ = doc @@ -140,6 +140,7 @@ class UserDefinedFunction(object): def udf(f, returnType=StringType()): """Create a user defined function (UDF) + >>> from pyspark.sql.types import IntegerType >>> slen = udf(lambda s: len(s), IntegerType()) >>> df.select(slen(df.name).alias('slen')).collect() [Row(slen=5), Row(slen=3)] @@ -151,17 +152,14 @@ def _test(): import doctest from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext - import pyspark.sql.dataframe - globs = pyspark.sql.dataframe.__dict__.copy() + import pyspark.sql.functions + globs = pyspark.sql.functions.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['sqlCtx'] = SQLContext(sc) globs['df'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF() - globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF() - globs['df3'] = sc.parallelize([Row(name='Alice', age=2, height=80), - Row(name='Bob', age=5, height=85)]).toDF() (failure_count, test_count) = doctest.testmod( - pyspark.sql.dataframe, globs=globs, + pyspark.sql.functions, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) globs['sc'].stop() if failure_count: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org