beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-1872) implement Reshuffle transform in python, make it experimental in Java
Date Wed, 29 Nov 2017 23:36:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-1872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271788#comment-16271788
] 

ASF GitHub Bot commented on BEAM-1872:
--------------------------------------

robertwb commented on a change in pull request #4040: [BEAM-1872] Add IdentityWindowFn for
use in Reshuffle
URL: https://github.com/apache/beam/pull/4040#discussion_r153941071
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/util.py
 ##########
 @@ -423,3 +434,105 @@ def expand(self, pcoll):
           self._batch_size_estimator))
     else:
       return pcoll | ParDo(_WindowAwareBatchingDoFn(self._batch_size_estimator))
+
+
+class _IdentityWindowFn(NonMergingWindowFn):
+  """Windowing function that preserves existing windows.
+
+  To be used internally with the Reshuffle transform.
+  Will raise an exception when used after DoFns that return TimestampedValue
+  elements.
+  """
+
+  def __init__(self, window_coder):
+    """Create a new WindowFn with compatible coder.
+    To be applied to PCollections with windows that are compatible with the
+    given coder.
+
+    Arguments:
+      window_coder: coders.Coder object to be used on windows.
+    """
+    super(_IdentityWindowFn, self).__init__()
+    if window_coder is None:
+      raise ValueError('window_coder should not be None')
+    self._window_coder = window_coder
+
+  def assign(self, assign_context):
+    if assign_context.window is None:
+      raise ValueError(
+          'assign_context.window should not be None. '
+          'This might be due to a DoFn returning a TimestampedValue.')
+    return [assign_context.window]
+
+  def get_window_coder(self):
+    return self._window_coder
+
+  def to_runner_api_parameter(self, unused_context):
+    pass  # Overridden by register_pickle_urn below.
+
+  urns.RunnerApiFn.register_pickle_urn(urns.RESHUFFLE_TRANSFORM)
+
+
+@typehints.with_input_types(typehints.KV[K, V])
+@typehints.with_output_types(typehints.KV[K, V])
+class ReshufflePerKey(PTransform):
+  """PTransform that returns a PCollection equivalent to its input,
+  but operationally provides some of the side effects of a GroupByKey,
+  in particular preventing fusion of the surrounding transforms,
+  checkpointing, and deduplication by id.
+
+  ReshufflePerKey is experimental. No backwards compatibility guarantees.
+  """
+
+  def expand(self, pcoll):
+    class ReifyTimestampsIn(DoFn):
+      def process(self, element, timestamp=DoFn.TimestampParam):
+        if (isinstance(timestamp, type(DoFn.TimestampParam)) and
+            timestamp == DoFn.TimestampParam):
+          raise ValueError('timestamp was unset for element: %r' % element)
+        yield element[0], TimestampedValue(element[1], timestamp)
+
+    class ReifyTimestampsExtract(DoFn):
+      def process(self, element, window=DoFn.WindowParam):
+        # Return a WindowedValue so that IdentityWindowFn can reuse the window
 
 Review comment:
   I'm not following this comment... 
   
   Shouldn't things already be in the correct window? We just need to emit a TimestampedValue
here. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> implement Reshuffle transform in python, make it experimental in Java
> ---------------------------------------------------------------------
>
>                 Key: BEAM-1872
>                 URL: https://issues.apache.org/jira/browse/BEAM-1872
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Udi Meiri
>              Labels: sdk-consistency
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message