spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-10162] [SQL] Fix the timezone omitting for PySpark Dataframe filter function
Date Tue, 01 Sep 2015 21:58:26 GMT
Repository: spark
Updated Branches:
  refs/heads/master ec0128053 -> bf550a4b5


[SPARK-10162] [SQL] Fix the timezone omitting for PySpark Dataframe filter function

This PR addresses [SPARK-10162](https://issues.apache.org/jira/browse/SPARK-10162)
The issue is with DataFrame filter() function, if datetime.datetime is passed to it:
* Timezone information of this datetime is ignored
* This datetime is assumed to be in local timezone, which depends on the OS timezone setting

Fix includes both code change and regression test. Problem reproduction code on master:
```python
import pytz
from datetime import datetime
from pyspark.sql import *
from pyspark.sql.types import *
sqc = SQLContext(sc)
df = sqc.createDataFrame([], StructType([StructField("dt", TimestampType())]))

m1 = pytz.timezone('UTC')
m2 = pytz.timezone('Etc/GMT+3')

df.filter(df.dt > datetime(2000, 01, 01, tzinfo=m1)).explain()
df.filter(df.dt > datetime(2000, 01, 01, tzinfo=m2)).explain()
```
It gives the same timestamp ignoring time zone:
```
>>> df.filter(df.dt > datetime(2000, 01, 01, tzinfo=m1)).explain()
Filter (dt#0 > 946713600000000)
 Scan PhysicalRDD[dt#0]

>>> df.filter(df.dt > datetime(2000, 01, 01, tzinfo=m2)).explain()
Filter (dt#0 > 946713600000000)
 Scan PhysicalRDD[dt#0]
```
After the fix:
```
>>> df.filter(df.dt > datetime(2000, 01, 01, tzinfo=m1)).explain()
Filter (dt#0 > 946684800000000)
 Scan PhysicalRDD[dt#0]

>>> df.filter(df.dt > datetime(2000, 01, 01, tzinfo=m2)).explain()
Filter (dt#0 > 946695600000000)
 Scan PhysicalRDD[dt#0]
```
PR [8536](https://github.com/apache/spark/pull/8536) was occasionally closed by me dropping
the repo

Author: 0x0FFF <programmerag@gmail.com>

Closes #8555 from 0x0FFF/SPARK-10162.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf550a4b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf550a4b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf550a4b

Branch: refs/heads/master
Commit: bf550a4b551b6dd18fea3eb3f70497f9a6ad8e6c
Parents: ec01280
Author: 0x0FFF <programmerag@gmail.com>
Authored: Tue Sep 1 14:34:59 2015 -0700
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Tue Sep 1 14:34:59 2015 -0700

----------------------------------------------------------------------
 python/pyspark/sql/tests.py | 26 ++++++++++++++++++--------
 python/pyspark/sql/types.py |  7 +++++--
 2 files changed, 23 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf550a4b/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index cd32e26..59a891b 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -50,16 +50,17 @@ from pyspark.sql.window import Window
 from pyspark.sql.utils import AnalysisException, IllegalArgumentException
 
 
-class UTC(datetime.tzinfo):
-    """UTC"""
-    ZERO = datetime.timedelta(0)
+class UTCOffsetTimezone(datetime.tzinfo):
+    """
+    Specifies timezone in UTC offset
+    """
+
+    def __init__(self, offset=0):
+        self.ZERO = datetime.timedelta(hours=offset)
 
     def utcoffset(self, dt):
         return self.ZERO
 
-    def tzname(self, dt):
-        return "UTC"
-
     def dst(self, dt):
         return self.ZERO
 
@@ -841,13 +842,22 @@ class SQLTests(ReusedPySparkTestCase):
         self.assertEqual(0, df.filter(df.date > date).count())
         self.assertEqual(0, df.filter(df.time > time).count())
 
+    def test_filter_with_datetime_timezone(self):
+        dt1 = datetime.datetime(2015, 4, 17, 23, 1, 2, 3000, tzinfo=UTCOffsetTimezone(0))
+        dt2 = datetime.datetime(2015, 4, 17, 23, 1, 2, 3000, tzinfo=UTCOffsetTimezone(1))
+        row = Row(date=dt1)
+        df = self.sqlCtx.createDataFrame([row])
+        self.assertEqual(0, df.filter(df.date == dt2).count())
+        self.assertEqual(1, df.filter(df.date > dt2).count())
+        self.assertEqual(0, df.filter(df.date < dt2).count())
+
     def test_time_with_timezone(self):
         day = datetime.date.today()
         now = datetime.datetime.now()
         ts = time.mktime(now.timetuple())
         # class in __main__ is not serializable
-        from pyspark.sql.tests import UTC
-        utc = UTC()
+        from pyspark.sql.tests import UTCOffsetTimezone
+        utc = UTCOffsetTimezone()
         utcnow = datetime.datetime.utcfromtimestamp(ts)  # without microseconds
         # add microseconds to utcnow (keeping year,month,day,hour,minute,second)
         utcnow = datetime.datetime(*(utcnow.timetuple()[:6] + (now.microsecond, utc)))

http://git-wip-us.apache.org/repos/asf/spark/blob/bf550a4b/python/pyspark/sql/types.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 94e581a..f84d08d 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1290,8 +1290,11 @@ class DatetimeConverter(object):
 
     def convert(self, obj, gateway_client):
         Timestamp = JavaClass("java.sql.Timestamp", gateway_client)
-        return Timestamp(int(time.mktime(obj.timetuple())) * 1000 + obj.microsecond // 1000)
-
+        seconds = (calendar.timegm(obj.utctimetuple()) if obj.tzinfo
+                   else time.mktime(obj.timetuple()))
+        t = Timestamp(int(seconds) * 1000)
+        t.setNanos(obj.microsecond * 1000)
+        return t
 
 # datetime is a subclass of date, we should register DatetimeConverter first
 register_input_converter(DatetimeConverter())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message