beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [1/2] beam git commit: Improve performance of bundle retry
Date Fri, 15 Sep 2017 15:50:23 GMT
Repository: beam
Updated Branches:
  refs/heads/master 082f70c93 -> 979923a4e


Improve performance of bundle retry


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1266ee2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1266ee2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1266ee2

Branch: refs/heads/master
Commit: c1266ee22ff17d29c82c7c67df94c8aae16106c8
Parents: 082f70c
Author: Maria Garcia Herrero <mariagh@google.com>
Authored: Wed Sep 6 21:04:38 2017 -0700
Committer: chamikara@google.com <chamikara@google.com>
Committed: Fri Sep 15 08:50:01 2017 -0700

----------------------------------------------------------------------
 .../python/apache_beam/runners/direct/evaluation_context.py | 9 ++-------
 sdks/python/apache_beam/transforms/trigger.py               | 9 +++++++++
 2 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c1266ee2/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index a927b09..2e8b33b 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -20,7 +20,6 @@
 from __future__ import absolute_import
 
 import collections
-import copy
 import threading
 
 from apache_beam.runners.direct.clock import Clock
@@ -321,11 +320,6 @@ class DirectUnmergedState(InMemoryUnmergedState):
   def __init__(self):
     super(DirectUnmergedState, self).__init__(defensive_copy=False)
 
-  # TODO(mariagh): make a selective deepcopy of just what is needed
-  # to preserve the state while a bundle is processed.
-  def clone(self):
-    return copy.deepcopy(self)
-
 
 class DirectStepContext(object):
   """Context for the currently-executing step."""
@@ -341,5 +335,6 @@ class DirectStepContext(object):
     if not self.existing_keyed_state.get(key):
       self.existing_keyed_state[key] = DirectUnmergedState()
     if not self.partial_keyed_state.get(key):
-      self.partial_keyed_state[key] = self.existing_keyed_state[key].clone()
+      self.partial_keyed_state[key] = (
+          self.existing_keyed_state[key].copy())
     return self.partial_keyed_state[key]

http://git-wip-us.apache.org/repos/asf/beam/blob/c1266ee2/sdks/python/apache_beam/transforms/trigger.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 84d5be9..8175d30 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -1064,6 +1064,15 @@ class InMemoryUnmergedState(UnmergedState):
     self.global_state = {}
     self.defensive_copy = defensive_copy
 
+  def copy(self):
+    cloned_object = InMemoryUnmergedState(defensive_copy=self.defensive_copy)
+    cloned_object.timers = copy.deepcopy(self.timers)
+    cloned_object.global_state = copy.deepcopy(self.global_state)
+    for window in self.state:
+      for tag in self.state[window]:
+        cloned_object.state[window][tag] = copy.copy(self.state[window][tag])
+    return cloned_object
+
   def set_global_state(self, tag, value):
     assert isinstance(tag, _ValueStateTag)
     if self.defensive_copy:


Mime
View raw message