airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joy...@apache.org
Subject [1/2] incubator-airflow git commit: [AIRFLOW-2169] Add schema to MySqlToGoogleCloudStorageOperator
Date Sat, 10 Mar 2018 01:42:30 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 803767959 -> 398746d8f


[AIRFLOW-2169] Add schema to MySqlToGoogleCloudStorageOperator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6f96f0f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6f96f0f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6f96f0f7

Branch: refs/heads/master
Commit: 6f96f0f79b825f325026ab1f29defb203def3964
Parents: e28f6e2
Author: Hongyi Wang <hongyiw@wepay.com>
Authored: Fri Mar 9 16:21:16 2018 -0800
Committer: Hongyi Wang <hongyiw@wepay.com>
Committed: Fri Mar 9 16:21:16 2018 -0800

----------------------------------------------------------------------
 airflow/contrib/operators/mysql_to_gcs.py | 56 +++++++++++++++++---------
 1 file changed, 37 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6f96f0f7/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py
index 41e23f5..9ba84c7 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -24,6 +24,7 @@ from datetime import date, datetime
 from decimal import Decimal
 from MySQLdb.constants import FIELD_TYPE
 from tempfile import NamedTemporaryFile
+from six import string_types
 
 PY3 = sys.version_info[0] == 3
 
@@ -32,7 +33,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
     """
     Copy data from MySQL to Google cloud storage in JSON format.
     """
-    template_fields = ('sql', 'bucket', 'filename', 'schema_filename')
+    template_fields = ('sql', 'bucket', 'filename', 'schema_filename', 'schema')
     template_ext = ('.sql',)
     ui_color = '#a0e08c'
 
@@ -45,6 +46,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
                  approx_max_file_size_bytes=1900000000,
                  mysql_conn_id='mysql_default',
                  google_cloud_storage_conn_id='google_cloud_storage_default',
+                 schema=None,
                  delegate_to=None,
                  *args,
                  **kwargs):
@@ -73,6 +75,10 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
         :param google_cloud_storage_conn_id: Reference to a specific Google
             cloud storage hook.
         :type google_cloud_storage_conn_id: string
+        :param schema: The schema to use, if any. Should be a list of dict or
+            a str. Examples could be see: https://cloud.google.com/bigquery
+            /docs/schemas#specifying_a_json_schema_file
+        :type schema: str or list
         :param delegate_to: The account to impersonate, if any. For this to
             work, the service account making the request must have domain-wide
             delegation enabled.
@@ -85,6 +91,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
         self.approx_max_file_size_bytes = approx_max_file_size_bytes
         self.mysql_conn_id = mysql_conn_id
         self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
+        self.schema = schema
         self.delegate_to = delegate_to
 
     def execute(self, context):
@@ -160,26 +167,36 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             contains the BigQuery schema fields in .json format.
         """
         schema = []
-        for field in cursor.description:
-            # See PEP 249 for details about the description tuple.
-            field_name = field[0]
-            field_type = self.type_map(field[1])
-            # Always allow TIMESTAMP to be nullable. MySQLdb returns None types
-            # for required fields because some MySQL timestamps can't be
-            # represented by Python's datetime (e.g. 0000-00-00 00:00:00).
-            field_mode = 'NULLABLE' if field[6] or field_type == 'TIMESTAMP' else 'REQUIRED'
-            schema.append({
-                'name': field_name,
-                'type': field_type,
-                'mode': field_mode,
-            })
+        tmp_schema_file_handle = NamedTemporaryFile(delete=True)
+        if self.schema is not None and isinstance(self.schema, string_types):
+            schema = self.schema
+            tmp_schema_file_handle.write(schema)
+        else:
+            if self.schema is not None and isinstance(self.schema, list):
+                schema = self.schema
+            else:
+                for field in cursor.description:
+                    # See PEP 249 for details about the description tuple.
+                    field_name = field[0]
+                    field_type = self.type_map(field[1])
+                    # Always allow TIMESTAMP to be nullable. MySQLdb returns None types
+                    # for required fields because some MySQL timestamps can't be
+                    # represented by Python's datetime (e.g. 0000-00-00 00:00:00).
+                    if field[6] or field_type == 'TIMESTAMP':
+                        field_mode = 'NULLABLE'
+                    else:
+                        field_mode = 'REQUIRED'
+                    schema.append({
+                        'name': field_name,
+                        'type': field_type,
+                        'mode': field_mode,
+                    })
+            s = json.dumps(schema, tmp_schema_file_handle)
+            if PY3:
+                s = s.encode('utf-8')
+            tmp_schema_file_handle.write(s)
 
         self.log.info('Using schema for %s: %s', self.schema_filename, schema)
-        tmp_schema_file_handle = NamedTemporaryFile(delete=True)
-        s = json.dumps(schema, tmp_schema_file_handle)
-        if PY3:
-            s = s.encode('utf-8')
-        tmp_schema_file_handle.write(s)
         return {self.schema_filename: tmp_schema_file_handle}
 
     def _upload_to_gcs(self, files_to_upload):
@@ -214,6 +231,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
         when a schema_filename is set.
         """
         d = {
+            FIELD_TYPE.INT24: 'INTEGER',
             FIELD_TYPE.TINY: 'INTEGER',
             FIELD_TYPE.BIT: 'INTEGER',
             FIELD_TYPE.DATETIME: 'TIMESTAMP',


Mime
View raw message