airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1331] add SparkSubmitOperator option
Date Sun, 24 Sep 2017 11:18:46 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 601045027 -> e4a984a6b


[AIRFLOW-1331] add SparkSubmitOperator option

spark-submit has --packages option to use
additional java packages.
but current version of SparkSubmitOperator
couldn't handle it.
I added "packages" option to SparkSubmitOperator
to resolve it.
I added same option for TestSparkSubmitOperator,
too.

Closes #2622 from chie8842/AIRFLOW-1331


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e4a984a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e4a984a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e4a984a6

Branch: refs/heads/master
Commit: e4a984a6b87888753415bdd4308c89622c983917
Parents: 6010450
Author: chie8842 <chie8842@gmail.com>
Authored: Sun Sep 24 13:18:09 2017 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Sun Sep 24 13:18:15 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_submit_hook.py            | 4 ++++
 airflow/contrib/operators/spark_submit_operator.py    | 5 +++++
 tests/contrib/hooks/test_spark_submit_hook.py         | 2 ++
 tests/contrib/operators/test_spark_submit_operator.py | 3 +++
 4 files changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4a984a6/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index 7d59cd2..7f8e35e 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -68,6 +68,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
                  py_files=None,
                  jars=None,
                  java_class=None,
+                 packages=None,
                  total_executor_cores=None,
                  executor_cores=None,
                  executor_memory=None,
@@ -84,6 +85,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         self._py_files = py_files
         self._jars = jars
         self._java_class = java_class
+        self._packages = packages
         self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
@@ -160,6 +162,8 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
             connection_cmd += ["--py-files", self._py_files]
         if self._jars:
             connection_cmd += ["--jars", self._jars]
+        if self._packages:
+            connection_cmd += ["--packages", self._packages]
         if self._num_executors:
             connection_cmd += ["--num-executors", str(self._num_executors)]
         if self._total_executor_cores:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4a984a6/airflow/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py
index 2aed4c6..277c55f 100644
--- a/airflow/contrib/operators/spark_submit_operator.py
+++ b/airflow/contrib/operators/spark_submit_operator.py
@@ -39,6 +39,8 @@ class SparkSubmitOperator(BaseOperator):
     :type jars: str
     :param java_class: the main class of the Java application
     :type java_class: str
+    :param packages: Comma-separated list of maven coordinates of jars to include on the
driver and executor classpaths
+    :type packages: str
     :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors
(Default: all the available cores on the worker)
     :type total_executor_cores: int
     :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default:
2)
@@ -72,6 +74,7 @@ class SparkSubmitOperator(BaseOperator):
                  py_files=None,
                  jars=None,
                  java_class=None,
+                 packages=None,
                  total_executor_cores=None,
                  executor_cores=None,
                  executor_memory=None,
@@ -91,6 +94,7 @@ class SparkSubmitOperator(BaseOperator):
         self._py_files = py_files
         self._jars = jars
         self._java_class = java_class
+        self._packages = packages
         self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
@@ -115,6 +119,7 @@ class SparkSubmitOperator(BaseOperator):
             py_files=self._py_files,
             jars=self._jars,
             java_class=self._java_class,
+            packages=self._packages,
             total_executor_cores=self._total_executor_cores,
             executor_cores=self._executor_cores,
             executor_memory=self._executor_memory,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4a984a6/tests/contrib/hooks/test_spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py
index 826576f..be88897 100644
--- a/tests/contrib/hooks/test_spark_submit_hook.py
+++ b/tests/contrib/hooks/test_spark_submit_hook.py
@@ -34,6 +34,7 @@ class TestSparkSubmitHook(unittest.TestCase):
         'files': 'hive-site.xml',
         'py_files': 'sample_library.py',
         'jars': 'parquet.jar',
+        'packages': 'com.databricks:spark-avro_2.11:3.2.0',
         'total_executor_cores': 4,
         'executor_cores': 4,
         'executor_memory': '22g',
@@ -113,6 +114,7 @@ class TestSparkSubmitHook(unittest.TestCase):
             '--files', 'hive-site.xml',
             '--py-files', 'sample_library.py',
             '--jars', 'parquet.jar',
+            '--packages', 'com.databricks:spark-avro_2.11:3.2.0',
             '--num-executors', '10',
             '--total-executor-cores', '4',
             '--executor-cores', '4',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4a984a6/tests/contrib/operators/test_spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_spark_submit_operator.py b/tests/contrib/operators/test_spark_submit_operator.py
index 1d41941..33705e9 100644
--- a/tests/contrib/operators/test_spark_submit_operator.py
+++ b/tests/contrib/operators/test_spark_submit_operator.py
@@ -34,6 +34,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
         'files': 'hive-site.xml',
         'py_files': 'sample_library.py',
         'jars': 'parquet.jar',
+        'packages': 'com.databricks:spark-avro_2.11:3.2.0',
         'total_executor_cores':4,
         'executor_cores': 4,
         'executor_memory': '22g',
@@ -84,6 +85,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
             'files': 'hive-site.xml',
             'py_files': 'sample_library.py',
             'jars': 'parquet.jar',
+            'packages': 'com.databricks:spark-avro_2.11:3.2.0',
             'total_executor_cores': 4,
             'executor_cores': 4,
             'executor_memory': '22g',
@@ -111,6 +113,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
         self.assertEqual(expected_dict['files'], operator._files)
         self.assertEqual(expected_dict['py_files'], operator._py_files)
         self.assertEqual(expected_dict['jars'], operator._jars)
+        self.assertEqual(expected_dict['packages'], operator._packages)
         self.assertEqual(expected_dict['total_executor_cores'], operator._total_executor_cores)
         self.assertEqual(expected_dict['executor_cores'], operator._executor_cores)
         self.assertEqual(expected_dict['executor_memory'], operator._executor_memory)


Mime
View raw message