Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F3DBD200C3B for ; Sat, 11 Feb 2017 02:00:16 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id F2804160B5C; Sat, 11 Feb 2017 01:00:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B8D51160B72 for ; Sat, 11 Feb 2017 02:00:15 +0100 (CET) Received: (qmail 48482 invoked by uid 500); 11 Feb 2017 01:00:14 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 48296 invoked by uid 99); 11 Feb 2017 01:00:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 11 Feb 2017 01:00:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 80DFEE0527; Sat, 11 Feb 2017 01:00:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: altay@apache.org To: commits@beam.apache.org Date: Sat, 11 Feb 2017 01:00:23 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [10/11] beam git commit: Remove label from Create archived-at: Sat, 11 Feb 2017 01:00:17 -0000 Remove label from Create Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dec7edfd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dec7edfd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dec7edfd Branch: refs/heads/master Commit: dec7edfdc0b5ad9c94e24864a4788e5c821241f8 Parents: fe39ae1 Author: Sourabh Bajaj Authored: Fri Feb 10 11:43:14 2017 -0800 Committer: Ahmet Altay Committed: Fri Feb 10 16:59:50 2017 -0800 ---------------------------------------------------------------------- .../apache_beam/examples/complete/tfidf_test.py | 3 +-- .../apache_beam/examples/snippets/snippets.py | 3 +-- .../consumer_tracking_pipeline_visitor_test.py | 12 ++++++--- sdks/python/apache_beam/transforms/core.py | 10 +++---- .../python/apache_beam/transforms/ptransform.py | 2 +- .../apache_beam/transforms/ptransform_test.py | 28 ++------------------ 6 files changed, 16 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/dec7edfd/sdks/python/apache_beam/examples/complete/tfidf_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index 404ab44..05e53a4 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -49,8 +49,7 @@ class TfIdfTest(unittest.TestCase): def test_tfidf_transform(self): p = TestPipeline() - uri_to_line = p | beam.Create( - 'create sample', + uri_to_line = p | 'create sample' >> beam.Create( [('1.txt', 'abc def ghi'), ('2.txt', 'abc def'), ('3.txt', 'abc')]) http://git-wip-us.apache.org/repos/asf/beam/blob/dec7edfd/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index d313a12..f15a089 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -775,8 +775,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, # Using the new sink in an example pipeline. # [START model_custom_sink_use_new_sink] p = beam.Pipeline(options=PipelineOptions()) - kvs = p | beam.core.Create( - 'CreateKVs', KVs) + kvs = p | 'CreateKVs' >> beam.Create(KVs) kvs | 'WriteToSimpleKV' >> beam.io.Write( SimpleKVSink('http://url_to_simple_kv/', final_table_name)) http://git-wip-us.apache.org/repos/asf/beam/blob/dec7edfd/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py index 3cc39ea..51dd1fa 100644 --- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py +++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py @@ -46,7 +46,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): self.visitor = ConsumerTrackingPipelineVisitor() def test_root_transforms(self): - root_create = Create('create', [[1, 2, 3]]) + root_create = Create([[1, 2, 3]]) class DummySource(iobase.BoundedSource): pass @@ -55,7 +55,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): root_flatten = Flatten('flatten', pipeline=self.pipeline) pbegin = pvalue.PBegin(self.pipeline) - pcoll_create = pbegin | root_create + pcoll_create = pbegin | 'create' >> root_create pbegin | root_read pcoll_create | FlatMap(lambda x: x) [] | root_flatten @@ -64,6 +64,10 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): root_transforms = sorted( [t.transform for t in self.visitor.root_transforms]) + print root_transforms + print root_read + print root_create + print root_flatten self.assertEqual(root_transforms, sorted( [root_read, root_create, root_flatten])) @@ -87,10 +91,10 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): def process(self, element, negatives): yield element - root_create = Create('create', [[-1, 2, 3]]) + root_create = Create([[-1, 2, 3]]) result = (self.pipeline - | root_create + | 'create' >> root_create | ParDo(SplitNumbersFn()).with_outputs('tag_negative', main='positive')) positive, negative = result http://git-wip-us.apache.org/repos/asf/beam/blob/dec7edfd/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 856d395..9da7cf2 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1321,17 +1321,13 @@ class Flatten(PTransform): class Create(PTransform): """A transform that creates a PCollection from an iterable.""" - def __init__(self, *args, **kwargs): + def __init__(self, value): """Initializes a Create transform. Args: - *args: A tuple of position arguments. - **kwargs: A dictionary of keyword arguments. - - The *args, **kwargs are expected to be (label, value) or (value). + value: An object of values for the PCollection """ - label, value = self.parse_label_and_arg(args, kwargs, 'value') - super(Create, self).__init__(label) + super(Create, self).__init__() if isinstance(value, basestring): raise TypeError('PTransform Create: Refusing to treat string as ' 'an iterable. (string=%r)' % value) http://git-wip-us.apache.org/repos/asf/beam/blob/dec7edfd/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index e4fbf76..994c09b 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -431,7 +431,7 @@ class PTransform(WithTypeHints, HasDisplayData): # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms.core import Create # pylint: enable=wrong-import-order, wrong-import-position - replacements = {id(v): p | Create('CreatePInput%s' % ix, v) + replacements = {id(v): p | 'CreatePInput%s' % ix >> Create(v) for ix, v in enumerate(pvalues) if not isinstance(v, pvalue.PValue) and v is not None} pvalueish = _SetInputPValues().visit(pvalueish, replacements) http://git-wip-us.apache.org/repos/asf/beam/blob/dec7edfd/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 3706f37..0606645 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -81,30 +81,6 @@ class PTransformTest(unittest.TestCase): """inputs=('ci',) side_inputs=('cs',)>""", str(inputs_tr)) - def test_parse_label_and_arg(self): - - def fun(*args, **kwargs): - return PTransform().parse_label_and_arg(args, kwargs, 'name') - - self.assertEqual(('PTransform', 'value'), fun('value')) - self.assertEqual(('PTransform', 'value'), fun(name='value')) - self.assertEqual(('label', 'value'), fun('label', 'value')) - self.assertEqual(('label', 'value'), fun('label', name='value')) - self.assertEqual(('label', 'value'), fun('value', label='label')) - self.assertEqual(('label', 'value'), fun(name='value', label='label')) - - self.assertRaises(ValueError, fun) - self.assertRaises(ValueError, fun, 0, 'value') - self.assertRaises(ValueError, fun, label=0, name='value') - self.assertRaises(ValueError, fun, other='value') - - with self.assertRaises(ValueError) as cm: - fun(0, name='value') - self.assertEqual( - cm.exception.message, - 'PTransform expects a (label, name) or (name) argument list ' - 'instead of args=(0,), kwargs={\'name\': \'value\'}') - def test_do_with_do_fn(self): class AddNDoFn(beam.DoFn): @@ -386,8 +362,8 @@ class PTransformTest(unittest.TestCase): def test_group_by_key(self): pipeline = TestPipeline() - pcoll = pipeline | beam.Create( - 'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]) + pcoll = pipeline | 'start' >> beam.Create( + [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]) result = pcoll | 'Group' >> beam.GroupByKey() assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])])) pipeline.run()