beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/3] incubator-beam git commit: Remove Some Code leftover from Earlier Refactoring
Date Mon, 30 May 2016 12:49:28 GMT
Remove Some Code leftover from Earlier Refactoring

This only removes unused parts of code.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4fe7010b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4fe7010b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4fe7010b

Branch: refs/heads/master
Commit: 4fe7010bdb7692914fc6c2821c95caea0cab770d
Parents: 36a27f5
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Sun May 29 08:04:39 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon May 30 14:23:19 2016 +0200

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    |   5 -
 .../FlinkBatchTransformTranslators.java         | 131 +------------------
 .../translation/wrappers/SinkOutputFormat.java  |  97 --------------
 3 files changed, 3 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fe7010b/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 757ac9c..b60cba1 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -57,11 +57,6 @@
       <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
       <version>${flink.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-avro_2.10</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
 
     <!-- Beam -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fe7010b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 8358807..200e4af 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -28,7 +28,6 @@ import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFun
 import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation;
-import org.apache.beam.runners.flink.translation.wrappers.SinkOutputFormat;
 import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -36,11 +35,8 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -69,27 +65,19 @@ import com.google.common.collect.Maps;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.FlatMapOperator;
 import org.apache.flink.api.java.operators.GroupCombineOperator;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.operators.SingleInputUdfOperator;
 import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -100,7 +88,7 @@ import java.util.Map;
  * Translators for transforming {@link PTransform PTransforms} to
  * Flink {@link DataSet DataSets}.
  */
-public class FlinkBatchTransformTranslators {
+class FlinkBatchTransformTranslators {
 
   // --------------------------------------------------------------------------------------------
   //  Transform Translator Registry
@@ -128,7 +116,7 @@ public class FlinkBatchTransformTranslators {
   }
 
 
-  public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(
+  static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(
       PTransform<?, ?> transform) {
     return TRANSLATORS.get(transform.getClass());
   }
@@ -154,119 +142,6 @@ public class FlinkBatchTransformTranslators {
     }
   }
 
-  private static class WriteSinkTranslatorBatch<T>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>>
{
-
-    @Override
-    public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext
context) {
-      String name = transform.getName();
-      PValue input = context.getInput(transform);
-      DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
-
-      inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions()))
-          .name(name);
-    }
-  }
-
-  private static class AvroIOWriteTranslatorBatch<T> implements
-      FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>>
{
-    private static final Logger LOG = LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class);
-
-
-    @Override
-    public void translateNode(
-        AvroIO.Write.Bound<T> transform,
-        FlinkBatchTranslationContext context) {
-      DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(context.getInput(transform));
-
-      String filenamePrefix = transform.getFilenamePrefix();
-      String filenameSuffix = transform.getFilenameSuffix();
-      int numShards = transform.getNumShards();
-      String shardNameTemplate = transform.getShardNameTemplate();
-
-      // TODO: Implement these. We need Flink support for this.
-      LOG.warn(
-          "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
-          filenameSuffix);
-      LOG.warn(
-          "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
-          shardNameTemplate);
-
-      // This is super hacky, but unfortunately we cannot get the type otherwise
-      Class<T> extractedAvroType;
-      try {
-        Field typeField = transform.getClass().getDeclaredField("type");
-        typeField.setAccessible(true);
-        @SuppressWarnings("unchecked")
-        Class<T> avroType = (Class<T>) typeField.get(transform);
-        extractedAvroType = avroType;
-      } catch (NoSuchFieldException | IllegalAccessException e) {
-        // we know that the field is there and it is accessible
-        throw new RuntimeException("Could not access type from AvroIO.Bound", e);
-      }
-
-      MapOperator<WindowedValue<T>, T> valueStream = inputDataSet.map(
-          new MapFunction<WindowedValue<T>, T>() {
-            @Override
-            public T map(WindowedValue<T> value) throws Exception {
-              return value.getValue();
-            }
-          }).returns(new CoderTypeInformation<>(context.getInput(transform).getCoder()));
-
-
-      DataSink<T> dataSink = valueStream.output(
-          new AvroOutputFormat<>(new Path(filenamePrefix), extractedAvroType));
-
-      if (numShards > 0) {
-        dataSink.setParallelism(numShards);
-      }
-    }
-  }
-
-  private static class TextIOWriteTranslatorBatch<T>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>>
{
-    private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class);
-
-    @Override
-    public void translateNode(
-        TextIO.Write.Bound<T> transform,
-        FlinkBatchTranslationContext context) {
-      PValue input = context.getInput(transform);
-      DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
-
-      String filenamePrefix = transform.getFilenamePrefix();
-      String filenameSuffix = transform.getFilenameSuffix();
-      boolean needsValidation = transform.needsValidation();
-      int numShards = transform.getNumShards();
-      String shardNameTemplate = transform.getShardNameTemplate();
-
-      // TODO: Implement these. We need Flink support for this.
-      LOG.warn(
-          "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
-          needsValidation);
-      LOG.warn(
-          "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
-          filenameSuffix);
-      LOG.warn(
-          "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
-          shardNameTemplate);
-
-      MapOperator<WindowedValue<T>, T> valueStream = inputDataSet.map(
-          new MapFunction<WindowedValue<T>, T>() {
-            @Override
-            public T map(WindowedValue<T> value) throws Exception {
-              return value.getValue();
-            }
-          }).returns(new CoderTypeInformation<>(transform.getCoder()));
-
-      DataSink<T> dataSink = valueStream.writeAsText(filenamePrefix);
-
-      if (numShards > 0) {
-        dataSink.setParallelism(numShards);
-      }
-    }
-  }
-
   private static class WindowBoundTranslatorBatch<T>
       implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Bound<T>>
{
 
@@ -431,7 +306,7 @@ public class FlinkBatchTransformTranslators {
   private static class Concatenate<T> extends Combine.CombineFn<T, List<T>,
List<T>> {
     @Override
     public List<T> createAccumulator() {
-      return new ArrayList<T>();
+      return new ArrayList<>();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fe7010b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
deleted file mode 100644
index c0a7132..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.io.Sink;
-import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.WindowedValue;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.AbstractID;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-
-/**
- * Wrapper for executing a {@link Sink} on Flink as an {@link OutputFormat}.
- *
- * @param <T> The type of the incoming records.
- */
-public class SinkOutputFormat<T> implements OutputFormat<WindowedValue<T>>
{
-
-  private final Sink<T> sink;
-
-  private final SerializedPipelineOptions serializedOptions;
-
-  private Sink.WriteOperation<T, ?> writeOperation;
-  private Sink.Writer<T, ?> writer;
-
-  private AbstractID uid = new AbstractID();
-
-  public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions)
{
-    this.sink = transform.getSink();
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-  }
-
-  @Override
-  public void configure(Configuration configuration) {
-    writeOperation = sink.createWriteOperation(serializedOptions.getPipelineOptions());
-    try {
-      writeOperation.initialize(serializedOptions.getPipelineOptions());
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to initialize the write operation.", e);
-    }
-  }
-
-  @Override
-  public void open(int taskNumber, int numTasks) throws IOException {
-    try {
-      writer = writeOperation.createWriter(serializedOptions.getPipelineOptions());
-    } catch (Exception e) {
-      throw new IOException("Couldn't create writer.", e);
-    }
-    try {
-      writer.open(uid + "-" + String.valueOf(taskNumber));
-    } catch (Exception e) {
-      throw new IOException("Couldn't open writer.", e);
-    }
-  }
-
-  @Override
-  public void writeRecord(WindowedValue<T> record) throws IOException {
-    try {
-      writer.write(record.getValue());
-    } catch (Exception e) {
-      throw new IOException("Couldn't write record.", e);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      writer.close();
-    } catch (Exception e) {
-      throw new IOException("Couldn't close writer.", e);
-    }
-  }
-
-}


Mime
View raw message