beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [3/4] incubator-beam git commit: Remove References to Instant#now in the DirectRunner
Date Fri, 05 Aug 2016 17:40:32 GMT
Remove References to Instant#now in the DirectRunner

The DirectRunner should use exclusively the configured clock to
determine the processing time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7585cfc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7585cfc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7585cfc3

Branch: refs/heads/master
Commit: 7585cfc3693800b00c4ccc799c27f0311e9b0cc1
Parents: fcf6b1d
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Aug 5 09:58:05 2016 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Fri Aug 5 10:04:21 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/EvaluationContext.java | 14 ++++++++++----
 .../direct/ExecutorServiceParallelExecutor.java       |  5 ++---
 2 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7585cfc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 23c139d..94f28e2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -48,6 +48,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.MoreExecutors;
 
+import org.joda.time.Instant;
+
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
@@ -81,6 +83,7 @@ class EvaluationContext {
 
   /** The options that were used to create this {@link Pipeline}. */
   private final DirectOptions options;
+  private final Clock clock;
 
   private final BundleFactory bundleFactory;
   /** The current processing time and event time watermarks and timers. */
@@ -116,6 +119,7 @@ class EvaluationContext {
       Map<AppliedPTransform<?, ?, ?>, String> stepNames,
       Collection<PCollectionView<?>> views) {
     this.options = checkNotNull(options);
+    this.clock = options.getClock();
     this.bundleFactory = checkNotNull(bundleFactory);
     checkNotNull(rootTransforms);
     checkNotNull(valueToConsumers);
@@ -123,9 +127,7 @@ class EvaluationContext {
     checkNotNull(views);
     this.stepNames = stepNames;
 
-    this.watermarkManager =
-        WatermarkManager.create(
-            NanosOffsetClock.create(), rootTransforms, valueToConsumers);
+    this.watermarkManager = WatermarkManager.create(clock, rootTransforms, valueToConsumers);
     this.sideInputContainer = SideInputContainer.create(this, views);
 
     this.applicationStateInternals = new ConcurrentHashMap<>();
@@ -314,7 +316,7 @@ class EvaluationContext {
       AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) {
     StepAndKey stepAndKey = StepAndKey.of(application, key);
     return new DirectExecutionContext(
-        options.getClock(),
+        clock,
         key,
         (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
         watermarkManager.getWatermarks(application));
@@ -427,4 +429,8 @@ class EvaluationContext {
     }
     return true;
   }
+
+  public Instant now() {
+    return clock.now();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7585cfc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 64836d8..a0a5ec0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -40,7 +40,6 @@ import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
-import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -433,9 +432,9 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
                       .createKeyedBundle(
                           null, keyTimers.getKey(), (PCollection) transform.getInput())
                       .add(WindowedValue.valueInEmptyWindows(work))
-                      .commit(Instant.now());
-              state.set(ExecutorState.ACTIVE);
+                      .commit(evaluationContext.now());
               scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery));
+              state.set(ExecutorState.ACTIVE);
             }
           }
         }


Mime
View raw message