airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kaxiln...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2169] Encode binary data with base64 before importing to BigQuery
Date Sat, 12 May 2018 16:34:44 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-10-test 28cda6b7c -> c0844bae0


[AIRFLOW-2169] Encode binary data with base64 before importing to BigQuery

Closes #3221 from whynick1/master

(cherry picked from commit 5f7bb61f1f22a203acdfa22e448d612ba97155a4)
Signed-off-by: Kaxil Naik <kaxilnaik@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c0844bae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c0844bae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c0844bae

Branch: refs/heads/v1-10-test
Commit: c0844bae0c48c864d778fb0e2cae4474a4bf4847
Parents: 28cda6b
Author: Hongyi Wang <hongyiw@wepay.com>
Authored: Sat May 12 17:33:46 2018 +0100
Committer: Kaxil Naik <kaxilnaik@gmail.com>
Committed: Sat May 12 17:34:37 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/mysql_to_gcs.py       | 114 ++++++++++++-------
 .../operators/test_mysql_to_gcs_operator.py     |  18 ++-
 2 files changed, 85 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0844bae/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py
index df3489c..bd3efdb 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,7 @@
 import sys
 import json
 import time
+import base64
 
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
 from airflow.hooks.mysql_hook import MySqlHook
@@ -29,7 +30,7 @@ from datetime import date, datetime
 from decimal import Decimal
 from MySQLdb.constants import FIELD_TYPE
 from tempfile import NamedTemporaryFile
-from six import string_types, binary_type
+from six import string_types
 
 PY3 = sys.version_info[0] == 3
 
@@ -81,8 +82,9 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             cloud storage hook.
         :type google_cloud_storage_conn_id: string
         :param schema: The schema to use, if any. Should be a list of dict or
-            a str. Examples could be see: https://cloud.google.com/bigquery
-            /docs/schemas#specifying_a_json_schema_file
+            a str. Pass a string if using Jinja template, otherwise, pass a list of
+            dict. Examples could be seen: https://cloud.google.com/bigquery/docs
+            /schemas#specifying_a_json_schema_file
         :type schema: str or list
         :param delegate_to: The account to impersonate, if any. For this to
             work, the service account making the request must have domain-wide
@@ -107,7 +109,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
         if self.schema_filename:
             files_to_upload.update(self._write_local_schema_file(cursor))
 
-        # Flush all files before uploading
+        # Flush all files before uploading.
         for file_handle in files_to_upload.values():
             file_handle.flush()
 
@@ -135,24 +137,20 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             names in GCS, and values are file handles to local files that
             contain the data for the GCS objects.
         """
-        class BinaryTypeEncoder(json.JSONEncoder):
-            def default(self, obj):
-                if PY3 and isinstance(obj, binary_type):
-                    return str(obj, 'utf-8')
-                return json.JSONEncoder.default(self, obj)
-
         schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
+        col_type_dict = self._get_col_type_dict()
         file_no = 0
         tmp_file_handle = NamedTemporaryFile(delete=True)
         tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}
 
         for row in cursor:
-            # Convert datetime objects to utc seconds, and decimals to floats
-            row = map(self.convert_types, row)
+            # Convert datetime objects to utc seconds, and decimals to floats.
+            # Convert binary type object to string encoded with base64.
+            row = self._convert_types(schema, col_type_dict, row)
             row_dict = dict(zip(schema, row))
 
             # TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB.
-            s = json.dumps(row_dict, cls=BinaryTypeEncoder)
+            s = json.dumps(row_dict)
             if PY3:
                 s = s.encode('utf-8')
             tmp_file_handle.write(s)
@@ -181,27 +179,26 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
         tmp_schema_file_handle = NamedTemporaryFile(delete=True)
         if self.schema is not None and isinstance(self.schema, string_types):
             schema_str = self.schema
+        elif self.schema is not None and isinstance(self.schema, list):
+            schema_str = json.dumps(self.schema)
         else:
             schema = []
-            if self.schema is not None and isinstance(self.schema, list):
-                schema = self.schema
-            else:
-                for field in cursor.description:
-                    # See PEP 249 for details about the description tuple.
-                    field_name = field[0]
-                    field_type = self.type_map(field[1])
-                    # Always allow TIMESTAMP to be nullable. MySQLdb returns None types
-                    # for required fields because some MySQL timestamps can't be
-                    # represented by Python's datetime (e.g. 0000-00-00 00:00:00).
-                    if field[6] or field_type == 'TIMESTAMP':
-                        field_mode = 'NULLABLE'
-                    else:
-                        field_mode = 'REQUIRED'
-                    schema.append({
-                        'name': field_name,
-                        'type': field_type,
-                        'mode': field_mode,
-                    })
+            for field in cursor.description:
+                # See PEP 249 for details about the description tuple.
+                field_name = field[0]
+                field_type = self.type_map(field[1])
+                # Always allow TIMESTAMP to be nullable. MySQLdb returns None types
+                # for required fields because some MySQL timestamps can't be
+                # represented by Python's datetime (e.g. 0000-00-00 00:00:00).
+                if field[6] or field_type == 'TIMESTAMP':
+                    field_mode = 'NULLABLE'
+                else:
+                    field_mode = 'REQUIRED'
+                schema.append({
+                    'name': field_name,
+                    'type': field_type,
+                    'mode': field_mode,
+                })
             schema_str = json.dumps(schema)
         if PY3:
             schema_str = schema_str.encode('utf-8')
@@ -221,19 +218,50 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
         for object, tmp_file_handle in files_to_upload.items():
             hook.upload(self.bucket, object, tmp_file_handle.name, 'application/json')
 
-    @classmethod
-    def convert_types(cls, value):
+    def _convert_types(self, schema, col_type_dict, row):
         """
         Takes a value from MySQLdb, and converts it to a value that's safe for
         JSON/Google cloud storage/BigQuery. Dates are converted to UTC seconds.
-        Decimals are converted to floats.
+        Decimals are converted to floats. Binary type fields are encoded with base64,
+        as imported BYTES data must be base64-encoded according to Bigquery SQL
+        date type documentation: https://cloud.google.com/bigquery/data-types
         """
-        if type(value) in (datetime, date):
-            return time.mktime(value.timetuple())
-        elif isinstance(value, Decimal):
-            return float(value)
-        else:
-            return value
+        converted_row = []
+        for col_name, col_val in zip(schema, row):
+            if type(col_val) in (datetime, date):
+                col_val = time.mktime(col_val.timetuple())
+            elif isinstance(col_val, Decimal):
+                col_val = float(col_val)
+            elif col_type_dict.get(col_name) == "BYTES":
+                col_val = base64.urlsafe_b64encode(col_val)
+                if PY3:
+                    col_val = col_val.decode('ascii')
+            else:
+                col_val = col_val
+            converted_row.append(col_val)
+        return converted_row
+
+    def _get_col_type_dict(self):
+        """
+        Return a dict of column name and column type based on self.schema if not None.
+        """
+        schema = []
+        if isinstance(self.schema, string_types):
+            schema = json.loads(self.schema)
+        elif isinstance(self.schema, list):
+            schema = self.schema
+        elif self.schema is not None:
+            self.log.warn('Using default schema due to unexpected type.'
+                          'Should be a string or list.')
+
+        col_type_dict = {}
+        try:
+            col_type_dict = {col['name']: col['type'] for col in schema}
+        except KeyError:
+            self.log.warn('Using default schema due to missing name or type. Please '
+                          'refer to: https://cloud.google.com/bigquery/docs/schemas'
+                          '#specifying_a_json_schema_file')
+        return col_type_dict
 
     @classmethod
     def type_map(cls, mysql_type):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0844bae/tests/contrib/operators/test_mysql_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_mysql_to_gcs_operator.py b/tests/contrib/operators/test_mysql_to_gcs_operator.py
index 396bc3a..6e2e3f9 100644
--- a/tests/contrib/operators/test_mysql_to_gcs_operator.py
+++ b/tests/contrib/operators/test_mysql_to_gcs_operator.py
@@ -18,6 +18,7 @@
 # under the License.
 
 import unittest
+import json
 from mock import MagicMock
 
 from airflow.contrib.operators.mysql_to_gcs import \
@@ -33,19 +34,28 @@ class MySqlToGoogleCloudStorageOperatorTest(unittest.TestCase):
         sql = "some_sql"
         bucket = "some_bucket"
         filename = "some_filename"
-        schema = "some_schema"
-        description_list = [['col_integer'], ['col_byte']]
         row_iter = [[1, b'byte_str_1'], [2, b'byte_str_2']]
+        schema = []
+        schema.append({
+            'name': 'location',
+            'type': 'STRING',
+            'mode': 'nullable',
+        })
+        schema.append({
+            'name': 'uuid',
+            'type': 'BYTES',
+            'mode': 'nullable',
+        })
+        schema_str = json.dumps(schema)
 
         op = MySqlToGoogleCloudStorageOperator(
             task_id=task_id,
             sql=sql,
             bucket=bucket,
             filename=filename,
-            schema=schema)
+            schema=schema_str)
 
         cursor_mock = MagicMock()
-        cursor_mock.description = description_list
         cursor_mock.__iter__.return_value = row_iter
 
         # Run


Mime
View raw message