airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1150] Fix scripts execution in sparksql hook[]
Date Fri, 12 May 2017 09:39:32 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master a30fee87d -> f97bc5ed3


[AIRFLOW-1150] Fix scripts execution in sparksql hook[]

When using the the SparkSqlOperator and submitting
a file (ending with
`.sql` or `.hql`), a whitespace need to be
appended, otherwise a Jinja
error will be raised.

However the trailing whitespace confused the hook
as those files will
not end with `.sql` and `.hql`, but with `.sql `
and `.hql `. This PR
fixes this.

In the test, I've added the `get_after` function
to easily check if the
path is really stripped or not by the `-f` option.

Closes #2259 from gglanzani/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f97bc5ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f97bc5ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f97bc5ed

Branch: refs/heads/master
Commit: f97bc5ed3dc42f975776175f7a269c0604f49123
Parents: a30fee8
Author: Giovanni Lanzani <gglanzani@users.noreply.github.com>
Authored: Fri May 12 11:39:24 2017 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Fri May 12 11:39:24 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_sql_hook.py    |  7 ++-
 tests/contrib/hooks/test_spark_sql_hook.py | 81 +++++++++++++++++++++++++
 2 files changed, 85 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f97bc5ed/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index 725db21..8d73f60 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -104,10 +104,11 @@ class SparkSqlHook(BaseHook):
         if self._num_executors:
             connection_cmd += ["--num-executors", str(self._num_executors)]
         if self._sql:
-            if self._sql.endswith('.sql') or self._sql.endswith(".hql"):
-                connection_cmd += ["-f", self._sql]
+            sql = self._sql.strip()
+            if sql.endswith(".sql") or sql.endswith(".hql"):
+                connection_cmd += ["-f", sql]
             else:
-                connection_cmd += ["-e", self._sql]
+                connection_cmd += ["-e", sql]
         if self._master:
             connection_cmd += ["--master", self._master]
         if self._name:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f97bc5ed/tests/contrib/hooks/test_spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_sql_hook.py b/tests/contrib/hooks/test_spark_sql_hook.py
new file mode 100644
index 0000000..145892c
--- /dev/null
+++ b/tests/contrib/hooks/test_spark_sql_hook.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import sys
+import unittest
+from io import StringIO
+from itertools import dropwhile
+
+import mock
+
+from airflow import configuration, models
+from airflow.utils import db
+from airflow.contrib.hooks.spark_sql_hook import SparkSqlHook
+
+
+def get_after(sentinel, iterable):
+    "Get the value after `sentinel` in an `iterable`"
+    truncated = dropwhile(lambda el: el != sentinel, iterable)
+    next(truncated)
+    return next(truncated)
+
+class TestSparkSqlHook(unittest.TestCase):
+
+    _config = {
+        'conn_id': 'spark_default',
+        'executor_cores': 4,
+        'executor_memory': '22g',
+        'keytab': 'privileged_user.keytab',
+        'name': 'spark-job',
+        'num_executors': 10,
+        'verbose': True,
+        'sql': ' /path/to/sql/file.sql ',
+        'conf': 'key=value,PROP=VALUE'
+    }
+
+    def setUp(self):
+
+        configuration.load_test_config()
+        db.merge_conn(
+            models.Connection(
+                conn_id='spark_default', conn_type='spark',
+                host='yarn://yarn-master')
+        )
+
+    def test_build_command(self):
+        hook = SparkSqlHook(**self._config)
+
+        # The subprocess requires an array but we build the cmd by joining on a space
+        cmd = ' '.join(hook._prepare_command(""))
+
+        # Check all the parameters
+        assert "--executor-cores {}".format(self._config['executor_cores']) in cmd
+        assert "--executor-memory {}".format(self._config['executor_memory']) in cmd
+        assert "--keytab {}".format(self._config['keytab']) in cmd
+        assert "--name {}".format(self._config['name']) in cmd
+        assert "--num-executors {}".format(self._config['num_executors']) in cmd
+        sql_path = get_after('-f', hook._prepare_command(""))
+        assert self._config['sql'].strip() == sql_path
+
+        # Check if all config settings are there
+        for kv in self._config['conf'].split(","):
+            k, v = kv.split('=')
+            assert "--conf {0}={1}".format(k, v) in cmd
+
+        if self._config['verbose']:
+            assert "--verbose" in cmd
+
+
+if __name__ == '__main__':
+    unittest.main()


Mime
View raw message