airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject [1/3] incubator-airflow git commit: [AIRFLOW-1613] make mysql_to_gcs_operator py3 compatible
Date Mon, 27 Nov 2017 18:56:01 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 59aba3064 -> eff68882b


[AIRFLOW-1613] make mysql_to_gcs_operator py3 compatible

Uses `__future__.unicode_literals` and replaces calling `json.dumps`
with `json.dump` followed by `tmp_file_handle.write` to write json lines
to the ndjson file. When using python3, `json.dump` will return a
unicode string instead of a byte string, therefore we encode the unicode
string to `utf-8` which is compatible with bigquery (see:
https://cloud.google.com/bigquery/docs/loading-data#loading_encoded_data).


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2f79610a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2f79610a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2f79610a

Branch: refs/heads/master
Commit: 2f79610a3ef726e88dec238de000d9295ae7d2a9
Parents: 313f5ba
Author: Devon Peticolas <devon@peticol.as>
Authored: Mon Nov 13 18:12:34 2017 -0500
Committer: Devon Peticolas <devon@peticol.as>
Committed: Wed Nov 15 14:00:02 2017 -0500

----------------------------------------------------------------------
 airflow/contrib/operators/mysql_to_gcs.py | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f79610a/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py
index f94bc24..784481d 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import sys
 import json
 import time
 
@@ -24,6 +25,8 @@ from decimal import Decimal
 from MySQLdb.constants import FIELD_TYPE
 from tempfile import NamedTemporaryFile
 
+PY3 = sys.version_info[0] == 3
+
 
 class MySqlToGoogleCloudStorageOperator(BaseOperator):
     """
@@ -131,10 +134,13 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             row_dict = dict(zip(schema, row))
 
             # TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB.
-            json.dump(row_dict, tmp_file_handle)
+            s = json.dumps(row_dict)
+            if PY3:
+                s = s.encode('utf-8')
+            tmp_file_handle.write(s)
 
             # Append newline to make dumps BigQuery compatible.
-            tmp_file_handle.write('\n')
+            tmp_file_handle.write(b'\n')
 
             # Stop if the file exceeds the file size limit.
             if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
@@ -170,7 +176,10 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
 
         self.log.info('Using schema for %s: %s', self.schema_filename, schema)
         tmp_schema_file_handle = NamedTemporaryFile(delete=True)
-        json.dump(schema, tmp_schema_file_handle)
+        s = json.dumps(schema, tmp_schema_file_handle)
+        if PY3:
+            s = s.encode('utf-8')
+        tmp_schema_file_handle.write(s)
         return {self.schema_filename: tmp_schema_file_handle}
 
     def _upload_to_gcs(self, files_to_upload):
@@ -178,8 +187,9 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
         Upload all of the file splits (and optionally the schema .json file) to
         Google cloud storage.
         """
-        hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
-                                      delegate_to=self.delegate_to)
+        hook = GoogleCloudStorageHook(
+            google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+            delegate_to=self.delegate_to)
         for object, tmp_file_handle in files_to_upload.items():
             hook.upload(self.bucket, object, tmp_file_handle.name, 'application/json')
 


Mime
View raw message