beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [41/50] incubator-beam git commit: Incrementally update Pending elements when work completes
Date Tue, 08 Nov 2016 03:41:40 GMT
Incrementally update Pending elements when work completes

This reduces the amount of single-threaded updates the monitor thread
performs before firing timers.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/317b5e65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/317b5e65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/317b5e65

Branch: refs/heads/gearpump-runner
Commit: 317b5e6577a623fa8fddeac90e6a3c9510a250e5
Parents: 9de9ce6
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Nov 4 11:28:03 2016 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon Nov 7 15:47:02 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/WatermarkManager.java   | 109 ++++++++++++++-----
 1 file changed, 83 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/317b5e65/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index f01c13c..2228cd5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -43,7 +43,10 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -682,10 +685,16 @@ public class WatermarkManager {
   private final ConcurrentLinkedQueue<PendingWatermarkUpdate> pendingUpdates;
 
   /**
+   * A lock used to control concurrency for updating pending values.
+   */
+  private final Lock refreshLock;
+
+  /**
    * A queue of pending {@link AppliedPTransform AppliedPTransforms} that have potentially
    * stale data.
    */
-  private final ConcurrentLinkedQueue<AppliedPTransform<?, ?, ?>> pendingRefreshes;
+  @GuardedBy("refreshLock")
+  private final Set<AppliedPTransform<?, ?, ?>> pendingRefreshes;
 
   /**
    * Creates a new {@link WatermarkManager}. All watermarks within the newly created
@@ -710,7 +719,9 @@ public class WatermarkManager {
     this.clock = clock;
     this.consumers = consumers;
     this.pendingUpdates = new ConcurrentLinkedQueue<>();
-    this.pendingRefreshes = new ConcurrentLinkedQueue<>();
+
+    this.refreshLock = new ReentrantLock();
+    this.pendingRefreshes = new HashSet<>();
 
     transformToWatermarks = new HashMap<>();
 
@@ -795,13 +806,18 @@ public class WatermarkManager {
 
   public void initialize(
       Map<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>>
initialBundles) {
-    for (Map.Entry<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>>
rootEntry :
-        initialBundles.entrySet()) {
-      TransformWatermarks rootWms = transformToWatermarks.get(rootEntry.getKey());
-      for (CommittedBundle<?> initialBundle : rootEntry.getValue()) {
-        rootWms.addPending(initialBundle);
+    refreshLock.lock();
+    try {
+      for (Map.Entry<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>>
rootEntry :
+          initialBundles.entrySet()) {
+        TransformWatermarks rootWms = transformToWatermarks.get(rootEntry.getKey());
+        for (CommittedBundle<?> initialBundle : rootEntry.getValue()) {
+          rootWms.addPending(initialBundle);
+        }
+        pendingRefreshes.add(rootEntry.getKey());
       }
-      pendingRefreshes.offer(rootEntry.getKey());
+    } finally {
+      refreshLock.unlock();
     }
   }
 
@@ -834,6 +850,17 @@ public class WatermarkManager {
         timerUpdate,
         result,
         earliestHold));
+    tryApplyPendingUpdates();
+  }
+
+  private void tryApplyPendingUpdates() {
+    if (refreshLock.tryLock()) {
+      try {
+        applyNUpdates(10);
+      } finally {
+        refreshLock.unlock();
+      }
+    }
   }
 
   /**
@@ -841,14 +868,24 @@ public class WatermarkManager {
    * of all {@link TransformWatermarks} to be advanced as far as possible.
    */
   private void applyPendingUpdates() {
-    Set<AppliedPTransform<?, ?, ?>> updatedTransforms = new HashSet<>();
-    PendingWatermarkUpdate pending = pendingUpdates.poll();
-    while (pending != null) {
+    refreshLock.lock();
+    try {
+      applyNUpdates(-1);
+    } finally {
+      refreshLock.unlock();
+    }
+  }
+
+  @GuardedBy("refreshLock")
+  /**
+   * Applies up to {@code numUpdates}, or all available updates if numUpdates is non-positive.
+   */
+  private void applyNUpdates(int numUpdates) {
+    for (int i = 0; !pendingUpdates.isEmpty() && (i < numUpdates || numUpdates
<= 0); i++) {
+      PendingWatermarkUpdate pending = pendingUpdates.poll();
       applyPendingUpdate(pending);
-      updatedTransforms.add(pending.getTransform());
-      pending = pendingUpdates.poll();
+      pendingRefreshes.add(pending.getTransform());
     }
-    pendingRefreshes.addAll(updatedTransforms);
   }
 
   private void applyPendingUpdate(PendingWatermarkUpdate pending) {
@@ -905,22 +942,37 @@ public class WatermarkManager {
    * watermarks to be advanced as far as possible.
    */
   synchronized void refreshAll() {
-    applyPendingUpdates();
-    while (!pendingRefreshes.isEmpty()) {
-      refreshWatermarks(pendingRefreshes.poll());
+    refreshLock.lock();
+    try {
+      applyPendingUpdates();
+      Set<AppliedPTransform<?, ?, ?>> toRefresh = pendingRefreshes;
+      while (!toRefresh.isEmpty()) {
+        toRefresh = refreshAllOf(toRefresh);
+      }
+    } finally {
+      refreshLock.unlock();
     }
   }
 
-  private void refreshWatermarks(AppliedPTransform<?, ?, ?> toRefresh) {
+  private Set<AppliedPTransform<?, ?, ?>> refreshAllOf(Set<AppliedPTransform<?,
?, ?>> toRefresh) {
+    Set<AppliedPTransform<?, ?, ?>> newRefreshes = new HashSet<>();
+    for (AppliedPTransform<?, ?, ?> transform : toRefresh) {
+      newRefreshes.addAll(refreshWatermarks(transform));
+    }
+    return newRefreshes;
+  }
+
+  private Set<AppliedPTransform<?, ?, ?>> refreshWatermarks(AppliedPTransform<?,
?, ?> toRefresh) {
     TransformWatermarks myWatermarks = transformToWatermarks.get(toRefresh);
     WatermarkUpdate updateResult = myWatermarks.refresh();
-    Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
     if (updateResult.isAdvanced()) {
+      Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
       for (PValue outputPValue : toRefresh.getOutput().expand()) {
         additionalRefreshes.addAll(consumers.get(outputPValue));
       }
+      return additionalRefreshes;
     }
-    pendingRefreshes.addAll(additionalRefreshes);
+    return Collections.emptySet();
   }
 
   /**
@@ -929,12 +981,17 @@ public class WatermarkManager {
    */
   public Collection<FiredTimers> extractFiredTimers() {
     Collection<FiredTimers> allTimers = new ArrayList<>();
-    for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry
:
-        transformToWatermarks.entrySet()) {
-      Collection<FiredTimers> firedTimers = watermarksEntry.getValue().extractFiredTimers();
-      allTimers.addAll(firedTimers);
+    refreshLock.lock();
+    try {
+      for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry
+          : transformToWatermarks.entrySet()) {
+        Collection<FiredTimers> firedTimers = watermarksEntry.getValue().extractFiredTimers();
+        allTimers.addAll(firedTimers);
+      }
+      return allTimers;
+    } finally {
+      refreshLock.unlock();
     }
-    return allTimers;
   }
 
   /**
@@ -1032,7 +1089,7 @@ public class WatermarkManager {
      * Removes the hold of the provided key.
      */
     public void removeHold(Object key) {
-      KeyedHold oldHold = keyedHolds.get(key);
+      KeyedHold oldHold = keyedHolds.remove(key);
       if (oldHold != null) {
         allHolds.remove(oldHold);
       }


Mime
View raw message