Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 45438200CB4 for ; Tue, 13 Jun 2017 00:43:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 43C76160BEC; Mon, 12 Jun 2017 22:43:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 91AE7160BDE for ; Tue, 13 Jun 2017 00:43:53 +0200 (CEST) Received: (qmail 56800 invoked by uid 500); 12 Jun 2017 22:43:52 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 56791 invoked by uid 99); 12 Jun 2017 22:43:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Jun 2017 22:43:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A5EEEDFAB0; Mon, 12 Jun 2017 22:43:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: altay@apache.org To: commits@beam.apache.org Date: Mon, 12 Jun 2017 22:43:52 -0000 Message-Id: <1e675a920f0a44aa972dc5c545ce2f93@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Reverse removal of NativeWrite evaluator in Python DirectRunner archived-at: Mon, 12 Jun 2017 22:43:54 -0000 Repository: beam Updated Branches: refs/heads/master f9d51aa5c -> 86e04893a Reverse removal of NativeWrite evaluator in Python DirectRunner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/809f1787 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/809f1787 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/809f1787 Branch: refs/heads/master Commit: 809f17876d847002ba76979cb3362451fa01c110 Parents: f9d51aa Author: Charles Chen Authored: Mon Jun 12 14:17:50 2017 -0700 Committer: Ahmet Altay Committed: Mon Jun 12 15:43:31 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/transform_evaluator.py | 62 +++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/809f1787/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 0fec8b8..b1cb626 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -29,6 +29,7 @@ from apache_beam.runners.common import DoFnRunner from apache_beam.runners.common import DoFnState from apache_beam.runners.direct.watermark_manager import WatermarkManager from apache_beam.runners.direct.transform_result import TransformResult +from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # pylint: disable=protected-access from apache_beam.transforms import core from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import WindowedValue @@ -53,6 +54,7 @@ class TransformEvaluatorRegistry(object): core.Flatten: _FlattenEvaluator, core.ParDo: _ParDoEvaluator, core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator, + _NativeWrite: _NativeWriteEvaluator, } def for_application( @@ -96,7 +98,8 @@ class TransformEvaluatorRegistry(object): Returns: True if executor should execute applied_ptransform serially. """ - return isinstance(applied_ptransform.transform, core._GroupByKeyOnly) + return isinstance(applied_ptransform.transform, + (core._GroupByKeyOnly, _NativeWrite)) class _TransformEvaluator(object): @@ -400,3 +403,60 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): return TransformResult( self._applied_ptransform, bundles, state, None, None, hold) + + +class _NativeWriteEvaluator(_TransformEvaluator): + """TransformEvaluator for _NativeWrite transform.""" + + def __init__(self, evaluation_context, applied_ptransform, + input_committed_bundle, side_inputs, scoped_metrics_container): + assert not side_inputs + super(_NativeWriteEvaluator, self).__init__( + evaluation_context, applied_ptransform, input_committed_bundle, + side_inputs, scoped_metrics_container) + + assert applied_ptransform.transform.sink + self._sink = applied_ptransform.transform.sink + + @property + def _is_final_bundle(self): + return (self._execution_context.watermarks.input_watermark + == WatermarkManager.WATERMARK_POS_INF) + + @property + def _has_already_produced_output(self): + return (self._execution_context.watermarks.output_watermark + == WatermarkManager.WATERMARK_POS_INF) + + def start_bundle(self): + # state: [values] + self.state = (self._execution_context.existing_state + if self._execution_context.existing_state else []) + + def process_element(self, element): + self.state.append(element) + + def finish_bundle(self): + # finish_bundle will append incoming bundles in memory until all the bundles + # carrying data is processed. This is done to produce only a single output + # shard (some tests depends on this behavior). It is possible to have + # incoming empty bundles after the output is produced, these bundles will be + # ignored and would not generate additional output files. + # TODO(altay): Do not wait until the last bundle to write in a single shard. + if self._is_final_bundle: + if self._has_already_produced_output: + # Ignore empty bundles that arrive after the output is produced. + assert self.state == [] + else: + self._sink.pipeline_options = self._evaluation_context.pipeline_options + with self._sink.writer() as writer: + for v in self.state: + writer.Write(v.value) + state = None + hold = WatermarkManager.WATERMARK_POS_INF + else: + state = self.state + hold = WatermarkManager.WATERMARK_NEG_INF + + return TransformResult( + self._applied_ptransform, [], state, None, None, hold)