beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [3/4] beam git commit: [BEAM-2206] Move pipelineOptions into options modules
Date Mon, 08 May 2017 19:56:53 GMT
[BEAM-2206] Move pipelineOptions into options modules


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50a4c56d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50a4c56d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50a4c56d

Branch: refs/heads/master
Commit: 50a4c56de1d3aa0e51e8b2b08b71715be4c2ef80
Parents: 63c6bea
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Sun May 7 13:55:20 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Mon May 8 12:56:10 2017 -0700

----------------------------------------------------------------------
 .../examples/complete/autocomplete.py           |   4 +-
 .../examples/complete/estimate_pi.py            |   4 +-
 .../examples/complete/game/hourly_team_score.py |   6 +-
 .../examples/complete/game/user_score.py        |   4 +-
 .../apache_beam/examples/complete/tfidf.py      |   4 +-
 .../examples/complete/top_wikipedia_sessions.py |   4 +-
 .../examples/cookbook/bigquery_side_input.py    |   4 +-
 .../apache_beam/examples/cookbook/coders.py     |   4 +-
 .../examples/cookbook/custom_ptransform.py      |   2 +-
 .../examples/cookbook/datastore_wordcount.py    |   6 +-
 .../examples/cookbook/group_with_coder.py       |   4 +-
 .../examples/cookbook/mergecontacts.py          |   4 +-
 .../examples/cookbook/multiple_output_pardo.py  |   4 +-
 .../apache_beam/examples/snippets/snippets.py   |  34 +-
 .../examples/snippets/snippets_test.py          |   2 +-
 sdks/python/apache_beam/examples/wordcount.py   |   4 +-
 .../apache_beam/examples/wordcount_debugging.py |   4 +-
 .../apache_beam/examples/wordcount_minimal.py   |   4 +-
 .../apache_beam/internal/gcp/json_value.py      |   2 +-
 .../apache_beam/internal/gcp/json_value_test.py |   4 +-
 sdks/python/apache_beam/io/filebasedsource.py   |   6 +-
 .../apache_beam/io/filebasedsource_test.py      |   4 +-
 sdks/python/apache_beam/io/fileio.py            |   6 +-
 sdks/python/apache_beam/io/fileio_test.py       |   2 +-
 sdks/python/apache_beam/io/gcp/bigquery.py      |   2 +-
 sdks/python/apache_beam/io/gcp/bigquery_test.py |   2 +-
 sdks/python/apache_beam/options/__init__.py     |  16 +
 .../apache_beam/options/pipeline_options.py     | 627 +++++++++++++++++++
 .../options/pipeline_options_test.py            | 240 +++++++
 .../options/pipeline_options_validator.py       | 199 ++++++
 .../options/pipeline_options_validator_test.py  | 343 ++++++++++
 .../apache_beam/options/value_provider.py       | 103 +++
 .../apache_beam/options/value_provider_test.py  | 145 +++++
 sdks/python/apache_beam/pipeline.py             |  10 +-
 .../runners/dataflow/dataflow_runner.py         |   2 +-
 .../runners/dataflow/dataflow_runner_test.py    |   2 +-
 .../runners/dataflow/internal/apiclient.py      |   8 +-
 .../runners/dataflow/internal/apiclient_test.py |   2 +-
 .../runners/dataflow/internal/dependency.py     |   4 +-
 .../dataflow/internal/dependency_test.py        |   6 +-
 .../runners/dataflow/template_runner_test.py    |   2 +-
 .../runners/dataflow/test_dataflow_runner.py    |   2 +-
 .../apache_beam/runners/direct/direct_runner.py |   4 +-
 .../runners/direct/transform_evaluator.py       |   2 +-
 .../portability/maptask_executor_runner.py      |   2 +-
 sdks/python/apache_beam/runners/runner_test.py  |   2 +-
 sdks/python/apache_beam/test_pipeline.py        |   2 +-
 sdks/python/apache_beam/test_pipeline_test.py   |   2 +-
 sdks/python/apache_beam/transforms/core.py      |   2 +-
 sdks/python/apache_beam/transforms/display.py   |   2 +-
 .../apache_beam/transforms/display_test.py      |   2 +-
 .../python/apache_beam/transforms/ptransform.py |   2 +-
 .../apache_beam/transforms/ptransform_test.py   |   2 +-
 .../typehints/typed_pipeline_test.py            |   2 +-
 .../apache_beam/utils/pipeline_options.py       | 612 +-----------------
 .../apache_beam/utils/pipeline_options_test.py  | 240 -------
 .../utils/pipeline_options_validator.py         | 199 ------
 .../utils/pipeline_options_validator_test.py    | 343 ----------
 sdks/python/apache_beam/utils/value_provider.py |  88 +--
 .../apache_beam/utils/value_provider_test.py    | 145 -----
 60 files changed, 1776 insertions(+), 1718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index f954ec1..f0acc3f 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -26,8 +26,8 @@ import re
 import apache_beam as beam
 from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 def run(argv=None):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 3e57f71..c709713 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -37,8 +37,8 @@ from apache_beam.io import WriteToText
 from apache_beam.typehints import Any
 from apache_beam.typehints import Iterable
 from apache_beam.typehints import Tuple
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 @beam.typehints.with_output_types(Tuple[int, int, int])

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
index 1d1cee8..e9d7188 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
@@ -59,9 +59,9 @@ from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.typehints import with_input_types
 from apache_beam.typehints import with_output_types
-from apache_beam.utils.pipeline_options import GoogleCloudOptions
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 class ParseEventFn(beam.DoFn):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/complete/game/user_score.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py
index 02d4775..389d2c6 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score.py
@@ -49,8 +49,8 @@ from apache_beam.io import ReadFromText
 from apache_beam.metrics import Metrics
 from apache_beam.typehints import with_input_types
 from apache_beam.typehints import with_output_types
-from apache_beam.utils.pipeline_options import GoogleCloudOptions
-from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions
 
 
 class ParseEventFn(beam.DoFn):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 367e275..a98d906 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -32,8 +32,8 @@ import apache_beam as beam
 from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
 from apache_beam.pvalue import AsSingleton
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 def read_documents(pipeline, uris):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index c24eb75..aa48e4e 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -52,8 +52,8 @@ from apache_beam.io import WriteToText
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import Sessions
 from apache_beam.transforms.window import TimestampedValue
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 ONE_HOUR_IN_SECONDS = 3600

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index f68c95d..6b28818 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -36,8 +36,8 @@ import apache_beam as beam
 from apache_beam.io import WriteToText
 from apache_beam.pvalue import AsList
 from apache_beam.pvalue import AsSingleton
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 def create_groups(group_ids, corpus, word, ignore_corpus, ignore_word):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index ede9d70..aeeb3c9 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -37,8 +37,8 @@ import logging
 import apache_beam as beam
 from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 class JsonCoder(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index 67d1ff8..609f2cd 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -28,7 +28,7 @@ import logging
 import apache_beam as beam
 from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
-from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import PipelineOptions
 
 
 # pylint doesn't understand our pipeline syntax:

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 1f88398..411feb8 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -76,9 +76,9 @@ from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
 from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
 from apache_beam.metrics import Metrics
 from apache_beam.metrics.metric import MetricsFilter
-from apache_beam.utils.pipeline_options import GoogleCloudOptions
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 class WordExtractingDoFn(beam.DoFn):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index cb675bd..6bdadae 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -37,8 +37,8 @@ from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
 from apache_beam.typehints import typehints
 from apache_beam.typehints.decorators import with_output_types
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 class Player(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index c880a9a..5aaba10 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -38,8 +38,8 @@ import re
 import apache_beam as beam
 from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 def run(argv=None, assert_results=None):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index 9c82df4..9759f48 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -59,8 +59,8 @@ import apache_beam as beam
 from apache_beam import pvalue
 from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 class SplitLinesToWordsFn(beam.DoFn):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 1b750b4..26af71d 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -91,7 +91,7 @@ def construct_pipeline(renames):
 
   # [START pipelines_constructing_creating]
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import PipelineOptions
 
   p = beam.Pipeline(options=PipelineOptions())
   # [END pipelines_constructing_creating]
@@ -126,7 +126,7 @@ def model_pipelines(argv):
   import re
 
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import PipelineOptions
 
   class MyOptions(PipelineOptions):
 
@@ -161,7 +161,7 @@ def model_pipelines(argv):
 
 def model_pcollection(argv):
   """Creating a PCollection from data in local memory."""
-  from apache_beam.utils.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import PipelineOptions
 
   class MyOptions(PipelineOptions):
 
@@ -197,7 +197,7 @@ def pipeline_options_remote(argv):
   """Creating a Pipeline using a PipelineOptions object for remote execution."""
 
   from apache_beam import Pipeline
-  from apache_beam.utils.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import PipelineOptions
 
   # [START pipeline_options_create]
   options = PipelineOptions(flags=argv)
@@ -212,8 +212,8 @@ def pipeline_options_remote(argv):
       parser.add_argument('--output')
   # [END pipeline_options_define_custom]
 
-  from apache_beam.utils.pipeline_options import GoogleCloudOptions
-  from apache_beam.utils.pipeline_options import StandardOptions
+  from apache_beam.options.pipeline_options import GoogleCloudOptions
+  from apache_beam.options.pipeline_options import StandardOptions
 
   # [START pipeline_options_dataflow_service]
   # Create and set your PipelineOptions.
@@ -248,7 +248,7 @@ def pipeline_options_local(argv):
   """Creating a Pipeline using a PipelineOptions object for local execution."""
 
   from apache_beam import Pipeline
-  from apache_beam.utils.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import PipelineOptions
 
   options = PipelineOptions(flags=argv)
 
@@ -341,7 +341,7 @@ def pipeline_monitoring(renames):
 
   import re
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import PipelineOptions
 
   class WordCountOptions(PipelineOptions):
 
@@ -405,9 +405,9 @@ def examples_wordcount_minimal(renames):
 
   import apache_beam as beam
 
-  from apache_beam.utils.pipeline_options import GoogleCloudOptions
-  from apache_beam.utils.pipeline_options import StandardOptions
-  from apache_beam.utils.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import GoogleCloudOptions
+  from apache_beam.options.pipeline_options import StandardOptions
+  from apache_beam.options.pipeline_options import PipelineOptions
 
   # [START examples_wordcount_minimal_options]
   options = PipelineOptions()
@@ -462,7 +462,7 @@ def examples_wordcount_wordcount(renames):
   import re
 
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import PipelineOptions
 
   argv = []
 
@@ -582,7 +582,7 @@ import apache_beam as beam
 from apache_beam.io import iobase
 from apache_beam.io.range_trackers import OffsetRangeTracker
 from apache_beam.transforms.core import PTransform
-from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import PipelineOptions
 
 
 # Defining a new source.
@@ -747,7 +747,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
   import apache_beam as beam
   from apache_beam.io import iobase
   from apache_beam.transforms.core import PTransform
-  from apache_beam.utils.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import PipelineOptions
 
   # Defining the new sink.
   # [START model_custom_sink_new_sink]
@@ -841,7 +841,7 @@ def model_textio(renames):
     return re.findall(r'[A-Za-z\']+', x)
 
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import PipelineOptions
 
   # [START model_textio_read]
   p = beam.Pipeline(options=PipelineOptions())
@@ -885,7 +885,7 @@ def model_datastoreio():
   from google.cloud.proto.datastore.v1 import query_pb2
   import googledatastore
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import PipelineOptions
   from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
   from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
 
@@ -918,7 +918,7 @@ def model_datastoreio():
 def model_bigqueryio():
   """Using a Read and Write transform to read/write to BigQuery."""
   import apache_beam as beam
-  from apache_beam.utils.pipeline_options import PipelineOptions
+  from apache_beam.options.pipeline_options import PipelineOptions
 
   # [START model_bigqueryio_read]
   p = beam.Pipeline(options=PipelineOptions())

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 370b436..0148096 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -31,7 +31,7 @@ from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
-from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.examples.snippets import snippets
 
 # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index adcc4db..8c6a485 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -27,8 +27,8 @@ from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
 from apache_beam.metrics import Metrics
 from apache_beam.metrics.metric import MetricsFilter
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 class WordExtractingDoFn(beam.DoFn):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index 9779b82..98acde4 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -49,8 +49,8 @@ import apache_beam as beam
 from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
 from apache_beam.metrics import Metrics
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 class FilterTextFn(beam.DoFn):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index b80ed84..5109c08 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -53,8 +53,8 @@ import re
 import apache_beam as beam
 from apache_beam.io import ReadFromText
 from apache_beam.io import WriteToText
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
 
 
 def run(argv=None):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/internal/gcp/json_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py b/sdks/python/apache_beam/internal/gcp/json_value.py
index 4099c1a..523db9c 100644
--- a/sdks/python/apache_beam/internal/gcp/json_value.py
+++ b/sdks/python/apache_beam/internal/gcp/json_value.py
@@ -25,7 +25,7 @@ except ImportError:
   extra_types = None
 # pylint: enable=wrong-import-order, wrong-import-position
 
-from apache_beam.utils.value_provider import ValueProvider
+from apache_beam.options.value_provider import ValueProvider
 
 
 _MAXINT64 = (1 << 63) - 1

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/internal/gcp/json_value_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/json_value_test.py b/sdks/python/apache_beam/internal/gcp/json_value_test.py
index a61c50b..b1fd63f 100644
--- a/sdks/python/apache_beam/internal/gcp/json_value_test.py
+++ b/sdks/python/apache_beam/internal/gcp/json_value_test.py
@@ -21,8 +21,8 @@ import unittest
 
 from apache_beam.internal.gcp.json_value import from_json_value
 from apache_beam.internal.gcp.json_value import to_json_value
-from apache_beam.utils.value_provider import StaticValueProvider
-from apache_beam.utils.value_provider import RuntimeValueProvider
+from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import RuntimeValueProvider
 
 
 # Protect against environments where apitools library is not available.

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index e25f92e..215e015 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -32,9 +32,9 @@ from apache_beam.io import range_trackers
 from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.utils.value_provider import ValueProvider
-from apache_beam.utils.value_provider import StaticValueProvider
-from apache_beam.utils.value_provider import check_accessible
+from apache_beam.options.value_provider import ValueProvider
+from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import check_accessible
 
 MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
 

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index f6cdf26..4ff23fc 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -43,8 +43,8 @@ from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
-from apache_beam.utils.value_provider import StaticValueProvider
-from apache_beam.utils.value_provider import RuntimeValueProvider
+from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import RuntimeValueProvider
 
 
 class LineSource(FileBasedSource):

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index bfed96e..ca3a759 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -31,9 +31,9 @@ from apache_beam.io.filesystem import BeamIOError
 from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.utils.value_provider import ValueProvider
-from apache_beam.utils.value_provider import StaticValueProvider
-from apache_beam.utils.value_provider import check_accessible
+from apache_beam.options.value_provider import ValueProvider
+from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import check_accessible
 
 DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
 

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 3f7211e..e0e9774 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -35,7 +35,7 @@ from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
-from apache_beam.utils.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import StaticValueProvider
 
 
 # TODO: Refactor code so all io tests are using same library

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 4e8d61b..1ed28a8 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -117,7 +117,7 @@ from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils import retry
-from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.io.gcp.internal.clients import bigquery
 
 # Protect against environments where bigquery library is not available.

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/io/gcp/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 2b83079..a26050c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -34,7 +34,7 @@ from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import PipelineOptions
 
 # Protect against environments where bigquery library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/options/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/__init__.py b/sdks/python/apache_beam/options/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/options/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
new file mode 100644
index 0000000..b79d85d
--- /dev/null
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -0,0 +1,627 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline options obtained from command line parsing."""
+
+import argparse
+
+from apache_beam.transforms.display import HasDisplayData
+from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import RuntimeValueProvider
+from apache_beam.options.value_provider import ValueProvider
+
+
+def _static_value_provider_of(value_type):
+  """"Helper function to plug a ValueProvider into argparse.
+
+  Args:
+    value_type: the type of the value. Since the type param of argparse's
+                add_argument will always be ValueProvider, we need to
+                preserve the type of the actual value.
+  Returns:
+    A partially constructed StaticValueProvider in the form of a function.
+
+  """
+  def _f(value):
+    _f.func_name = value_type.__name__
+    return StaticValueProvider(value_type, value)
+  return _f
+
+
+class BeamArgumentParser(argparse.ArgumentParser):
+  """An ArgumentParser that supports ValueProvider options.
+
+  Example Usage::
+
+    class TemplateUserOptions(PipelineOptions):
+      @classmethod
+
+      def _add_argparse_args(cls, parser):
+        parser.add_value_provider_argument('--vp-arg1', default='start')
+        parser.add_value_provider_argument('--vp-arg2')
+        parser.add_argument('--non-vp-arg')
+
+  """
+  def add_value_provider_argument(self, *args, **kwargs):
+    """ValueProvider arguments can be either of type keyword or positional.
+    At runtime, even positional arguments will need to be supplied in the
+    key/value form.
+    """
+    # Extract the option name from positional argument ['pos_arg']
+    assert args != () and len(args[0]) >= 1
+    if args[0][0] != '-':
+      option_name = args[0]
+      if kwargs.get('nargs') is None:  # make them optionally templated
+        kwargs['nargs'] = '?'
+    else:
+      # or keyword arguments like [--kw_arg, -k, -w] or [--kw-arg]
+      option_name = [i.replace('--', '') for i in args if i[:2] == '--'][0]
+
+    # reassign the type to make room for using
+    # StaticValueProvider as the type for add_argument
+    value_type = kwargs.get('type') or str
+    kwargs['type'] = _static_value_provider_of(value_type)
+
+    # reassign default to default_value to make room for using
+    # RuntimeValueProvider as the default for add_argument
+    default_value = kwargs.get('default')
+    kwargs['default'] = RuntimeValueProvider(
+        option_name=option_name,
+        value_type=value_type,
+        default_value=default_value
+    )
+
+    # have add_argument do most of the work
+    self.add_argument(*args, **kwargs)
+
+
+class PipelineOptions(HasDisplayData):
+  """Pipeline options class used as container for command line options.
+
+  The class is essentially a wrapper over the standard argparse Python module
+  (see https://docs.python.org/3/library/argparse.html).  To define one option
+  or a group of options you subclass from PipelineOptions::
+
+    class XyzOptions(PipelineOptions):
+
+      @classmethod
+      def _add_argparse_args(cls, parser):
+        parser.add_argument('--abc', default='start')
+        parser.add_argument('--xyz', default='end')
+
+  The arguments for the add_argument() method are exactly the ones
+  described in the argparse public documentation.
+
+  Pipeline objects require an options object during initialization.
+  This is obtained simply by initializing an options class as defined above::
+
+    p = Pipeline(options=XyzOptions())
+    if p.options.xyz == 'end':
+      raise ValueError('Option xyz has an invalid value.')
+
+  By default the options classes will use command line arguments to initialize
+  the options.
+  """
+  def __init__(self, flags=None, **kwargs):
+    """Initialize an options class.
+
+    The initializer will traverse all subclasses, add all their argparse
+    arguments and then parse the command line specified by flags or by default
+    the one obtained from sys.argv.
+
+    The subclasses are not expected to require a redefinition of __init__.
+
+    Args:
+      flags: An iterable of command line arguments to be used. If not specified
+        then sys.argv will be used as input for parsing arguments.
+
+      **kwargs: Add overrides for arguments passed in flags.
+    """
+    self._flags = flags
+    self._all_options = kwargs
+    parser = BeamArgumentParser()
+
+    for cls in type(self).mro():
+      if cls == PipelineOptions:
+        break
+      elif '_add_argparse_args' in cls.__dict__:
+        cls._add_argparse_args(parser)
+    # The _visible_options attribute will contain only those options from the
+    # flags (i.e., command line) that can be recognized. The _all_options
+    # field contains additional overrides.
+    self._visible_options, _ = parser.parse_known_args(flags)
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    # Override this in subclasses to provide options.
+    pass
+
+  @classmethod
+  def from_dictionary(cls, options):
+    """Returns a PipelineOptions from a dictionary of arguments.
+
+    Args:
+      options: Dictinary of argument value pairs.
+
+    Returns:
+      A PipelineOptions object representing the given arguments.
+    """
+    flags = []
+    for k, v in options.iteritems():
+      if isinstance(v, bool):
+        if v:
+          flags.append('--%s' % k)
+      else:
+        flags.append('--%s=%s' % (k, v))
+
+    return cls(flags)
+
+  def get_all_options(self, drop_default=False):
+    """Returns a dictionary of all defined arguments.
+
+    Returns a dictionary of all defined arguments (arguments that are defined in
+    any subclass of PipelineOptions) into a dictionary.
+
+    Args:
+      drop_default: If set to true, options that are equal to their default
+        values, are not returned as part of the result dictionary.
+
+    Returns:
+      Dictionary of all args and values.
+    """
+
+    # TODO(BEAM-1319): PipelineOption sub-classes in the main session might be
+    # repeated. Pick last unique instance of each subclass to avoid conflicts.
+    subset = {}
+    parser = BeamArgumentParser()
+    for cls in PipelineOptions.__subclasses__():
+      subset[str(cls)] = cls
+    for cls in subset.values():
+      cls._add_argparse_args(parser)  # pylint: disable=protected-access
+    known_args, _ = parser.parse_known_args(self._flags)
+    result = vars(known_args)
+
+    # Apply the overrides if any
+    for k in result.keys():
+      if k in self._all_options:
+        result[k] = self._all_options[k]
+      if (drop_default and
+          parser.get_default(k) == result[k] and
+          not isinstance(parser.get_default(k), ValueProvider)):
+        del result[k]
+
+    return result
+
+  def display_data(self):
+    return self.get_all_options(True)
+
+  def view_as(self, cls):
+    view = cls(self._flags)
+    view._all_options = self._all_options
+    return view
+
+  def _visible_option_list(self):
+    return sorted(option
+                  for option in dir(self._visible_options) if option[0] != '_')
+
+  def __dir__(self):
+    return sorted(dir(type(self)) + self.__dict__.keys() +
+                  self._visible_option_list())
+
+  def __getattr__(self, name):
+    # Special methods which may be accessed before the object is
+    # fully constructed (e.g. in unpickling).
+    if name[:2] == name[-2:] == '__':
+      return object.__getattribute__(self, name)
+    elif name in self._visible_option_list():
+      return self._all_options.get(name, getattr(self._visible_options, name))
+    else:
+      raise AttributeError("'%s' object has no attribute '%s'" %
+                           (type(self).__name__, name))
+
+  def __setattr__(self, name, value):
+    if name in ('_flags', '_all_options', '_visible_options'):
+      super(PipelineOptions, self).__setattr__(name, value)
+    elif name in self._visible_option_list():
+      self._all_options[name] = value
+    else:
+      raise AttributeError("'%s' object has no attribute '%s'" %
+                           (type(self).__name__, name))
+
+  def __str__(self):
+    return '%s(%s)' % (type(self).__name__,
+                       ', '.join('%s=%s' % (option, getattr(self, option))
+                                 for option in self._visible_option_list()))
+
+
+class StandardOptions(PipelineOptions):
+
+  DEFAULT_RUNNER = 'DirectRunner'
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--runner',
+        help=('Pipeline runner used to execute the workflow. Valid values are '
+              'DirectRunner, DataflowRunner.'))
+    # Whether to enable streaming mode.
+    parser.add_argument('--streaming',
+                        default=False,
+                        action='store_true',
+                        help='Whether to enable streaming mode.')
+
+  # TODO(BEAM-1265): Remove this error, once at least one runner supports
+  # streaming pipelines.
+  def validate(self, validator):
+    errors = []
+    if self.view_as(StandardOptions).streaming:
+      errors.append('Streaming pipelines are not supported.')
+    return errors
+
+
+class TypeOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    # TODO(laolu): Add a type inferencing option here once implemented.
+    parser.add_argument('--type_check_strictness',
+                        default='DEFAULT_TO_ANY',
+                        choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'],
+                        help='The level of exhaustive manual type-hint '
+                        'annotation required')
+    parser.add_argument('--no_pipeline_type_check',
+                        dest='pipeline_type_check',
+                        action='store_false',
+                        help='Disable type checking at pipeline construction '
+                        'time')
+    parser.add_argument('--runtime_type_check',
+                        default=False,
+                        action='store_true',
+                        help='Enable type checking at pipeline execution '
+                        'time. NOTE: only supported with the '
+                        'DirectRunner')
+
+
+class DirectOptions(PipelineOptions):
+  """DirectRunner-specific execution options."""
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--no_direct_runner_use_stacked_bundle',
+        action='store_false',
+        dest='direct_runner_use_stacked_bundle',
+        help='DirectRunner uses stacked WindowedValues within a Bundle for '
+        'memory optimization. Set --no_direct_runner_use_stacked_bundle to '
+        'avoid it.')
+
+
+class GoogleCloudOptions(PipelineOptions):
+  """Google Cloud Dataflow service execution options."""
+
+  BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
+  COMPUTE_API_SERVICE = 'compute.googleapis.com'
+  STORAGE_API_SERVICE = 'storage.googleapis.com'
+  DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com'
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--dataflow_endpoint',
+        default=cls.DATAFLOW_ENDPOINT,
+        help=
+        ('The URL for the Dataflow API. If not set, the default public URL '
+         'will be used.'))
+    # Remote execution must check that this option is not None.
+    parser.add_argument('--project',
+                        default=None,
+                        help='Name of the Cloud project owning the Dataflow '
+                        'job.')
+    # Remote execution must check that this option is not None.
+    parser.add_argument('--job_name',
+                        default=None,
+                        help='Name of the Cloud Dataflow job.')
+    # Remote execution must check that this option is not None.
+    parser.add_argument('--staging_location',
+                        default=None,
+                        help='GCS path for staging code packages needed by '
+                        'workers.')
+    # Remote execution must check that this option is not None.
+    # If staging_location is not set, it defaults to temp_location.
+    parser.add_argument('--temp_location',
+                        default=None,
+                        help='GCS path for saving temporary workflow jobs.')
+    # The Cloud Dataflow service does not yet honor this setting. However, once
+    # service support is added then users of this SDK will be able to control
+    # the region. Default is up to the Dataflow service. See
+    # https://cloud.google.com/compute/docs/regions-zones/regions-zones for a
+    # list of valid options/
+    parser.add_argument('--region',
+                        default='us-central1',
+                        help='The Google Compute Engine region for creating '
+                        'Dataflow job.')
+    parser.add_argument('--service_account_email',
+                        default=None,
+                        help='Identity to run virtual machines as.')
+    parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False)
+    # Option to run templated pipelines
+    parser.add_argument('--template_location',
+                        default=None,
+                        help='Save job to specified local or GCS location.')
+
+  def validate(self, validator):
+    errors = []
+    if validator.is_service_runner():
+      errors.extend(validator.validate_cloud_options(self))
+      errors.extend(validator.validate_gcs_path(self, 'temp_location'))
+      if getattr(self, 'staging_location',
+                 None) or getattr(self, 'temp_location', None) is None:
+        errors.extend(validator.validate_gcs_path(self, 'staging_location'))
+
+    if self.view_as(DebugOptions).dataflow_job_file:
+      if self.view_as(GoogleCloudOptions).template_location:
+        errors.append('--dataflow_job_file and --template_location '
+                      'are mutually exclusive.')
+
+    return errors
+
+
+# Command line options controlling the worker pool configuration.
+# TODO(silviuc): Update description when autoscaling options are in.
+class WorkerOptions(PipelineOptions):
+  """Worker pool configuration options."""
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--num_workers',
+        type=int,
+        default=None,
+        help=
+        ('Number of workers to use when executing the Dataflow job. If not '
+         'set, the Dataflow service will use a reasonable default.'))
+    parser.add_argument(
+        '--max_num_workers',
+        type=int,
+        default=None,
+        help=
+        ('Maximum number of workers to use when executing the Dataflow job.'))
+    parser.add_argument(
+        '--autoscaling_algorithm',
+        type=str,
+        choices=['NONE', 'THROUGHPUT_BASED'],
+        default=None,  # Meaning unset, distinct from 'NONE' meaning don't scale
+        help=
+        ('If and how to autoscale the workerpool.'))
+    parser.add_argument(
+        '--worker_machine_type',
+        dest='machine_type',
+        default=None,
+        help=('Machine type to create Dataflow worker VMs as. See '
+              'https://cloud.google.com/compute/docs/machine-types '
+              'for a list of valid options. If not set, '
+              'the Dataflow service will choose a reasonable '
+              'default.'))
+    parser.add_argument(
+        '--disk_size_gb',
+        type=int,
+        default=None,
+        help=
+        ('Remote worker disk size, in gigabytes, or 0 to use the default size. '
+         'If not set, the Dataflow service will use a reasonable default.'))
+    parser.add_argument(
+        '--worker_disk_type',
+        dest='disk_type',
+        default=None,
+        help=('Specifies what type of persistent disk should be used.'))
+    parser.add_argument(
+        '--zone',
+        default=None,
+        help=(
+            'GCE availability zone for launching workers. Default is up to the '
+            'Dataflow service.'))
+    parser.add_argument(
+        '--network',
+        default=None,
+        help=(
+            'GCE network for launching workers. Default is up to the Dataflow '
+            'service.'))
+    parser.add_argument(
+        '--worker_harness_container_image',
+        default=None,
+        help=('Docker registry location of container image to use for the '
+              'worker harness. Default is the container for the version of the '
+              'SDK. Note: currently, only approved Google Cloud Dataflow '
+              'container images may be used here.'))
+    parser.add_argument(
+        '--use_public_ips',
+        default=None,
+        help='Whether to assign public IP addresses to the worker machines.')
+
+  def validate(self, validator):
+    errors = []
+    if validator.is_service_runner():
+      errors.extend(
+          validator.validate_optional_argument_positive(self, 'num_workers'))
+    return errors
+
+
+class DebugOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument('--dataflow_job_file',
+                        default=None,
+                        help='Debug file to write the workflow specification.')
+    parser.add_argument(
+        '--experiment', '--experiments',
+        dest='experiments',
+        action='append',
+        default=None,
+        help=
+        ('Runners may provide a number of experimental features that can be '
+         'enabled with this flag. Please sync with the owners of the runner '
+         'before enabling any experiments.'))
+
+
+class ProfilingOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument('--profile_cpu',
+                        action='store_true',
+                        help='Enable work item CPU profiling.')
+    parser.add_argument('--profile_memory',
+                        action='store_true',
+                        help='Enable work item heap profiling.')
+    parser.add_argument('--profile_location',
+                        default=None,
+                        help='GCS path for saving profiler data.')
+
+
+class SetupOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    # Options for installing dependencies in the worker.
+    parser.add_argument(
+        '--requirements_file',
+        default=None,
+        help=
+        ('Path to a requirements file containing package dependencies. '
+         'Typically it is produced by a pip freeze command. More details: '
+         'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '
+         'If used, all the packages specified will be downloaded, '
+         'cached (use --requirements_cache to change default location), '
+         'and then staged so that they can be automatically installed in '
+         'workers during startup. The cache is refreshed as needed '
+         'avoiding extra downloads for existing packages. Typically the '
+         'file is named requirements.txt.'))
+    parser.add_argument(
+        '--requirements_cache',
+        default=None,
+        help=
+        ('Path to a folder to cache the packages specified in '
+         'the requirements file using the --requirements_file option.'))
+    parser.add_argument(
+        '--setup_file',
+        default=None,
+        help=
+        ('Path to a setup Python file containing package dependencies. If '
+         'specified, the file\'s containing folder is assumed to have the '
+         'structure required for a setuptools setup package. The file must be '
+         'named setup.py. More details: '
+         'https://pythonhosted.org/an_example_pypi_project/setuptools.html '
+         'During job submission a source distribution will be built and the '
+         'worker will install the resulting package before running any custom '
+         'code.'))
+    parser.add_argument(
+        '--save_main_session',
+        default=False,
+        action='store_true',
+        help=
+        ('Save the main session state so that pickled functions and classes '
+         'defined in __main__ (e.g. interactive session) can be unpickled. '
+         'Some workflows do not need the session state if for instance all '
+         'their functions/classes are defined in proper modules (not __main__)'
+         ' and the modules are importable in the worker. '))
+    parser.add_argument(
+        '--sdk_location',
+        default='default',
+        help=
+        ('Override the default location from where the Beam SDK is downloaded. '
+         'It can be a URL, a GCS path, or a local path to an SDK tarball. '
+         'Workflow submissions will download or copy an SDK tarball from here. '
+         'If set to the string "default", a standard SDK location is used. If '
+         'empty, no SDK is copied.'))
+    parser.add_argument(
+        '--extra_package', '--extra_packages',
+        dest='extra_packages',
+        action='append',
+        default=None,
+        help=
+        ('Local path to a Python package file. The file is expected to be (1) '
+         'a package tarball (".tar") or (2) a compressed package tarball '
+         '(".tar.gz") which can be installed using the "pip install" command '
+         'of the standard pip package. Multiple --extra_package options can '
+         'be specified if more than one package is needed. During job '
+         'submission, the files will be staged in the staging area '
+         '(--staging_location option) and the workers will install them in '
+         'same order they were specified on the command line.'))
+
+
+class TestOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    # Options for e2e test pipeline.
+    parser.add_argument(
+        '--on_success_matcher',
+        default=None,
+        help=('Verify state/output of e2e test pipeline. This is pickled '
+              'version of the matcher which should extends '
+              'hamcrest.core.base_matcher.BaseMatcher.'))
+
+  def validate(self, validator):
+    errors = []
+    if self.view_as(TestOptions).on_success_matcher:
+      errors.extend(validator.validate_test_matcher(self, 'on_success_matcher'))
+    return errors
+
+# TODO(silviuc): Add --files_to_stage option.
+# This could potentially replace the --requirements_file and --setup_file.
+
+# TODO(silviuc): Non-standard options. Keep them? If yes, add help too!
+# Remote execution must check that this option is not None.
+
+
+class OptionsContext(object):
+  """Set default pipeline options for pipelines created in this block.
+
+  This is particularly useful for pipelines implicitly created with the
+
+      [python list] | PTransform
+
+  construct.
+
+  Can also be used as a decorator.
+  """
+  overrides = []
+
+  def __init__(self, **options):
+    self.options = options
+
+  def __enter__(self):
+    self.overrides.append(self.options)
+
+  def __exit__(self, *exn_info):
+    self.overrides.pop()
+
+  def __call__(self, f, *args, **kwargs):
+
+    def wrapper(*args, **kwargs):
+      with self:
+        f(*args, **kwargs)
+
+    return wrapper
+
+  @classmethod
+  def augment_options(cls, options):
+    for override in cls.overrides:
+      for name, value in override.items():
+        setattr(options, name, value)
+    return options

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/options/pipeline_options_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py
new file mode 100644
index 0000000..1a644b4
--- /dev/null
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -0,0 +1,240 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the pipeline options module."""
+
+import logging
+import unittest
+
+import hamcrest as hc
+from apache_beam.transforms.display import DisplayData
+from apache_beam.transforms.display_test import DisplayDataItemMatcher
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import RuntimeValueProvider
+
+
+class PipelineOptionsTest(unittest.TestCase):
+  def tearDown(self):
+    # Clean up the global variable used by RuntimeValueProvider
+    RuntimeValueProvider.runtime_options = None
+
+  TEST_CASES = [
+      {'flags': ['--num_workers', '5'],
+       'expected': {'num_workers': 5, 'mock_flag': False, 'mock_option': None},
+       'display_data': [DisplayDataItemMatcher('num_workers', 5)]},
+      {
+          'flags': [
+              '--profile_cpu', '--profile_location', 'gs://bucket/', 'ignored'],
+          'expected': {
+              'profile_cpu': True, 'profile_location': 'gs://bucket/',
+              'mock_flag': False, 'mock_option': None},
+          'display_data': [
+              DisplayDataItemMatcher('profile_cpu',
+                                     True),
+              DisplayDataItemMatcher('profile_location',
+                                     'gs://bucket/')]
+      },
+      {'flags': ['--num_workers', '5', '--mock_flag'],
+       'expected': {'num_workers': 5, 'mock_flag': True, 'mock_option': None},
+       'display_data': [
+           DisplayDataItemMatcher('num_workers', 5),
+           DisplayDataItemMatcher('mock_flag', True)]
+      },
+      {'flags': ['--mock_option', 'abc'],
+       'expected': {'mock_flag': False, 'mock_option': 'abc'},
+       'display_data': [
+           DisplayDataItemMatcher('mock_option', 'abc')]
+      },
+      {'flags': ['--mock_option', ' abc def '],
+       'expected': {'mock_flag': False, 'mock_option': ' abc def '},
+       'display_data': [
+           DisplayDataItemMatcher('mock_option', ' abc def ')]
+      },
+      {'flags': ['--mock_option= abc xyz '],
+       'expected': {'mock_flag': False, 'mock_option': ' abc xyz '},
+       'display_data': [
+           DisplayDataItemMatcher('mock_option', ' abc xyz ')]
+      },
+      {'flags': ['--mock_option=gs://my bucket/my folder/my file'],
+       'expected': {'mock_flag': False,
+                    'mock_option': 'gs://my bucket/my folder/my file'},
+       'display_data': [
+           DisplayDataItemMatcher(
+               'mock_option', 'gs://my bucket/my folder/my file')]
+      },
+  ]
+
+  # Used for testing newly added flags.
+  class MockOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument('--mock_flag', action='store_true', help='mock flag')
+      parser.add_argument('--mock_option', help='mock option')
+      parser.add_argument('--option with space', help='mock option with space')
+
+  def test_display_data(self):
+    for case in PipelineOptionsTest.TEST_CASES:
+      options = PipelineOptions(flags=case['flags'])
+      dd = DisplayData.create_from(options)
+      hc.assert_that(dd.items, hc.contains_inanyorder(*case['display_data']))
+
+  def test_get_all_options(self):
+    for case in PipelineOptionsTest.TEST_CASES:
+      options = PipelineOptions(flags=case['flags'])
+      self.assertDictContainsSubset(case['expected'], options.get_all_options())
+      self.assertEqual(options.view_as(
+          PipelineOptionsTest.MockOptions).mock_flag,
+                       case['expected']['mock_flag'])
+      self.assertEqual(options.view_as(
+          PipelineOptionsTest.MockOptions).mock_option,
+                       case['expected']['mock_option'])
+
+  def test_from_dictionary(self):
+    for case in PipelineOptionsTest.TEST_CASES:
+      options = PipelineOptions(flags=case['flags'])
+      all_options_dict = options.get_all_options()
+      options_from_dict = PipelineOptions.from_dictionary(all_options_dict)
+      self.assertEqual(options_from_dict.view_as(
+          PipelineOptionsTest.MockOptions).mock_flag,
+                       case['expected']['mock_flag'])
+      self.assertEqual(options.view_as(
+          PipelineOptionsTest.MockOptions).mock_option,
+                       case['expected']['mock_option'])
+
+  def test_option_with_space(self):
+    options = PipelineOptions(flags=['--option with space= value with space'])
+    self.assertEqual(
+        getattr(options.view_as(PipelineOptionsTest.MockOptions),
+                'option with space'), ' value with space')
+    options_from_dict = PipelineOptions.from_dictionary(
+        options.get_all_options())
+    self.assertEqual(
+        getattr(options_from_dict.view_as(PipelineOptionsTest.MockOptions),
+                'option with space'), ' value with space')
+
+  def test_override_options(self):
+    base_flags = ['--num_workers', '5']
+    options = PipelineOptions(base_flags)
+    self.assertEqual(options.get_all_options()['num_workers'], 5)
+    self.assertEqual(options.get_all_options()['mock_flag'], False)
+
+    options.view_as(PipelineOptionsTest.MockOptions).mock_flag = True
+    self.assertEqual(options.get_all_options()['num_workers'], 5)
+    self.assertTrue(options.get_all_options()['mock_flag'])
+
+  def test_experiments(self):
+    options = PipelineOptions(['--experiment', 'abc', '--experiment', 'def'])
+    self.assertEqual(
+        sorted(options.get_all_options()['experiments']), ['abc', 'def'])
+
+    options = PipelineOptions(['--experiments', 'abc', '--experiments', 'def'])
+    self.assertEqual(
+        sorted(options.get_all_options()['experiments']), ['abc', 'def'])
+
+    options = PipelineOptions(flags=[''])
+    self.assertEqual(options.get_all_options()['experiments'], None)
+
+  def test_extra_package(self):
+    options = PipelineOptions(['--extra_package', 'abc',
+                               '--extra_packages', 'def',
+                               '--extra_packages', 'ghi'])
+    self.assertEqual(
+        sorted(options.get_all_options()['extra_packages']),
+        ['abc', 'def', 'ghi'])
+
+    options = PipelineOptions(flags=[''])
+    self.assertEqual(options.get_all_options()['extra_packages'], None)
+
+  def test_dataflow_job_file(self):
+    options = PipelineOptions(['--dataflow_job_file', 'abc'])
+    self.assertEqual(options.get_all_options()['dataflow_job_file'], 'abc')
+
+    options = PipelineOptions(flags=[''])
+    self.assertEqual(options.get_all_options()['dataflow_job_file'], None)
+
+  def test_template_location(self):
+    options = PipelineOptions(['--template_location', 'abc'])
+    self.assertEqual(options.get_all_options()['template_location'], 'abc')
+
+    options = PipelineOptions(flags=[''])
+    self.assertEqual(options.get_all_options()['template_location'], None)
+
+  def test_redefine_options(self):
+
+    class TestRedefinedOptios(PipelineOptions):  # pylint: disable=unused-variable
+
+      @classmethod
+      def _add_argparse_args(cls, parser):
+        parser.add_argument('--redefined_flag', action='store_true')
+
+    class TestRedefinedOptios(PipelineOptions):
+
+      @classmethod
+      def _add_argparse_args(cls, parser):
+        parser.add_argument('--redefined_flag', action='store_true')
+
+    options = PipelineOptions(['--redefined_flag'])
+    self.assertTrue(options.get_all_options()['redefined_flag'])
+
+  def test_value_provider_options(self):
+    class UserOptions(PipelineOptions):
+      @classmethod
+      def _add_argparse_args(cls, parser):
+        parser.add_value_provider_argument(
+            '--vp_arg',
+            help='This flag is a value provider')
+
+        parser.add_value_provider_argument(
+            '--vp_arg2',
+            default=1,
+            type=int)
+
+        parser.add_argument(
+            '--non_vp_arg',
+            default=1,
+            type=int
+        )
+
+    # Provide values: if not provided, the option becomes of the type runtime vp
+    options = UserOptions(['--vp_arg', 'hello'])
+    self.assertIsInstance(options.vp_arg, StaticValueProvider)
+    self.assertIsInstance(options.vp_arg2, RuntimeValueProvider)
+    self.assertIsInstance(options.non_vp_arg, int)
+
+    # Values can be overwritten
+    options = UserOptions(vp_arg=5,
+                          vp_arg2=StaticValueProvider(value_type=str,
+                                                      value='bye'),
+                          non_vp_arg=RuntimeValueProvider(
+                              option_name='foo',
+                              value_type=int,
+                              default_value=10))
+    self.assertEqual(options.vp_arg, 5)
+    self.assertTrue(options.vp_arg2.is_accessible(),
+                    '%s is not accessible' % options.vp_arg2)
+    self.assertEqual(options.vp_arg2.get(), 'bye')
+    self.assertFalse(options.non_vp_arg.is_accessible())
+
+    with self.assertRaises(RuntimeError):
+      options.non_vp_arg.get()
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/50a4c56d/sdks/python/apache_beam/options/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py b/sdks/python/apache_beam/options/pipeline_options_validator.py
new file mode 100644
index 0000000..5c1ce2a
--- /dev/null
+++ b/sdks/python/apache_beam/options/pipeline_options_validator.py
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline options validator.
+"""
+import re
+
+from apache_beam.internal import pickler
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.options.pipeline_options import TestOptions
+from apache_beam.options.pipeline_options import TypeOptions
+from apache_beam.options.pipeline_options import WorkerOptions
+
+
+class PipelineOptionsValidator(object):
+  """Validates PipelineOptions.
+
+  Goes through a list of known PipelineOption subclassess and calls::
+
+    validate(validator)
+
+  if one is implemented. Aggregates a list of validation errors from all and
+  returns an aggregated list.
+  """
+
+  # Validator will call validate on these subclasses of PipelineOptions
+  OPTIONS = [DebugOptions, GoogleCloudOptions, SetupOptions, StandardOptions,
+             TypeOptions, WorkerOptions, TestOptions]
+
+  # Possible validation errors.
+  ERR_MISSING_OPTION = 'Missing required option: %s.'
+  ERR_MISSING_GCS_PATH = 'Missing GCS path option: %s.'
+  ERR_INVALID_GCS_PATH = 'Invalid GCS path (%s), given for the option: %s.'
+  ERR_INVALID_GCS_BUCKET = (
+      'Invalid GCS bucket (%s), given for the option: %s. See '
+      'https://developers.google.com/storage/docs/bucketnaming '
+      'for more details.')
+  ERR_INVALID_GCS_OBJECT = 'Invalid GCS object (%s), given for the option: %s.'
+  ERR_INVALID_JOB_NAME = (
+      'Invalid job_name (%s); the name must consist of only the characters '
+      '[-a-z0-9], starting with a letter and ending with a letter or number')
+  ERR_INVALID_PROJECT_NUMBER = (
+      'Invalid Project ID (%s). Please make sure you specified the Project ID, '
+      'not project number.')
+  ERR_INVALID_PROJECT_ID = (
+      'Invalid Project ID (%s). Please make sure you specified the Project ID, '
+      'not project description.')
+  ERR_INVALID_NOT_POSITIVE = ('Invalid value (%s) for option: %s. Value needs '
+                              'to be positive.')
+  ERR_INVALID_TEST_MATCHER_TYPE = (
+      'Invalid value (%s) for option: %s. Please extend your matcher object '
+      'from hamcrest.core.base_matcher.BaseMatcher.')
+  ERR_INVALID_TEST_MATCHER_UNPICKLABLE = (
+      'Invalid value (%s) for option: %s. Please make sure the test matcher '
+      'is unpicklable.')
+
+  # GCS path specific patterns.
+  GCS_URI = '(?P<SCHEME>[^:]+)://(?P<BUCKET>[^/]+)(/(?P<OBJECT>.*))?'
+  GCS_BUCKET = '^[a-z0-9][-_a-z0-9.]+[a-z0-9]$'
+  GCS_SCHEME = 'gs'
+
+  # GoogleCloudOptions specific patterns.
+  JOB_PATTERN = '[a-z]([-a-z0-9]*[a-z0-9])?'
+  PROJECT_ID_PATTERN = '[a-z][-a-z0-9:.]+[a-z0-9]'
+  PROJECT_NUMBER_PATTERN = '[0-9]*'
+  ENDPOINT_PATTERN = r'https://[\S]*googleapis\.com[/]?'
+
+  def __init__(self, options, runner):
+    self.options = options
+    self.runner = runner
+
+  def validate(self):
+    """Calls validate on subclassess and returns a list of errors.
+
+    validate will call validate method on subclasses, accumulate the returned
+    list of errors, and returns the aggregate list.
+
+    Returns:
+      Aggregate list of errors after all calling all possible validate methods.
+    """
+    errors = []
+    for cls in self.OPTIONS:
+      if 'validate' in cls.__dict__:
+        errors.extend(self.options.view_as(cls).validate(self))
+    return errors
+
+  def is_service_runner(self):
+    """True if pipeline will execute on the Google Cloud Dataflow service."""
+    is_service_runner = (self.runner is not None and
+                         type(self.runner).__name__ in [
+                             'DataflowRunner',
+                             'TestDataflowRunner'])
+
+    dataflow_endpoint = (
+        self.options.view_as(GoogleCloudOptions).dataflow_endpoint)
+    is_service_endpoint = (dataflow_endpoint is not None and
+                           self.is_full_string_match(
+                               self.ENDPOINT_PATTERN, dataflow_endpoint))
+
+    return is_service_runner and is_service_endpoint
+
+  def is_full_string_match(self, pattern, string):
+    """Returns True if the pattern matches the whole string."""
+    pattern = '^%s$' % pattern
+    return re.search(pattern, string) is not None
+
+  def _validate_error(self, err, *args):
+    return [err % args]
+
+  def validate_gcs_path(self, view, arg_name):
+    """Validates a GCS path against gs://bucket/object URI format."""
+    arg = getattr(view, arg_name, None)
+    if arg is None:
+      return self._validate_error(self.ERR_MISSING_GCS_PATH, arg_name)
+    match = re.match(self.GCS_URI, arg, re.DOTALL)
+    if match is None:
+      return self._validate_error(self.ERR_INVALID_GCS_PATH, arg, arg_name)
+
+    scheme = match.group('SCHEME')
+    bucket = match.group('BUCKET')
+    gcs_object = match.group('OBJECT')
+
+    if ((scheme is None) or (scheme.lower() != self.GCS_SCHEME) or
+        (bucket is None)):
+      return self._validate_error(self.ERR_INVALID_GCS_PATH, arg, arg_name)
+
+    if not self.is_full_string_match(self.GCS_BUCKET, bucket):
+      return self._validate_error(self.ERR_INVALID_GCS_BUCKET, arg, arg_name)
+    if gcs_object is None or '\n' in gcs_object or '\r' in gcs_object:
+      return self._validate_error(self.ERR_INVALID_GCS_OBJECT, arg, arg_name)
+
+    return []
+
+  def validate_cloud_options(self, view):
+    """Validates job_name and project arguments."""
+    errors = []
+    if (view.job_name and
+        not self.is_full_string_match(self.JOB_PATTERN, view.job_name)):
+      errors.extend(self._validate_error(self.ERR_INVALID_JOB_NAME,
+                                         view.job_name))
+    project = view.project
+    if project is None:
+      errors.extend(self._validate_error(self.ERR_MISSING_OPTION, 'project'))
+    else:
+      if self.is_full_string_match(self.PROJECT_NUMBER_PATTERN, project):
+        errors.extend(
+            self._validate_error(self.ERR_INVALID_PROJECT_NUMBER, project))
+      elif not self.is_full_string_match(self.PROJECT_ID_PATTERN, project):
+        errors.extend(
+            self._validate_error(self.ERR_INVALID_PROJECT_ID, project))
+    return errors
+
+  def validate_optional_argument_positive(self, view, arg_name):
+    """Validates that an optional argument (if set) has a positive value."""
+    arg = getattr(view, arg_name, None)
+    if arg is not None and int(arg) <= 0:
+      return self._validate_error(self.ERR_INVALID_NOT_POSITIVE, arg, arg_name)
+    return []
+
+  def validate_test_matcher(self, view, arg_name):
+    """Validates that on_success_matcher argument if set.
+
+    Validates that on_success_matcher is unpicklable and is instance
+    of `hamcrest.core.base_matcher.BaseMatcher`.
+    """
+    # This is a test only method and requires hamcrest
+    from hamcrest.core.base_matcher import BaseMatcher
+    pickled_matcher = view.on_success_matcher
+    errors = []
+    try:
+      matcher = pickler.loads(pickled_matcher)
+      if not isinstance(matcher, BaseMatcher):
+        errors.extend(
+            self._validate_error(
+                self.ERR_INVALID_TEST_MATCHER_TYPE, matcher, arg_name))
+    except:   # pylint: disable=bare-except
+      errors.extend(
+          self._validate_error(
+              self.ERR_INVALID_TEST_MATCHER_UNPICKLABLE,
+              pickled_matcher,
+              arg_name))
+    return errors


Mime
View raw message