beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] incubator-beam git commit: Add snippet for datastoreio
Date Fri, 02 Dec 2016 19:34:41 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk a463f000e -> 2363ee510


Add snippet for datastoreio


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/557a2f92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/557a2f92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/557a2f92

Branch: refs/heads/python-sdk
Commit: 557a2f92f67bfc545533fa35852485e9c4c0b785
Parents: a463f00
Author: Vikas Kedigehalli <vikasrk@google.com>
Authored: Thu Dec 1 10:27:05 2016 -0800
Committer: Robert Bradshaw <robertwb@google.com>
Committed: Fri Dec 2 11:34:28 2016 -0800

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   | 41 ++++++++++++++++++++
 .../examples/snippets/snippets_test.py          |  9 ++++-
 2 files changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/557a2f92/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 8d05130..c2a047f 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -891,6 +891,47 @@ def model_textio(renames):
   p.run()
 
 
+def model_datastoreio():
+  """Using a Read and Write transform to read/write to Cloud Datastore.
+
+  URL: https://cloud.google.com/dataflow/model/datastoreio
+  """
+
+  import uuid
+  from google.datastore.v1 import entity_pb2
+  from google.datastore.v1 import query_pb2
+  import googledatastore
+  import apache_beam as beam
+  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
+  from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
+
+  project = 'my_project'
+  kind = 'my_kind'
+  query = query_pb2.Query()
+  query.kind.add().name = kind
+
+  # [START model_datastoreio_read]
+  p = beam.Pipeline(options=PipelineOptions())
+  entities = p | 'Read From Datastore' >> ReadFromDatastore(project, query)
+  # [END model_datastoreio_read]
+
+  # [START model_datastoreio_write]
+  p = beam.Pipeline(options=PipelineOptions())
+  musicians = p | 'Musicians' >> beam.Create(
+      ['Mozart', 'Chopin', 'Beethoven', 'Bach'])
+
+  def to_entity(content):
+    entity = entity_pb2.Entity()
+    googledatastore.helper.add_key_path(entity.key, kind, str(uuid.uuid4()))
+    googledatastore.helper.add_properties(entity, {'content': unicode(content)})
+    return entity
+
+  entities = musicians | 'To Entity' >> beam.Map(to_entity)
+  entities | 'Write To Datastore' >> WriteToDatastore(project)
+  # [END model_datastoreio_write]
+
+
 def model_bigqueryio():
   """Using a Read and Write transform to read/write to BigQuery.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/557a2f92/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 72fccb2..09b4ba4 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -470,11 +470,18 @@ class SnippetsTest(unittest.TestCase):
         ['aa', 'bb', 'bb', 'cc', 'cc', 'cc'],
         self.get_output(result_path, suffix='.csv'))
 
+  def test_model_datastoreio(self):
+    # We cannot test datastoreio functionality in unit tests therefore we limit
+    # ourselves to making sure the pipeline containing Datastore read and write
+    # transforms can be built.
+    # TODO(vikasrk): Expore using Datastore Emulator.
+    snippets.model_datastoreio()
+
   def test_model_bigqueryio(self):
     # We cannot test BigQueryIO functionality in unit tests therefore we limit
     # ourselves to making sure the pipeline containing BigQuery sources and
     # sinks can be built.
-    self.assertEqual(None, snippets.model_bigqueryio())
+    snippets.model_bigqueryio()
 
   def _run_test_pipeline_for_options(self, fn):
     temp_path = self.create_temp_file('aa\nbb\ncc')


Mime
View raw message