airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1816] Add region param to Dataproc operators
Date Wed, 15 Nov 2017 20:05:34 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 5157b5a76 -> d04519e60


[AIRFLOW-1816] Add region param to Dataproc operators

Closes #2788 from DanSedov/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d04519e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d04519e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d04519e6

Branch: refs/heads/master
Commit: d04519e6051e39ec95c553c0f550092cfa418a38
Parents: 5157b5a
Author: Dan Sedov <sedov@google.com>
Authored: Wed Nov 15 12:05:00 2017 -0800
Committer: Chris Riccomini <criccomini@apache.org>
Committed: Wed Nov 15 12:05:06 2017 -0800

----------------------------------------------------------------------
 airflow/contrib/operators/dataproc_operator.py | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d04519e6/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index ba2c601..9c1eb0f 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -441,6 +441,7 @@ class DataProcPigOperator(BaseOperator):
             dataproc_pig_jars=None,
             gcp_conn_id='google_cloud_default',
             delegate_to=None,
+            region='global',
             *args,
             **kwargs):
         """
@@ -474,6 +475,8 @@ class DataProcPigOperator(BaseOperator):
             For this to work, the service account making the request must have domain-wide
             delegation enabled.
         :type delegate_to: string
+        :param region: The specified region where the dataproc cluster is created.
+        :type region: string
         """
         super(DataProcPigOperator, self).__init__(*args, **kwargs)
         self.gcp_conn_id = gcp_conn_id
@@ -485,6 +488,7 @@ class DataProcPigOperator(BaseOperator):
         self.cluster_name = cluster_name
         self.dataproc_properties = dataproc_pig_properties
         self.dataproc_jars = dataproc_pig_jars
+        self.region = region
 
     def execute(self, context):
         hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
@@ -500,7 +504,7 @@ class DataProcPigOperator(BaseOperator):
         job.add_jar_file_uris(self.dataproc_jars)
         job.set_job_name(self.job_name)
 
-        hook.submit(hook.project_id, job.build())
+        hook.submit(hook.project_id, job.build(), self.region)
 
 
 class DataProcHiveOperator(BaseOperator):
@@ -606,6 +610,7 @@ class DataProcSparkSqlOperator(BaseOperator):
             dataproc_spark_jars=None,
             gcp_conn_id='google_cloud_default',
             delegate_to=None,
+            region='global',
             *args,
             **kwargs):
         """
@@ -635,6 +640,8 @@ class DataProcSparkSqlOperator(BaseOperator):
             For this to work, the service account making the request must have domain-wide
             delegation enabled.
         :type delegate_to: string
+        :param region: The specified region where the dataproc cluster is created.
+        :type region: string
         """
         super(DataProcSparkSqlOperator, self).__init__(*args, **kwargs)
         self.gcp_conn_id = gcp_conn_id
@@ -646,6 +653,7 @@ class DataProcSparkSqlOperator(BaseOperator):
         self.cluster_name = cluster_name
         self.dataproc_properties = dataproc_spark_properties
         self.dataproc_jars = dataproc_spark_jars
+        self.region = region
 
     def execute(self, context):
         hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
@@ -662,7 +670,7 @@ class DataProcSparkSqlOperator(BaseOperator):
         job.add_jar_file_uris(self.dataproc_jars)
         job.set_job_name(self.job_name)
 
-        hook.submit(hook.project_id, job.build())
+        hook.submit(hook.project_id, job.build(), self.region)
 
 
 class DataProcSparkOperator(BaseOperator):


Mime
View raw message