beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] incubator-beam git commit: Fixing inconsistencies in PipelineOptions
Date Wed, 21 Dec 2016 23:15:35 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 3454d691f -> bb09c07b6


Fixing inconsistencies in PipelineOptions

The following options have changed:

* job_name - Default is 'beamapp-username-date-microseconds'. Test was added.
* staging_location and temp_location - staging_location was the default of
  temp_location. Now it's the other way around, and the tests reflect that.
* machine_type alias of worker_machine_type has been removed.
* disk_type alias of worker_disk_type has been removed.
* disk_source_image option has been removed.
* no_save_main_session option has been removed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35e2fdc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35e2fdc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35e2fdc7

Branch: refs/heads/python-sdk
Commit: 35e2fdc7f22f20d74a569e86ced931209661dec1
Parents: 3454d69
Author: Pablo <pabloem@google.com>
Authored: Tue Dec 6 18:01:54 2016 -0800
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Wed Dec 21 15:14:52 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/apiclient.py   | 45 ++++++++++++--------
 .../apache_beam/internal/apiclient_test.py      |  6 +++
 sdks/python/apache_beam/utils/options.py        | 33 ++++++--------
 .../utils/pipeline_options_validator.py         | 11 ++---
 .../utils/pipeline_options_validator_test.py    |  8 ++--
 5 files changed, 54 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index f1341a7..3a9ba46 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -18,6 +18,8 @@
 """Dataflow client utility functions."""
 
 import codecs
+from datetime import datetime
+import getpass
 import json
 import logging
 import os
@@ -46,10 +48,6 @@ from apache_beam.utils.options import WorkerOptions
 from apache_beam.internal.clients import storage
 import apache_beam.internal.clients.dataflow as dataflow
 
-BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
-COMPUTE_API_SERVICE = 'compute.googleapis.com'
-STORAGE_API_SERVICE = 'storage.googleapis.com'
-
 
 class Step(object):
   """Wrapper for a dataflow Step protobuf."""
@@ -121,11 +119,13 @@ class Environment(object):
     self.worker_options = options.view_as(WorkerOptions)
     self.debug_options = options.view_as(DebugOptions)
     self.proto = dataflow.Environment()
-    self.proto.clusterManagerApiService = COMPUTE_API_SERVICE
-    self.proto.dataset = '%s/cloud_dataflow' % BIGQUERY_API_SERVICE
+    self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE
+    self.proto.dataset = '{}/cloud_dataflow'.format(
+        GoogleCloudOptions.BIGQUERY_API_SERVICE)
     self.proto.tempStoragePrefix = (
-        self.google_cloud_options.temp_location.replace('gs:/',
-                                                        STORAGE_API_SERVICE))
+        self.google_cloud_options.temp_location.replace(
+            'gs:/',
+            GoogleCloudOptions.STORAGE_API_SERVICE))
     # User agent information.
     self.proto.userAgent = dataflow.Environment.UserAgentValue()
     self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint
@@ -165,7 +165,7 @@ class Environment(object):
           dataflow.Package(
               location='%s/%s' % (
                   self.google_cloud_options.staging_location.replace(
-                      'gs:/', STORAGE_API_SERVICE),
+                      'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE),
                   package),
               name=package))
 
@@ -174,7 +174,7 @@ class Environment(object):
         packages=package_descriptors,
         taskrunnerSettings=dataflow.TaskRunnerSettings(
             parallelWorkerSettings=dataflow.WorkerSettings(
-                baseUrl='https://dataflow.googleapis.com',
+                baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
                 servicePath=self.google_cloud_options.dataflow_endpoint)))
     pool.autoscalingSettings = dataflow.AutoscalingSettings()
     # Set worker pool options received through command line.
@@ -195,8 +195,6 @@ class Environment(object):
       pool.diskSizeGb = self.worker_options.disk_size_gb
     if self.worker_options.disk_type:
       pool.diskType = self.worker_options.disk_type
-    if self.worker_options.disk_source_image:
-      pool.diskSourceImage = self.worker_options.disk_source_image
     if self.worker_options.zone:
       pool.zone = self.worker_options.zone
     if self.worker_options.network:
@@ -299,10 +297,23 @@ class Job(object):
         json.loads(encoding.MessageToJson(self.proto), encoding='shortstrings'),
         indent=2, sort_keys=True)
 
+  @staticmethod
+  def default_job_name(job_name):
+    if job_name is None:
+      user_name = getpass.getuser().lower()
+      date_component = datetime.utcnow().strftime('%m%d%H%M%S-%f')
+      app_name = 'beamapp'
+      job_name = '{}-{}-{}'.format(app_name, user_name, date_component)
+    return job_name
+
   def __init__(self, options):
     self.options = options
     self.google_cloud_options = options.view_as(GoogleCloudOptions)
-    required_google_cloud_options = ['project', 'job_name', 'staging_location']
+    if not self.google_cloud_options.job_name:
+      self.google_cloud_options.job_name = self.default_job_name(
+          self.google_cloud_options.job_name)
+
+    required_google_cloud_options = ['project', 'job_name', 'temp_location']
     missing = [
         option for option in required_google_cloud_options
         if not getattr(self.google_cloud_options, option)]
@@ -310,11 +321,11 @@ class Job(object):
       raise ValueError(
           'Missing required configuration parameters: %s' % missing)
 
-    if not self.google_cloud_options.temp_location:
-      logging.info('Defaulting to the staging_location as temp_location: %s',
-                   self.google_cloud_options.staging_location)
+    if not self.google_cloud_options.staging_location:
+      logging.info('Defaulting to the temp_location as staging_location: %s',
+                   self.google_cloud_options.temp_location)
       (self.google_cloud_options
-       .temp_location) = self.google_cloud_options.staging_location
+       .staging_location) = self.google_cloud_options.temp_location
 
     # Make the staging and temp locations job name and time specific. This is
     # needed to avoid clashes between job submissions using the same staging

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py
index 31b2dad..75d00e0 100644
--- a/sdks/python/apache_beam/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/internal/apiclient_test.py
@@ -16,6 +16,7 @@
 #
 """Unit tests for the apiclient module."""
 
+import re
 import unittest
 
 from apache_beam.utils.options import PipelineOptions
@@ -32,6 +33,11 @@ class UtilTest(unittest.TestCase):
         pipeline_options,
         DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION)
 
+  def test_default_job_name(self):
+    job_name = apiclient.Job.default_job_name(None)
+    regexp = 'beamapp-[a-z]*-[0-9]{10}-[0-9]{6}'
+    self.assertTrue(re.match(regexp, job_name))
+
 
 if __name__ == '__main__':
   unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py
index eaa1065..085c09c 100644
--- a/sdks/python/apache_beam/utils/options.py
+++ b/sdks/python/apache_beam/utils/options.py
@@ -224,11 +224,16 @@ class TypeOptions(PipelineOptions):
 class GoogleCloudOptions(PipelineOptions):
   """Google Cloud Dataflow service execution options."""
 
+  BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
+  COMPUTE_API_SERVICE = 'compute.googleapis.com'
+  STORAGE_API_SERVICE = 'storage.googleapis.com'
+  DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com'
+
   @classmethod
   def _add_argparse_args(cls, parser):
     parser.add_argument(
         '--dataflow_endpoint',
-        default='https://dataflow.googleapis.com',
+        default=cls.DATAFLOW_ENDPOINT,
         help=
         ('The URL for the Dataflow API. If not set, the default public URL '
          'will be used.'))
@@ -251,7 +256,6 @@ class GoogleCloudOptions(PipelineOptions):
     parser.add_argument('--temp_location',
                         default=None,
                         help='GCS path for saving temporary workflow jobs.')
-    # Options for using service account credentials.
     parser.add_argument('--service_account_name',
                         default=None,
                         help='Name of the service account for Google APIs.')
@@ -272,10 +276,10 @@ class GoogleCloudOptions(PipelineOptions):
     errors = []
     if validator.is_service_runner():
       errors.extend(validator.validate_cloud_options(self))
-      errors.extend(validator.validate_gcs_path(self, 'staging_location'))
-      if getattr(self, 'temp_location',
-                 None) or getattr(self, 'staging_location', None) is None:
-        errors.extend(validator.validate_gcs_path(self, 'temp_location'))
+      errors.extend(validator.validate_gcs_path(self, 'temp_location'))
+      if getattr(self, 'staging_location',
+                 None) or getattr(self, 'temp_location', None) is None:
+        errors.extend(validator.validate_gcs_path(self, 'staging_location'))
 
     if self.view_as(DebugOptions).dataflow_job_file:
       if self.view_as(GoogleCloudOptions).template_location:
@@ -312,9 +316,8 @@ class WorkerOptions(PipelineOptions):
         default=None,  # Meaning unset, distinct from 'NONE' meaning don't scale
         help=
         ('If and how to auotscale the workerpool.'))
-    # TODO(silviuc): Remove --machine_type variant of the flag.
     parser.add_argument(
-        '--worker_machine_type', '--machine_type',
+        '--worker_machine_type',
         dest='machine_type',
         default=None,
         help=('Machine type to create Dataflow worker VMs as. See '
@@ -329,21 +332,12 @@ class WorkerOptions(PipelineOptions):
         help=
         ('Remote worker disk size, in gigabytes, or 0 to use the default size. '
          'If not set, the Dataflow service will use a reasonable default.'))
-    # TODO(silviuc): Remove --disk_type variant of the flag.
     parser.add_argument(
-        '--worker_disk_type', '--disk_type',
+        '--worker_disk_type',
         dest='disk_type',
         default=None,
         help=('Specifies what type of persistent disk should be used.'))
     parser.add_argument(
-        '--disk_source_image',
-        default=None,
-        help=
-        ('Disk source image to use by VMs for jobs. See '
-         'https://developers.google.com/compute/docs/images for further '
-         'details. If not set, the Dataflow service will use a reasonable '
-         'default.'))
-    parser.add_argument(
         '--zone',
         default=None,
         help=(
@@ -461,9 +455,6 @@ class SetupOptions(PipelineOptions):
          'Some workflows do not need the session state if for instance all '
          'their functions/classes are defined in proper modules (not __main__)'
          ' and the modules are importable in the worker. '))
-    parser.add_argument('--no_save_main_session',
-                        dest='save_main_session',
-                        action='store_false')
     parser.add_argument(
         '--sdk_location',
         default='default',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/utils/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py b/sdks/python/apache_beam/utils/pipeline_options_validator.py
index b7b2978..c248022 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py
@@ -17,7 +17,6 @@
 
 """Pipeline options validator.
 """
-
 import re
 
 from apache_beam.utils.options import DebugOptions
@@ -144,12 +143,10 @@ class PipelineOptionsValidator(object):
   def validate_cloud_options(self, view):
     """Validates job_name and project arguments."""
     errors = []
-    job_name = view.job_name
-    if job_name is None:
-      errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'job_name'))
-    elif not self.is_full_string_match(self.JOB_PATTERN, job_name):
-      errors.extend(self._validate_error(self.ERR_INVALID_JOB_NAME, job_name))
-
+    if (view.job_name and
+        not self.is_full_string_match(self.JOB_PATTERN, view.job_name)):
+      errors.extend(self._validate_error(self.ERR_INVALID_JOB_NAME,
+                                         view.job_name))
     project = view.project
     if project is None:
       errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'project'))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
index bffbeca..5e93ff6 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
@@ -70,7 +70,7 @@ class SetupTest(unittest.TestCase):
     self.assertEqual(
         self.check_errors_for_arguments(
             errors,
-            ['project', 'job_name', 'staging_location', 'temp_location']),
+            ['project', 'staging_location', 'temp_location']),
         [])
 
   def test_gcs_path(self):
@@ -91,13 +91,13 @@ class SetupTest(unittest.TestCase):
     test_cases = [
         {'temp_location': None,
          'staging_location': 'gs://foo/bar',
-         'errors': []},
+         'errors': ['temp_location']},
         {'temp_location': None,
          'staging_location': None,
          'errors': ['staging_location', 'temp_location']},
         {'temp_location': 'gs://foo/bar',
          'staging_location': None,
-         'errors': ['staging_location']},
+         'errors': []},
         {'temp_location': 'gs://foo/bar',
          'staging_location': 'gs://ABC/bar',
          'errors': ['staging_location']},
@@ -172,7 +172,7 @@ class SetupTest(unittest.TestCase):
       return validator
 
     test_cases = [
-        {'job_name': None, 'errors': ['job_name']},
+        {'job_name': None, 'errors': []},
         {'job_name': '12345', 'errors': ['job_name']},
         {'job_name': 'FOO', 'errors': ['job_name']},
         {'job_name': 'foo:bar', 'errors': ['job_name']},


Mime
View raw message