beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bchamb...@apache.org
Subject [1/2] incubator-beam git commit: Explicitly track the Source a ReadEvaluator is using
Date Fri, 01 Apr 2016 16:45:20 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master fba1259b4 -> dad60f6bd


Explicitly track the Source a ReadEvaluator is using

This permits use of sources that are not the initial source used in the
transform.

BoundedSource#splitIntoBundles and UnboundedSource#generateInitialSplits
generate multiple source objects for the same transform in order to
permit parallelism.

Also some cleanups:
 - Use proper scoping, interfaces in BoundedReadEvaluator
 - Use BoundedReader instead of Reader
 - contentsRemaining should be method-scoped not instance-scoped


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/536e4397
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/536e4397
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/536e4397

Branch: refs/heads/master
Commit: 536e43970e3535df0190ed8e41d93f26fa1dc177
Parents: fba1259
Author: Thomas Groh <tgroh@google.com>
Authored: Thu Mar 31 10:40:37 2016 -0700
Committer: bchambers <bchambers@google.com>
Committed: Fri Apr 1 09:32:32 2016 -0700

----------------------------------------------------------------------
 .../inprocess/BoundedReadEvaluatorFactory.java  | 26 +++++++++++---------
 .../UnboundedReadEvaluatorFactory.java          | 14 ++++++++---
 2 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/536e4397/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
index caec1fc..f034e2f 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
@@ -20,7 +20,6 @@ package com.google.cloud.dataflow.sdk.runners.inprocess;
 import com.google.cloud.dataflow.sdk.io.BoundedSource;
 import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
 import com.google.cloud.dataflow.sdk.io.Read.Bounded;
-import com.google.cloud.dataflow.sdk.io.Source.Reader;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
@@ -62,8 +61,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
 
   private <OutputT> TransformEvaluator<?> getTransformEvaluator(
       final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>>
transform,
-      final InProcessEvaluationContext evaluationContext)
-      throws IOException {
+      final InProcessEvaluationContext evaluationContext) {
     BoundedReadEvaluator<?> evaluator =
         getTransformEvaluatorQueue(transform, evaluationContext).poll();
     if (evaluator == null) {
@@ -93,8 +91,9 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
       if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
         // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
         // factory for this transform
+        BoundedSource<OutputT> source = transform.getTransform().getSource();
         BoundedReadEvaluator<OutputT> evaluator =
-            new BoundedReadEvaluator<OutputT>(transform, evaluationContext);
+            new BoundedReadEvaluator<OutputT>(transform, evaluationContext, source);
         evaluatorQueue.offer(evaluator);
       } else {
         // otherwise return the existing Queue that arrived before us
@@ -116,13 +115,19 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
   private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object>
{
     private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>>
transform;
     private final InProcessEvaluationContext evaluationContext;
-    private boolean contentsRemaining;
+    /**
+     * The source being read from by this {@link BoundedReadEvaluator}. This may not be the
same
+     * as the source derived from {@link #transform} due to splitting.
+     */
+    private BoundedSource<OutputT> source;
 
     public BoundedReadEvaluator(
         AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-        InProcessEvaluationContext evaluationContext) {
+        InProcessEvaluationContext evaluationContext,
+        BoundedSource<OutputT> source) {
       this.transform = transform;
       this.evaluationContext = evaluationContext;
+      this.source = source;
     }
 
     @Override
@@ -130,12 +135,9 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
 
     @Override
     public InProcessTransformResult finishBundle() throws IOException {
-      try (final Reader<OutputT> reader =
-              transform
-                  .getTransform()
-                  .getSource()
-                  .createReader(evaluationContext.getPipelineOptions());) {
-        contentsRemaining = reader.start();
+      try (final BoundedReader<OutputT> reader =
+              source.createReader(evaluationContext.getPipelineOptions());) {
+        boolean contentsRemaining = reader.start();
         UncommittedBundle<OutputT> output =
             evaluationContext.createRootBundle(transform.getOutput());
         while (contentsRemaining) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/536e4397/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
index fa16290..0f2e4f4 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
@@ -90,8 +90,10 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
       if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
         // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
         // factory for this transform
+        UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
         UnboundedReadEvaluator<OutputT> evaluator =
-            new UnboundedReadEvaluator<OutputT>(transform, evaluationContext, evaluatorQueue);
+            new UnboundedReadEvaluator<OutputT>(
+                transform, evaluationContext, source, evaluatorQueue);
         evaluatorQueue.offer(evaluator);
       } else {
         // otherwise return the existing Queue that arrived before us
@@ -116,15 +118,22 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
     private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>>
transform;
     private final InProcessEvaluationContext evaluationContext;
     private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
+    /**
+     * The source being read from by this {@link UnboundedReadEvaluator}. This may not be
the same
+     * source as derived from {@link #transform} due to splitting.
+     */
+    private final UnboundedSource<OutputT, ?> source;
     private CheckpointMark checkpointMark;
 
     public UnboundedReadEvaluator(
         AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>>
transform,
         InProcessEvaluationContext evaluationContext,
+        UnboundedSource<OutputT, ?> source,
         Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
       this.transform = transform;
       this.evaluationContext = evaluationContext;
       this.evaluatorQueue = evaluatorQueue;
+      this.source = source;
       this.checkpointMark = null;
     }
 
@@ -135,8 +144,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
     public InProcessTransformResult finishBundle() throws IOException {
       UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
       try (UnboundedReader<OutputT> reader =
-              createReader(
-                  transform.getTransform().getSource(), evaluationContext.getPipelineOptions());)
{
+              createReader(source, evaluationContext.getPipelineOptions());) {
         int numElements = 0;
         if (reader.start()) {
           do {


Mime
View raw message