beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pabl...@apache.org
Subject [beam] branch master updated: [BEAM-12751] Set clientRequestId for Dataflow python job creation
Date Fri, 27 Aug 2021 03:15:26 GMT
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 81fe195  [BEAM-12751] Set clientRequestId for Dataflow python job creation
     new cbbebcd  Merge pull request #15335 from [BEAM-12751] Set clientRequestId for Dataflow
python job creation
81fe195 is described below

commit 81fe195239cdc399e86950c4c3b67559acda555d
Author: Minbo Bae <baeminbo@google.com>
AuthorDate: Sun Aug 15 20:35:33 2021 -0700

    [BEAM-12751] Set clientRequestId for Dataflow python job creation
---
 .../runners/dataflow/internal/apiclient.py         | 28 ++++++++
 .../runners/dataflow/internal/apiclient_test.py    | 80 ++++++++++++++++++++++
 2 files changed, 108 insertions(+)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 9dc75cf..209d131 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -27,6 +27,8 @@ import io
 import json
 import logging
 import os
+import random
+
 import pkg_resources
 import re
 import sys
@@ -496,6 +498,11 @@ class Job(object):
         self.proto.labels.additionalProperties.append(
             dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value))
 
+    # Client Request ID
+    self.proto.clientRequestId = '{}-{}'.format(
+        datetime.utcnow().strftime('%Y%m%d%H%M%S%f'),
+        random.randrange(9000) + 1000)
+
     self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$')
     self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')
 
@@ -794,6 +801,20 @@ class DataflowApplicationClient(object):
           self.google_cloud_options.dataflow_endpoint)
       _LOGGER.fatal('details of server error: %s', e)
       raise
+
+    if response.clientRequestId and \
+        response.clientRequestId != job.proto.clientRequestId:
+      if self.google_cloud_options.update:
+        raise DataflowJobAlreadyExistsError(
+            "The job named %s with id: %s has already been updated into job "
+            "id: %s and cannot be updated again." %
+            (response.name, job.proto.replaceJobId, response.id))
+      else:
+        raise DataflowJobAlreadyExistsError(
+            'There is already active job named %s with id: %s. If you want to '
+            'submit a second job, try again by setting a different name using '
+            '--job_name.' % (response.name, response.id))
+
     _LOGGER.info('Create job: %s', response)
     # The response is a Job proto with the id for the new job.
     _LOGGER.info('Created job with id: [%s]', response.id)
@@ -1028,6 +1049,13 @@ class _LegacyDataflowStager(Stager):
     return shared_names.BEAM_PACKAGE_NAME
 
 
+class DataflowJobAlreadyExistsError(retry.PermanentException):
+  """A non-retryable exception that a job with the given name already exists."""
+  # Inherits retry.PermanentException to avoid retry in
+  # DataflowApplicationClient.submit_job_description
+  pass
+
+
 def to_split_int(n):
   res = dataflow.SplitInt64()
   res.lowBits = n & 0xffffffff
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index a4f81a3..41e79f3 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -1039,6 +1039,86 @@ class UtilTest(unittest.TestCase):
               mock.ANY, "dataflow_graph.json", mock.ANY)
           client.create_job_description.assert_called_once()
 
+  def test_create_job_returns_existing_job(self):
+    pipeline_options = PipelineOptions([
+        '--project',
+        'test_project',
+        '--job_name',
+        'test_job_name',
+        '--temp_location',
+        'gs://test-location/temp',
+    ])
+    job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
+    self.assertTrue(job.proto.clientRequestId)  # asserts non-empty string
+    pipeline_options.view_as(GoogleCloudOptions).no_auth = True
+    client = apiclient.DataflowApplicationClient(pipeline_options)
+
+    response = dataflow.Job()
+    # different clientRequestId from `job`
+    response.clientRequestId = "20210821081910123456-1234"
+    response.name = 'test_job_name'
+    response.id = '2021-08-19_21_18_43-9756917246311111021'
+
+    with mock.patch.object(client._client.projects_locations_jobs,
+                           'Create',
+                           side_effect=[response]):
+      with mock.patch.object(client, 'create_job_description',
+                             side_effect=None):
+        with self.assertRaises(
+            apiclient.DataflowJobAlreadyExistsError) as context:
+          client.create_job(job)
+
+        self.assertEqual(
+            str(context.exception),
+            'There is already active job named %s with id: %s. If you want to '
+            'submit a second job, try again by setting a different name using '
+            '--job_name.' % ('test_job_name', response.id))
+
+  def test_update_job_returns_existing_job(self):
+    pipeline_options = PipelineOptions([
+        '--project',
+        'test_project',
+        '--job_name',
+        'test_job_name',
+        '--temp_location',
+        'gs://test-location/temp',
+        '--region',
+        'us-central1',
+        '--update',
+    ])
+    replace_job_id = '2021-08-21_00_00_01-6081497447916622336'
+    with mock.patch('apache_beam.runners.dataflow.internal.apiclient'
+                    '.DataflowApplicationClient.job_id_for_name',
+                    return_value=replace_job_id) as job_id_for_name_mock:
+      job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
+    job_id_for_name_mock.assert_called_once()
+
+    self.assertTrue(job.proto.clientRequestId)  # asserts non-empty string
+
+    pipeline_options.view_as(GoogleCloudOptions).no_auth = True
+    client = apiclient.DataflowApplicationClient(pipeline_options)
+
+    response = dataflow.Job()
+    # different clientRequestId from `job`
+    response.clientRequestId = "20210821083254123456-1234"
+    response.name = 'test_job_name'
+    response.id = '2021-08-19_21_29_07-5725551945600207770'
+
+    with mock.patch.object(client, 'create_job_description', side_effect=None):
+      with mock.patch.object(client._client.projects_locations_jobs,
+                             'Create',
+                             side_effect=[response]):
+
+        with self.assertRaises(
+            apiclient.DataflowJobAlreadyExistsError) as context:
+          client.create_job(job)
+
+      self.assertEqual(
+          str(context.exception),
+          'The job named %s with id: %s has already been updated into job '
+          'id: %s and cannot be updated again.' %
+          ('test_job_name', replace_job_id, response.id))
+
   def test_template_file_generation_with_upload_graph(self):
     pipeline_options = PipelineOptions([
         '--project',

Mime
View raw message