From commits-return-13626-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Thu Apr 5 02:50:23 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id EB9C3180675 for ; Thu, 5 Apr 2018 02:50:22 +0200 (CEST) Received: (qmail 47662 invoked by uid 500); 5 Apr 2018 00:50:22 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 47653 invoked by uid 99); 5 Apr 2018 00:50:21 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Apr 2018 00:50:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 91CC1181434 for ; Thu, 5 Apr 2018 00:50:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -11.212 X-Spam-Level: X-Spam-Status: No, score=-11.212 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_NUMSUBJECT=0.5, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 0P3XHTzx0VWW for ; Thu, 5 Apr 2018 00:50:20 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 5E7CA5F180 for ; Thu, 5 Apr 2018 00:50:19 +0000 (UTC) Received: (qmail 47458 invoked by uid 99); 5 Apr 2018 00:50:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Apr 2018 00:50:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 34FE1E00B7; Thu, 5 Apr 2018 00:50:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: joygao@apache.org To: commits@airflow.incubator.apache.org Date: Thu, 05 Apr 2018 00:50:18 -0000 Message-Id: <903543ed52a547f1a6a4f1ecf6f59355@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-airflow git commit: [AIRFLOW-2169] Fix type 'bytes' is not JSON serializable in python3 Repository: incubator-airflow Updated Branches: refs/heads/master 9c0c4264c -> f865c7898 [AIRFLOW-2169] Fix type 'bytes' is not JSON serializable in python3 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4c89e440 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4c89e440 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4c89e440 Branch: refs/heads/master Commit: 4c89e440ef34b43142ef7f61a2fb6424dfc7f00f Parents: d1f94fe Author: Hongyi Wang Authored: Tue Apr 3 18:10:42 2018 -0700 Committer: Hongyi Wang Committed: Tue Apr 3 18:10:42 2018 -0700 ---------------------------------------------------------------------- airflow/contrib/operators/mysql_to_gcs.py | 26 ++++++----- .../operators/test_mysql_to_gcs_operator.py | 47 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c89e440/airflow/contrib/operators/mysql_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py index 2249b9c..4e238ca 100644 --- a/airflow/contrib/operators/mysql_to_gcs.py +++ b/airflow/contrib/operators/mysql_to_gcs.py @@ -24,7 +24,7 @@ from datetime import date, datetime from decimal import Decimal from MySQLdb.constants import FIELD_TYPE from tempfile import NamedTemporaryFile -from six import string_types +from six import string_types, binary_type PY3 = sys.version_info[0] == 3 @@ -130,6 +130,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): names in GCS, and values are file handles to local files that contain the data for the GCS objects. """ + class BinaryTypeEncoder(json.JSONEncoder): + def default(self, obj): + if PY3 and isinstance(obj, binary_type): + return str(obj, 'utf-8') + return json.JSONEncoder.default(self, obj) + schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) file_no = 0 tmp_file_handle = NamedTemporaryFile(delete=True) @@ -141,7 +147,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): row_dict = dict(zip(schema, row)) # TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB. - s = json.dumps(row_dict) + s = json.dumps(row_dict, cls=BinaryTypeEncoder) if PY3: s = s.encode('utf-8') tmp_file_handle.write(s) @@ -166,12 +172,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): name in GCS, and values are file handles to local files that contains the BigQuery schema fields in .json format. """ - schema = [] + schema_str = None tmp_schema_file_handle = NamedTemporaryFile(delete=True) if self.schema is not None and isinstance(self.schema, string_types): - schema = self.schema - tmp_schema_file_handle.write(schema) + schema_str = self.schema else: + schema = [] if self.schema is not None and isinstance(self.schema, list): schema = self.schema else: @@ -191,12 +197,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): 'type': field_type, 'mode': field_mode, }) - s = json.dumps(schema, tmp_schema_file_handle) - if PY3: - s = s.encode('utf-8') - tmp_schema_file_handle.write(s) + schema_str = json.dumps(schema) + if PY3: + schema_str = schema_str.encode('utf-8') + tmp_schema_file_handle.write(schema_str) - self.log.info('Using schema for %s: %s', self.schema_filename, schema) + self.log.info('Using schema for %s: %s', self.schema_filename, schema_str) return {self.schema_filename: tmp_schema_file_handle} def _upload_to_gcs(self, files_to_upload): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c89e440/tests/contrib/operators/test_mysql_to_gcs_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_mysql_to_gcs_operator.py b/tests/contrib/operators/test_mysql_to_gcs_operator.py new file mode 100644 index 0000000..c985ac3 --- /dev/null +++ b/tests/contrib/operators/test_mysql_to_gcs_operator.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from mock import MagicMock + +from airflow.contrib.operators.mysql_to_gcs import \ + MySqlToGoogleCloudStorageOperator + + +class MySqlToGoogleCloudStorageOperatorTest(unittest.TestCase): + + def test_write_local_data_files(self): + + # Configure + task_id = "some_test_id" + sql = "some_sql" + bucket = "some_bucket" + filename = "some_filename" + schema = "some_schema" + description_list = [['col_integer'], ['col_byte']] + row_iter = [[1, b'byte_str_1'], [2, b'byte_str_2']] + + op = MySqlToGoogleCloudStorageOperator( + task_id=task_id, + sql=sql, + bucket=bucket, + filename=filename, + schema=schema) + + cursor_mock = MagicMock() + cursor_mock.description = description_list + cursor_mock.__iter__.return_value = row_iter + + # Run + op._write_local_data_files(cursor_mock)