beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 41/50: Split batch and streaming sources and translators
Date Fri, 04 Jan 2019 10:39:03 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1ca4192fadaa80d327655e80e0a8bf3eb22ea932
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Thu Dec 27 17:20:21 2018 +0100

    Split batch and streaming sources and translators
---
 .../translation/batch/DatasetSourceBatch.java      | 148 +++++++++++++++++++++
 .../DatasetSourceMockBatch.java}                   |   4 +-
 .../batch/ReadSourceTranslatorBatch.java           |  20 +--
 .../batch/ReadSourceTranslatorMockBatch.java       |   5 +-
 .../DatasetStreamingSource.java}                   |   4 +-
 5 files changed, 158 insertions(+), 23 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
new file mode 100644
index 0000000..1ad16eb
--- /dev/null
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static scala.collection.JavaConversions.asScalaBuffer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous
streaming
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
This
+ * class is just a mix-in.
+ */
+public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
+
+  private int numPartitions;
+  private Long bundleSize;
+  private TranslationContext context;
+  private BoundedSource<T> source;
+
+
+  @Override public DataSourceReader createReader(DataSourceOptions options) {
+    this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism();
+    checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
+    this.bundleSize = context.getOptions().getBundleSize();
+    return new DatasetReader();  }
+
+  /** This class can be mapped to Beam {@link BoundedSource}. */
+  private class DatasetReader implements DataSourceReader {
+
+    private Optional<StructType> schema;
+    private String checkpointLocation;
+    private DataSourceOptions options;
+
+    @Override
+    public StructType readSchema() {
+      return new StructType();
+    }
+
+    @Override
+    public List<InputPartition<InternalRow>> planInputPartitions() {
+      List<InputPartition<InternalRow>> result = new ArrayList<>();
+      long desiredSizeBytes;
+      SparkPipelineOptions options = context.getOptions();
+      try {
+        desiredSizeBytes =
+            (bundleSize == null)
+                ? source.getEstimatedSizeBytes(options) / numPartitions
+                : bundleSize;
+        List<? extends BoundedSource<T>> sources = source.split(desiredSizeBytes,
options);
+        for (BoundedSource<T> source : sources) {
+          result.add(
+              new InputPartition<InternalRow>() {
+
+                @Override
+                public InputPartitionReader<InternalRow> createPartitionReader() {
+                  BoundedReader<T> reader = null;
+                  try {
+                    reader = source.createReader(options);
+                  } catch (IOException e) {
+                    throw new RuntimeException(
+                        "Error creating BoundedReader " + reader.getClass().getCanonicalName(),
e);
+                  }
+                  return new DatasetPartitionReader(reader);
+                }
+              });
+        }
+        return result;
+
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Error in splitting BoundedSource " + source.getClass().getCanonicalName(), e);
+      }
+    }
+  }
+
+  /** This class can be mapped to Beam {@link BoundedReader} */
+  private class DatasetPartitionReader implements InputPartitionReader<InternalRow>
{
+
+    BoundedReader<T> reader;
+    private boolean started;
+    private boolean closed;
+
+    DatasetPartitionReader(BoundedReader<T> reader) {
+      this.reader = reader;
+      this.started = false;
+      this.closed = false;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      if (!started) {
+        started = true;
+        return reader.start();
+      } else {
+        return !closed && reader.advance();
+      }
+    }
+
+    @Override
+    public InternalRow get() {
+      List<Object> list = new ArrayList<>();
+      list.add(
+          WindowedValue.timestampedValueInGlobalWindow(
+              reader.getCurrent(), reader.getCurrentTimestamp()));
+      return InternalRow.apply(asScalaBuffer(list).toList());
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+      reader.close();
+    }
+  }
+}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java
similarity index 97%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java
index f722377..b616a6f 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.spark.structuredstreaming.translation.io;
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import static scala.collection.JavaConversions.asScalaBuffer;
 
@@ -37,7 +37,7 @@ import org.joda.time.Instant;
 /**
  * This is a mock source that gives values between 0 and 999.
  */
-public class DatasetSourceMock implements DataSourceV2, ReadSupport {
+public class DatasetSourceMockBatch implements DataSourceV2, ReadSupport {
 
   @Override public DataSourceReader createReader(DataSourceOptions options) {
     return new DatasetReader();
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index aed016a..370e3f4 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -21,32 +21,23 @@ import java.io.IOException;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource;
-import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.DatasetStreamingSource;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
 import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.scheduler.SparkListener;
-import org.apache.spark.scheduler.SparkListenerApplicationStart;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalog.Catalog;
-import org.apache.spark.sql.catalyst.catalog.CatalogTable;
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
 import org.apache.spark.sql.streaming.DataStreamReader;
 
 class ReadSourceTranslatorBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>>
{
 
-  private String SOURCE_PROVIDER_CLASS = DatasetSource.class.getCanonicalName();
+  private String SOURCE_PROVIDER_CLASS = DatasetSourceBatch.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
@@ -64,12 +55,11 @@ class ReadSourceTranslatorBatch<T>
       throw new RuntimeException(e);
     }
     SparkSession sparkSession = context.getSparkSession();
-    DataStreamReader dataStreamReader = sparkSession.readStream().format(providerClassName);
 
-    Dataset<Row> rowDataset = dataStreamReader.load();
+    Dataset<Row> rowDataset = sparkSession.read().format(providerClassName).load();
 
-    //TODO initialize source : how, to get a reference to the DatasetSource instance that
spark
-    // instantiates to be able to call DatasetSource.initialize(). How to pass in a DatasetCatalog?
+    //TODO initialize source : how, to get a reference to the DatasetStreamingSource instance
that spark
+    // instantiates to be able to call DatasetStreamingSource.initialize(). How to pass in
a DatasetCatalog?
     MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>()
{
       @Override public WindowedValue<T> call(Row value) throws Exception {
         //there is only one value put in each Row by the InputPartitionReader
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index 184d24c..758ff1d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSourceMock;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
@@ -29,8 +28,6 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.streaming.DataStreamReader;
-
 
 /**
  * Mock translator that generates a source of 0 to 999 and prints it.
@@ -39,7 +36,7 @@ import org.apache.spark.sql.streaming.DataStreamReader;
 class ReadSourceTranslatorMockBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>>
{
 
-  private String SOURCE_PROVIDER_CLASS = DatasetSourceMock.class.getCanonicalName();
+  private String SOURCE_PROVIDER_CLASS = DatasetSourceMockBatch.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
similarity index 99%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
index deacdf4..8701a83 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.spark.structuredstreaming.translation.io;
+package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static scala.collection.JavaConversions.asScalaBuffer;
@@ -56,7 +56,7 @@ import scala.collection.immutable.Map;
  * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
This
  * class is just a mix-in.
  */
-public class DatasetSource<T> implements DataSourceV2, MicroBatchReadSupport{
+public class DatasetStreamingSource<T> implements DataSourceV2, MicroBatchReadSupport{
 
   private int numPartitions;
   private Long bundleSize;


Mime
View raw message