airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1323] Made Dataproc operator parameter names consistent
Date Tue, 03 Oct 2017 09:15:32 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 032a3e6c8 -> dd861f8cd


[AIRFLOW-1323] Made Dataproc operator parameter names consistent

Closes #2636 from cjqian/1323


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dd861f8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dd861f8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dd861f8c

Branch: refs/heads/master
Commit: dd861f8cd0491f977ba545415d667f3d4611b5b8
Parents: 032a3e6
Author: Crystal Qian <crystaljqian@gmail.com>
Authored: Tue Oct 3 11:15:27 2017 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Tue Oct 3 11:15:27 2017 +0200

----------------------------------------------------------------------
 UPDATING.md                                    |  3 +-
 airflow/contrib/hooks/gcp_dataproc_hook.py     |  8 +-
 airflow/contrib/operators/dataproc_operator.py | 98 ++++++++++-----------
 3 files changed, 55 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dd861f8c/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 0b21dd1..329f416 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -146,6 +146,7 @@ A new DaskExecutor allows Airflow tasks to be run in Dask Distributed
clusters.
 ### Deprecated Features
 These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`),
but are no longer
 supported and will be removed entirely in Airflow 2.0
+- If you're using the `google_cloud_conn_id` or `dataproc_cluster` argument names explicitly
in `contrib.operators.Dataproc{*}Operator`(s), be sure to rename them to `gcp_conn_id` or
`cluster_name`, respectively. We've renamed these arguments for consistency. (AIRFLOW-1323)
 
 - `post_execute()` hooks now take two arguments, `context` and `result`
   (AIRFLOW-886)
@@ -157,7 +158,7 @@ supported and will be removed entirely in Airflow 2.0
 - The pickle type for XCom messages has been replaced by json to prevent RCE attacks.
   Note that JSON serialization is stricter than pickling, so if you want to e.g. pass
   raw bytes through XCom you must encode them using an encoding like base64.
-  By default pickling is still enabled until Airflow 2.0. To disable it 
+  By default pickling is still enabled until Airflow 2.0. To disable it
   Set enable_xcom_pickling = False in your Airflow config.
 
 ## Airflow 1.8.1

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dd861f8c/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index a1bba0b..4be166e 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -74,7 +74,7 @@ class _DataProcJob(LoggingMixin):
 
 
 class _DataProcJobBuilder:
-    def __init__(self, project_id, task_id, dataproc_cluster, job_type, properties):
+    def __init__(self, project_id, task_id, cluster_name, job_type, properties):
         name = task_id + "_" + str(uuid.uuid1())[:8]
         self.job_type = job_type
         self.job = {
@@ -84,7 +84,7 @@ class _DataProcJobBuilder:
                     "jobId": name,
                 },
                 "placement": {
-                    "clusterName": dataproc_cluster
+                    "clusterName": cluster_name
                 },
                 job_type: {
                 }
@@ -159,6 +159,6 @@ class DataProcHook(GoogleCloudBaseHook):
         if not submitted.wait_for_done():
             submitted.raise_error("DataProcTask has errors")
 
-    def create_job_template(self, task_id, dataproc_cluster, job_type, properties):
-        return _DataProcJobBuilder(self.project_id, task_id, dataproc_cluster, job_type,
+    def create_job_template(self, task_id, cluster_name, job_type, properties):
+        return _DataProcJobBuilder(self.project_id, task_id, cluster_name, job_type,
                                    properties)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dd861f8c/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 6ef89ba..0823ed8 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -55,7 +55,7 @@ class DataprocClusterCreateOperator(BaseOperator):
                  num_preemptible_workers=0,
                  labels=None,
                  region='global',
-                 google_cloud_conn_id='google_cloud_default',
+                 gcp_conn_id='google_cloud_default',
                  delegate_to=None,
                  service_account=None,
                  service_account_scopes=None,
@@ -68,7 +68,7 @@ class DataprocClusterCreateOperator(BaseOperator):
 
         https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters
 
-        :param cluster_name: The name of the cluster to create
+        :param cluster_name: The name of the DataProc cluster to create.
         :type cluster_name: string
         :param project_id: The ID of the google cloud project in which
             to create the cluster
@@ -106,8 +106,8 @@ class DataprocClusterCreateOperator(BaseOperator):
         :param zone: The zone where the cluster will be located
         :type zone: string
         :param region: leave as 'global', might become relevant in the future
-        :param google_cloud_conn_id: The connection id to use when connecting to dataproc
-        :type google_cloud_conn_id: string
+        :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+        :type gcp_conn_id: string
         :param delegate_to: The account to impersonate, if any.
             For this to work, the service account making the request must have domain-wide
             delegation enabled.
@@ -118,7 +118,7 @@ class DataprocClusterCreateOperator(BaseOperator):
         :type service_account_scopes: list[string]
         """
         super(DataprocClusterCreateOperator, self).__init__(*args, **kwargs)
-        self.google_cloud_conn_id = google_cloud_conn_id
+        self.gcp_conn_id = gcp_conn_id
         self.delegate_to = delegate_to
         self.cluster_name = cluster_name
         self.project_id = project_id
@@ -266,7 +266,7 @@ class DataprocClusterCreateOperator(BaseOperator):
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataProcHook(
-            gcp_conn_id=self.google_cloud_conn_id,
+            gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to
         )
         service = hook.get_conn()
@@ -315,29 +315,29 @@ class DataprocClusterDeleteOperator(BaseOperator):
                  cluster_name,
                  project_id,
                  region='global',
-                 google_cloud_conn_id='google_cloud_default',
+                 gcp_conn_id='google_cloud_default',
                  delegate_to=None,
                  *args,
                  **kwargs):
         """
         Delete a cluster on Google Cloud Dataproc.
 
-        :param cluster_name: The name of the cluster to create
+        :param cluster_name: The name of the cluster to create.
         :type cluster_name: string
         :param project_id: The ID of the google cloud project in which
             the cluster runs
         :type project_id: string
         :param region: leave as 'global', might become relevant in the future
         :type region: string
-        :param google_cloud_conn_id: The connection id to use when connecting to dataproc
-        :type google_cloud_conn_id: string
+        :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+        :type gcp_conn_id: string
         :param delegate_to: The account to impersonate, if any.
             For this to work, the service account making the request must have domain-wide
             delegation enabled.
         :type delegate_to: string
         """
         super(DataprocClusterDeleteOperator, self).__init__(*args, **kwargs)
-        self.google_cloud_conn_id = google_cloud_conn_id
+        self.gcp_conn_id = gcp_conn_id
         self.delegate_to = delegate_to
         self.cluster_name = cluster_name
         self.project_id = project_id
@@ -360,7 +360,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
     def execute(self, context):
         self.log.info('Deleting cluster: %s', self.cluster_name)
         hook = DataProcHook(
-            gcp_conn_id=self.google_cloud_conn_id,
+            gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to
         )
         service = hook.get_conn()
@@ -385,7 +385,7 @@ class DataProcPigOperator(BaseOperator):
 
     ```
     default_args = {
-        'dataproc_cluster': 'cluster-1',
+        'cluster_name': 'cluster-1',
         'dataproc_pig_jars': [
             'gs://example/udf/jar/datafu/1.2.0/datafu.jar',
             'gs://example/udf/jar/gpig/1.2/gpig.jar'
@@ -405,7 +405,7 @@ class DataProcPigOperator(BaseOperator):
     dag=dag)
     ```
     """
-    template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster']
+    template_fields = ['query', 'variables', 'job_name', 'cluster_name']
     template_ext = ('.pg', '.pig',)
     ui_color = '#0273d4'
 
@@ -416,7 +416,7 @@ class DataProcPigOperator(BaseOperator):
             query_uri=None,
             variables=None,
             job_name='{{task.task_id}}_{{ds_nodash}}',
-            dataproc_cluster='cluster-1',
+            cluster_name='cluster-1',
             dataproc_pig_properties=None,
             dataproc_pig_jars=None,
             gcp_conn_id='google_cloud_default',
@@ -440,8 +440,8 @@ class DataProcPigOperator(BaseOperator):
             is the task_id appended with the execution data, but can be templated. The
             name will always be appended with a random number to avoid name clashes.
         :type job_name: string
-        :param dataproc_cluster: The id of the DataProc cluster.
-        :type dataproc_cluster: string
+        :param cluster_name: The name of the DataProc cluster.
+        :type cluster_name: string
         :param dataproc_pig_properties: Map for the Pig properties. Ideal to put in
             default arguments
         :type dataproc_pig_properties: dict
@@ -462,14 +462,14 @@ class DataProcPigOperator(BaseOperator):
         self.query_uri = query_uri
         self.variables = variables
         self.job_name = job_name
-        self.dataproc_cluster = dataproc_cluster
+        self.cluster_name = cluster_name
         self.dataproc_properties = dataproc_pig_properties
         self.dataproc_jars = dataproc_pig_jars
 
     def execute(self, context):
         hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
                             delegate_to=self.delegate_to)
-        job = hook.create_job_template(self.task_id, self.dataproc_cluster, "pigJob",
+        job = hook.create_job_template(self.task_id, self.cluster_name, "pigJob",
                                        self.dataproc_properties)
 
         if self.query is None:
@@ -487,7 +487,7 @@ class DataProcHiveOperator(BaseOperator):
     """
     Start a Hive query Job on a Cloud DataProc cluster.
     """
-    template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster']
+    template_fields = ['query', 'variables', 'job_name', 'cluster_name']
     template_ext = ('.q',)
     ui_color = '#0273d4'
 
@@ -498,7 +498,7 @@ class DataProcHiveOperator(BaseOperator):
             query_uri=None,
             variables=None,
             job_name='{{task.task_id}}_{{ds_nodash}}',
-            dataproc_cluster='cluster-1',
+            cluster_name='cluster-1',
             dataproc_hive_properties=None,
             dataproc_hive_jars=None,
             gcp_conn_id='google_cloud_default',
@@ -519,8 +519,8 @@ class DataProcHiveOperator(BaseOperator):
             is the task_id appended with the execution data, but can be templated. The
             name will always be appended with a random number to avoid name clashes.
         :type job_name: string
-        :param dataproc_cluster: The id of the DataProc cluster.
-        :type dataproc_cluster: string
+        :param cluster_name: The name of the DataProc cluster.
+        :type cluster_name: string
         :param dataproc_hive_properties: Map for the Pig properties. Ideal to put in
             default arguments
         :type dataproc_hive_properties: dict
@@ -543,7 +543,7 @@ class DataProcHiveOperator(BaseOperator):
         self.query_uri = query_uri
         self.variables = variables
         self.job_name = job_name
-        self.dataproc_cluster = dataproc_cluster
+        self.cluster_name = cluster_name
         self.dataproc_properties = dataproc_hive_properties
         self.dataproc_jars = dataproc_hive_jars
         self.region = region
@@ -552,7 +552,7 @@ class DataProcHiveOperator(BaseOperator):
         hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
                             delegate_to=self.delegate_to)
 
-        job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hiveJob",
+        job = hook.create_job_template(self.task_id, self.cluster_name, "hiveJob",
                                        self.dataproc_properties)
 
         if self.query is None:
@@ -570,7 +570,7 @@ class DataProcSparkSqlOperator(BaseOperator):
     """
     Start a Spark SQL query Job on a Cloud DataProc cluster.
     """
-    template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster']
+    template_fields = ['query', 'variables', 'job_name', 'cluster_name']
     template_ext = ('.q',)
     ui_color = '#0273d4'
 
@@ -581,7 +581,7 @@ class DataProcSparkSqlOperator(BaseOperator):
             query_uri=None,
             variables=None,
             job_name='{{task.task_id}}_{{ds_nodash}}',
-            dataproc_cluster='cluster-1',
+            cluster_name='cluster-1',
             dataproc_spark_properties=None,
             dataproc_spark_jars=None,
             gcp_conn_id='google_cloud_default',
@@ -601,8 +601,8 @@ class DataProcSparkSqlOperator(BaseOperator):
             is the task_id appended with the execution data, but can be templated. The
             name will always be appended with a random number to avoid name clashes.
         :type job_name: string
-        :param dataproc_cluster: The id of the DataProc cluster.
-        :type dataproc_cluster: string
+        :param cluster_name: The name of the DataProc cluster.
+        :type cluster_name: string
         :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in
             default arguments
         :type dataproc_spark_properties: dict
@@ -623,7 +623,7 @@ class DataProcSparkSqlOperator(BaseOperator):
         self.query_uri = query_uri
         self.variables = variables
         self.job_name = job_name
-        self.dataproc_cluster = dataproc_cluster
+        self.cluster_name = cluster_name
         self.dataproc_properties = dataproc_spark_properties
         self.dataproc_jars = dataproc_spark_jars
 
@@ -631,7 +631,7 @@ class DataProcSparkSqlOperator(BaseOperator):
         hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
                             delegate_to=self.delegate_to)
 
-        job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkSqlJob",
+        job = hook.create_job_template(self.task_id, self.cluster_name, "sparkSqlJob",
                                        self.dataproc_properties)
 
         if self.query is None:
@@ -650,7 +650,7 @@ class DataProcSparkOperator(BaseOperator):
     Start a Spark Job on a Cloud DataProc cluster.
     """
 
-    template_fields = ['arguments', 'job_name', 'dataproc_cluster']
+    template_fields = ['arguments', 'job_name', 'cluster_name']
     ui_color = '#0273d4'
 
     @apply_defaults
@@ -662,7 +662,7 @@ class DataProcSparkOperator(BaseOperator):
             archives=None,
             files=None,
             job_name='{{task.task_id}}_{{ds_nodash}}',
-            dataproc_cluster='cluster-1',
+            cluster_name='cluster-1',
             dataproc_spark_properties=None,
             dataproc_spark_jars=None,
             gcp_conn_id='google_cloud_default',
@@ -690,8 +690,8 @@ class DataProcSparkOperator(BaseOperator):
             is the task_id appended with the execution data, but can be templated. The
             name will always be appended with a random number to avoid name clashes.
         :type job_name: string
-        :param dataproc_cluster: The id of the DataProc cluster.
-        :type dataproc_cluster: string
+        :param cluster_name: The name of the DataProc cluster.
+        :type cluster_name: string
         :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in
             default arguments
         :type dataproc_spark_properties: dict
@@ -716,7 +716,7 @@ class DataProcSparkOperator(BaseOperator):
         self.archives = archives
         self.files = files
         self.job_name = job_name
-        self.dataproc_cluster = dataproc_cluster
+        self.cluster_name = cluster_name
         self.dataproc_properties = dataproc_spark_properties
         self.dataproc_jars = dataproc_spark_jars
         self.region = region
@@ -724,7 +724,7 @@ class DataProcSparkOperator(BaseOperator):
     def execute(self, context):
         hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
                             delegate_to=self.delegate_to)
-        job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkJob",
+        job = hook.create_job_template(self.task_id, self.cluster_name, "sparkJob",
                                        self.dataproc_properties)
 
         job.set_main(self.main_jar, self.main_class)
@@ -742,7 +742,7 @@ class DataProcHadoopOperator(BaseOperator):
     Start a Hadoop Job on a Cloud DataProc cluster.
     """
 
-    template_fields = ['arguments', 'job_name', 'dataproc_cluster']
+    template_fields = ['arguments', 'job_name', 'cluster_name']
     ui_color = '#0273d4'
 
     @apply_defaults
@@ -754,7 +754,7 @@ class DataProcHadoopOperator(BaseOperator):
             archives=None,
             files=None,
             job_name='{{task.task_id}}_{{ds_nodash}}',
-            dataproc_cluster='cluster-1',
+            cluster_name='cluster-1',
             dataproc_hadoop_properties=None,
             dataproc_hadoop_jars=None,
             gcp_conn_id='google_cloud_default',
@@ -782,8 +782,8 @@ class DataProcHadoopOperator(BaseOperator):
             is the task_id appended with the execution data, but can be templated. The
             name will always be appended with a random number to avoid name clashes.
         :type job_name: string
-        :param dataproc_cluster: The id of the DataProc cluster.
-        :type dataproc_cluster: string
+        :param cluster_name: The name of the DataProc cluster.
+        :type cluster_name: string
         :param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in
             default arguments
         :type dataproc_hadoop_properties: dict
@@ -808,7 +808,7 @@ class DataProcHadoopOperator(BaseOperator):
         self.archives = archives
         self.files = files
         self.job_name = job_name
-        self.dataproc_cluster = dataproc_cluster
+        self.cluster_name = cluster_name
         self.dataproc_properties = dataproc_hadoop_properties
         self.dataproc_jars = dataproc_hadoop_jars
         self.region = region
@@ -816,7 +816,7 @@ class DataProcHadoopOperator(BaseOperator):
     def execute(self, context):
         hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
                             delegate_to=self.delegate_to)
-        job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hadoopJob",
+        job = hook.create_job_template(self.task_id, self.cluster_name, "hadoopJob",
                                        self.dataproc_properties)
 
         job.set_main(self.main_jar, self.main_class)
@@ -834,7 +834,7 @@ class DataProcPySparkOperator(BaseOperator):
     Start a PySpark Job on a Cloud DataProc cluster.
     """
 
-    template_fields = ['arguments', 'job_name', 'dataproc_cluster']
+    template_fields = ['arguments', 'job_name', 'cluster_name']
     ui_color = '#0273d4'
 
     @apply_defaults
@@ -846,7 +846,7 @@ class DataProcPySparkOperator(BaseOperator):
             pyfiles=None,
             files=None,
             job_name='{{task.task_id}}_{{ds_nodash}}',
-            dataproc_cluster='cluster-1',
+            cluster_name='cluster-1',
             dataproc_pyspark_properties=None,
             dataproc_pyspark_jars=None,
             gcp_conn_id='google_cloud_default',
@@ -874,8 +874,8 @@ class DataProcPySparkOperator(BaseOperator):
             is the task_id appended with the execution data, but can be templated. The
             name will always be appended with a random number to avoid name clashes.
         :type job_name: string
-        :param dataproc_cluster: The id of the DataProc cluster.
-        :type dataproc_cluster: string
+        :param cluster_name: The name of the DataProc cluster.
+        :type cluster_name: string
         :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in
             default arguments
         :type dataproc_pyspark_properties: dict
@@ -900,7 +900,7 @@ class DataProcPySparkOperator(BaseOperator):
         self.files = files
         self.pyfiles = pyfiles
         self.job_name = job_name
-        self.dataproc_cluster = dataproc_cluster
+        self.cluster_name = cluster_name
         self.dataproc_properties = dataproc_pyspark_properties
         self.dataproc_jars = dataproc_pyspark_jars
         self.region = region
@@ -908,7 +908,7 @@ class DataProcPySparkOperator(BaseOperator):
     def execute(self, context):
         hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
                             delegate_to=self.delegate_to)
-        job = hook.create_job_template(self.task_id, self.dataproc_cluster, "pysparkJob",
+        job = hook.create_job_template(self.task_id, self.cluster_name, "pysparkJob",
                                        self.dataproc_properties)
 
         job.set_python_main(self.main)


Mime
View raw message