beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 11/20: Refactoring: -move batch/streaming common translation visitor and utility methods to PipelineTranslator -rename batch dedicated classes to Batch* to differentiate with their streaming counterparts -Introduce TranslationContext for common batch/streaming components
Date Tue, 27 Nov 2018 15:39:21 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3ce792dfa5258ee3d28e97eebf9266b81161f186
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Wed Nov 21 16:51:40 2018 +0100

    Refactoring:
    -move batch/streaming common translation visitor and utility methods to PipelineTranslator
    -rename batch dedicated classes to Batch* to differentiate with their streaming counterparts
    -Introduce TranslationContext for common batch/streaming components
---
 .../translation/PipelineTranslator.java            | 73 ++++++++++++++++++-
 .../translation/TransformTranslator.java           | 11 +++
 .../translation/TranslationContext.java            | 13 ++++
 .../batch/BatchCombinePerKeyTranslator.java        | 17 +++++
 .../batch/BatchFlattenPCollectionTranslator.java   | 16 ++++
 .../batch/BatchGroupByKeyTranslator.java           | 17 +++++
 .../translation/batch/BatchParDoTranslator.java    | 16 ++++
 .../translation/batch/BatchPipelineTranslator.java | 85 +++-------------------
 .../batch/BatchReadSourceTranslator.java           | 15 ++++
 .../batch/BatchReshuffleTranslator.java            | 12 +++
 .../batch/BatchTransformTranslator.java            | 11 ---
 .../translation/batch/BatchTranslationContext.java | 12 +--
 .../batch/BatchWindowAssignTranslator.java         | 14 ++++
 .../batch/CombinePerKeyTranslatorBatch.java        | 14 ----
 .../batch/FlattenPCollectionTranslatorBatch.java   | 13 ----
 .../batch/GroupByKeyTranslatorBatch.java           | 14 ----
 .../translation/batch/ParDoTranslatorBatch.java    | 13 ----
 .../batch/ReadSourceTranslatorBatch.java           | 12 ---
 .../batch/ReshuffleTranslatorBatch.java            | 11 ---
 .../batch/WindowAssignTranslatorBatch.java         | 12 ---
 .../streaming/StreamingPipelineTranslator.java     |  6 ++
 .../streaming/StreamingTranslationContext.java     |  7 ++
 22 files changed, 227 insertions(+), 187 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index 8eb1fb6..62e87f2 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -1,5 +1,6 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation;
 
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PipelineResources;
 import org.apache.beam.runners.spark.SparkTransformOverrides;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
@@ -7,11 +8,11 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.batch.Batch
 import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 /**
  /**
  * The role of this class is to detect the pipeline mode and to translate the Beam operators
to their Spark counterparts. If we have
@@ -19,7 +20,11 @@ import org.slf4j.LoggerFactory;
  * case, i.e. for a batch job, a {@link BatchPipelineTranslator} is created. Correspondingly,
  */
 
-public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
+public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
+  private int depth = 0;
+  private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class);
+  protected TranslationContext translationContext;
+
 
   // --------------------------------------------------------------------------------------------
   //  Pipeline preparation methods
@@ -103,7 +108,7 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
    * @param n number of spaces to generate
    * @return String with "|" followed by n spaces
    */
-  protected static String genSpaces(int n) {
+  private static String genSpaces(int n) {
     StringBuilder builder = new StringBuilder();
     for (int i = 0; i < n; i++) {
       builder.append("|   ");
@@ -111,8 +116,31 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
     return builder.toString();
   }
 
+  /**
+   * get a {@link TransformTranslator} for the given {@link TransformHierarchy.Node}
+   * @param node
+   * @return
+   */
+  protected abstract TransformTranslator<?> getTransformTranslator(TransformHierarchy.Node
node);
+
+  private <T extends PTransform<?, ?>> void translateNode(
+      TransformHierarchy.Node node,
+      TransformTranslator<?> transformTranslator) {
+
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) node.getTransform();
+
+    @SuppressWarnings("unchecked")
+    TransformTranslator<T> typedTransformTranslator = (TransformTranslator<T>)
transformTranslator;
+
+    // create the applied PTransform on the translationContext
+    translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
+    typedTransformTranslator.translateNode(typedTransform, translationContext);
+  }
+
+
   // --------------------------------------------------------------------------------------------
-  //  Pipeline visitor methods
+  //  Pipeline visitor entry point
   // --------------------------------------------------------------------------------------------
 
   /**
@@ -121,11 +149,48 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
    * @param pipeline The pipeline to be translated
    */
   public void translate(Pipeline pipeline) {
+    LOG.info("starting translation of the pipeline using {}", getClass().getName());
     pipeline.traverseTopologically(this);
   }
 
+  // --------------------------------------------------------------------------------------------
+  //  Pipeline Visitor Methods
+  // --------------------------------------------------------------------------------------------
 
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+    LOG.info("{} enterCompositeTransform- {}", genSpaces(depth), node.getFullName());
+    depth++;
 
+    TransformTranslator<?> transformTranslator = getTransformTranslator(node);
 
+    if (transformTranslator != null) {
+      translateNode(node, transformTranslator);
+      LOG.info("{} translated- {}", genSpaces(depth), node.getFullName());
+      return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+    } else {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+  }
 
+  @Override
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
+    depth--;
+    LOG.info("{} leaveCompositeTransform- {}", genSpaces(depth), node.getFullName());
+  }
+
+  @Override
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    LOG.info("{} visitPrimitiveTransform- {}", genSpaces(depth), node.getFullName());
+
+    // get the transformation corresponding to the node we are
+    // currently visiting and translate it into its Spark alternative.
+    TransformTranslator<?> transformTranslator = getTransformTranslator(node);
+    if (transformTranslator == null) {
+      String transformUrn = PTransformTranslation.urnForTransform(node.getTransform());
+      throw new UnsupportedOperationException(
+          "The transform " + transformUrn + " is currently not supported.");
+    }
+    translateNode(node, transformTranslator);
+  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java
new file mode 100644
index 0000000..51cdd99
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TransformTranslator.java
@@ -0,0 +1,11 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation;
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+public interface TransformTranslator<TransformT extends PTransform> {
+
+  /** A translator of a {@link PTransform}. */
+
+  void translateNode(TransformT transform, TranslationContext context);
+  }
+
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
new file mode 100644
index 0000000..341ed49
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -0,0 +1,13 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation;
+
+import org.apache.beam.sdk.runners.AppliedPTransform;
+
+public class TranslationContext {
+
+  private AppliedPTransform<?, ?, ?> currentTransform;
+
+  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
+    this.currentTransform = currentTransform;
+  }
+
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchCombinePerKeyTranslator.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchCombinePerKeyTranslator.java
new file mode 100644
index 0000000..c9cae47
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchCombinePerKeyTranslator.java
@@ -0,0 +1,17 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+class BatchCombinePerKeyTranslator<K, InputT, AccumT, OutputT> implements
+    TransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K,
OutputT>>>> {
+
+  @Override public void translateNode(
+      PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
transform,
+      TranslationContext context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchFlattenPCollectionTranslator.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchFlattenPCollectionTranslator.java
new file mode 100644
index 0000000..77f6fdb
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchFlattenPCollectionTranslator.java
@@ -0,0 +1,16 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+class BatchFlattenPCollectionTranslator<T> implements
+    TransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>>
{
+
+  @Override public void translateNode(PTransform<PCollectionList<T>, PCollection<T>>
transform,
+      TranslationContext context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchGroupByKeyTranslator.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchGroupByKeyTranslator.java
new file mode 100644
index 0000000..1bd42f5
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchGroupByKeyTranslator.java
@@ -0,0 +1,17 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+class BatchGroupByKeyTranslator<K, InputT> implements
+    TransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K,
Iterable<InputT>>>>> {
+
+  @Override public void translateNode(
+      PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>
transform,
+      TranslationContext context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchParDoTranslator.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchParDoTranslator.java
new file mode 100644
index 0000000..cf8c896
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchParDoTranslator.java
@@ -0,0 +1,16 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+
+class BatchParDoTranslator<InputT, OutputT> implements
+    TransformTranslator<PTransform<PCollection<InputT>, PCollectionTuple>>
{
+
+  @Override public void translateNode(PTransform<PCollection<InputT>, PCollectionTuple>
transform,
+      TranslationContext context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
index 1bf660f..ff92d89 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
@@ -6,13 +6,10 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.spark.SparkConf;
-import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a Spark batch job.
*/
 
@@ -23,37 +20,34 @@ public class BatchPipelineTranslator extends PipelineTranslator {
   //  Transform Translator Registry
   // --------------------------------------------------------------------------------------------
 
-  private BatchTranslationContext translationContext;
-  private int depth = 0;
-
   @SuppressWarnings("rawtypes")
-  private static final Map<String, BatchTransformTranslator> TRANSFORM_TRANSLATORS
= new HashMap<>();
+  private static final Map<String, TransformTranslator> TRANSFORM_TRANSLATORS = new
HashMap<>();
 
   static {
     TRANSFORM_TRANSLATORS.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN,
-        new CombinePerKeyTranslatorBatch());
+        new BatchCombinePerKeyTranslator());
     TRANSFORM_TRANSLATORS
-        .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslatorBatch());
-    TRANSFORM_TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
+        .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new BatchGroupByKeyTranslator());
+    TRANSFORM_TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new BatchReshuffleTranslator());
 
     TRANSFORM_TRANSLATORS
-        .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch());
+        .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new BatchFlattenPCollectionTranslator());
 
     TRANSFORM_TRANSLATORS
-        .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch());
+        .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new BatchWindowAssignTranslator());
 
-    TRANSFORM_TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch());
+    TRANSFORM_TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new BatchParDoTranslator());
 
-    TRANSFORM_TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch());
+    TRANSFORM_TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new BatchReadSourceTranslator());
   }
-  private static final Logger LOG = LoggerFactory.getLogger(BatchPipelineTranslator.class);
 
   public BatchPipelineTranslator(SparkPipelineOptions options) {
     translationContext = new BatchTranslationContext(options);
   }
 
   /** Returns a translator for the given node, if it is possible, otherwise null. */
-  private static BatchTransformTranslator<?> getTransformTranslator(TransformHierarchy.Node
node) {
+  @Override
+  protected TransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node)
{
     @Nullable PTransform<?, ?> transform = node.getTransform();
     // Root of the graph is null
     if (transform == null) {
@@ -64,61 +58,4 @@ public class BatchPipelineTranslator extends PipelineTranslator {
   }
 
 
-  // --------------------------------------------------------------------------------------------
-  //  Pipeline Visitor Methods
-  // --------------------------------------------------------------------------------------------
-
-  @Override
-  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-    LOG.info("{} enterCompositeTransform- {}", genSpaces(depth), node.getFullName());
-    depth++;
-
-    BatchTransformTranslator<?> transformTranslator = getTransformTranslator(node);
-
-    if (transformTranslator != null) {
-      translateNode(node, transformTranslator);
-      LOG.info("{} translated- {}", genSpaces(depth), node.getFullName());
-      return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
-    } else {
-      return CompositeBehavior.ENTER_TRANSFORM;
-    }
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformHierarchy.Node node) {
-    depth--;
-    LOG.info("{} leaveCompositeTransform- {}", genSpaces(depth), node.getFullName());
-  }
-
-  @Override
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    LOG.info("{} visitPrimitiveTransform- {}", genSpaces(depth), node.getFullName());
-
-    // get the transformation corresponding to the node we are
-    // currently visiting and translate it into its Spark alternative.
-    BatchTransformTranslator<?> transformTranslator = getTransformTranslator(node);
-    if (transformTranslator == null) {
-      String transformUrn = PTransformTranslation.urnForTransform(node.getTransform());
-      throw new UnsupportedOperationException(
-          "The transform " + transformUrn + " is currently not supported.");
-    }
-    translateNode(node, transformTranslator);
-  }
-
-  private <T extends PTransform<?, ?>> void translateNode(
-      TransformHierarchy.Node node,
-      BatchTransformTranslator<?> transformTranslator) {
-
-    @SuppressWarnings("unchecked")
-    T typedTransform = (T) node.getTransform();
-
-    @SuppressWarnings("unchecked")
-    BatchTransformTranslator<T> typedTransformTranslator = (BatchTransformTranslator<T>)
transformTranslator;
-
-    // create the applied PTransform on the translationContext
-    translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
-    typedTransformTranslator.translateNode(typedTransform, translationContext);
-  }
-
-
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReadSourceTranslator.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReadSourceTranslator.java
new file mode 100644
index 0000000..f5f0351
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReadSourceTranslator.java
@@ -0,0 +1,15 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+
+class BatchReadSourceTranslator<T> implements TransformTranslator<PTransform<PBegin,
PCollection<T>>> {
+
+  @Override public void translateNode(PTransform<PBegin, PCollection<T>> transform,
+      TranslationContext context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReshuffleTranslator.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReshuffleTranslator.java
new file mode 100644
index 0000000..5fab1c8
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchReshuffleTranslator.java
@@ -0,0 +1,12 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.Reshuffle;
+
+class BatchReshuffleTranslator<K, InputT> implements TransformTranslator<Reshuffle<K,
InputT>> {
+
+  @Override public void translateNode(Reshuffle<K, InputT> transform, TranslationContext
context) {
+
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTransformTranslator.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTransformTranslator.java
deleted file mode 100644
index ab0cf68..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTransformTranslator.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import org.apache.beam.sdk.transforms.PTransform;
-
-public interface BatchTransformTranslator<TransformT extends PTransform> {
-
-  /** A translator of a {@link PTransform} in batch mode. */
-
-  void translateNode(TransformT transform, BatchTranslationContext context);
-  }
-
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
index b53aa19..71ef315 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
@@ -3,6 +3,7 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.spark.SparkConf;
@@ -12,7 +13,7 @@ import org.apache.spark.sql.SparkSession;
 /**
  * Keeps track of the {@link Dataset} and the step the translation is in.
  */
-public class BatchTranslationContext {
+public class BatchTranslationContext extends TranslationContext {
   private final Map<PValue, Dataset<?>> datasets;
 
   /**
@@ -24,9 +25,6 @@ public class BatchTranslationContext {
   private SparkSession sparkSession;
   private final SparkPipelineOptions options;
 
-  private AppliedPTransform<?, ?, ?> currentTransform;
-
-
   public BatchTranslationContext(SparkPipelineOptions options) {
     SparkConf sparkConf = new SparkConf();
     sparkConf.setMaster(options.getSparkMaster());
@@ -35,7 +33,7 @@ public class BatchTranslationContext {
       sparkConf.setJars(options.getFilesToStage().toArray(new String[0]));
     }
 
-    SparkSession sparkSession = SparkSession
+    this.sparkSession = SparkSession
         .builder()
         .config(sparkConf)
         .getOrCreate();
@@ -43,8 +41,4 @@ public class BatchTranslationContext {
     this.datasets = new HashMap<>();
     this.danglingDataSets = new HashMap<>();
   }
-
-  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
-    this.currentTransform = currentTransform;
-  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchWindowAssignTranslator.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchWindowAssignTranslator.java
new file mode 100644
index 0000000..fbbced5
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchWindowAssignTranslator.java
@@ -0,0 +1,14 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+
+class BatchWindowAssignTranslator<T> implements
+    TransformTranslator<PTransform<PCollection<T>, PCollection<T>>>
{
+
+  @Override public void translateNode(PTransform<PCollection<T>, PCollection<T>>
transform,
+      TranslationContext context) {
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
deleted file mode 100644
index 6099fbc..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT> implements BatchTransformTranslator<PTransform<PCollection<KV<K,
InputT>>, PCollection<KV<K, OutputT>>>> {
-
-  @Override public void translateNode(
-      PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
transform,
-      BatchTranslationContext context) {
-
-  }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
deleted file mode 100644
index 281eda9..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenPCollectionTranslatorBatch.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-
-class FlattenPCollectionTranslatorBatch<T> implements BatchTransformTranslator<PTransform<PCollectionList<T>,
PCollection<T>>> {
-
-  @Override public void translateNode(PTransform<PCollectionList<T>, PCollection<T>>
transform,
-      BatchTranslationContext context) {
-
-  }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
deleted file mode 100644
index bb0ccc1..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-class GroupByKeyTranslatorBatch<K, InputT> implements BatchTransformTranslator<PTransform<PCollection<KV<K,
InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
-
-  @Override public void translateNode(
-      PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>
transform,
-      BatchTranslationContext context) {
-
-  }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
deleted file mode 100644
index 4477853..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-
-class ParDoTranslatorBatch<InputT, OutputT> implements BatchTransformTranslator<PTransform<PCollection<InputT>,
PCollectionTuple>> {
-
-  @Override public void translateNode(PTransform<PCollection<InputT>, PCollectionTuple>
transform,
-      BatchTranslationContext context) {
-
-  }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
deleted file mode 100644
index a30fa70..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-
-class ReadSourceTranslatorBatch<T> implements BatchTransformTranslator<PTransform<PBegin,
PCollection<T>>> {
-
-  @Override public void translateNode(PTransform<PBegin, PCollection<T>> transform,
BatchTranslationContext context) {
-
-  }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReshuffleTranslatorBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReshuffleTranslatorBatch.java
deleted file mode 100644
index 6283fdb..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReshuffleTranslatorBatch.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import org.apache.beam.sdk.transforms.Reshuffle;
-
-class ReshuffleTranslatorBatch<K, InputT> implements BatchTransformTranslator<Reshuffle<K,
InputT>> {
-
-  @Override public void translateNode(Reshuffle<K, InputT> transform,
-      BatchTranslationContext context) {
-
-  }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
deleted file mode 100644
index 21b71b9..0000000
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-
-class WindowAssignTranslatorBatch<T> implements BatchTransformTranslator<PTransform<PCollection<T>,
PCollection<T>>> {
-
-  @Override public void translateNode(PTransform<PCollection<T>, PCollection<T>>
transform,
-      BatchTranslationContext context) {
-
-  }
-}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
index 7bed930..9303d59 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
@@ -2,9 +2,15 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
 
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 
 public class StreamingPipelineTranslator extends PipelineTranslator {
 
   public StreamingPipelineTranslator(SparkPipelineOptions options) {
   }
+
+  @Override protected TransformTranslator<?> getTransformTranslator(TransformHierarchy.Node
node) {
+    return null;
+  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
new file mode 100644
index 0000000..460dbf6
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
@@ -0,0 +1,7 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
+
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+
+public class StreamingTranslationContext extends TranslationContext {
+
+}


Mime
View raw message