airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joy...@apache.org
Subject [1/2] incubator-airflow git commit: [AIRFLOW-2169] Fix type 'bytes' is not JSON serializable in python3
Date Thu, 05 Apr 2018 00:50:18 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 9c0c4264c -> f865c7898


[AIRFLOW-2169] Fix type 'bytes' is not JSON serializable in python3


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4c89e440
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4c89e440
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4c89e440

Branch: refs/heads/master
Commit: 4c89e440ef34b43142ef7f61a2fb6424dfc7f00f
Parents: d1f94fe
Author: Hongyi Wang <hongyiw@wepay.com>
Authored: Tue Apr 3 18:10:42 2018 -0700
Committer: Hongyi Wang <hongyiw@wepay.com>
Committed: Tue Apr 3 18:10:42 2018 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/mysql_to_gcs.py       | 26 ++++++-----
 .../operators/test_mysql_to_gcs_operator.py     | 47 ++++++++++++++++++++
 2 files changed, 63 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c89e440/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py
index 2249b9c..4e238ca 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -24,7 +24,7 @@ from datetime import date, datetime
 from decimal import Decimal
 from MySQLdb.constants import FIELD_TYPE
 from tempfile import NamedTemporaryFile
-from six import string_types
+from six import string_types, binary_type
 
 PY3 = sys.version_info[0] == 3
 
@@ -130,6 +130,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             names in GCS, and values are file handles to local files that
             contain the data for the GCS objects.
         """
+        class BinaryTypeEncoder(json.JSONEncoder):
+            def default(self, obj):
+                if PY3 and isinstance(obj, binary_type):
+                    return str(obj, 'utf-8')
+                return json.JSONEncoder.default(self, obj)
+
         schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
         file_no = 0
         tmp_file_handle = NamedTemporaryFile(delete=True)
@@ -141,7 +147,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             row_dict = dict(zip(schema, row))
 
             # TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB.
-            s = json.dumps(row_dict)
+            s = json.dumps(row_dict, cls=BinaryTypeEncoder)
             if PY3:
                 s = s.encode('utf-8')
             tmp_file_handle.write(s)
@@ -166,12 +172,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             name in GCS, and values are file handles to local files that
             contains the BigQuery schema fields in .json format.
         """
-        schema = []
+        schema_str = None
         tmp_schema_file_handle = NamedTemporaryFile(delete=True)
         if self.schema is not None and isinstance(self.schema, string_types):
-            schema = self.schema
-            tmp_schema_file_handle.write(schema)
+            schema_str = self.schema
         else:
+            schema = []
             if self.schema is not None and isinstance(self.schema, list):
                 schema = self.schema
             else:
@@ -191,12 +197,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
                         'type': field_type,
                         'mode': field_mode,
                     })
-            s = json.dumps(schema, tmp_schema_file_handle)
-            if PY3:
-                s = s.encode('utf-8')
-            tmp_schema_file_handle.write(s)
+            schema_str = json.dumps(schema)
+        if PY3:
+            schema_str = schema_str.encode('utf-8')
+        tmp_schema_file_handle.write(schema_str)
 
-        self.log.info('Using schema for %s: %s', self.schema_filename, schema)
+        self.log.info('Using schema for %s: %s', self.schema_filename, schema_str)
         return {self.schema_filename: tmp_schema_file_handle}
 
     def _upload_to_gcs(self, files_to_upload):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c89e440/tests/contrib/operators/test_mysql_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_mysql_to_gcs_operator.py b/tests/contrib/operators/test_mysql_to_gcs_operator.py
new file mode 100644
index 0000000..c985ac3
--- /dev/null
+++ b/tests/contrib/operators/test_mysql_to_gcs_operator.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import MagicMock
+
+from airflow.contrib.operators.mysql_to_gcs import \
+    MySqlToGoogleCloudStorageOperator
+
+
+class MySqlToGoogleCloudStorageOperatorTest(unittest.TestCase):
+
+    def test_write_local_data_files(self):
+
+        # Configure
+        task_id = "some_test_id"
+        sql = "some_sql"
+        bucket = "some_bucket"
+        filename = "some_filename"
+        schema = "some_schema"
+        description_list = [['col_integer'], ['col_byte']]
+        row_iter = [[1, b'byte_str_1'], [2, b'byte_str_2']]
+
+        op = MySqlToGoogleCloudStorageOperator(
+            task_id=task_id,
+            sql=sql,
+            bucket=bucket,
+            filename=filename,
+            schema=schema)
+
+        cursor_mock = MagicMock()
+        cursor_mock.description = description_list
+        cursor_mock.__iter__.return_value = row_iter
+
+        # Run
+        op._write_local_data_files(cursor_mock)


Mime
View raw message