Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 773ED200D6E for ; Sat, 2 Dec 2017 01:28:25 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 75C4F160C19; Sat, 2 Dec 2017 00:28:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C5DE9160C06 for ; Sat, 2 Dec 2017 01:28:24 +0100 (CET) Received: (qmail 65008 invoked by uid 500); 2 Dec 2017 00:28:23 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 64999 invoked by uid 99); 2 Dec 2017 00:28:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 02 Dec 2017 00:28:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 2ACA61A0F2B for ; Sat, 2 Dec 2017 00:28:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id VgfXAxkCwWQB for ; Sat, 2 Dec 2017 00:28:22 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 01913611C7 for ; Sat, 2 Dec 2017 00:28:17 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 37066E25C0 for ; Sat, 2 Dec 2017 00:28:15 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 5A55B2560C for ; Sat, 2 Dec 2017 00:28:13 +0000 (UTC) Date: Sat, 2 Dec 2017 00:28:13 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: commits@beam.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (BEAM-1630) Add Splittable DoFn to Python SDK MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Sat, 02 Dec 2017 00:28:25 -0000 [ https://issues.apache.org/jira/browse/BEAM-1630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275295#comment-16275295 ] ASF GitHub Bot commented on BEAM-1630: -------------------------------------- chamikaramj commented on a change in pull request #4064: [BEAM-1630] Adds support for processing Splittable DoFns using DirectRunner. URL: https://github.com/apache/beam/pull/4064#discussion_r154449348 ########## File path: sdks/python/apache_beam/runners/direct/transform_evaluator.py ########## @@ -826,3 +830,74 @@ def finish_bundle(self): None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF) return TransformResult(self, [], [], None, {None: hold}) + + +class _ProcessElemenetsEvaluator(_TransformEvaluator): + """An evaluator for sdf_direct_runner.ProcessElements transform.""" + + DEFAULT_MAX_NUM_OUTPUTS = 100 + DEFAULT_MAX_DURATION = 1 + + def __init__(self, evaluation_context, applied_ptransform, + input_committed_bundle, side_inputs, scoped_metrics_container): + super(_ProcessElemenetsEvaluator, self).__init__( + evaluation_context, applied_ptransform, input_committed_bundle, + side_inputs, scoped_metrics_container) + + process_elements_transform = applied_ptransform.transform + assert isinstance(process_elements_transform, ProcessElements) + + # Replacing the do_fn of the transform with a wrapper do_fn that performs + # SDF magic. + transform = applied_ptransform.transform + sdf = transform.sdf + self._process_fn = transform.new_process_fn(sdf) + transform.dofn = self._process_fn + + assert isinstance(self._process_fn, ProcessFn) + + self.step_context = self._execution_context.get_step_context() + # self.global_state = self.step_context.get_keyed_state(None) + self._process_fn.set_step_context(self.step_context) + + process_element_invoker = ( + SDFProcessElementInvoker( + max_num_outputs=self.DEFAULT_MAX_NUM_OUTPUTS, + max_duration=self.DEFAULT_MAX_DURATION)) + self._process_fn.set_process_element_invoker(process_element_invoker) + + self._par_do_evaluator = _ParDoEvaluator( + evaluation_context, applied_ptransform, input_committed_bundle, + side_inputs, scoped_metrics_container) + self.keyed_holds = {} + + def start_bundle(self): + self._par_do_evaluator.start_bundle() + + def process_element(self, element): + assert isinstance(element, WindowedValue) + assert len(element.windows) == 1 + window = element.windows[0] + if isinstance(element.value, KeyedWorkItem): + encoded_k = element.value.encoded_key + else: + assert isinstance(element.value, tuple) Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org > Add Splittable DoFn to Python SDK > --------------------------------- > > Key: BEAM-1630 > URL: https://issues.apache.org/jira/browse/BEAM-1630 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core > Reporter: Chamikara Jayalath > Assignee: Chamikara Jayalath > > Splittable DoFn [1] is currently being implemented for Java SDK [2]. We should add this to Python SDK as well. > Following document proposes an API for this. > https://docs.google.com/document/d/1h_zprJrOilivK2xfvl4L42vaX4DMYGfH1YDmi-s_ozM/edit?usp=sharing > [1] https://s.apache.org/splittable-do-fn > [2] https://lists.apache.org/thread.html/0ce61ac162460a149d5c93cdface37cc383f8030fe86ca09e5699b18@%3Cdev.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)