Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CC5BF200BFA for ; Thu, 29 Dec 2016 05:33:20 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CAD76160B2E; Thu, 29 Dec 2016 04:33:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 87823160B35 for ; Thu, 29 Dec 2016 05:33:18 +0100 (CET) Received: (qmail 40416 invoked by uid 500); 29 Dec 2016 04:33:17 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 40300 invoked by uid 99); 29 Dec 2016 04:33:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Dec 2016 04:33:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5D267DFD86; Thu, 29 Dec 2016 04:33:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.apache.org Date: Thu, 29 Dec 2016 04:33:18 -0000 Message-Id: <2426291230ae446ea40f9894ece119c4@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] beam git commit: Rename options.py -> pipeline_options.py archived-at: Thu, 29 Dec 2016 04:33:21 -0000 Rename options.py -> pipeline_options.py Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd9c9f88 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd9c9f88 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd9c9f88 Branch: refs/heads/python-sdk Commit: cd9c9f888879d425a990031f28bf3bff63d2087e Parents: 89a5b3c Author: Maria Garcia Herrero Authored: Wed Dec 28 15:28:37 2016 -0800 Committer: Dan Halperin Committed: Wed Dec 28 20:32:24 2016 -0800 ---------------------------------------------------------------------- .../examples/complete/autocomplete.py | 4 +- .../examples/complete/estimate_pi.py | 4 +- .../apache_beam/examples/complete/tfidf.py | 4 +- .../examples/complete/top_wikipedia_sessions.py | 4 +- .../examples/cookbook/bigquery_side_input.py | 4 +- .../apache_beam/examples/cookbook/bigshuffle.py | 4 +- .../apache_beam/examples/cookbook/coders.py | 4 +- .../examples/cookbook/custom_ptransform.py | 2 +- .../examples/cookbook/datastore_wordcount.py | 6 +- .../examples/cookbook/group_with_coder.py | 4 +- .../examples/cookbook/mergecontacts.py | 4 +- .../examples/cookbook/multiple_output_pardo.py | 4 +- .../apache_beam/examples/snippets/snippets.py | 50 +- .../examples/snippets/snippets_test.py | 2 +- sdks/python/apache_beam/examples/wordcount.py | 4 +- .../apache_beam/examples/wordcount_debugging.py | 4 +- .../apache_beam/examples/wordcount_minimal.py | 4 +- sdks/python/apache_beam/internal/apiclient.py | 8 +- .../apache_beam/internal/apiclient_test.py | 2 +- sdks/python/apache_beam/internal/auth.py | 4 +- sdks/python/apache_beam/io/bigquery.py | 2 +- sdks/python/apache_beam/io/bigquery_test.py | 2 +- sdks/python/apache_beam/pipeline.py | 8 +- .../apache_beam/runners/dataflow_runner.py | 2 +- .../runners/direct/transform_evaluator.py | 2 +- sdks/python/apache_beam/runners/runner_test.py | 2 +- .../apache_beam/runners/template_runner_test.py | 2 +- .../runners/test/test_dataflow_runner.py | 2 +- sdks/python/apache_beam/test_pipeline.py | 2 +- sdks/python/apache_beam/test_pipeline_test.py | 2 +- sdks/python/apache_beam/transforms/core.py | 2 +- sdks/python/apache_beam/transforms/display.py | 2 +- .../apache_beam/transforms/display_test.py | 2 +- .../python/apache_beam/transforms/ptransform.py | 2 +- .../apache_beam/transforms/ptransform_test.py | 4 +- .../transforms/write_ptransform_test.py | 2 +- .../typehints/typed_pipeline_test.py | 4 +- sdks/python/apache_beam/utils/dependency.py | 4 +- .../python/apache_beam/utils/dependency_test.py | 6 +- sdks/python/apache_beam/utils/options.py | 543 ------------------- .../apache_beam/utils/pipeline_options.py | 540 ++++++++++++++++++ .../apache_beam/utils/pipeline_options_test.py | 2 +- .../utils/pipeline_options_validator.py | 14 +- .../utils/pipeline_options_validator_test.py | 2 +- 44 files changed, 639 insertions(+), 642 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/complete/autocomplete.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py index eaa5ca2..87e6c0c 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete.py @@ -24,8 +24,8 @@ import logging import re import apache_beam as beam -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions def run(argv=None): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/complete/estimate_pi.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index 682c6d2..d0faefe 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -36,8 +36,8 @@ import apache_beam as beam from apache_beam.typehints import Any from apache_beam.typehints import Iterable from apache_beam.typehints import Tuple -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions @beam.typehints.with_output_types(Tuple[int, int, int]) http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/complete/tfidf.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index 59b2900..b4d5b45 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -30,8 +30,8 @@ import re import apache_beam as beam from apache_beam.pvalue import AsSingleton -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions def read_documents(pipeline, uris): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index fbce641..cbd305a 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -46,8 +46,8 @@ import logging import apache_beam as beam from apache_beam import combiners from apache_beam import window -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions ONE_HOUR_IN_SECONDS = 3600 http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py index 8a53637..25e2c3b 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py @@ -35,8 +35,8 @@ import apache_beam as beam from apache_beam.pvalue import AsList from apache_beam.pvalue import AsSingleton -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions def create_groups(group_ids, corpus, word, ignore_corpus, ignore_word): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/bigshuffle.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py index a076a0c..83d3881 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py +++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py @@ -25,8 +25,8 @@ import logging import apache_beam as beam -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions def crc32line(line): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py index bbe02b3..690ba66 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders.py +++ b/sdks/python/apache_beam/examples/cookbook/coders.py @@ -35,8 +35,8 @@ import json import logging import apache_beam as beam -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions class JsonCoder(object): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py index b9d64cf..56259ed 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py @@ -27,7 +27,7 @@ import logging import apache_beam as beam -from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.pipeline_options import PipelineOptions # pylint doesn't understand our pipeline syntax: http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index 9613402..dd34070 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -73,9 +73,9 @@ from googledatastore import helper as datastore_helper, PropertyFilter import apache_beam as beam from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore -from apache_beam.utils.options import GoogleCloudOptions -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import GoogleCloudOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions empty_line_aggregator = beam.Aggregator('emptyLines') average_word_size_aggregator = beam.Aggregator('averageWordLength', http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/group_with_coder.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py index 651a4f3..c4b8c59 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py @@ -35,8 +35,8 @@ import apache_beam as beam from apache_beam import coders from apache_beam.typehints import typehints from apache_beam.typehints.decorators import with_output_types -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions class Player(object): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/mergecontacts.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index bf6d1b1..6602609 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -36,8 +36,8 @@ import logging import re import apache_beam as beam -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions def run(argv=None, assert_results=None): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py index d760e5a..dd91e74 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py @@ -54,8 +54,8 @@ import re import apache_beam as beam from apache_beam import pvalue -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions class SplitLinesToWordsFn(beam.DoFn): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 64878f3..0d55125 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -88,7 +88,7 @@ def construct_pipeline(renames): return True # [START pipelines_constructing_creating] - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions p = beam.Pipeline(options=PipelineOptions()) # [END pipelines_constructing_creating] @@ -125,7 +125,7 @@ def model_pipelines(argv): import re import apache_beam as beam - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions class MyOptions(PipelineOptions): @@ -161,7 +161,7 @@ def model_pcollection(argv): URL: https://cloud.google.com/dataflow/model/pcollection """ - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions class MyOptions(PipelineOptions): @@ -197,7 +197,7 @@ def pipeline_options_remote(argv): """ from apache_beam import Pipeline - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions # [START pipeline_options_create] options = PipelineOptions(flags=argv) @@ -212,8 +212,8 @@ def pipeline_options_remote(argv): parser.add_argument('--output') # [END pipeline_options_define_custom] - from apache_beam.utils.options import GoogleCloudOptions - from apache_beam.utils.options import StandardOptions + from apache_beam.utils.pipeline_options import GoogleCloudOptions + from apache_beam.utils.pipeline_options import StandardOptions # [START pipeline_options_dataflow_service] # Create and set your PipelineOptions. @@ -254,7 +254,7 @@ def pipeline_options_local(argv): """ from apache_beam import Pipeline - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions options = PipelineOptions(flags=argv) @@ -320,7 +320,7 @@ def pipeline_logging(lines, output): import re import apache_beam as beam - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions # [START pipeline_logging] # import Python logging module. @@ -357,7 +357,7 @@ def pipeline_monitoring(renames): import re import apache_beam as beam - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions class WordCountOptions(PipelineOptions): @@ -425,9 +425,9 @@ def examples_wordcount_minimal(renames): import apache_beam as beam - from apache_beam.utils.options import GoogleCloudOptions - from apache_beam.utils.options import StandardOptions - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import GoogleCloudOptions + from apache_beam.utils.pipeline_options import StandardOptions + from apache_beam.utils.pipeline_options import PipelineOptions # [START examples_wordcount_minimal_options] options = PipelineOptions() @@ -485,7 +485,7 @@ def examples_wordcount_wordcount(renames): import re import apache_beam as beam - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions argv = [] @@ -544,7 +544,7 @@ def examples_wordcount_debugging(renames): import re import apache_beam as beam - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions # [START example_wordcount_debugging_logging] # [START example_wordcount_debugging_aggregators] @@ -635,7 +635,7 @@ def model_custom_source(count): from apache_beam.io import iobase from apache_beam.io.range_trackers import OffsetRangeTracker from apache_beam.transforms.core import PTransform - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions # Defining a new source. # [START model_custom_source_new_source] @@ -766,7 +766,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, import apache_beam as beam from apache_beam.io import iobase from apache_beam.transforms.core import PTransform - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions # Defining the new sink. # [START model_custom_sink_new_sink] @@ -867,7 +867,7 @@ def model_textio(renames): return re.findall(r'[A-Za-z\']+', x) import apache_beam as beam - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions # [START model_textio_read] p = beam.Pipeline(options=PipelineOptions()) @@ -902,7 +902,7 @@ def model_datastoreio(): from google.datastore.v1 import query_pb2 import googledatastore import apache_beam as beam - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore @@ -938,7 +938,7 @@ def model_bigqueryio(): URL: https://cloud.google.com/dataflow/model/bigquery-io """ import apache_beam as beam - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions # [START model_bigqueryio_read] p = beam.Pipeline(options=PipelineOptions()) @@ -1009,7 +1009,7 @@ def model_composite_transform_example(contents, output_path): # [END composite_ptransform_apply_method] # [END composite_transform_example] - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions p = beam.Pipeline(options=PipelineOptions()) (p | beam.Create(contents) @@ -1025,7 +1025,7 @@ def model_multiple_pcollections_flatten(contents, output_path): """ some_hash_fn = lambda s: ord(s[0]) import apache_beam as beam - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions p = beam.Pipeline(options=PipelineOptions()) partition_fn = lambda element, partitions: some_hash_fn(element) % partitions @@ -1066,7 +1066,7 @@ def model_multiple_pcollections_partition(contents, output_path): """Assume i in [0,100).""" return i import apache_beam as beam - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions p = beam.Pipeline(options=PipelineOptions()) students = p | beam.Create(contents) @@ -1096,7 +1096,7 @@ def model_group_by_key(contents, output_path): import re import apache_beam as beam - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions p = beam.Pipeline(options=PipelineOptions()) words_and_counts = ( p @@ -1123,7 +1123,7 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path): URL: https://cloud.google.com/dataflow/model/group-by-key """ import apache_beam as beam - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions p = beam.Pipeline(options=PipelineOptions()) # [START model_group_by_key_cogroupbykey_tuple] # Each data set is represented by key-value pairs in separate PCollections. @@ -1161,7 +1161,7 @@ def model_join_using_side_inputs( import apache_beam as beam from apache_beam.pvalue import AsIter - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions p = beam.Pipeline(options=PipelineOptions()) # [START model_join_using_side_inputs] http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index feb06c5..1a84a6e 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -31,7 +31,7 @@ from apache_beam import typehints from apache_beam.io import fileio from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to -from apache_beam.utils.options import TypeOptions +from apache_beam.utils.pipeline_options import TypeOptions from apache_beam.examples.snippets import snippets # pylint: disable=expression-not-assigned http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 7f347d8..51fc2eb 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -24,8 +24,8 @@ import logging import re import apache_beam as beam -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions empty_line_aggregator = beam.Aggregator('emptyLines') http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/wordcount_debugging.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index ffbfed7..bba09b4 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -46,8 +46,8 @@ import logging import re import apache_beam as beam -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions class FilterTextFn(beam.DoFn): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/wordcount_minimal.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py index 98477df..c02ec16 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal.py @@ -51,8 +51,8 @@ import logging import re import apache_beam as beam -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions def run(argv=None): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 3a9ba46..001ae64 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -41,10 +41,10 @@ from apache_beam.utils import retry from apache_beam.utils.dependency import get_required_container_version from apache_beam.utils.dependency import get_sdk_name_and_version from apache_beam.utils.names import PropertyNames -from apache_beam.utils.options import DebugOptions -from apache_beam.utils.options import GoogleCloudOptions -from apache_beam.utils.options import StandardOptions -from apache_beam.utils.options import WorkerOptions +from apache_beam.utils.pipeline_options import DebugOptions +from apache_beam.utils.pipeline_options import GoogleCloudOptions +from apache_beam.utils.pipeline_options import StandardOptions +from apache_beam.utils.pipeline_options import WorkerOptions from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/internal/apiclient_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py index 1a83752..188a5a8 100644 --- a/sdks/python/apache_beam/internal/apiclient_test.py +++ b/sdks/python/apache_beam/internal/apiclient_test.py @@ -18,7 +18,7 @@ import unittest -from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.runners.dataflow_runner import DataflowRunner from apache_beam.internal import apiclient http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/internal/auth.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/auth.py b/sdks/python/apache_beam/internal/auth.py index 056f40c..c645f24 100644 --- a/sdks/python/apache_beam/internal/auth.py +++ b/sdks/python/apache_beam/internal/auth.py @@ -29,8 +29,8 @@ from oauth2client.client import OAuth2Credentials from apache_beam.utils import processes from apache_beam.utils import retry -from apache_beam.utils.options import GoogleCloudOptions -from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.pipeline_options import GoogleCloudOptions +from apache_beam.utils.pipeline_options import PipelineOptions # When we are running in GCE, we can authenticate with VM credentials. http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/io/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 6a766d0..ea0a281 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -120,7 +120,7 @@ from apache_beam.internal.json_value import to_json_value from apache_beam.runners.dataflow.native_io import iobase as dataflow_io from apache_beam.transforms.display import DisplayDataItem from apache_beam.utils import retry -from apache_beam.utils.options import GoogleCloudOptions +from apache_beam.utils.pipeline_options import GoogleCloudOptions # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/io/bigquery_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index 3a558d7..b8682d1 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -35,7 +35,7 @@ from apache_beam.io.bigquery import TableRowJsonCoder from apache_beam.io.bigquery import parse_table_schema_from_json from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher -from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.pipeline_options import PipelineOptions class TestRowAsDictJsonCoder(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 8b2345e..81343f3 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -55,10 +55,10 @@ from apache_beam.runners import create_runner from apache_beam.runners import PipelineRunner from apache_beam.transforms import ptransform from apache_beam.typehints import TypeCheckError -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions -from apache_beam.utils.options import StandardOptions -from apache_beam.utils.options import TypeOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions +from apache_beam.utils.pipeline_options import StandardOptions +from apache_beam.utils.pipeline_options import TypeOptions from apache_beam.utils.pipeline_options_validator import PipelineOptionsValidator http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 3ee95c5..3505acc 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -41,7 +41,7 @@ from apache_beam.typehints import typehints from apache_beam.utils import names from apache_beam.utils.names import PropertyNames from apache_beam.utils.names import TransformNames -from apache_beam.utils.options import StandardOptions +from apache_beam.utils.pipeline_options import StandardOptions from apache_beam.internal.clients import dataflow as dataflow_api http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 24ab754..b4c43ba 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -38,7 +38,7 @@ from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn from apache_beam.typehints.typecheck import TypeCheckError from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn from apache_beam.utils import counters -from apache_beam.utils.options import TypeOptions +from apache_beam.utils.pipeline_options import TypeOptions class TransformEvaluatorRegistry(object): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 74c81c2..ff6a22e 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -36,7 +36,7 @@ from apache_beam.runners import DirectRunner from apache_beam.runners import TestDataflowRunner import apache_beam.transforms as ptransform from apache_beam.transforms.display import DisplayDataItem -from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.pipeline_options import PipelineOptions class RunnerTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/template_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py index 8cd818b..457022d 100644 --- a/sdks/python/apache_beam/runners/template_runner_test.py +++ b/sdks/python/apache_beam/runners/template_runner_test.py @@ -26,7 +26,7 @@ import tempfile import apache_beam as beam from apache_beam.pipeline import Pipeline from apache_beam.runners.dataflow_runner import DataflowRunner -from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.internal import apiclient http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/test/test_dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py index a58ab33..77655bd 100644 --- a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py @@ -19,7 +19,7 @@ from apache_beam.internal import pickler from apache_beam.runners.dataflow_runner import DataflowRunner -from apache_beam.utils.options import TestOptions +from apache_beam.utils.pipeline_options import TestOptions class TestDataflowRunner(DataflowRunner): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/test_pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py index e758c3d..48b98b2 100644 --- a/sdks/python/apache_beam/test_pipeline.py +++ b/sdks/python/apache_beam/test_pipeline.py @@ -22,7 +22,7 @@ import shlex from apache_beam.internal import pickler from apache_beam.pipeline import Pipeline -from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.pipeline_options import PipelineOptions from nose.plugins.skip import SkipTest http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/test_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py index 747f0ef..42ba2d7 100644 --- a/sdks/python/apache_beam/test_pipeline_test.py +++ b/sdks/python/apache_beam/test_pipeline_test.py @@ -20,7 +20,7 @@ import unittest from apache_beam.test_pipeline import TestPipeline -from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.pipeline_options import PipelineOptions class TestPipelineTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 0ba1c62..6a7bd2e 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -46,7 +46,7 @@ from apache_beam.typehints import TypeCheckError from apache_beam.typehints import Union from apache_beam.typehints import WithTypeHints from apache_beam.typehints.trivial_inference import element_type -from apache_beam.utils.options import TypeOptions +from apache_beam.utils.pipeline_options import TypeOptions # Type variables T = typehints.TypeVariable('T') http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/display.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index c38fd9b..6e74512 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -122,7 +122,7 @@ class DisplayData(object): ValueError: If the has_display_data argument is not an instance of HasDisplayData. """ - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions if not isinstance(pipeline_options, PipelineOptions): raise ValueError( 'Element of class {}.{} does not subclass PipelineOptions' http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/display_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index fc50abe..848746c 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -29,7 +29,7 @@ import apache_beam as beam from apache_beam.transforms.display import HasDisplayData from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display import DisplayDataItem -from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.pipeline_options import PipelineOptions class DisplayDataItemMatcher(BaseMatcher): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 39864a6..c022c5e 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -410,7 +410,7 @@ class PTransform(WithTypeHints, HasDisplayData): deferred = False # pylint: disable=wrong-import-order, wrong-import-position from apache_beam import pipeline - from apache_beam.utils.options import PipelineOptions + from apache_beam.utils.pipeline_options import PipelineOptions # pylint: enable=wrong-import-order, wrong-import-position p = pipeline.Pipeline( 'DirectRunner', PipelineOptions(sys.argv)) http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 5ed7d72..705e85e 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -36,8 +36,8 @@ import apache_beam.typehints as typehints from apache_beam.typehints import with_input_types from apache_beam.typehints import with_output_types from apache_beam.typehints.typehints_test import TypeHintTestCase -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import TypeOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import TypeOptions # Disable frequent lint warning due to pipe operator for chaining transforms. http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/write_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index 9a1a7de..f96dffb 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -25,7 +25,7 @@ from apache_beam.io import iobase from apache_beam.pipeline import Pipeline from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.util import assert_that, is_empty -from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.pipeline_options import PipelineOptions class _TestSink(iobase.Sink): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/typehints/typed_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 329d657..8b5e3f4 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -25,8 +25,8 @@ from apache_beam import pvalue from apache_beam import typehints from apache_beam.transforms.util import assert_that, equal_to from apache_beam.typehints import WithTypeHints -from apache_beam.utils.options import OptionsContext -from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.pipeline_options import OptionsContext +from apache_beam.utils.pipeline_options import PipelineOptions # These test often construct a pipeline as value | PTransform to test side # effects (e.g. errors). http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py index addcdb2..11a2e1c 100644 --- a/sdks/python/apache_beam/utils/dependency.py +++ b/sdks/python/apache_beam/utils/dependency.py @@ -66,8 +66,8 @@ from apache_beam import version as beam_version from apache_beam.internal import pickler from apache_beam.utils import names from apache_beam.utils import processes -from apache_beam.utils.options import GoogleCloudOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import GoogleCloudOptions +from apache_beam.utils.pipeline_options import SetupOptions # Standard file names used for staging files. http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/dependency_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py index 3549a07..a484d60 100644 --- a/sdks/python/apache_beam/utils/dependency_test.py +++ b/sdks/python/apache_beam/utils/dependency_test.py @@ -26,9 +26,9 @@ import unittest from apache_beam import utils from apache_beam.utils import dependency from apache_beam.utils import names -from apache_beam.utils.options import GoogleCloudOptions -from apache_beam.utils.options import PipelineOptions -from apache_beam.utils.options import SetupOptions +from apache_beam.utils.pipeline_options import GoogleCloudOptions +from apache_beam.utils.pipeline_options import PipelineOptions +from apache_beam.utils.pipeline_options import SetupOptions class SetupTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py deleted file mode 100644 index 7ca0573..0000000 --- a/sdks/python/apache_beam/utils/options.py +++ /dev/null @@ -1,543 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Pipeline options obtained from command line parsing. - -TODO(silviuc): Should rename this module to pipeline_options. -""" - -import argparse - -from apache_beam.transforms.display import HasDisplayData - - -class PipelineOptions(HasDisplayData): - """Pipeline options class used as container for command line options. - - The class is essentially a wrapper over the standard argparse Python module - (see https://docs.python.org/3/library/argparse.html). To define one option - or a group of options you subclass from PipelineOptions:: - - class XyzOptions(PipelineOptions): - - @classmethod - def _add_argparse_args(cls, parser): - parser.add_argument('--abc', default='start') - parser.add_argument('--xyz', default='end') - - The arguments for the add_argument() method are exactly the ones - described in the argparse public documentation. - - Pipeline objects require an options object during initialization. - This is obtained simply by initializing an options class as defined above:: - - p = Pipeline(options=XyzOptions()) - if p.options.xyz == 'end': - raise ValueError('Option xyz has an invalid value.') - - By default the options classes will use command line arguments to initialize - the options. - """ - - def __init__(self, flags=None, **kwargs): - """Initialize an options class. - - The initializer will traverse all subclasses, add all their argparse - arguments and then parse the command line specified by flags or by default - the one obtained from sys.argv. - - The subclasses are not expected to require a redefinition of __init__. - - Args: - flags: An iterable of command line arguments to be used. If not specified - then sys.argv will be used as input for parsing arguments. - - **kwargs: Add overrides for arguments passed in flags. - """ - self._flags = flags - self._all_options = kwargs - parser = argparse.ArgumentParser() - for cls in type(self).mro(): - if cls == PipelineOptions: - break - elif '_add_argparse_args' in cls.__dict__: - cls._add_argparse_args(parser) - # The _visible_options attribute will contain only those options from the - # flags (i.e., command line) that can be recognized. The _all_options - # field contains additional overrides. - self._visible_options, _ = parser.parse_known_args(flags) - - @classmethod - def _add_argparse_args(cls, parser): - # Override this in subclasses to provide options. - pass - - @classmethod - def from_dictionary(cls, options): - """Returns a PipelineOptions from a dictionary of arguments. - - Args: - options: Dictinary of argument value pairs. - - Returns: - A PipelineOptions object representing the given arguments. - """ - flags = [] - for k, v in options.iteritems(): - if isinstance(v, bool): - if v: - flags.append('--%s' % k) - else: - flags.append('--%s=%s' % (k, v)) - - return cls(flags) - - def get_all_options(self, drop_default=False): - """Returns a dictionary of all defined arguments. - - Returns a dictionary of all defined arguments (arguments that are defined in - any subclass of PipelineOptions) into a dictionary. - - Args: - drop_default: If set to true, options that are equal to their default - values, are not returned as part of the result dictionary. - - Returns: - Dictionary of all args and values. - """ - parser = argparse.ArgumentParser() - for cls in PipelineOptions.__subclasses__(): - cls._add_argparse_args(parser) # pylint: disable=protected-access - known_args, _ = parser.parse_known_args(self._flags) - result = vars(known_args) - - # Apply the overrides if any - for k in result.keys(): - if k in self._all_options: - result[k] = self._all_options[k] - if drop_default and parser.get_default(k) == result[k]: - del result[k] - - return result - - def display_data(self): - return self.get_all_options(True) - - def view_as(self, cls): - view = cls(self._flags) - view._all_options = self._all_options - return view - - def _visible_option_list(self): - return sorted(option - for option in dir(self._visible_options) if option[0] != '_') - - def __dir__(self): - return sorted(dir(type(self)) + self.__dict__.keys() + - self._visible_option_list()) - - def __getattr__(self, name): - # Special methods which may be accessed before the object is - # fully constructed (e.g. in unpickling). - if name[:2] == name[-2:] == '__': - return object.__getattr__(self, name) - elif name in self._visible_option_list(): - return self._all_options.get(name, getattr(self._visible_options, name)) - else: - raise AttributeError("'%s' object has no attribute '%s'" % - (type(self).__name__, name)) - - def __setattr__(self, name, value): - if name in ('_flags', '_all_options', '_visible_options'): - super(PipelineOptions, self).__setattr__(name, value) - elif name in self._visible_option_list(): - self._all_options[name] = value - else: - raise AttributeError("'%s' object has no attribute '%s'" % - (type(self).__name__, name)) - - def __str__(self): - return '%s(%s)' % (type(self).__name__, - ', '.join('%s=%s' % (option, getattr(self, option)) - for option in self._visible_option_list())) - - -class StandardOptions(PipelineOptions): - - DEFAULT_RUNNER = 'DirectRunner' - - @classmethod - def _add_argparse_args(cls, parser): - parser.add_argument( - '--runner', - help=('Pipeline runner used to execute the workflow. Valid values are ' - 'DirectRunner, DataflowRunner, ' - 'and BlockingDataflowRunner.')) - # Whether to enable streaming mode. - parser.add_argument('--streaming', - default=False, - action='store_true', - help='Whether to enable streaming mode.') - - -class TypeOptions(PipelineOptions): - - @classmethod - def _add_argparse_args(cls, parser): - # TODO(laolu): Add a type inferencing option here once implemented. - parser.add_argument('--type_check_strictness', - default='DEFAULT_TO_ANY', - choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'], - help='The level of exhaustive manual type-hint ' - 'annotation required') - parser.add_argument('--no_pipeline_type_check', - dest='pipeline_type_check', - action='store_false', - help='Disable type checking at pipeline construction ' - 'time') - parser.add_argument('--pipeline_type_check', - action='store_true', - help='Enable type checking at pipeline construction ' - 'time') - parser.add_argument('--runtime_type_check', - default=False, - action='store_true', - help='Enable type checking at pipeline execution ' - 'time. NOTE: only supported with the ' - 'DirectRunner') - - -class GoogleCloudOptions(PipelineOptions): - """Google Cloud Dataflow service execution options.""" - - BIGQUERY_API_SERVICE = 'bigquery.googleapis.com' - COMPUTE_API_SERVICE = 'compute.googleapis.com' - STORAGE_API_SERVICE = 'storage.googleapis.com' - DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com' - - @classmethod - def _add_argparse_args(cls, parser): - parser.add_argument( - '--dataflow_endpoint', - default=cls.DATAFLOW_ENDPOINT, - help= - ('The URL for the Dataflow API. If not set, the default public URL ' - 'will be used.')) - # Remote execution must check that this option is not None. - parser.add_argument('--project', - default=None, - help='Name of the Cloud project owning the Dataflow ' - 'job.') - # Remote execution must check that this option is not None. - parser.add_argument('--job_name', - default=None, - help='Name of the Cloud Dataflow job.') - # Remote execution must check that this option is not None. - parser.add_argument('--staging_location', - default=None, - help='GCS path for staging code packages needed by ' - 'workers.') - # Remote execution must check that this option is not None. - # If staging_location is not set, it defaults to temp_location. - parser.add_argument('--temp_location', - default=None, - help='GCS path for saving temporary workflow jobs.') - parser.add_argument('--service_account_name', - default=None, - help='Name of the service account for Google APIs.') - parser.add_argument('--service_account_key_file', - default=None, - help='Path to a file containing the P12 service ' - 'credentials.') - parser.add_argument('--service_account_email', - default=None, - help='Identity to run virtual machines as.') - parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False) - # Option to run templated pipelines - parser.add_argument('--template_location', - default=None, - help='Save job to specified local or GCS location.') - - def validate(self, validator): - errors = [] - if validator.is_service_runner(): - errors.extend(validator.validate_cloud_options(self)) - errors.extend(validator.validate_gcs_path(self, 'temp_location')) - if getattr(self, 'staging_location', - None) or getattr(self, 'temp_location', None) is None: - errors.extend(validator.validate_gcs_path(self, 'staging_location')) - - if self.view_as(DebugOptions).dataflow_job_file: - if self.view_as(GoogleCloudOptions).template_location: - errors.append('--dataflow_job_file and --template_location ' - 'are mutually exclusive.') - - return errors - - -# Command line options controlling the worker pool configuration. -# TODO(silviuc): Update description when autoscaling options are in. -class WorkerOptions(PipelineOptions): - """Worker pool configuration options.""" - - @classmethod - def _add_argparse_args(cls, parser): - parser.add_argument( - '--num_workers', - type=int, - default=None, - help= - ('Number of workers to use when executing the Dataflow job. If not ' - 'set, the Dataflow service will use a reasonable default.')) - parser.add_argument( - '--max_num_workers', - type=int, - default=None, - help= - ('Maximum number of workers to use when executing the Dataflow job.')) - parser.add_argument( - '--autoscaling_algorithm', - type=str, - choices=['NONE', 'THROUGHPUT_BASED'], - default=None, # Meaning unset, distinct from 'NONE' meaning don't scale - help= - ('If and how to auotscale the workerpool.')) - parser.add_argument( - '--worker_machine_type', - dest='machine_type', - default=None, - help=('Machine type to create Dataflow worker VMs as. See ' - 'https://cloud.google.com/compute/docs/machine-types ' - 'for a list of valid options. If not set, ' - 'the Dataflow service will choose a reasonable ' - 'default.')) - parser.add_argument( - '--disk_size_gb', - type=int, - default=None, - help= - ('Remote worker disk size, in gigabytes, or 0 to use the default size. ' - 'If not set, the Dataflow service will use a reasonable default.')) - parser.add_argument( - '--worker_disk_type', - dest='disk_type', - default=None, - help=('Specifies what type of persistent disk should be used.')) - parser.add_argument( - '--zone', - default=None, - help=( - 'GCE availability zone for launching workers. Default is up to the ' - 'Dataflow service.')) - parser.add_argument( - '--network', - default=None, - help=( - 'GCE network for launching workers. Default is up to the Dataflow ' - 'service.')) - parser.add_argument( - '--worker_harness_container_image', - default=None, - help=('Docker registry location of container image to use for the ' - 'worker harness. Default is the container for the version of the ' - 'SDK. Note: currently, only approved Google Cloud Dataflow ' - 'container images may be used here.')) - parser.add_argument( - '--teardown_policy', - choices=['TEARDOWN_ALWAYS', 'TEARDOWN_NEVER', 'TEARDOWN_ON_SUCCESS'], - default=None, - help= - ('The teardown policy for the VMs. By default this is left unset and ' - 'the service sets the default policy.')) - parser.add_argument( - '--use_public_ips', - default=None, - help='Whether to assign public IP addresses to the worker machines.') - - def validate(self, validator): - errors = [] - if validator.is_service_runner(): - errors.extend( - validator.validate_optional_argument_positive(self, 'num_workers')) - return errors - - -class DebugOptions(PipelineOptions): - - @classmethod - def _add_argparse_args(cls, parser): - parser.add_argument('--dataflow_job_file', - default=None, - help='Debug file to write the workflow specification.') - parser.add_argument( - '--experiment', '--experiments', - dest='experiments', - action='append', - default=None, - help= - ('Runners may provide a number of experimental features that can be ' - 'enabled with this flag. Please sync with the owners of the runner ' - 'before enabling any experiments.')) - - -class ProfilingOptions(PipelineOptions): - - @classmethod - def _add_argparse_args(cls, parser): - parser.add_argument('--profile_cpu', - action='store_true', - help='Enable work item CPU profiling.') - parser.add_argument('--profile_memory', - action='store_true', - help='Enable work item heap profiling.') - parser.add_argument('--profile_location', - default=None, - help='GCS path for saving profiler data.') - - -class SetupOptions(PipelineOptions): - - @classmethod - def _add_argparse_args(cls, parser): - # Options for installing dependencies in the worker. - parser.add_argument( - '--requirements_file', - default=None, - help= - ('Path to a requirements file containing package dependencies. ' - 'Typically it is produced by a pip freeze command. More details: ' - 'https://pip.pypa.io/en/latest/reference/pip_freeze.html. ' - 'If used, all the packages specified will be downloaded, ' - 'cached (use --requirements_cache to change default location), ' - 'and then staged so that they can be automatically installed in ' - 'workers during startup. The cache is refreshed as needed ' - 'avoiding extra downloads for existing packages. Typically the ' - 'file is named requirements.txt.')) - parser.add_argument( - '--requirements_cache', - default=None, - help= - ('Path to a folder to cache the packages specified in ' - 'the requirements file using the --requirements_file option.')) - parser.add_argument( - '--setup_file', - default=None, - help= - ('Path to a setup Python file containing package dependencies. If ' - 'specified, the file\'s containing folder is assumed to have the ' - 'structure required for a setuptools setup package. The file must be ' - 'named setup.py. More details: ' - 'https://pythonhosted.org/an_example_pypi_project/setuptools.html ' - 'During job submission a source distribution will be built and the ' - 'worker will install the resulting package before running any custom ' - 'code.')) - parser.add_argument( - '--save_main_session', - default=False, - action='store_true', - help= - ('Save the main session state so that pickled functions and classes ' - 'defined in __main__ (e.g. interactive session) can be unpickled. ' - 'Some workflows do not need the session state if for instance all ' - 'their functions/classes are defined in proper modules (not __main__)' - ' and the modules are importable in the worker. ')) - parser.add_argument( - '--sdk_location', - default='default', - help= - ('Override the default GitHub location from where Dataflow SDK is ' - 'downloaded. It can be an URL, a GCS path, or a local path to an ' - 'SDK tarball. Workflow submissions will download or copy an SDK ' - 'tarball from here. If the string "default", ' - 'a standard SDK location is used. If empty, no SDK is copied.')) - parser.add_argument( - '--extra_package', '--extra_packages', - dest='extra_packages', - action='append', - default=None, - help= - ('Local path to a Python package file. The file is expected to be (1) ' - 'a package tarball (".tar") or (2) a compressed package tarball ' - '(".tar.gz") which can be installed using the "pip install" command ' - 'of the standard pip package. Multiple --extra_package options can ' - 'be specified if more than one package is needed. During job ' - 'submission, the files will be staged in the staging area ' - '(--staging_location option) and the workers will install them in ' - 'same order they were specified on the command line.')) - - -class TestOptions(PipelineOptions): - - @classmethod - def _add_argparse_args(cls, parser): - # Options for e2e test pipeline. - parser.add_argument( - '--on_success_matcher', - default=None, - help=('Verify state/output of e2e test pipeline. This is pickled ' - 'version of the matcher which should extends ' - 'hamcrest.core.base_matcher.BaseMatcher.')) - - def validate(self, validator): - errors = [] - if self.view_as(TestOptions).on_success_matcher: - errors.extend(validator.validate_test_matcher(self, 'on_success_matcher')) - return errors - -# TODO(silviuc): Add --files_to_stage option. -# This could potentially replace the --requirements_file and --setup_file. - -# TODO(silviuc): Non-standard options. Keep them? If yes, add help too! -# Remote execution must check that this option is not None. - - -class OptionsContext(object): - """Set default pipeline options for pipelines created in this block. - - This is particularly useful for pipelines implicitly created with the - - [python list] | PTransform - - construct. - - Can also be used as a decorator. - """ - overrides = [] - - def __init__(self, **options): - self.options = options - - def __enter__(self): - self.overrides.append(self.options) - - def __exit__(self, *exn_info): - self.overrides.pop() - - def __call__(self, f, *args, **kwargs): - - def wrapper(*args, **kwargs): - with self: - f(*args, **kwargs) - - return wrapper - - @classmethod - def augment_options(cls, options): - for override in cls.overrides: - for name, value in override.items(): - setattr(options, name, value) - return options http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/pipeline_options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py new file mode 100644 index 0000000..3e09a3b --- /dev/null +++ b/sdks/python/apache_beam/utils/pipeline_options.py @@ -0,0 +1,540 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Pipeline options obtained from command line parsing.""" + +import argparse + +from apache_beam.transforms.display import HasDisplayData + + +class PipelineOptions(HasDisplayData): + """Pipeline options class used as container for command line options. + + The class is essentially a wrapper over the standard argparse Python module + (see https://docs.python.org/3/library/argparse.html). To define one option + or a group of options you subclass from PipelineOptions:: + + class XyzOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument('--abc', default='start') + parser.add_argument('--xyz', default='end') + + The arguments for the add_argument() method are exactly the ones + described in the argparse public documentation. + + Pipeline objects require an options object during initialization. + This is obtained simply by initializing an options class as defined above:: + + p = Pipeline(options=XyzOptions()) + if p.options.xyz == 'end': + raise ValueError('Option xyz has an invalid value.') + + By default the options classes will use command line arguments to initialize + the options. + """ + + def __init__(self, flags=None, **kwargs): + """Initialize an options class. + + The initializer will traverse all subclasses, add all their argparse + arguments and then parse the command line specified by flags or by default + the one obtained from sys.argv. + + The subclasses are not expected to require a redefinition of __init__. + + Args: + flags: An iterable of command line arguments to be used. If not specified + then sys.argv will be used as input for parsing arguments. + + **kwargs: Add overrides for arguments passed in flags. + """ + self._flags = flags + self._all_options = kwargs + parser = argparse.ArgumentParser() + for cls in type(self).mro(): + if cls == PipelineOptions: + break + elif '_add_argparse_args' in cls.__dict__: + cls._add_argparse_args(parser) + # The _visible_options attribute will contain only those options from the + # flags (i.e., command line) that can be recognized. The _all_options + # field contains additional overrides. + self._visible_options, _ = parser.parse_known_args(flags) + + @classmethod + def _add_argparse_args(cls, parser): + # Override this in subclasses to provide options. + pass + + @classmethod + def from_dictionary(cls, options): + """Returns a PipelineOptions from a dictionary of arguments. + + Args: + options: Dictinary of argument value pairs. + + Returns: + A PipelineOptions object representing the given arguments. + """ + flags = [] + for k, v in options.iteritems(): + if isinstance(v, bool): + if v: + flags.append('--%s' % k) + else: + flags.append('--%s=%s' % (k, v)) + + return cls(flags) + + def get_all_options(self, drop_default=False): + """Returns a dictionary of all defined arguments. + + Returns a dictionary of all defined arguments (arguments that are defined in + any subclass of PipelineOptions) into a dictionary. + + Args: + drop_default: If set to true, options that are equal to their default + values, are not returned as part of the result dictionary. + + Returns: + Dictionary of all args and values. + """ + parser = argparse.ArgumentParser() + for cls in PipelineOptions.__subclasses__(): + cls._add_argparse_args(parser) # pylint: disable=protected-access + known_args, _ = parser.parse_known_args(self._flags) + result = vars(known_args) + + # Apply the overrides if any + for k in result.keys(): + if k in self._all_options: + result[k] = self._all_options[k] + if drop_default and parser.get_default(k) == result[k]: + del result[k] + + return result + + def display_data(self): + return self.get_all_options(True) + + def view_as(self, cls): + view = cls(self._flags) + view._all_options = self._all_options + return view + + def _visible_option_list(self): + return sorted(option + for option in dir(self._visible_options) if option[0] != '_') + + def __dir__(self): + return sorted(dir(type(self)) + self.__dict__.keys() + + self._visible_option_list()) + + def __getattr__(self, name): + # Special methods which may be accessed before the object is + # fully constructed (e.g. in unpickling). + if name[:2] == name[-2:] == '__': + return object.__getattr__(self, name) + elif name in self._visible_option_list(): + return self._all_options.get(name, getattr(self._visible_options, name)) + else: + raise AttributeError("'%s' object has no attribute '%s'" % + (type(self).__name__, name)) + + def __setattr__(self, name, value): + if name in ('_flags', '_all_options', '_visible_options'): + super(PipelineOptions, self).__setattr__(name, value) + elif name in self._visible_option_list(): + self._all_options[name] = value + else: + raise AttributeError("'%s' object has no attribute '%s'" % + (type(self).__name__, name)) + + def __str__(self): + return '%s(%s)' % (type(self).__name__, + ', '.join('%s=%s' % (option, getattr(self, option)) + for option in self._visible_option_list())) + + +class StandardOptions(PipelineOptions): + + DEFAULT_RUNNER = 'DirectRunner' + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--runner', + help=('Pipeline runner used to execute the workflow. Valid values are ' + 'DirectRunner, DataflowRunner, ' + 'and BlockingDataflowRunner.')) + # Whether to enable streaming mode. + parser.add_argument('--streaming', + default=False, + action='store_true', + help='Whether to enable streaming mode.') + + +class TypeOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + # TODO(laolu): Add a type inferencing option here once implemented. + parser.add_argument('--type_check_strictness', + default='DEFAULT_TO_ANY', + choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'], + help='The level of exhaustive manual type-hint ' + 'annotation required') + parser.add_argument('--no_pipeline_type_check', + dest='pipeline_type_check', + action='store_false', + help='Disable type checking at pipeline construction ' + 'time') + parser.add_argument('--pipeline_type_check', + action='store_true', + help='Enable type checking at pipeline construction ' + 'time') + parser.add_argument('--runtime_type_check', + default=False, + action='store_true', + help='Enable type checking at pipeline execution ' + 'time. NOTE: only supported with the ' + 'DirectRunner') + + +class GoogleCloudOptions(PipelineOptions): + """Google Cloud Dataflow service execution options.""" + + BIGQUERY_API_SERVICE = 'bigquery.googleapis.com' + COMPUTE_API_SERVICE = 'compute.googleapis.com' + STORAGE_API_SERVICE = 'storage.googleapis.com' + DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com' + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--dataflow_endpoint', + default=cls.DATAFLOW_ENDPOINT, + help= + ('The URL for the Dataflow API. If not set, the default public URL ' + 'will be used.')) + # Remote execution must check that this option is not None. + parser.add_argument('--project', + default=None, + help='Name of the Cloud project owning the Dataflow ' + 'job.') + # Remote execution must check that this option is not None. + parser.add_argument('--job_name', + default=None, + help='Name of the Cloud Dataflow job.') + # Remote execution must check that this option is not None. + parser.add_argument('--staging_location', + default=None, + help='GCS path for staging code packages needed by ' + 'workers.') + # Remote execution must check that this option is not None. + # If staging_location is not set, it defaults to temp_location. + parser.add_argument('--temp_location', + default=None, + help='GCS path for saving temporary workflow jobs.') + parser.add_argument('--service_account_name', + default=None, + help='Name of the service account for Google APIs.') + parser.add_argument('--service_account_key_file', + default=None, + help='Path to a file containing the P12 service ' + 'credentials.') + parser.add_argument('--service_account_email', + default=None, + help='Identity to run virtual machines as.') + parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False) + # Option to run templated pipelines + parser.add_argument('--template_location', + default=None, + help='Save job to specified local or GCS location.') + + def validate(self, validator): + errors = [] + if validator.is_service_runner(): + errors.extend(validator.validate_cloud_options(self)) + errors.extend(validator.validate_gcs_path(self, 'temp_location')) + if getattr(self, 'staging_location', + None) or getattr(self, 'temp_location', None) is None: + errors.extend(validator.validate_gcs_path(self, 'staging_location')) + + if self.view_as(DebugOptions).dataflow_job_file: + if self.view_as(GoogleCloudOptions).template_location: + errors.append('--dataflow_job_file and --template_location ' + 'are mutually exclusive.') + + return errors + + +# Command line options controlling the worker pool configuration. +# TODO(silviuc): Update description when autoscaling options are in. +class WorkerOptions(PipelineOptions): + """Worker pool configuration options.""" + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--num_workers', + type=int, + default=None, + help= + ('Number of workers to use when executing the Dataflow job. If not ' + 'set, the Dataflow service will use a reasonable default.')) + parser.add_argument( + '--max_num_workers', + type=int, + default=None, + help= + ('Maximum number of workers to use when executing the Dataflow job.')) + parser.add_argument( + '--autoscaling_algorithm', + type=str, + choices=['NONE', 'THROUGHPUT_BASED'], + default=None, # Meaning unset, distinct from 'NONE' meaning don't scale + help= + ('If and how to auotscale the workerpool.')) + parser.add_argument( + '--worker_machine_type', + dest='machine_type', + default=None, + help=('Machine type to create Dataflow worker VMs as. See ' + 'https://cloud.google.com/compute/docs/machine-types ' + 'for a list of valid options. If not set, ' + 'the Dataflow service will choose a reasonable ' + 'default.')) + parser.add_argument( + '--disk_size_gb', + type=int, + default=None, + help= + ('Remote worker disk size, in gigabytes, or 0 to use the default size. ' + 'If not set, the Dataflow service will use a reasonable default.')) + parser.add_argument( + '--worker_disk_type', + dest='disk_type', + default=None, + help=('Specifies what type of persistent disk should be used.')) + parser.add_argument( + '--zone', + default=None, + help=( + 'GCE availability zone for launching workers. Default is up to the ' + 'Dataflow service.')) + parser.add_argument( + '--network', + default=None, + help=( + 'GCE network for launching workers. Default is up to the Dataflow ' + 'service.')) + parser.add_argument( + '--worker_harness_container_image', + default=None, + help=('Docker registry location of container image to use for the ' + 'worker harness. Default is the container for the version of the ' + 'SDK. Note: currently, only approved Google Cloud Dataflow ' + 'container images may be used here.')) + parser.add_argument( + '--teardown_policy', + choices=['TEARDOWN_ALWAYS', 'TEARDOWN_NEVER', 'TEARDOWN_ON_SUCCESS'], + default=None, + help= + ('The teardown policy for the VMs. By default this is left unset and ' + 'the service sets the default policy.')) + parser.add_argument( + '--use_public_ips', + default=None, + help='Whether to assign public IP addresses to the worker machines.') + + def validate(self, validator): + errors = [] + if validator.is_service_runner(): + errors.extend( + validator.validate_optional_argument_positive(self, 'num_workers')) + return errors + + +class DebugOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument('--dataflow_job_file', + default=None, + help='Debug file to write the workflow specification.') + parser.add_argument( + '--experiment', '--experiments', + dest='experiments', + action='append', + default=None, + help= + ('Runners may provide a number of experimental features that can be ' + 'enabled with this flag. Please sync with the owners of the runner ' + 'before enabling any experiments.')) + + +class ProfilingOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument('--profile_cpu', + action='store_true', + help='Enable work item CPU profiling.') + parser.add_argument('--profile_memory', + action='store_true', + help='Enable work item heap profiling.') + parser.add_argument('--profile_location', + default=None, + help='GCS path for saving profiler data.') + + +class SetupOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + # Options for installing dependencies in the worker. + parser.add_argument( + '--requirements_file', + default=None, + help= + ('Path to a requirements file containing package dependencies. ' + 'Typically it is produced by a pip freeze command. More details: ' + 'https://pip.pypa.io/en/latest/reference/pip_freeze.html. ' + 'If used, all the packages specified will be downloaded, ' + 'cached (use --requirements_cache to change default location), ' + 'and then staged so that they can be automatically installed in ' + 'workers during startup. The cache is refreshed as needed ' + 'avoiding extra downloads for existing packages. Typically the ' + 'file is named requirements.txt.')) + parser.add_argument( + '--requirements_cache', + default=None, + help= + ('Path to a folder to cache the packages specified in ' + 'the requirements file using the --requirements_file option.')) + parser.add_argument( + '--setup_file', + default=None, + help= + ('Path to a setup Python file containing package dependencies. If ' + 'specified, the file\'s containing folder is assumed to have the ' + 'structure required for a setuptools setup package. The file must be ' + 'named setup.py. More details: ' + 'https://pythonhosted.org/an_example_pypi_project/setuptools.html ' + 'During job submission a source distribution will be built and the ' + 'worker will install the resulting package before running any custom ' + 'code.')) + parser.add_argument( + '--save_main_session', + default=False, + action='store_true', + help= + ('Save the main session state so that pickled functions and classes ' + 'defined in __main__ (e.g. interactive session) can be unpickled. ' + 'Some workflows do not need the session state if for instance all ' + 'their functions/classes are defined in proper modules (not __main__)' + ' and the modules are importable in the worker. ')) + parser.add_argument( + '--sdk_location', + default='default', + help= + ('Override the default GitHub location from where Dataflow SDK is ' + 'downloaded. It can be an URL, a GCS path, or a local path to an ' + 'SDK tarball. Workflow submissions will download or copy an SDK ' + 'tarball from here. If the string "default", ' + 'a standard SDK location is used. If empty, no SDK is copied.')) + parser.add_argument( + '--extra_package', '--extra_packages', + dest='extra_packages', + action='append', + default=None, + help= + ('Local path to a Python package file. The file is expected to be (1) ' + 'a package tarball (".tar") or (2) a compressed package tarball ' + '(".tar.gz") which can be installed using the "pip install" command ' + 'of the standard pip package. Multiple --extra_package options can ' + 'be specified if more than one package is needed. During job ' + 'submission, the files will be staged in the staging area ' + '(--staging_location option) and the workers will install them in ' + 'same order they were specified on the command line.')) + + +class TestOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + # Options for e2e test pipeline. + parser.add_argument( + '--on_success_matcher', + default=None, + help=('Verify state/output of e2e test pipeline. This is pickled ' + 'version of the matcher which should extends ' + 'hamcrest.core.base_matcher.BaseMatcher.')) + + def validate(self, validator): + errors = [] + if self.view_as(TestOptions).on_success_matcher: + errors.extend(validator.validate_test_matcher(self, 'on_success_matcher')) + return errors + +# TODO(silviuc): Add --files_to_stage option. +# This could potentially replace the --requirements_file and --setup_file. + +# TODO(silviuc): Non-standard options. Keep them? If yes, add help too! +# Remote execution must check that this option is not None. + + +class OptionsContext(object): + """Set default pipeline options for pipelines created in this block. + + This is particularly useful for pipelines implicitly created with the + + [python list] | PTransform + + construct. + + Can also be used as a decorator. + """ + overrides = [] + + def __init__(self, **options): + self.options = options + + def __enter__(self): + self.overrides.append(self.options) + + def __exit__(self, *exn_info): + self.overrides.pop() + + def __call__(self, f, *args, **kwargs): + + def wrapper(*args, **kwargs): + with self: + f(*args, **kwargs) + + return wrapper + + @classmethod + def augment_options(cls, options): + for override in cls.overrides: + for name, value in override.items(): + setattr(options, name, value) + return options http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/pipeline_options_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py index 75bd6a2..054b6a5 100644 --- a/sdks/python/apache_beam/utils/pipeline_options_test.py +++ b/sdks/python/apache_beam/utils/pipeline_options_test.py @@ -23,7 +23,7 @@ import unittest import hamcrest as hc from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher -from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.pipeline_options import PipelineOptions class PipelineOptionsTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/pipeline_options_validator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py b/sdks/python/apache_beam/utils/pipeline_options_validator.py index c1243ce..85fdc4d 100644 --- a/sdks/python/apache_beam/utils/pipeline_options_validator.py +++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py @@ -20,13 +20,13 @@ import re from apache_beam.internal import pickler -from apache_beam.utils.options import DebugOptions -from apache_beam.utils.options import GoogleCloudOptions -from apache_beam.utils.options import SetupOptions -from apache_beam.utils.options import StandardOptions -from apache_beam.utils.options import TestOptions -from apache_beam.utils.options import TypeOptions -from apache_beam.utils.options import WorkerOptions +from apache_beam.utils.pipeline_options import DebugOptions +from apache_beam.utils.pipeline_options import GoogleCloudOptions +from apache_beam.utils.pipeline_options import SetupOptions +from apache_beam.utils.pipeline_options import StandardOptions +from apache_beam.utils.pipeline_options import TestOptions +from apache_beam.utils.pipeline_options import TypeOptions +from apache_beam.utils.pipeline_options import WorkerOptions class PipelineOptionsValidator(object):