airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator
Date Wed, 11 Oct 2017 20:07:39 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-9-test ace2b1d24 -> afcdd097d


[AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator

Closes #2680 from jgao54/write-binary


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/afcdd097
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/afcdd097
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/afcdd097

Branch: refs/heads/v1-9-test
Commit: afcdd097d765c8e88b4858436bb01585bc04ba7f
Parents: ace2b1d
Author: Joy Gao <joyg@wepay.com>
Authored: Wed Oct 11 13:06:17 2017 -0700
Committer: Chris Riccomini <criccomini@apache.org>
Committed: Wed Oct 11 13:07:32 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py    |  4 ++-
 airflow/contrib/operators/mysql_to_gcs.py | 48 +++++++++++++++++++-------
 2 files changed, 38 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/afcdd097/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 5fc7e22..fab2a43 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -971,7 +971,9 @@ def _bq_cast(string_field, bq_type):
     if string_field is None:
         return None
     elif bq_type == 'INTEGER' or bq_type == 'TIMESTAMP':
-        return int(string_field)
+        # convert to float first to handle cases where string_field is
+        # represented in scientific notation
+        return int(float(string_field))
     elif bq_type == 'FLOAT':
         return float(string_field)
     elif bq_type == 'BOOLEAN':

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/afcdd097/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py
index f94bc24..47b7ac9 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -14,6 +14,7 @@
 
 import json
 import time
+import base64
 
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
 from airflow.hooks.mysql_hook import MySqlHook
@@ -21,7 +22,7 @@ from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from datetime import date, datetime
 from decimal import Decimal
-from MySQLdb.constants import FIELD_TYPE
+from MySQLdb.constants import FIELD_TYPE, FLAG
 from tempfile import NamedTemporaryFile
 
 
@@ -120,15 +121,20 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             names in GCS, and values are file handles to local files that
             contain the data for the GCS objects.
         """
-        schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
+        field_names = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
+        mysql_types = list(map(lambda schema_tuple: schema_tuple[1], cursor.description))
+        byte_fields = [self.is_binary(t, f) for t, f in zip(mysql_types, cursor.description_flags)]
+
         file_no = 0
-        tmp_file_handle = NamedTemporaryFile(delete=True)
+        tmp_file_handle = NamedTemporaryFile(mode='w', delete=True)
         tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}
 
         for row in cursor:
-            # Convert datetime objects to utc seconds, and decimals to floats
-            row = map(self.convert_types, row)
-            row_dict = dict(zip(schema, row))
+            # Convert datetime objects to utc seconds, decimals to floats, and binaries
+            # to base64-encoded strings
+            row_dict = {}
+            for name, value, is_binary in zip(field_names, row, byte_fields):
+                row_dict[name] = self.convert_types(value, is_binary)
 
             # TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB.
             json.dump(row_dict, tmp_file_handle)
@@ -139,7 +145,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             # Stop if the file exceeds the file size limit.
             if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
                 file_no += 1
-                tmp_file_handle = NamedTemporaryFile(delete=True)
+                tmp_file_handle = NamedTemporaryFile(mode='w', delete=True)
                 tmp_file_handles[self.filename.format(file_no)] = tmp_file_handle
 
         return tmp_file_handles
@@ -154,10 +160,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             contains the BigQuery schema fields in .json format.
         """
         schema = []
-        for field in cursor.description:
+        for field, flag in zip(cursor.description, cursor.description_flags):
             # See PEP 249 for details about the description tuple.
             field_name = field[0]
-            field_type = self.type_map(field[1])
+
+            field_type = self.type_map(field[1], flag)
+
             # Always allow TIMESTAMP to be nullable. MySQLdb returns None types
             # for required fields because some MySQL timestamps can't be
             # represented by Python's datetime (e.g. 0000-00-00 00:00:00).
@@ -169,7 +177,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             })
 
         self.log.info('Using schema for %s: %s', self.schema_filename, schema)
-        tmp_schema_file_handle = NamedTemporaryFile(delete=True)
+        tmp_schema_file_handle = NamedTemporaryFile(mode='w', delete=True)
         json.dump(schema, tmp_schema_file_handle)
         return {self.schema_filename: tmp_schema_file_handle}
 
@@ -184,21 +192,24 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             hook.upload(self.bucket, object, tmp_file_handle.name, 'application/json')
 
     @classmethod
-    def convert_types(cls, value):
+    def convert_types(cls, value, is_binary=False):
         """
         Takes a value from MySQLdb, and converts it to a value that's safe for
         JSON/Google cloud storage/BigQuery. Dates are converted to UTC seconds.
-        Decimals are converted to floats.
+        Decimals are converted to floats. Binaries are converted to base64-encoded
+        strings.
         """
         if type(value) in (datetime, date):
             return time.mktime(value.timetuple())
         elif isinstance(value, Decimal):
             return float(value)
+        elif is_binary:
+            return base64.b64encode(value).decode()
         else:
             return value
 
     @classmethod
-    def type_map(cls, mysql_type):
+    def type_map(cls, mysql_type, flags):
         """
         Helper function that maps from MySQL fields to BigQuery fields. Used
         when a schema_filename is set.
@@ -220,4 +231,15 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             FIELD_TYPE.TIMESTAMP: 'TIMESTAMP',
             FIELD_TYPE.YEAR: 'INTEGER',
         }
+
+        if MySqlToGoogleCloudStorageOperator.is_binary(mysql_type, flags):
+            return 'BYTES'
+
         return d[mysql_type] if mysql_type in d else 'STRING'
+
+    @classmethod
+    def is_binary(cls, mysql_type, flags):
+        # MySQLdb groups both char/varchar and binary/varbinary as STRING/VAR_STRING.
+        # To work around this ambiguity, check the description flag to see if it's a binary
field.
+        return mysql_type in [FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING] and \
+            flags & FLAG.BINARY == FLAG.BINARY


Mime
View raw message