beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [1/2] beam git commit: Updates Python datastore wordcount example to take a dataset parameter.
Date Wed, 01 Nov 2017 20:05:20 GMT
Repository: beam
Updated Branches:
  refs/heads/master b013d7c5a -> 8247b5346


Updates Python datastore wordcount example to take a dataset parameter.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ac436349
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ac436349
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ac436349

Branch: refs/heads/master
Commit: ac4363491e6c300cf34c05bf547cee3ccc37c98e
Parents: b013d7c
Author: chamikara@google.com <chamikara@google.com>
Authored: Tue Oct 31 18:37:29 2017 -0700
Committer: chamikara@google.com <chamikara@google.com>
Committed: Wed Nov 1 13:03:37 2017 -0700

----------------------------------------------------------------------
 .../examples/cookbook/datastore_wordcount.py    | 24 +++++++++++---------
 .../io/gcp/datastore/v1/datastoreio.py          | 16 +++++++++++--
 2 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ac436349/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 099fb08..7204e3b 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -32,12 +32,14 @@ counts them and write the output to a set of files.
 
 The following options must be provided to run this pipeline in read-only mode:
 ``
---project YOUR_PROJECT_ID
+--dataset YOUR_DATASET
 --kind YOUR_DATASTORE_KIND
 --output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH]
 --read_only
 ``
 
+Dataset maps to Project ID for v1 version of datastore.
+
 Read-write Mode: In this mode, this example reads words from an input file,
 converts them to Cloud Datastore ``Entity`` objects and writes them to
 Cloud Datastore using the ``datastoreio.Write`` transform. The second pipeline
@@ -47,7 +49,7 @@ write the output to a set of files.
 
 The following options must be provided to run this pipeline in read-write mode:
 ``
---project YOUR_PROJECT_ID
+--dataset YOUR_DATASET
 --kind YOUR_DATASTORE_KIND
 --output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH]
 ``
@@ -77,7 +79,6 @@ from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
 from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
 from apache_beam.metrics import Metrics
 from apache_beam.metrics.metric import MetricsFilter
-from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
 
@@ -134,7 +135,7 @@ class EntityWrapper(object):
     return entity
 
 
-def write_to_datastore(project, user_options, pipeline_options):
+def write_to_datastore(user_options, pipeline_options):
   """Creates a pipeline that writes entities to Cloud Datastore."""
   with beam.Pipeline(options=pipeline_options) as p:
 
@@ -144,7 +145,7 @@ def write_to_datastore(project, user_options, pipeline_options):
      | 'create entity' >> beam.Map(
          EntityWrapper(user_options.namespace, user_options.kind,
                        user_options.ancestor).make_entity)
-     | 'write to datastore' >> WriteToDatastore(project))
+     | 'write to datastore' >> WriteToDatastore(user_options.dataset))
 
 
 def make_ancestor_query(kind, namespace, ancestor):
@@ -167,7 +168,7 @@ def make_ancestor_query(kind, namespace, ancestor):
   return query
 
 
-def read_from_datastore(project, user_options, pipeline_options):
+def read_from_datastore(user_options, pipeline_options):
   """Creates a pipeline that reads entities from Cloud Datastore."""
   p = beam.Pipeline(options=pipeline_options)
   # Create a query to read entities from datastore.
@@ -176,7 +177,7 @@ def read_from_datastore(project, user_options, pipeline_options):
 
   # Read entities from Cloud Datastore into a PCollection.
   lines = p | 'read from datastore' >> ReadFromDatastore(
-      project, query, user_options.namespace)
+      user_options.dataset, query, user_options.namespace)
 
   # Count the occurrences of each word.
   def count_ones(word_ones):
@@ -216,6 +217,9 @@ def run(argv=None):
                       dest='input',
                       default='gs://dataflow-samples/shakespeare/kinglear.txt',
                       help='Input file to process.')
+  parser.add_argument('--dataset',
+                      dest='dataset',
+                      help='Dataset ID to read from Cloud Datastore.')
   parser.add_argument('--kind',
                       dest='kind',
                       required=True,
@@ -246,15 +250,13 @@ def run(argv=None):
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
-  gcloud_options = pipeline_options.view_as(GoogleCloudOptions)
 
   # Write to Datastore if `read_only` options is not specified.
   if not known_args.read_only:
-    write_to_datastore(gcloud_options.project, known_args, pipeline_options)
+    write_to_datastore(known_args, pipeline_options)
 
   # Read entities from Datastore.
-  result = read_from_datastore(gcloud_options.project, known_args,
-                               pipeline_options)
+  result = read_from_datastore(known_args, pipeline_options)
 
   empty_lines_filter = MetricsFilter().with_name('empty_lines')
   query_result = result.metrics().query(empty_lines_filter)

http://git-wip-us.apache.org/repos/asf/beam/blob/ac436349/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index 078002c..13209c1 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -89,10 +89,10 @@ class ReadFromDatastore(PTransform):
   _DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024
 
   def __init__(self, project, query, namespace=None, num_splits=0):
-    """Initialize the ReadFromDatastore transform.
+    """Initialize the `ReadFromDatastore` transform.
 
     Args:
-      project: The Project ID
+      project: The ID of the project to read from.
       query: Cloud Datastore query to be read from.
       namespace: An optional namespace.
       num_splits: Number of splits for the query.
@@ -459,7 +459,13 @@ class _Mutate(PTransform):
 
 class WriteToDatastore(_Mutate):
   """A ``PTransform`` to write a ``PCollection[Entity]`` to Cloud Datastore."""
+
   def __init__(self, project):
+    """Initialize the `WriteToDatastore` transform.
+
+    Args:
+      project: The ID of the project to write to.
+    """
 
     # Import here to avoid adding the dependency for local running scenarios.
     try:
@@ -486,6 +492,12 @@ class WriteToDatastore(_Mutate):
 class DeleteFromDatastore(_Mutate):
   """A ``PTransform`` to delete a ``PCollection[Key]`` from Cloud Datastore."""
   def __init__(self, project):
+    """Initialize the `DeleteFromDatastore` transform.
+
+    Args:
+      project: The ID of the project from which the entities will be deleted.
+    """
+
     super(DeleteFromDatastore, self).__init__(
         project, DeleteFromDatastore.to_delete_mutation)
 


Mime
View raw message