airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1350] Add query_uri param to Hive/SparkSQL DataProc operator
Date Tue, 27 Jun 2017 19:44:07 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 5fe25d859 -> d32c72969


[AIRFLOW-1350] Add query_uri param to Hive/SparkSQL DataProc operator

Closes #2402 from lukeFalsina/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d32c7296
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d32c7296
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d32c7296

Branch: refs/heads/master
Commit: d32c7296908e6975c4dda7159c1a7a6b9e89f046
Parents: 5fe25d8
Author: Luca Falsina <luca.falsina@booking.com>
Authored: Tue Jun 27 12:43:13 2017 -0700
Committer: Chris Riccomini <criccomini@apache.org>
Committed: Tue Jun 27 12:43:17 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py | 22 +++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d32c7296/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 14245c8..3e006ac 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -472,7 +472,8 @@ class DataProcHiveOperator(BaseOperator):
     @apply_defaults
     def __init__(
             self,
-            query,
+            query=None,
+            query_uri=None,
             variables=None,
             job_name='{{task.task_id}}_{{ds_nodash}}',
             dataproc_cluster='cluster-1',
@@ -487,6 +488,8 @@ class DataProcHiveOperator(BaseOperator):
 
         :param query: The query or reference to the query file (q extension).
         :type query: string
+        :param query_uri: The uri of a hive script on Cloud Storage.
+        :type query_uri: string
         :param variables: Map of named parameters for the query.
         :type variables: dict
         :param job_name: The job name used in the DataProc cluster. This name by default
@@ -512,6 +515,7 @@ class DataProcHiveOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.delegate_to = delegate_to
         self.query = query
+        self.query_uri = query_uri
         self.variables = variables
         self.job_name = job_name
         self.dataproc_cluster = dataproc_cluster
@@ -525,7 +529,10 @@ class DataProcHiveOperator(BaseOperator):
         job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hiveJob",
                                        self.dataproc_properties)
 
-        job.add_query(self.query)
+        if self.query is None:
+            job.add_query_uri(self.query_uri)
+        else:
+            job.add_query(self.query)
         job.add_variables(self.variables)
         job.add_jar_file_uris(self.dataproc_jars)
         job.set_job_name(self.job_name)
@@ -544,7 +551,8 @@ class DataProcSparkSqlOperator(BaseOperator):
     @apply_defaults
     def __init__(
             self,
-            query,
+            query=None,
+            query_uri=None,
             variables=None,
             job_name='{{task.task_id}}_{{ds_nodash}}',
             dataproc_cluster='cluster-1',
@@ -559,6 +567,8 @@ class DataProcSparkSqlOperator(BaseOperator):
 
         :param query: The query or reference to the query file (q extension).
         :type query: string
+        :param query_uri: The uri of a spark sql script on Cloud Storage.
+        :type query_uri: string
         :param variables: Map of named parameters for the query.
         :type variables: dict
         :param job_name: The job name used in the DataProc cluster. This name by default
@@ -584,6 +594,7 @@ class DataProcSparkSqlOperator(BaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.delegate_to = delegate_to
         self.query = query
+        self.query_uri = query_uri
         self.variables = variables
         self.job_name = job_name
         self.dataproc_cluster = dataproc_cluster
@@ -597,7 +608,10 @@ class DataProcSparkSqlOperator(BaseOperator):
         job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkSqlJob",
                                        self.dataproc_properties)
 
-        job.add_query(self.query)
+        if self.query is None:
+            job.add_query_uri(self.query_uri)
+        else:
+            job.add_query(self.query)
         job.add_variables(self.variables)
         job.add_jar_file_uris(self.dataproc_jars)
         job.set_job_name(self.job_name)


Mime
View raw message